1use nu_engine::command_prelude::*;
2use nu_protocol::{ListStream, shell_error::io::IoError};
3use std::{
4 io::{BufRead, Cursor, ErrorKind},
5 num::NonZeroUsize,
6};
7
8#[derive(Clone)]
9pub struct Chunks;
10
11impl Command for Chunks {
12 fn name(&self) -> &str {
13 "chunks"
14 }
15
16 fn signature(&self) -> Signature {
17 Signature::build("chunks")
18 .input_output_types(vec![
19 (Type::table(), Type::list(Type::table())),
20 (Type::list(Type::Any), Type::list(Type::list(Type::Any))),
21 (Type::Binary, Type::list(Type::Binary)),
22 ])
23 .required("chunk_size", SyntaxShape::Int, "The size of each chunk.")
24 .category(Category::Filters)
25 }
26
27 fn description(&self) -> &str {
28 "Divide a list, table or binary input into chunks of `chunk_size`."
29 }
30
31 fn extra_description(&self) -> &str {
32 "This command will error if `chunk_size` is negative or zero."
33 }
34
35 fn search_terms(&self) -> Vec<&str> {
36 vec!["batch", "group", "split", "bytes"]
37 }
38
39 fn examples(&self) -> Vec<Example<'_>> {
40 vec![
41 Example {
42 example: "[1 2 3 4] | chunks 2",
43 description: "Chunk a list into pairs",
44 result: Some(Value::test_list(vec![
45 Value::test_list(vec![Value::test_int(1), Value::test_int(2)]),
46 Value::test_list(vec![Value::test_int(3), Value::test_int(4)]),
47 ])),
48 },
49 Example {
50 example: "[[foo bar]; [0 1] [2 3] [4 5] [6 7] [8 9]] | chunks 3",
51 description: "Chunk the rows of a table into triplets",
52 result: Some(Value::test_list(vec![
53 Value::test_list(vec![
54 Value::test_record(record! {
55 "foo" => Value::test_int(0),
56 "bar" => Value::test_int(1),
57 }),
58 Value::test_record(record! {
59 "foo" => Value::test_int(2),
60 "bar" => Value::test_int(3),
61 }),
62 Value::test_record(record! {
63 "foo" => Value::test_int(4),
64 "bar" => Value::test_int(5),
65 }),
66 ]),
67 Value::test_list(vec![
68 Value::test_record(record! {
69 "foo" => Value::test_int(6),
70 "bar" => Value::test_int(7),
71 }),
72 Value::test_record(record! {
73 "foo" => Value::test_int(8),
74 "bar" => Value::test_int(9),
75 }),
76 ]),
77 ])),
78 },
79 Example {
80 example: "0x[11 22 33 44 55 66 77 88] | chunks 3",
81 description: "Chunk the bytes of a binary into triplets",
82 result: Some(Value::test_list(vec![
83 Value::test_binary(vec![0x11, 0x22, 0x33]),
84 Value::test_binary(vec![0x44, 0x55, 0x66]),
85 Value::test_binary(vec![0x77, 0x88]),
86 ])),
87 },
88 ]
89 }
90
91 fn run(
92 &self,
93 engine_state: &EngineState,
94 stack: &mut Stack,
95 call: &Call,
96 input: PipelineData,
97 ) -> Result<PipelineData, ShellError> {
98 let input = input.into_stream_or_original(engine_state);
99 let head = call.head;
100 let chunk_size: Value = call.req(engine_state, stack, 0)?;
101
102 let size =
103 usize::try_from(chunk_size.as_int()?).map_err(|_| ShellError::NeedsPositiveValue {
104 span: chunk_size.span(),
105 })?;
106
107 let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue {
108 msg: "`chunk_size` cannot be zero".into(),
109 val_span: chunk_size.span(),
110 call_span: head,
111 })?;
112
113 chunks(engine_state, input, size, head)
114 }
115}
116
117pub fn chunks(
118 engine_state: &EngineState,
119 input: PipelineData,
120 chunk_size: NonZeroUsize,
121 span: Span,
122) -> Result<PipelineData, ShellError> {
123 let from_io_error = IoError::factory(span, None);
124 match input {
125 PipelineData::Value(Value::List { vals, .. }, metadata) => {
126 let chunks = ChunksIter::new(vals, chunk_size, span);
127 let stream = ListStream::new(chunks, span, engine_state.signals().clone());
128 Ok(PipelineData::list_stream(stream, metadata))
129 }
130 PipelineData::ListStream(stream, metadata) => {
131 let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span));
132 Ok(PipelineData::list_stream(stream, metadata))
133 }
134 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
135 let chunk_read = ChunkRead {
136 reader: Cursor::new(val),
137 size: chunk_size,
138 };
139 let value_stream = chunk_read.map(move |chunk| match chunk {
140 Ok(chunk) => Value::binary(chunk, span),
141 Err(e) => Value::error(from_io_error(e).into(), span),
142 });
143 let pipeline_data_with_metadata = value_stream.into_pipeline_data_with_metadata(
144 span,
145 engine_state.signals().clone(),
146 metadata,
147 );
148 Ok(pipeline_data_with_metadata)
149 }
150 PipelineData::ByteStream(stream, metadata) => {
151 let pipeline_data = match stream.reader() {
152 None => PipelineData::empty(),
153 Some(reader) => {
154 let chunk_read = ChunkRead {
155 reader,
156 size: chunk_size,
157 };
158 let value_stream = chunk_read.map(move |chunk| match chunk {
159 Ok(chunk) => Value::binary(chunk, span),
160 Err(e) => Value::error(from_io_error(e).into(), span),
161 });
162 value_stream.into_pipeline_data_with_metadata(
163 span,
164 engine_state.signals().clone(),
165 metadata,
166 )
167 }
168 };
169 Ok(pipeline_data)
170 }
171 input => Err(input.unsupported_input_error("list", span)),
172 }
173}
174
175struct ChunksIter<I: Iterator<Item = Value>> {
176 iter: I,
177 size: usize,
178 span: Span,
179}
180
181impl<I: Iterator<Item = Value>> ChunksIter<I> {
182 fn new(iter: impl IntoIterator<IntoIter = I>, size: NonZeroUsize, span: Span) -> Self {
183 Self {
184 iter: iter.into_iter(),
185 size: size.into(),
186 span,
187 }
188 }
189}
190
191impl<I: Iterator<Item = Value>> Iterator for ChunksIter<I> {
192 type Item = Value;
193
194 fn next(&mut self) -> Option<Self::Item> {
195 let first = self.iter.next()?;
196 let mut chunk = Vec::with_capacity(self.size); chunk.push(first);
198 chunk.extend((&mut self.iter).take(self.size - 1));
199 Some(Value::list(chunk, self.span))
200 }
201}
202
203struct ChunkRead<R: BufRead> {
204 reader: R,
205 size: NonZeroUsize,
206}
207
208impl<R: BufRead> Iterator for ChunkRead<R> {
209 type Item = Result<Vec<u8>, std::io::Error>;
210
211 fn next(&mut self) -> Option<Self::Item> {
212 let mut buf = Vec::with_capacity(self.size.get());
213 while buf.len() < self.size.get() {
214 let available = match self.reader.fill_buf() {
215 Ok([]) if buf.is_empty() => return None,
216 Ok([]) => return Some(Ok(buf)),
217 Ok(n) => n,
218 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
219 Err(e) => return Some(Err(e)),
220 };
221 let needed = self.size.get() - buf.len();
222 let have = available.len().min(needed);
223 buf.extend_from_slice(&available[..have]);
224 self.reader.consume(have);
225 }
226 Some(Ok(buf))
227 }
228}
229
230#[cfg(test)]
231mod test {
232 use std::io::Read;
233
234 use super::*;
235
236 #[test]
237 fn chunk_read() {
238 let s = "hello world";
239 let data = Cursor::new(s);
240 let chunk_read = ChunkRead {
241 reader: data,
242 size: NonZeroUsize::new(4).unwrap(),
243 };
244 let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
245 assert_eq!(
246 chunks,
247 [&s.as_bytes()[..4], &s.as_bytes()[4..8], &s.as_bytes()[8..]]
248 );
249 }
250
251 #[test]
252 fn chunk_read_stream() {
253 let s = "hello world";
254 let data = Cursor::new(&s[..3])
255 .chain(Cursor::new(&s[3..9]))
256 .chain(Cursor::new(&s[9..]));
257 let chunk_read = ChunkRead {
258 reader: data,
259 size: NonZeroUsize::new(4).unwrap(),
260 };
261 let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
262 assert_eq!(
263 chunks,
264 [&s.as_bytes()[..4], &s.as_bytes()[4..8], &s.as_bytes()[8..]]
265 );
266 }
267
268 #[test]
269 fn test_examples() -> nu_test_support::Result {
270 nu_test_support::test().examples(Chunks)
271 }
272}