nu_protocol/pipeline/
list_stream.rs1use crate::{Config, PipelineData, ShellError, Signals, Span, Value};
6
7pub type ValueIterator = Box<dyn Iterator<Item = Value> + Send + 'static>;
8
9#[derive(derive_more::Debug)]
15#[debug("ListStream")]
16pub struct ListStream {
17 stream: ValueIterator,
18 span: Span,
19 caller_spans: Vec<Span>,
20}
21
22impl ListStream {
23 pub fn new(
25 iter: impl Iterator<Item = Value> + Send + 'static,
26 span: Span,
27 signals: Signals,
28 ) -> Self {
29 Self {
30 stream: Box::new(InterruptIter::new(iter, signals)),
31 span,
32 caller_spans: vec![],
33 }
34 }
35
36 pub fn span(&self) -> Span {
38 self.span
39 }
40
41 pub fn push_caller_span(&mut self, span: Span) {
43 if span != self.span {
44 self.caller_spans.push(span)
45 }
46 }
47
48 pub fn get_caller_spans(&self) -> &Vec<Span> {
50 &self.caller_spans
51 }
52
53 pub fn with_span(mut self, span: Span) -> Self {
55 self.span = span;
56 self
57 }
58
59 pub fn into_inner(self) -> ValueIterator {
61 self.stream
62 }
63
64 pub fn next_value(&mut self) -> Option<Value> {
66 self.stream.next()
67 }
68
69 pub fn into_string(self, separator: &str, config: &Config) -> String {
72 self.into_iter()
73 .map(|val| val.to_expanded_string(", ", config))
74 .collect::<Vec<String>>()
75 .join(separator)
76 }
77
78 pub fn into_value(self) -> Result<Value, ShellError> {
82 Ok(Value::list(
83 self.stream
84 .map(Value::unwrap_error)
85 .collect::<Result<_, _>>()?,
86 self.span,
87 ))
88 }
89
90 pub fn into_debug_value(self) -> Value {
93 Value::list(self.stream.collect(), self.span)
94 }
95
96 pub fn drain(self) -> Result<(), ShellError> {
98 for next in self {
99 if let Value::Error { error, .. } = next {
100 return Err(*error);
101 }
102 }
103 Ok(())
104 }
105
106 pub fn modify<I>(self, f: impl FnOnce(ValueIterator) -> I) -> Self
119 where
120 I: Iterator<Item = Value> + Send + 'static,
121 {
122 Self {
123 stream: Box::new(f(self.stream)),
124 span: self.span,
125 caller_spans: self.caller_spans,
126 }
127 }
128
129 pub fn map(self, mapping: impl FnMut(Value) -> Value + Send + 'static) -> Self {
132 self.modify(|iter| iter.map(mapping))
133 }
134}
135
136impl IntoIterator for ListStream {
137 type Item = Value;
138
139 type IntoIter = IntoIter;
140
141 fn into_iter(self) -> Self::IntoIter {
142 IntoIter {
143 stream: self.into_inner(),
144 }
145 }
146}
147
148impl From<ListStream> for PipelineData {
149 fn from(stream: ListStream) -> Self {
150 Self::list_stream(stream, None)
151 }
152}
153
154pub struct IntoIter {
155 stream: ValueIterator,
156}
157
158impl Iterator for IntoIter {
159 type Item = Value;
160
161 fn next(&mut self) -> Option<Self::Item> {
162 self.stream.next()
163 }
164}
165
166struct InterruptIter<I: Iterator> {
167 iter: I,
168 signals: Signals,
169}
170
171impl<I: Iterator> InterruptIter<I> {
172 fn new(iter: I, signals: Signals) -> Self {
173 Self { iter, signals }
174 }
175}
176
177impl<I: Iterator> Iterator for InterruptIter<I> {
178 type Item = <I as Iterator>::Item;
179
180 fn next(&mut self) -> Option<Self::Item> {
181 if self.signals.interrupted() {
182 None
183 } else {
184 self.iter.next()
185 }
186 }
187
188 fn size_hint(&self) -> (usize, Option<usize>) {
189 self.iter.size_hint()
190 }
191}