Skip to main content

nu_protocol/pipeline/
list_stream.rs

1//! Module managing the streaming of individual [`Value`]s as a [`ListStream`] between pipeline
2//! elements
3//!
4//! For more general infos regarding our pipelining model refer to [`PipelineData`]
5use crate::{Config, PipelineData, ShellError, Signals, Span, Value};
6
7pub type ValueIterator = Box<dyn Iterator<Item = Value> + Send + 'static>;
8
9/// A potentially infinite, interruptible stream of [`Value`]s.
10///
11/// In practice, a "stream" here means anything which can be iterated and produces Values.
12/// Like other iterators in Rust, observing values from this stream will drain the items
13/// as you view them and the stream cannot be replayed.
14#[derive(derive_more::Debug)]
15#[debug("ListStream")]
16pub struct ListStream {
17    stream: ValueIterator,
18    span: Span,
19    caller_spans: Vec<Span>,
20}
21
22impl ListStream {
23    /// Create a new [`ListStream`] from a [`Value`] `Iterator`.
24    pub fn new(
25        iter: impl Iterator<Item = Value> + Send + 'static,
26        span: Span,
27        signals: Signals,
28    ) -> Self {
29        Self {
30            stream: Box::new(InterruptIter::new(iter, signals)),
31            span,
32            caller_spans: vec![],
33        }
34    }
35
36    /// Returns the [`Span`] associated with this [`ListStream`].
37    pub fn span(&self) -> Span {
38        self.span
39    }
40
41    /// Push a caller [`Span`] to the bytestream, it's useful to construct a backtrace.
42    pub fn push_caller_span(&mut self, span: Span) {
43        if span != self.span {
44            self.caller_spans.push(span)
45        }
46    }
47
48    /// Get all caller [`Span`], it's useful to construct a backtrace.
49    pub fn get_caller_spans(&self) -> &Vec<Span> {
50        &self.caller_spans
51    }
52
53    /// Changes the [`Span`] associated with this [`ListStream`].
54    pub fn with_span(mut self, span: Span) -> Self {
55        self.span = span;
56        self
57    }
58
59    /// Convert a [`ListStream`] into its inner [`Value`] `Iterator`.
60    pub fn into_inner(self) -> ValueIterator {
61        self.stream
62    }
63
64    /// Take a single value from the inner `Iterator`, modifying the stream.
65    pub fn next_value(&mut self) -> Option<Value> {
66        self.stream.next()
67    }
68
69    /// Converts each value in a [`ListStream`] into a string and then joins the strings together
70    /// using the given separator.
71    pub fn into_string(self, separator: &str, config: &Config) -> String {
72        self.into_iter()
73            .map(|val| val.to_expanded_string(", ", config))
74            .collect::<Vec<String>>()
75            .join(separator)
76    }
77
78    /// Collect the values of a [`ListStream`] into a list [`Value`].
79    ///
80    /// If any of the values in the stream is a [Value::Error], its inner [ShellError] is returned.
81    pub fn into_value(self) -> Result<Value, ShellError> {
82        Ok(Value::list(
83            self.stream
84                .map(Value::unwrap_error)
85                .collect::<Result<_, _>>()?,
86            self.span,
87        ))
88    }
89
90    /// Collect the values of a [`ListStream`] into a [`Value::List`], preserving [Value::Error]
91    /// items for debugging purposes.
92    pub fn into_debug_value(self) -> Value {
93        Value::list(self.stream.collect(), self.span)
94    }
95
96    /// Consume all values in the stream, returning an error if any of the values is a `Value::Error`.
97    pub fn drain(self) -> Result<(), ShellError> {
98        for next in self {
99            if let Value::Error { error, .. } = next {
100                return Err(*error);
101            }
102        }
103        Ok(())
104    }
105
106    /// Modify the inner iterator of a [`ListStream`] using a function.
107    ///
108    /// This can be used to call any number of standard iterator functions on the [`ListStream`].
109    /// E.g., `take`, `filter`, `step_by`, and more.
110    ///
111    /// ```
112    /// use nu_protocol::{ListStream, Signals, Span, Value};
113    ///
114    /// let span = Span::unknown();
115    /// let stream = ListStream::new(std::iter::repeat(Value::int(0, span)), span, Signals::empty());
116    /// let new_stream = stream.modify(|iter| iter.take(100));
117    /// ```
118    pub fn modify<I>(self, f: impl FnOnce(ValueIterator) -> I) -> Self
119    where
120        I: Iterator<Item = Value> + Send + 'static,
121    {
122        Self {
123            stream: Box::new(f(self.stream)),
124            span: self.span,
125            caller_spans: self.caller_spans,
126        }
127    }
128
129    /// Create a new [`ListStream`] whose values are the results of applying the given function
130    /// to each of the values in the original [`ListStream`].
131    pub fn map(self, mapping: impl FnMut(Value) -> Value + Send + 'static) -> Self {
132        self.modify(|iter| iter.map(mapping))
133    }
134}
135
136impl IntoIterator for ListStream {
137    type Item = Value;
138
139    type IntoIter = IntoIter;
140
141    fn into_iter(self) -> Self::IntoIter {
142        IntoIter {
143            stream: self.into_inner(),
144        }
145    }
146}
147
148impl From<ListStream> for PipelineData {
149    fn from(stream: ListStream) -> Self {
150        Self::list_stream(stream, None)
151    }
152}
153
154pub struct IntoIter {
155    stream: ValueIterator,
156}
157
158impl Iterator for IntoIter {
159    type Item = Value;
160
161    fn next(&mut self) -> Option<Self::Item> {
162        self.stream.next()
163    }
164}
165
166struct InterruptIter<I: Iterator> {
167    iter: I,
168    signals: Signals,
169}
170
171impl<I: Iterator> InterruptIter<I> {
172    fn new(iter: I, signals: Signals) -> Self {
173        Self { iter, signals }
174    }
175}
176
177impl<I: Iterator> Iterator for InterruptIter<I> {
178    type Item = <I as Iterator>::Item;
179
180    fn next(&mut self) -> Option<Self::Item> {
181        if self.signals.interrupted() {
182            None
183        } else {
184            self.iter.next()
185        }
186    }
187
188    fn size_hint(&self) -> (usize, Option<usize>) {
189        self.iter.size_hint()
190    }
191}