1use super::utils::chain_error_with_input;
2use nu_engine::{ClosureEval, command_prelude::*};
3use nu_protocol::Signals;
4use nu_protocol::engine::Closure;
5
6#[derive(Clone)]
7pub struct ChunkBy;
8
9impl Command for ChunkBy {
10 fn name(&self) -> &str {
11 "chunk-by"
12 }
13
14 fn signature(&self) -> Signature {
15 Signature::build("chunk-by")
16 .input_output_types(vec![
17 (
18 Type::List(Box::new(Type::Any)),
19 Type::list(Type::list(Type::Any)),
20 ),
21 (Type::Range, Type::list(Type::list(Type::Any))),
22 ])
23 .required(
24 "closure",
25 SyntaxShape::Closure(Some(vec![SyntaxShape::Any])),
26 "The closure to run.",
27 )
28 .category(Category::Filters)
29 }
30
31 fn description(&self) -> &str {
32 "Divides a sequence into sub-sequences based on a closure."
33 }
34
35 fn extra_description(&self) -> &str {
36 "chunk-by applies the given closure to each value of the input list, and groups
37consecutive elements that share the same closure result value into lists."
38 }
39
40 fn run(
41 &self,
42 engine_state: &EngineState,
43 stack: &mut Stack,
44 call: &Call,
45 input: PipelineData,
46 ) -> Result<PipelineData, ShellError> {
47 chunk_by(engine_state, stack, call, input)
48 }
49
50 fn examples(&self) -> Vec<Example<'_>> {
51 vec![
52 Example {
53 description: "Chunk data into runs of larger than zero or not.",
54 example: "[1, 3, -2, -2, 0, 1, 2] | chunk-by {|it| $it >= 0 }",
55 result: Some(Value::test_list(vec![
56 Value::test_list(vec![Value::test_int(1), Value::test_int(3)]),
57 Value::test_list(vec![Value::test_int(-2), Value::test_int(-2)]),
58 Value::test_list(vec![
59 Value::test_int(0),
60 Value::test_int(1),
61 Value::test_int(2),
62 ]),
63 ])),
64 },
65 Example {
66 description: "Identify repetitions in a string",
67 example: "[a b b c c c] | chunk-by { |it| $it }",
68 result: Some(Value::test_list(vec![
69 Value::test_list(vec![Value::test_string("a")]),
70 Value::test_list(vec![Value::test_string("b"), Value::test_string("b")]),
71 Value::test_list(vec![
72 Value::test_string("c"),
73 Value::test_string("c"),
74 Value::test_string("c"),
75 ]),
76 ])),
77 },
78 Example {
79 description: "Chunk values of range by predicate",
80 example: "(0..8) | chunk-by { |it| $it // 3 }",
81 result: Some(Value::test_list(vec![
82 Value::test_list(vec![
83 Value::test_int(0),
84 Value::test_int(1),
85 Value::test_int(2),
86 ]),
87 Value::test_list(vec![
88 Value::test_int(3),
89 Value::test_int(4),
90 Value::test_int(5),
91 ]),
92 Value::test_list(vec![
93 Value::test_int(6),
94 Value::test_int(7),
95 Value::test_int(8),
96 ]),
97 ])),
98 },
99 ]
100 }
101}
102
103struct Chunk<I, T, F, K> {
104 iterator: I,
105 last_value: Option<(T, K)>,
106 closure: F,
107 done: bool,
108 signals: Signals,
109}
110
111impl<I, T, F, K> Chunk<I, T, F, K>
112where
113 I: Iterator<Item = T>,
114 F: FnMut(&T) -> K,
115 K: PartialEq,
116{
117 fn inner_iterator_next(&mut self) -> Option<I::Item> {
118 if self.signals.interrupted() {
119 self.done = true;
120 return None;
121 }
122 self.iterator.next()
123 }
124}
125
126impl<I, T, F, K> Iterator for Chunk<I, T, F, K>
127where
128 I: Iterator<Item = T>,
129 F: FnMut(&T) -> K,
130 K: PartialEq,
131{
132 type Item = Vec<T>;
133
134 fn next(&mut self) -> Option<Self::Item> {
135 if self.done {
136 return None;
137 }
138
139 let (head, head_key) = match self.last_value.take() {
140 None => {
141 let head = self.inner_iterator_next()?;
142
143 let key = (self.closure)(&head);
144
145 (head, key)
146 }
147
148 Some((value, key)) => (value, key),
149 };
150
151 let mut result = vec![head];
152
153 loop {
154 match self.inner_iterator_next() {
155 None => {
156 self.done = true;
157 return Some(result);
158 }
159 Some(value) => {
160 let value_key = (self.closure)(&value);
161
162 if value_key == head_key {
163 result.push(value);
164 } else {
165 self.last_value = Some((value, value_key));
166 return Some(result);
167 }
168 }
169 }
170 }
171 }
172}
173
174fn chunk_iter_by<I, T, F, K>(iterator: I, signals: Signals, closure: F) -> Chunk<I, T, F, K>
176where
177 I: Iterator<Item = T>,
178 F: FnMut(&T) -> K,
179 K: PartialEq,
180{
181 Chunk {
182 closure,
183 iterator,
184 last_value: None,
185 done: false,
186 signals,
187 }
188}
189
190pub fn chunk_by(
191 engine_state: &EngineState,
192 stack: &mut Stack,
193 call: &Call,
194 input: PipelineData,
195) -> Result<PipelineData, ShellError> {
196 let mut input = input.into_stream_or_original(engine_state);
197
198 let head = call.head;
199 let closure: Closure = call.req(engine_state, stack, 0)?;
200
201 let metadata = input.take_metadata();
202
203 match input {
204 PipelineData::Empty => Ok(PipelineData::empty()),
205 PipelineData::Value(Value::Range { .. }, ..)
206 | PipelineData::Value(Value::List { .. }, ..)
207 | PipelineData::ListStream(..) => {
208 let closure = ClosureEval::new(engine_state, stack, closure);
209
210 let result = chunk_value_stream(
211 input.into_iter(),
212 closure,
213 head,
214 engine_state.signals().clone(),
215 );
216
217 Ok(result.into_pipeline_data(head, engine_state.signals().clone()))
218 }
219
220 PipelineData::ByteStream(..) | PipelineData::Value(..) => {
221 Err(input.unsupported_input_error("list", head))
222 }
223 }
224 .map(|data| data.set_metadata(metadata))
225}
226
227fn chunk_value_stream<I>(
228 iterator: I,
229 mut closure: ClosureEval,
230 head: Span,
231 signals: Signals,
232) -> impl Iterator<Item = Value> + 'static + Send
233where
234 I: Iterator<Item = Value> + 'static + Send,
235{
236 chunk_iter_by(iterator, signals, move |value| {
237 match closure.run_with_value(value.clone()) {
238 Ok(data) => data.into_value(head).unwrap_or_else(|error| {
239 Value::error(chain_error_with_input(error, value.is_error(), head), head)
240 }),
241
242 Err(error) => Value::error(chain_error_with_input(error, value.is_error(), head), head),
243 }
244 })
245 .map(move |it| Value::list(it, head))
246}
247
248#[cfg(test)]
249mod test {
250 use super::*;
251
252 #[test]
253 fn test_examples() -> nu_test_support::Result {
254 nu_test_support::test().examples(ChunkBy)
255 }
256}