nu_stream/
input.rs

1use crate::prelude::*;
2use nu_errors::ShellError;
3use nu_protocol::{Primitive, Type, UntaggedValue, Value};
4use nu_source::{HasFallibleSpan, PrettyDebug, Tag, Tagged, TaggedItem};
5
6pub struct InputStream {
7    values: Box<dyn Iterator<Item = Value> + Send + Sync>,
8
9    // Whether or not an empty stream was explicitly requested via InputStream::empty
10    empty: bool,
11}
12
13impl Iterator for InputStream {
14    type Item = Value;
15
16    fn next(&mut self) -> Option<Self::Item> {
17        self.values.next()
18    }
19}
20
21impl InputStream {
22    pub fn empty() -> InputStream {
23        InputStream {
24            values: Box::new(std::iter::empty()),
25            empty: true,
26        }
27    }
28
29    pub fn one(item: impl Into<Value>) -> InputStream {
30        InputStream {
31            values: Box::new(std::iter::once(item.into())),
32            empty: false,
33        }
34    }
35
36    pub fn into_vec(self) -> Vec<Value> {
37        self.values.collect()
38    }
39
40    pub fn is_empty(&self) -> bool {
41        self.empty
42    }
43
44    pub fn drain_vec(&mut self) -> Vec<Value> {
45        let mut output = vec![];
46        for x in &mut self.values {
47            output.push(x);
48        }
49        output
50    }
51
52    pub fn from_stream(input: impl Iterator<Item = Value> + Send + Sync + 'static) -> InputStream {
53        InputStream {
54            values: Box::new(input),
55            empty: false,
56        }
57    }
58
59    pub fn collect_string(mut self, tag: Tag) -> Result<Tagged<String>, ShellError> {
60        let mut bytes = vec![];
61        let mut value_tag = tag.clone();
62
63        loop {
64            match self.values.next() {
65                Some(Value {
66                    value: UntaggedValue::Primitive(Primitive::String(s)),
67                    tag: value_t,
68                }) => {
69                    value_tag = value_t;
70                    bytes.extend_from_slice(&s.into_bytes());
71                }
72                Some(Value {
73                    value: UntaggedValue::Primitive(Primitive::Binary(b)),
74                    tag: value_t,
75                }) => {
76                    value_tag = value_t;
77                    bytes.extend_from_slice(&b);
78                }
79                Some(Value {
80                    value: UntaggedValue::Primitive(Primitive::Nothing),
81                    tag: value_t,
82                }) => {
83                    value_tag = value_t;
84                }
85                Some(Value {
86                    tag: value_tag,
87                    value,
88                }) => {
89                    return Err(ShellError::labeled_error_with_secondary(
90                        "Expected a string from pipeline",
91                        "requires string input",
92                        tag,
93                        format!(
94                            "{} originates from here",
95                            Type::from_value(&value).plain_string(100000)
96                        ),
97                        value_tag,
98                    ))
99                }
100                None => break,
101            }
102        }
103
104        match String::from_utf8(bytes) {
105            Ok(s) => Ok(s.tagged(value_tag)),
106            Err(_) => Err(ShellError::labeled_error_with_secondary(
107                "Expected a string from pipeline",
108                "requires string input",
109                tag,
110                "value originates from here",
111                value_tag,
112            )),
113        }
114    }
115
116    pub fn collect_binary(mut self, tag: Tag) -> Result<Tagged<Vec<u8>>, ShellError> {
117        let mut bytes = vec![];
118        let mut value_tag = tag.clone();
119
120        loop {
121            match self.values.next() {
122                Some(Value {
123                    value: UntaggedValue::Primitive(Primitive::Binary(b)),
124                    tag: value_t,
125                }) => {
126                    value_tag = value_t;
127                    bytes.extend_from_slice(&b);
128                }
129                Some(Value {
130                    tag: value_tag,
131                    value: _,
132                }) => {
133                    return Err(ShellError::labeled_error_with_secondary(
134                        "Expected binary from pipeline",
135                        "requires binary input",
136                        tag,
137                        "value originates from here",
138                        value_tag,
139                    ));
140                }
141                None => break,
142            }
143        }
144
145        Ok(bytes.tagged(value_tag))
146    }
147}
148
149impl From<VecDeque<Value>> for InputStream {
150    fn from(input: VecDeque<Value>) -> InputStream {
151        InputStream {
152            values: Box::new(input.into_iter()),
153            empty: false,
154        }
155    }
156}
157
158impl From<Vec<Value>> for InputStream {
159    fn from(input: Vec<Value>) -> InputStream {
160        InputStream {
161            values: Box::new(input.into_iter()),
162            empty: false,
163        }
164    }
165}
166
167pub trait IntoInputStream {
168    fn into_input_stream(self) -> InputStream;
169}
170
171impl<T, U> IntoInputStream for T
172where
173    T: Iterator<Item = U> + Send + Sync + 'static,
174    U: Into<Result<nu_protocol::Value, nu_errors::ShellError>>,
175{
176    fn into_input_stream(self) -> InputStream {
177        InputStream {
178            empty: false,
179            values: Box::new(self.map(|item| match item.into() {
180                Ok(result) => result,
181                Err(err) => match HasFallibleSpan::maybe_span(&err) {
182                    Some(span) => nu_protocol::UntaggedValue::Error(err).into_value(span),
183                    None => nu_protocol::UntaggedValue::Error(err).into_untagged_value(),
184                },
185            })),
186        }
187    }
188}