1use crate::prelude::*;
2use nu_errors::ShellError;
3use nu_protocol::{Primitive, Type, UntaggedValue, Value};
4use nu_source::{HasFallibleSpan, PrettyDebug, Tag, Tagged, TaggedItem};
5
6pub struct InputStream {
7 values: Box<dyn Iterator<Item = Value> + Send + Sync>,
8
9 empty: bool,
11}
12
13impl Iterator for InputStream {
14 type Item = Value;
15
16 fn next(&mut self) -> Option<Self::Item> {
17 self.values.next()
18 }
19}
20
21impl InputStream {
22 pub fn empty() -> InputStream {
23 InputStream {
24 values: Box::new(std::iter::empty()),
25 empty: true,
26 }
27 }
28
29 pub fn one(item: impl Into<Value>) -> InputStream {
30 InputStream {
31 values: Box::new(std::iter::once(item.into())),
32 empty: false,
33 }
34 }
35
36 pub fn into_vec(self) -> Vec<Value> {
37 self.values.collect()
38 }
39
40 pub fn is_empty(&self) -> bool {
41 self.empty
42 }
43
44 pub fn drain_vec(&mut self) -> Vec<Value> {
45 let mut output = vec![];
46 for x in &mut self.values {
47 output.push(x);
48 }
49 output
50 }
51
52 pub fn from_stream(input: impl Iterator<Item = Value> + Send + Sync + 'static) -> InputStream {
53 InputStream {
54 values: Box::new(input),
55 empty: false,
56 }
57 }
58
59 pub fn collect_string(mut self, tag: Tag) -> Result<Tagged<String>, ShellError> {
60 let mut bytes = vec![];
61 let mut value_tag = tag.clone();
62
63 loop {
64 match self.values.next() {
65 Some(Value {
66 value: UntaggedValue::Primitive(Primitive::String(s)),
67 tag: value_t,
68 }) => {
69 value_tag = value_t;
70 bytes.extend_from_slice(&s.into_bytes());
71 }
72 Some(Value {
73 value: UntaggedValue::Primitive(Primitive::Binary(b)),
74 tag: value_t,
75 }) => {
76 value_tag = value_t;
77 bytes.extend_from_slice(&b);
78 }
79 Some(Value {
80 value: UntaggedValue::Primitive(Primitive::Nothing),
81 tag: value_t,
82 }) => {
83 value_tag = value_t;
84 }
85 Some(Value {
86 tag: value_tag,
87 value,
88 }) => {
89 return Err(ShellError::labeled_error_with_secondary(
90 "Expected a string from pipeline",
91 "requires string input",
92 tag,
93 format!(
94 "{} originates from here",
95 Type::from_value(&value).plain_string(100000)
96 ),
97 value_tag,
98 ))
99 }
100 None => break,
101 }
102 }
103
104 match String::from_utf8(bytes) {
105 Ok(s) => Ok(s.tagged(value_tag)),
106 Err(_) => Err(ShellError::labeled_error_with_secondary(
107 "Expected a string from pipeline",
108 "requires string input",
109 tag,
110 "value originates from here",
111 value_tag,
112 )),
113 }
114 }
115
116 pub fn collect_binary(mut self, tag: Tag) -> Result<Tagged<Vec<u8>>, ShellError> {
117 let mut bytes = vec![];
118 let mut value_tag = tag.clone();
119
120 loop {
121 match self.values.next() {
122 Some(Value {
123 value: UntaggedValue::Primitive(Primitive::Binary(b)),
124 tag: value_t,
125 }) => {
126 value_tag = value_t;
127 bytes.extend_from_slice(&b);
128 }
129 Some(Value {
130 tag: value_tag,
131 value: _,
132 }) => {
133 return Err(ShellError::labeled_error_with_secondary(
134 "Expected binary from pipeline",
135 "requires binary input",
136 tag,
137 "value originates from here",
138 value_tag,
139 ));
140 }
141 None => break,
142 }
143 }
144
145 Ok(bytes.tagged(value_tag))
146 }
147}
148
149impl From<VecDeque<Value>> for InputStream {
150 fn from(input: VecDeque<Value>) -> InputStream {
151 InputStream {
152 values: Box::new(input.into_iter()),
153 empty: false,
154 }
155 }
156}
157
158impl From<Vec<Value>> for InputStream {
159 fn from(input: Vec<Value>) -> InputStream {
160 InputStream {
161 values: Box::new(input.into_iter()),
162 empty: false,
163 }
164 }
165}
166
167pub trait IntoInputStream {
168 fn into_input_stream(self) -> InputStream;
169}
170
171impl<T, U> IntoInputStream for T
172where
173 T: Iterator<Item = U> + Send + Sync + 'static,
174 U: Into<Result<nu_protocol::Value, nu_errors::ShellError>>,
175{
176 fn into_input_stream(self) -> InputStream {
177 InputStream {
178 empty: false,
179 values: Box::new(self.map(|item| match item.into() {
180 Ok(result) => result,
181 Err(err) => match HasFallibleSpan::maybe_span(&err) {
182 Some(span) => nu_protocol::UntaggedValue::Error(err).into_value(span),
183 None => nu_protocol::UntaggedValue::Error(err).into_untagged_value(),
184 },
185 })),
186 }
187 }
188}