Skip to main content

nu_command/filters/
chunks.rs

1use nu_engine::command_prelude::*;
2use nu_protocol::{ListStream, shell_error::io::IoError};
3use std::{
4    io::{BufRead, Cursor, ErrorKind},
5    num::NonZeroUsize,
6};
7
8#[derive(Clone)]
9pub struct Chunks;
10
11impl Command for Chunks {
12    fn name(&self) -> &str {
13        "chunks"
14    }
15
16    fn signature(&self) -> Signature {
17        Signature::build("chunks")
18            .input_output_types(vec![
19                (Type::table(), Type::list(Type::table())),
20                (Type::list(Type::Any), Type::list(Type::list(Type::Any))),
21                (Type::Binary, Type::list(Type::Binary)),
22            ])
23            .required("chunk_size", SyntaxShape::Int, "The size of each chunk.")
24            .category(Category::Filters)
25    }
26
27    fn description(&self) -> &str {
28        "Divide a list, table or binary input into chunks of `chunk_size`."
29    }
30
31    fn extra_description(&self) -> &str {
32        "This command will error if `chunk_size` is negative or zero."
33    }
34
35    fn search_terms(&self) -> Vec<&str> {
36        vec!["batch", "group", "split", "bytes"]
37    }
38
39    fn examples(&self) -> Vec<Example<'_>> {
40        vec![
41            Example {
42                example: "[1 2 3 4] | chunks 2",
43                description: "Chunk a list into pairs",
44                result: Some(Value::test_list(vec![
45                    Value::test_list(vec![Value::test_int(1), Value::test_int(2)]),
46                    Value::test_list(vec![Value::test_int(3), Value::test_int(4)]),
47                ])),
48            },
49            Example {
50                example: "[[foo bar]; [0 1] [2 3] [4 5] [6 7] [8 9]] | chunks 3",
51                description: "Chunk the rows of a table into triplets",
52                result: Some(Value::test_list(vec![
53                    Value::test_list(vec![
54                        Value::test_record(record! {
55                            "foo" => Value::test_int(0),
56                            "bar" => Value::test_int(1),
57                        }),
58                        Value::test_record(record! {
59                            "foo" => Value::test_int(2),
60                            "bar" => Value::test_int(3),
61                        }),
62                        Value::test_record(record! {
63                            "foo" => Value::test_int(4),
64                            "bar" => Value::test_int(5),
65                        }),
66                    ]),
67                    Value::test_list(vec![
68                        Value::test_record(record! {
69                            "foo" => Value::test_int(6),
70                            "bar" => Value::test_int(7),
71                        }),
72                        Value::test_record(record! {
73                            "foo" => Value::test_int(8),
74                            "bar" => Value::test_int(9),
75                        }),
76                    ]),
77                ])),
78            },
79            Example {
80                example: "0x[11 22 33 44 55 66 77 88] | chunks 3",
81                description: "Chunk the bytes of a binary into triplets",
82                result: Some(Value::test_list(vec![
83                    Value::test_binary(vec![0x11, 0x22, 0x33]),
84                    Value::test_binary(vec![0x44, 0x55, 0x66]),
85                    Value::test_binary(vec![0x77, 0x88]),
86                ])),
87            },
88        ]
89    }
90
91    fn run(
92        &self,
93        engine_state: &EngineState,
94        stack: &mut Stack,
95        call: &Call,
96        input: PipelineData,
97    ) -> Result<PipelineData, ShellError> {
98        let input = input.into_stream_or_original(engine_state);
99        let head = call.head;
100        let chunk_size: Value = call.req(engine_state, stack, 0)?;
101
102        let size =
103            usize::try_from(chunk_size.as_int()?).map_err(|_| ShellError::NeedsPositiveValue {
104                span: chunk_size.span(),
105            })?;
106
107        let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue {
108            msg: "`chunk_size` cannot be zero".into(),
109            val_span: chunk_size.span(),
110            call_span: head,
111        })?;
112
113        chunks(engine_state, input, size, head)
114    }
115}
116
117pub fn chunks(
118    engine_state: &EngineState,
119    input: PipelineData,
120    chunk_size: NonZeroUsize,
121    span: Span,
122) -> Result<PipelineData, ShellError> {
123    let from_io_error = IoError::factory(span, None);
124    match input {
125        PipelineData::Value(Value::List { vals, .. }, metadata) => {
126            let chunks = ChunksIter::new(vals, chunk_size, span);
127            let stream = ListStream::new(chunks, span, engine_state.signals().clone());
128            Ok(PipelineData::list_stream(stream, metadata))
129        }
130        PipelineData::ListStream(stream, metadata) => {
131            let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span));
132            Ok(PipelineData::list_stream(stream, metadata))
133        }
134        PipelineData::Value(Value::Binary { val, .. }, metadata) => {
135            let chunk_read = ChunkRead {
136                reader: Cursor::new(val),
137                size: chunk_size,
138            };
139            let value_stream = chunk_read.map(move |chunk| match chunk {
140                Ok(chunk) => Value::binary(chunk, span),
141                Err(e) => Value::error(from_io_error(e).into(), span),
142            });
143            let pipeline_data_with_metadata = value_stream.into_pipeline_data_with_metadata(
144                span,
145                engine_state.signals().clone(),
146                metadata,
147            );
148            Ok(pipeline_data_with_metadata)
149        }
150        PipelineData::ByteStream(stream, metadata) => {
151            let pipeline_data = match stream.reader() {
152                None => PipelineData::empty(),
153                Some(reader) => {
154                    let chunk_read = ChunkRead {
155                        reader,
156                        size: chunk_size,
157                    };
158                    let value_stream = chunk_read.map(move |chunk| match chunk {
159                        Ok(chunk) => Value::binary(chunk, span),
160                        Err(e) => Value::error(from_io_error(e).into(), span),
161                    });
162                    value_stream.into_pipeline_data_with_metadata(
163                        span,
164                        engine_state.signals().clone(),
165                        metadata,
166                    )
167                }
168            };
169            Ok(pipeline_data)
170        }
171        input => Err(input.unsupported_input_error("list", span)),
172    }
173}
174
175struct ChunksIter<I: Iterator<Item = Value>> {
176    iter: I,
177    size: usize,
178    span: Span,
179}
180
181impl<I: Iterator<Item = Value>> ChunksIter<I> {
182    fn new(iter: impl IntoIterator<IntoIter = I>, size: NonZeroUsize, span: Span) -> Self {
183        Self {
184            iter: iter.into_iter(),
185            size: size.into(),
186            span,
187        }
188    }
189}
190
191impl<I: Iterator<Item = Value>> Iterator for ChunksIter<I> {
192    type Item = Value;
193
194    fn next(&mut self) -> Option<Self::Item> {
195        let first = self.iter.next()?;
196        let mut chunk = Vec::with_capacity(self.size); // delay allocation to optimize for empty iter
197        chunk.push(first);
198        chunk.extend((&mut self.iter).take(self.size - 1));
199        Some(Value::list(chunk, self.span))
200    }
201}
202
203struct ChunkRead<R: BufRead> {
204    reader: R,
205    size: NonZeroUsize,
206}
207
208impl<R: BufRead> Iterator for ChunkRead<R> {
209    type Item = Result<Vec<u8>, std::io::Error>;
210
211    fn next(&mut self) -> Option<Self::Item> {
212        let mut buf = Vec::with_capacity(self.size.get());
213        while buf.len() < self.size.get() {
214            let available = match self.reader.fill_buf() {
215                Ok([]) if buf.is_empty() => return None,
216                Ok([]) => return Some(Ok(buf)),
217                Ok(n) => n,
218                Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
219                Err(e) => return Some(Err(e)),
220            };
221            let needed = self.size.get() - buf.len();
222            let have = available.len().min(needed);
223            buf.extend_from_slice(&available[..have]);
224            self.reader.consume(have);
225        }
226        Some(Ok(buf))
227    }
228}
229
230#[cfg(test)]
231mod test {
232    use std::io::Read;
233
234    use super::*;
235
236    #[test]
237    fn chunk_read() {
238        let s = "hello world";
239        let data = Cursor::new(s);
240        let chunk_read = ChunkRead {
241            reader: data,
242            size: NonZeroUsize::new(4).unwrap(),
243        };
244        let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
245        assert_eq!(
246            chunks,
247            [&s.as_bytes()[..4], &s.as_bytes()[4..8], &s.as_bytes()[8..]]
248        );
249    }
250
251    #[test]
252    fn chunk_read_stream() {
253        let s = "hello world";
254        let data = Cursor::new(&s[..3])
255            .chain(Cursor::new(&s[3..9]))
256            .chain(Cursor::new(&s[9..]));
257        let chunk_read = ChunkRead {
258            reader: data,
259            size: NonZeroUsize::new(4).unwrap(),
260        };
261        let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
262        assert_eq!(
263            chunks,
264            [&s.as_bytes()[..4], &s.as_bytes()[4..8], &s.as_bytes()[8..]]
265        );
266    }
267
268    #[test]
269    fn test_examples() -> nu_test_support::Result {
270        nu_test_support::test().examples(Chunks)
271    }
272}