nu_command/filters/
chunks.rs

1use nu_engine::command_prelude::*;
2use nu_protocol::{ListStream, shell_error::io::IoError};
3use std::{
4    io::{BufRead, Cursor, ErrorKind},
5    num::NonZeroUsize,
6};
7
8#[derive(Clone)]
9pub struct Chunks;
10
11impl Command for Chunks {
12    fn name(&self) -> &str {
13        "chunks"
14    }
15
16    fn signature(&self) -> Signature {
17        Signature::build("chunks")
18            .input_output_types(vec![
19                (Type::table(), Type::list(Type::table())),
20                (Type::list(Type::Any), Type::list(Type::list(Type::Any))),
21                (Type::Binary, Type::list(Type::Binary)),
22            ])
23            .required("chunk_size", SyntaxShape::Int, "The size of each chunk.")
24            .category(Category::Filters)
25    }
26
27    fn description(&self) -> &str {
28        "Divide a list, table or binary input into chunks of `chunk_size`."
29    }
30
31    fn extra_description(&self) -> &str {
32        "This command will error if `chunk_size` is negative or zero."
33    }
34
35    fn search_terms(&self) -> Vec<&str> {
36        vec!["batch", "group", "split", "bytes"]
37    }
38
39    fn examples(&self) -> Vec<Example> {
40        vec![
41            Example {
42                example: "[1 2 3 4] | chunks 2",
43                description: "Chunk a list into pairs",
44                result: Some(Value::test_list(vec![
45                    Value::test_list(vec![Value::test_int(1), Value::test_int(2)]),
46                    Value::test_list(vec![Value::test_int(3), Value::test_int(4)]),
47                ])),
48            },
49            Example {
50                example: "[[foo bar]; [0 1] [2 3] [4 5] [6 7] [8 9]] | chunks 3",
51                description: "Chunk the rows of a table into triplets",
52                result: Some(Value::test_list(vec![
53                    Value::test_list(vec![
54                        Value::test_record(record! {
55                            "foo" => Value::test_int(0),
56                            "bar" => Value::test_int(1),
57                        }),
58                        Value::test_record(record! {
59                            "foo" => Value::test_int(2),
60                            "bar" => Value::test_int(3),
61                        }),
62                        Value::test_record(record! {
63                            "foo" => Value::test_int(4),
64                            "bar" => Value::test_int(5),
65                        }),
66                    ]),
67                    Value::test_list(vec![
68                        Value::test_record(record! {
69                            "foo" => Value::test_int(6),
70                            "bar" => Value::test_int(7),
71                        }),
72                        Value::test_record(record! {
73                            "foo" => Value::test_int(8),
74                            "bar" => Value::test_int(9),
75                        }),
76                    ]),
77                ])),
78            },
79            Example {
80                example: "0x[11 22 33 44 55 66 77 88] | chunks 3",
81                description: "Chunk the bytes of a binary into triplets",
82                result: Some(Value::test_list(vec![
83                    Value::test_binary(vec![0x11, 0x22, 0x33]),
84                    Value::test_binary(vec![0x44, 0x55, 0x66]),
85                    Value::test_binary(vec![0x77, 0x88]),
86                ])),
87            },
88        ]
89    }
90
91    fn run(
92        &self,
93        engine_state: &EngineState,
94        stack: &mut Stack,
95        call: &Call,
96        input: PipelineData,
97    ) -> Result<PipelineData, ShellError> {
98        let head = call.head;
99        let chunk_size: Value = call.req(engine_state, stack, 0)?;
100
101        let size =
102            usize::try_from(chunk_size.as_int()?).map_err(|_| ShellError::NeedsPositiveValue {
103                span: chunk_size.span(),
104            })?;
105
106        let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue {
107            msg: "`chunk_size` cannot be zero".into(),
108            val_span: chunk_size.span(),
109            call_span: head,
110        })?;
111
112        chunks(engine_state, input, size, head)
113    }
114}
115
116pub fn chunks(
117    engine_state: &EngineState,
118    input: PipelineData,
119    chunk_size: NonZeroUsize,
120    span: Span,
121) -> Result<PipelineData, ShellError> {
122    let from_io_error = IoError::factory(span, None);
123    match input {
124        PipelineData::Value(Value::List { vals, .. }, metadata) => {
125            let chunks = ChunksIter::new(vals, chunk_size, span);
126            let stream = ListStream::new(chunks, span, engine_state.signals().clone());
127            Ok(PipelineData::list_stream(stream, metadata))
128        }
129        PipelineData::ListStream(stream, metadata) => {
130            let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span));
131            Ok(PipelineData::list_stream(stream, metadata))
132        }
133        PipelineData::Value(Value::Binary { val, .. }, metadata) => {
134            let chunk_read = ChunkRead {
135                reader: Cursor::new(val),
136                size: chunk_size,
137            };
138            let value_stream = chunk_read.map(move |chunk| match chunk {
139                Ok(chunk) => Value::binary(chunk, span),
140                Err(e) => Value::error(from_io_error(e).into(), span),
141            });
142            let pipeline_data_with_metadata = value_stream.into_pipeline_data_with_metadata(
143                span,
144                engine_state.signals().clone(),
145                metadata,
146            );
147            Ok(pipeline_data_with_metadata)
148        }
149        PipelineData::ByteStream(stream, metadata) => {
150            let pipeline_data = match stream.reader() {
151                None => PipelineData::empty(),
152                Some(reader) => {
153                    let chunk_read = ChunkRead {
154                        reader,
155                        size: chunk_size,
156                    };
157                    let value_stream = chunk_read.map(move |chunk| match chunk {
158                        Ok(chunk) => Value::binary(chunk, span),
159                        Err(e) => Value::error(from_io_error(e).into(), span),
160                    });
161                    value_stream.into_pipeline_data_with_metadata(
162                        span,
163                        engine_state.signals().clone(),
164                        metadata,
165                    )
166                }
167            };
168            Ok(pipeline_data)
169        }
170        input => Err(input.unsupported_input_error("list", span)),
171    }
172}
173
174struct ChunksIter<I: Iterator<Item = Value>> {
175    iter: I,
176    size: usize,
177    span: Span,
178}
179
180impl<I: Iterator<Item = Value>> ChunksIter<I> {
181    fn new(iter: impl IntoIterator<IntoIter = I>, size: NonZeroUsize, span: Span) -> Self {
182        Self {
183            iter: iter.into_iter(),
184            size: size.into(),
185            span,
186        }
187    }
188}
189
190impl<I: Iterator<Item = Value>> Iterator for ChunksIter<I> {
191    type Item = Value;
192
193    fn next(&mut self) -> Option<Self::Item> {
194        let first = self.iter.next()?;
195        let mut chunk = Vec::with_capacity(self.size); // delay allocation to optimize for empty iter
196        chunk.push(first);
197        chunk.extend((&mut self.iter).take(self.size - 1));
198        Some(Value::list(chunk, self.span))
199    }
200}
201
202struct ChunkRead<R: BufRead> {
203    reader: R,
204    size: NonZeroUsize,
205}
206
207impl<R: BufRead> Iterator for ChunkRead<R> {
208    type Item = Result<Vec<u8>, std::io::Error>;
209
210    fn next(&mut self) -> Option<Self::Item> {
211        let mut buf = Vec::with_capacity(self.size.get());
212        while buf.len() < self.size.get() {
213            let available = match self.reader.fill_buf() {
214                Ok([]) if buf.is_empty() => return None,
215                Ok([]) => return Some(Ok(buf)),
216                Ok(n) => n,
217                Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
218                Err(e) => return Some(Err(e)),
219            };
220            let needed = self.size.get() - buf.len();
221            let have = available.len().min(needed);
222            buf.extend_from_slice(&available[..have]);
223            self.reader.consume(have);
224        }
225        Some(Ok(buf))
226    }
227}
228
229#[cfg(test)]
230mod test {
231    use std::io::Read;
232
233    use super::*;
234
235    #[test]
236    fn chunk_read() {
237        let s = "hello world";
238        let data = Cursor::new(s);
239        let chunk_read = ChunkRead {
240            reader: data,
241            size: NonZeroUsize::new(4).unwrap(),
242        };
243        let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
244        assert_eq!(
245            chunks,
246            [&s.as_bytes()[..4], &s.as_bytes()[4..8], &s.as_bytes()[8..]]
247        );
248    }
249
250    #[test]
251    fn chunk_read_stream() {
252        let s = "hello world";
253        let data = Cursor::new(&s[..3])
254            .chain(Cursor::new(&s[3..9]))
255            .chain(Cursor::new(&s[9..]));
256        let chunk_read = ChunkRead {
257            reader: data,
258            size: NonZeroUsize::new(4).unwrap(),
259        };
260        let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
261        assert_eq!(
262            chunks,
263            [&s.as_bytes()[..4], &s.as_bytes()[4..8], &s.as_bytes()[8..]]
264        );
265    }
266
267    #[test]
268    fn test_examples() {
269        use crate::test_examples;
270
271        test_examples(Chunks {})
272    }
273}