nu_protocol/pipeline/
list_stream.rs1use crate::{Config, PipelineData, ShellError, Signals, Span, Value};
6use std::fmt::Debug;
7
8pub type ValueIterator = Box<dyn Iterator<Item = Value> + Send + 'static>;
9
10pub struct ListStream {
16    stream: ValueIterator,
17    span: Span,
18    caller_spans: Vec<Span>,
19}
20
21impl ListStream {
22    pub fn new(
24        iter: impl Iterator<Item = Value> + Send + 'static,
25        span: Span,
26        signals: Signals,
27    ) -> Self {
28        Self {
29            stream: Box::new(InterruptIter::new(iter, signals)),
30            span,
31            caller_spans: vec![],
32        }
33    }
34
35    pub fn span(&self) -> Span {
37        self.span
38    }
39
40    pub fn push_caller_span(&mut self, span: Span) {
42        if span != self.span {
43            self.caller_spans.push(span)
44        }
45    }
46
47    pub fn get_caller_spans(&self) -> &Vec<Span> {
49        &self.caller_spans
50    }
51
52    pub fn with_span(mut self, span: Span) -> Self {
54        self.span = span;
55        self
56    }
57
58    pub fn into_inner(self) -> ValueIterator {
60        self.stream
61    }
62
63    pub fn next_value(&mut self) -> Option<Value> {
65        self.stream.next()
66    }
67
68    pub fn into_string(self, separator: &str, config: &Config) -> String {
71        self.into_iter()
72            .map(|val| val.to_expanded_string(", ", config))
73            .collect::<Vec<String>>()
74            .join(separator)
75    }
76
77    pub fn into_value(self) -> Result<Value, ShellError> {
81        Ok(Value::list(
82            self.stream
83                .map(Value::unwrap_error)
84                .collect::<Result<_, _>>()?,
85            self.span,
86        ))
87    }
88
89    pub fn into_debug_value(self) -> Value {
92        Value::list(self.stream.collect(), self.span)
93    }
94
95    pub fn drain(self) -> Result<(), ShellError> {
97        for next in self {
98            if let Value::Error { error, .. } = next {
99                return Err(*error);
100            }
101        }
102        Ok(())
103    }
104
105    pub fn modify<I>(self, f: impl FnOnce(ValueIterator) -> I) -> Self
118    where
119        I: Iterator<Item = Value> + Send + 'static,
120    {
121        Self {
122            stream: Box::new(f(self.stream)),
123            span: self.span,
124            caller_spans: self.caller_spans,
125        }
126    }
127
128    pub fn map(self, mapping: impl FnMut(Value) -> Value + Send + 'static) -> Self {
131        self.modify(|iter| iter.map(mapping))
132    }
133}
134
135impl Debug for ListStream {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct("ListStream").finish()
138    }
139}
140
141impl IntoIterator for ListStream {
142    type Item = Value;
143
144    type IntoIter = IntoIter;
145
146    fn into_iter(self) -> Self::IntoIter {
147        IntoIter {
148            stream: self.into_inner(),
149        }
150    }
151}
152
153impl From<ListStream> for PipelineData {
154    fn from(stream: ListStream) -> Self {
155        Self::list_stream(stream, None)
156    }
157}
158
159pub struct IntoIter {
160    stream: ValueIterator,
161}
162
163impl Iterator for IntoIter {
164    type Item = Value;
165
166    fn next(&mut self) -> Option<Self::Item> {
167        self.stream.next()
168    }
169}
170
171struct InterruptIter<I: Iterator> {
172    iter: I,
173    signals: Signals,
174}
175
176impl<I: Iterator> InterruptIter<I> {
177    fn new(iter: I, signals: Signals) -> Self {
178        Self { iter, signals }
179    }
180}
181
182impl<I: Iterator> Iterator for InterruptIter<I> {
183    type Item = <I as Iterator>::Item;
184
185    fn next(&mut self) -> Option<Self::Item> {
186        if self.signals.interrupted() {
187            None
188        } else {
189            self.iter.next()
190        }
191    }
192
193    fn size_hint(&self) -> (usize, Option<usize>) {
194        self.iter.size_hint()
195    }
196}