1use nu_engine::command_prelude::*;
2use nu_protocol::{ListStream, shell_error::io::IoError};
3use std::{
4 io::{BufRead, Cursor, ErrorKind},
5 num::NonZeroUsize,
6};
7
8#[derive(Clone)]
9pub struct Chunks;
10
11impl Command for Chunks {
12 fn name(&self) -> &str {
13 "chunks"
14 }
15
16 fn signature(&self) -> Signature {
17 Signature::build("chunks")
18 .input_output_types(vec![
19 (Type::table(), Type::list(Type::table())),
20 (Type::list(Type::Any), Type::list(Type::list(Type::Any))),
21 (Type::Binary, Type::list(Type::Binary)),
22 ])
23 .required("chunk_size", SyntaxShape::Int, "The size of each chunk.")
24 .category(Category::Filters)
25 }
26
27 fn description(&self) -> &str {
28 "Divide a list, table or binary input into chunks of `chunk_size`."
29 }
30
31 fn extra_description(&self) -> &str {
32 "This command will error if `chunk_size` is negative or zero."
33 }
34
35 fn search_terms(&self) -> Vec<&str> {
36 vec!["batch", "group", "split", "bytes"]
37 }
38
39 fn examples(&self) -> Vec<Example> {
40 vec![
41 Example {
42 example: "[1 2 3 4] | chunks 2",
43 description: "Chunk a list into pairs",
44 result: Some(Value::test_list(vec![
45 Value::test_list(vec![Value::test_int(1), Value::test_int(2)]),
46 Value::test_list(vec![Value::test_int(3), Value::test_int(4)]),
47 ])),
48 },
49 Example {
50 example: "[[foo bar]; [0 1] [2 3] [4 5] [6 7] [8 9]] | chunks 3",
51 description: "Chunk the rows of a table into triplets",
52 result: Some(Value::test_list(vec![
53 Value::test_list(vec![
54 Value::test_record(record! {
55 "foo" => Value::test_int(0),
56 "bar" => Value::test_int(1),
57 }),
58 Value::test_record(record! {
59 "foo" => Value::test_int(2),
60 "bar" => Value::test_int(3),
61 }),
62 Value::test_record(record! {
63 "foo" => Value::test_int(4),
64 "bar" => Value::test_int(5),
65 }),
66 ]),
67 Value::test_list(vec![
68 Value::test_record(record! {
69 "foo" => Value::test_int(6),
70 "bar" => Value::test_int(7),
71 }),
72 Value::test_record(record! {
73 "foo" => Value::test_int(8),
74 "bar" => Value::test_int(9),
75 }),
76 ]),
77 ])),
78 },
79 Example {
80 example: "0x[11 22 33 44 55 66 77 88] | chunks 3",
81 description: "Chunk the bytes of a binary into triplets",
82 result: Some(Value::test_list(vec![
83 Value::test_binary(vec![0x11, 0x22, 0x33]),
84 Value::test_binary(vec![0x44, 0x55, 0x66]),
85 Value::test_binary(vec![0x77, 0x88]),
86 ])),
87 },
88 ]
89 }
90
91 fn run(
92 &self,
93 engine_state: &EngineState,
94 stack: &mut Stack,
95 call: &Call,
96 input: PipelineData,
97 ) -> Result<PipelineData, ShellError> {
98 let head = call.head;
99 let chunk_size: Value = call.req(engine_state, stack, 0)?;
100
101 let size =
102 usize::try_from(chunk_size.as_int()?).map_err(|_| ShellError::NeedsPositiveValue {
103 span: chunk_size.span(),
104 })?;
105
106 let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue {
107 msg: "`chunk_size` cannot be zero".into(),
108 val_span: chunk_size.span(),
109 call_span: head,
110 })?;
111
112 chunks(engine_state, input, size, head)
113 }
114}
115
116pub fn chunks(
117 engine_state: &EngineState,
118 input: PipelineData,
119 chunk_size: NonZeroUsize,
120 span: Span,
121) -> Result<PipelineData, ShellError> {
122 let from_io_error = IoError::factory(span, None);
123 match input {
124 PipelineData::Value(Value::List { vals, .. }, metadata) => {
125 let chunks = ChunksIter::new(vals, chunk_size, span);
126 let stream = ListStream::new(chunks, span, engine_state.signals().clone());
127 Ok(PipelineData::list_stream(stream, metadata))
128 }
129 PipelineData::ListStream(stream, metadata) => {
130 let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span));
131 Ok(PipelineData::list_stream(stream, metadata))
132 }
133 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
134 let chunk_read = ChunkRead {
135 reader: Cursor::new(val),
136 size: chunk_size,
137 };
138 let value_stream = chunk_read.map(move |chunk| match chunk {
139 Ok(chunk) => Value::binary(chunk, span),
140 Err(e) => Value::error(from_io_error(e).into(), span),
141 });
142 let pipeline_data_with_metadata = value_stream.into_pipeline_data_with_metadata(
143 span,
144 engine_state.signals().clone(),
145 metadata,
146 );
147 Ok(pipeline_data_with_metadata)
148 }
149 PipelineData::ByteStream(stream, metadata) => {
150 let pipeline_data = match stream.reader() {
151 None => PipelineData::empty(),
152 Some(reader) => {
153 let chunk_read = ChunkRead {
154 reader,
155 size: chunk_size,
156 };
157 let value_stream = chunk_read.map(move |chunk| match chunk {
158 Ok(chunk) => Value::binary(chunk, span),
159 Err(e) => Value::error(from_io_error(e).into(), span),
160 });
161 value_stream.into_pipeline_data_with_metadata(
162 span,
163 engine_state.signals().clone(),
164 metadata,
165 )
166 }
167 };
168 Ok(pipeline_data)
169 }
170 input => Err(input.unsupported_input_error("list", span)),
171 }
172}
173
174struct ChunksIter<I: Iterator<Item = Value>> {
175 iter: I,
176 size: usize,
177 span: Span,
178}
179
180impl<I: Iterator<Item = Value>> ChunksIter<I> {
181 fn new(iter: impl IntoIterator<IntoIter = I>, size: NonZeroUsize, span: Span) -> Self {
182 Self {
183 iter: iter.into_iter(),
184 size: size.into(),
185 span,
186 }
187 }
188}
189
190impl<I: Iterator<Item = Value>> Iterator for ChunksIter<I> {
191 type Item = Value;
192
193 fn next(&mut self) -> Option<Self::Item> {
194 let first = self.iter.next()?;
195 let mut chunk = Vec::with_capacity(self.size); chunk.push(first);
197 chunk.extend((&mut self.iter).take(self.size - 1));
198 Some(Value::list(chunk, self.span))
199 }
200}
201
202struct ChunkRead<R: BufRead> {
203 reader: R,
204 size: NonZeroUsize,
205}
206
207impl<R: BufRead> Iterator for ChunkRead<R> {
208 type Item = Result<Vec<u8>, std::io::Error>;
209
210 fn next(&mut self) -> Option<Self::Item> {
211 let mut buf = Vec::with_capacity(self.size.get());
212 while buf.len() < self.size.get() {
213 let available = match self.reader.fill_buf() {
214 Ok([]) if buf.is_empty() => return None,
215 Ok([]) => return Some(Ok(buf)),
216 Ok(n) => n,
217 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
218 Err(e) => return Some(Err(e)),
219 };
220 let needed = self.size.get() - buf.len();
221 let have = available.len().min(needed);
222 buf.extend_from_slice(&available[..have]);
223 self.reader.consume(have);
224 }
225 Some(Ok(buf))
226 }
227}
228
229#[cfg(test)]
230mod test {
231 use std::io::Read;
232
233 use super::*;
234
235 #[test]
236 fn chunk_read() {
237 let s = "hello world";
238 let data = Cursor::new(s);
239 let chunk_read = ChunkRead {
240 reader: data,
241 size: NonZeroUsize::new(4).unwrap(),
242 };
243 let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
244 assert_eq!(
245 chunks,
246 [&s.as_bytes()[..4], &s.as_bytes()[4..8], &s.as_bytes()[8..]]
247 );
248 }
249
250 #[test]
251 fn chunk_read_stream() {
252 let s = "hello world";
253 let data = Cursor::new(&s[..3])
254 .chain(Cursor::new(&s[3..9]))
255 .chain(Cursor::new(&s[9..]));
256 let chunk_read = ChunkRead {
257 reader: data,
258 size: NonZeroUsize::new(4).unwrap(),
259 };
260 let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
261 assert_eq!(
262 chunks,
263 [&s.as_bytes()[..4], &s.as_bytes()[4..8], &s.as_bytes()[8..]]
264 );
265 }
266
267 #[test]
268 fn test_examples() {
269 use crate::test_examples;
270
271 test_examples(Chunks {})
272 }
273}