nu_protocol/pipeline/
list_stream.rs1use crate::{Config, PipelineData, ShellError, Signals, Span, Value};
6use std::fmt::Debug;
7
8pub type ValueIterator = Box<dyn Iterator<Item = Value> + Send + 'static>;
9
10pub struct ListStream {
16 stream: ValueIterator,
17 span: Span,
18 caller_spans: Vec<Span>,
19}
20
21impl ListStream {
22 pub fn new(
24 iter: impl Iterator<Item = Value> + Send + 'static,
25 span: Span,
26 signals: Signals,
27 ) -> Self {
28 Self {
29 stream: Box::new(InterruptIter::new(iter, signals)),
30 span,
31 caller_spans: vec![],
32 }
33 }
34
35 pub fn span(&self) -> Span {
37 self.span
38 }
39
40 pub fn push_caller_span(&mut self, span: Span) {
42 if span != self.span {
43 self.caller_spans.push(span)
44 }
45 }
46
47 pub fn get_caller_spans(&self) -> &Vec<Span> {
49 &self.caller_spans
50 }
51
52 pub fn with_span(mut self, span: Span) -> Self {
54 self.span = span;
55 self
56 }
57
58 pub fn into_inner(self) -> ValueIterator {
60 self.stream
61 }
62
63 pub fn next_value(&mut self) -> Option<Value> {
65 self.stream.next()
66 }
67
68 pub fn into_string(self, separator: &str, config: &Config) -> String {
71 self.into_iter()
72 .map(|val| val.to_expanded_string(", ", config))
73 .collect::<Vec<String>>()
74 .join(separator)
75 }
76
77 pub fn into_value(self) -> Result<Value, ShellError> {
81 Ok(Value::list(
82 self.stream
83 .map(Value::unwrap_error)
84 .collect::<Result<_, _>>()?,
85 self.span,
86 ))
87 }
88
89 pub fn into_debug_value(self) -> Value {
92 Value::list(self.stream.collect(), self.span)
93 }
94
95 pub fn drain(self) -> Result<(), ShellError> {
97 for next in self {
98 if let Value::Error { error, .. } = next {
99 return Err(*error);
100 }
101 }
102 Ok(())
103 }
104
105 pub fn modify<I>(self, f: impl FnOnce(ValueIterator) -> I) -> Self
118 where
119 I: Iterator<Item = Value> + Send + 'static,
120 {
121 Self {
122 stream: Box::new(f(self.stream)),
123 span: self.span,
124 caller_spans: self.caller_spans,
125 }
126 }
127
128 pub fn map(self, mapping: impl FnMut(Value) -> Value + Send + 'static) -> Self {
131 self.modify(|iter| iter.map(mapping))
132 }
133}
134
135impl Debug for ListStream {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 f.debug_struct("ListStream").finish()
138 }
139}
140
141impl IntoIterator for ListStream {
142 type Item = Value;
143
144 type IntoIter = IntoIter;
145
146 fn into_iter(self) -> Self::IntoIter {
147 IntoIter {
148 stream: self.into_inner(),
149 }
150 }
151}
152
153impl From<ListStream> for PipelineData {
154 fn from(stream: ListStream) -> Self {
155 Self::list_stream(stream, None)
156 }
157}
158
159pub struct IntoIter {
160 stream: ValueIterator,
161}
162
163impl Iterator for IntoIter {
164 type Item = Value;
165
166 fn next(&mut self) -> Option<Self::Item> {
167 self.stream.next()
168 }
169}
170
171struct InterruptIter<I: Iterator> {
172 iter: I,
173 signals: Signals,
174}
175
176impl<I: Iterator> InterruptIter<I> {
177 fn new(iter: I, signals: Signals) -> Self {
178 Self { iter, signals }
179 }
180}
181
182impl<I: Iterator> Iterator for InterruptIter<I> {
183 type Item = <I as Iterator>::Item;
184
185 fn next(&mut self) -> Option<Self::Item> {
186 if self.signals.interrupted() {
187 None
188 } else {
189 self.iter.next()
190 }
191 }
192
193 fn size_hint(&self) -> (usize, Option<usize>) {
194 self.iter.size_hint()
195 }
196}