nu_protocol/pipeline/
list_stream.rs

1//! Module managing the streaming of individual [`Value`]s as a [`ListStream`] between pipeline
2//! elements
3//!
4//! For more general infos regarding our pipelining model refer to [`PipelineData`]
5use crate::{Config, PipelineData, ShellError, Signals, Span, Value};
6use std::fmt::Debug;
7
8pub type ValueIterator = Box<dyn Iterator<Item = Value> + Send + 'static>;
9
10/// A potentially infinite, interruptible stream of [`Value`]s.
11///
12/// In practice, a "stream" here means anything which can be iterated and produces Values.
13/// Like other iterators in Rust, observing values from this stream will drain the items
14/// as you view them and the stream cannot be replayed.
15pub struct ListStream {
16    stream: ValueIterator,
17    span: Span,
18    caller_spans: Vec<Span>,
19}
20
21impl ListStream {
22    /// Create a new [`ListStream`] from a [`Value`] `Iterator`.
23    pub fn new(
24        iter: impl Iterator<Item = Value> + Send + 'static,
25        span: Span,
26        signals: Signals,
27    ) -> Self {
28        Self {
29            stream: Box::new(InterruptIter::new(iter, signals)),
30            span,
31            caller_spans: vec![],
32        }
33    }
34
35    /// Returns the [`Span`] associated with this [`ListStream`].
36    pub fn span(&self) -> Span {
37        self.span
38    }
39
40    /// Push a caller [`Span`] to the bytestream, it's useful to construct a backtrace.
41    pub fn push_caller_span(&mut self, span: Span) {
42        if span != self.span {
43            self.caller_spans.push(span)
44        }
45    }
46
47    /// Get all caller [`Span`], it's useful to construct a backtrace.
48    pub fn get_caller_spans(&self) -> &Vec<Span> {
49        &self.caller_spans
50    }
51
52    /// Changes the [`Span`] associated with this [`ListStream`].
53    pub fn with_span(mut self, span: Span) -> Self {
54        self.span = span;
55        self
56    }
57
58    /// Convert a [`ListStream`] into its inner [`Value`] `Iterator`.
59    pub fn into_inner(self) -> ValueIterator {
60        self.stream
61    }
62
63    /// Take a single value from the inner `Iterator`, modifying the stream.
64    pub fn next_value(&mut self) -> Option<Value> {
65        self.stream.next()
66    }
67
68    /// Converts each value in a [`ListStream`] into a string and then joins the strings together
69    /// using the given separator.
70    pub fn into_string(self, separator: &str, config: &Config) -> String {
71        self.into_iter()
72            .map(|val| val.to_expanded_string(", ", config))
73            .collect::<Vec<String>>()
74            .join(separator)
75    }
76
77    /// Collect the values of a [`ListStream`] into a list [`Value`].
78    pub fn into_value(self) -> Value {
79        Value::list(self.stream.collect(), self.span)
80    }
81
82    /// Consume all values in the stream, returning an error if any of the values is a `Value::Error`.
83    pub fn drain(self) -> Result<(), ShellError> {
84        for next in self {
85            if let Value::Error { error, .. } = next {
86                return Err(*error);
87            }
88        }
89        Ok(())
90    }
91
92    /// Modify the inner iterator of a [`ListStream`] using a function.
93    ///
94    /// This can be used to call any number of standard iterator functions on the [`ListStream`].
95    /// E.g., `take`, `filter`, `step_by`, and more.
96    ///
97    /// ```
98    /// use nu_protocol::{ListStream, Signals, Span, Value};
99    ///
100    /// let span = Span::unknown();
101    /// let stream = ListStream::new(std::iter::repeat(Value::int(0, span)), span, Signals::empty());
102    /// let new_stream = stream.modify(|iter| iter.take(100));
103    /// ```
104    pub fn modify<I>(self, f: impl FnOnce(ValueIterator) -> I) -> Self
105    where
106        I: Iterator<Item = Value> + Send + 'static,
107    {
108        Self {
109            stream: Box::new(f(self.stream)),
110            span: self.span,
111            caller_spans: self.caller_spans,
112        }
113    }
114
115    /// Create a new [`ListStream`] whose values are the results of applying the given function
116    /// to each of the values in the original [`ListStream`].
117    pub fn map(self, mapping: impl FnMut(Value) -> Value + Send + 'static) -> Self {
118        self.modify(|iter| iter.map(mapping))
119    }
120}
121
122impl Debug for ListStream {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        f.debug_struct("ListStream").finish()
125    }
126}
127
128impl IntoIterator for ListStream {
129    type Item = Value;
130
131    type IntoIter = IntoIter;
132
133    fn into_iter(self) -> Self::IntoIter {
134        IntoIter {
135            stream: self.into_inner(),
136        }
137    }
138}
139
140impl From<ListStream> for PipelineData {
141    fn from(stream: ListStream) -> Self {
142        Self::list_stream(stream, None)
143    }
144}
145
146pub struct IntoIter {
147    stream: ValueIterator,
148}
149
150impl Iterator for IntoIter {
151    type Item = Value;
152
153    fn next(&mut self) -> Option<Self::Item> {
154        self.stream.next()
155    }
156}
157
158struct InterruptIter<I: Iterator> {
159    iter: I,
160    signals: Signals,
161}
162
163impl<I: Iterator> InterruptIter<I> {
164    fn new(iter: I, signals: Signals) -> Self {
165        Self { iter, signals }
166    }
167}
168
169impl<I: Iterator> Iterator for InterruptIter<I> {
170    type Item = <I as Iterator>::Item;
171
172    fn next(&mut self) -> Option<Self::Item> {
173        if self.signals.interrupted() {
174            None
175        } else {
176            self.iter.next()
177        }
178    }
179
180    fn size_hint(&self) -> (usize, Option<usize>) {
181        self.iter.size_hint()
182    }
183}