nu_command/filters/
chunk_by.rs

1use super::utils::chain_error_with_input;
2use nu_engine::{ClosureEval, command_prelude::*};
3use nu_protocol::Signals;
4use nu_protocol::engine::Closure;
5
6#[derive(Clone)]
7pub struct ChunkBy;
8
9impl Command for ChunkBy {
10    fn name(&self) -> &str {
11        "chunk-by"
12    }
13
14    fn signature(&self) -> Signature {
15        Signature::build("chunk-by")
16            .input_output_types(vec![
17                (
18                    Type::List(Box::new(Type::Any)),
19                    Type::list(Type::list(Type::Any)),
20                ),
21                (Type::Range, Type::list(Type::list(Type::Any))),
22            ])
23            .required(
24                "closure",
25                SyntaxShape::Closure(Some(vec![SyntaxShape::Any])),
26                "The closure to run.",
27            )
28            .category(Category::Filters)
29    }
30
31    fn description(&self) -> &str {
32        r#"Divides a sequence into sub-sequences based on a closure."#
33    }
34
35    fn extra_description(&self) -> &str {
36        r#"chunk-by applies the given closure to each value of the input list, and groups
37consecutive elements that share the same closure result value into lists."#
38    }
39
40    fn run(
41        &self,
42        engine_state: &EngineState,
43        stack: &mut Stack,
44        call: &Call,
45        input: PipelineData,
46    ) -> Result<PipelineData, ShellError> {
47        chunk_by(engine_state, stack, call, input)
48    }
49
50    fn examples(&self) -> Vec<Example<'_>> {
51        vec![
52            Example {
53                description: "Chunk data into runs of larger than zero or not.",
54                example: "[1, 3, -2, -2, 0, 1, 2] | chunk-by {|it| $it >= 0 }",
55                result: Some(Value::test_list(vec![
56                    Value::test_list(vec![Value::test_int(1), Value::test_int(3)]),
57                    Value::test_list(vec![Value::test_int(-2), Value::test_int(-2)]),
58                    Value::test_list(vec![
59                        Value::test_int(0),
60                        Value::test_int(1),
61                        Value::test_int(2),
62                    ]),
63                ])),
64            },
65            Example {
66                description: "Identify repetitions in a string",
67                example: r#"[a b b c c c] | chunk-by { |it| $it }"#,
68                result: Some(Value::test_list(vec![
69                    Value::test_list(vec![Value::test_string("a")]),
70                    Value::test_list(vec![Value::test_string("b"), Value::test_string("b")]),
71                    Value::test_list(vec![
72                        Value::test_string("c"),
73                        Value::test_string("c"),
74                        Value::test_string("c"),
75                    ]),
76                ])),
77            },
78            Example {
79                description: "Chunk values of range by predicate",
80                example: r#"(0..8) | chunk-by { |it| $it // 3 }"#,
81                result: Some(Value::test_list(vec![
82                    Value::test_list(vec![
83                        Value::test_int(0),
84                        Value::test_int(1),
85                        Value::test_int(2),
86                    ]),
87                    Value::test_list(vec![
88                        Value::test_int(3),
89                        Value::test_int(4),
90                        Value::test_int(5),
91                    ]),
92                    Value::test_list(vec![
93                        Value::test_int(6),
94                        Value::test_int(7),
95                        Value::test_int(8),
96                    ]),
97                ])),
98            },
99        ]
100    }
101}
102
103struct Chunk<I, T, F, K> {
104    iterator: I,
105    last_value: Option<(T, K)>,
106    closure: F,
107    done: bool,
108    signals: Signals,
109}
110
111impl<I, T, F, K> Chunk<I, T, F, K>
112where
113    I: Iterator<Item = T>,
114    F: FnMut(&T) -> K,
115    K: PartialEq,
116{
117    fn inner_iterator_next(&mut self) -> Option<I::Item> {
118        if self.signals.interrupted() {
119            self.done = true;
120            return None;
121        }
122        self.iterator.next()
123    }
124}
125
126impl<I, T, F, K> Iterator for Chunk<I, T, F, K>
127where
128    I: Iterator<Item = T>,
129    F: FnMut(&T) -> K,
130    K: PartialEq,
131{
132    type Item = Vec<T>;
133
134    fn next(&mut self) -> Option<Self::Item> {
135        if self.done {
136            return None;
137        }
138
139        let (head, head_key) = match self.last_value.take() {
140            None => {
141                let head = self.inner_iterator_next()?;
142
143                let key = (self.closure)(&head);
144
145                (head, key)
146            }
147
148            Some((value, key)) => (value, key),
149        };
150
151        let mut result = vec![head];
152
153        loop {
154            match self.inner_iterator_next() {
155                None => {
156                    self.done = true;
157                    return Some(result);
158                }
159                Some(value) => {
160                    let value_key = (self.closure)(&value);
161
162                    if value_key == head_key {
163                        result.push(value);
164                    } else {
165                        self.last_value = Some((value, value_key));
166                        return Some(result);
167                    }
168                }
169            }
170        }
171    }
172}
173
174/// An iterator with the semantics of the chunk_by operation.
175fn chunk_iter_by<I, T, F, K>(iterator: I, signals: Signals, closure: F) -> Chunk<I, T, F, K>
176where
177    I: Iterator<Item = T>,
178    F: FnMut(&T) -> K,
179    K: PartialEq,
180{
181    Chunk {
182        closure,
183        iterator,
184        last_value: None,
185        done: false,
186        signals,
187    }
188}
189
190pub fn chunk_by(
191    engine_state: &EngineState,
192    stack: &mut Stack,
193    call: &Call,
194    input: PipelineData,
195) -> Result<PipelineData, ShellError> {
196    let head = call.head;
197    let closure: Closure = call.req(engine_state, stack, 0)?;
198
199    let metadata = input.metadata();
200
201    match input {
202        PipelineData::Empty => Ok(PipelineData::empty()),
203        PipelineData::Value(Value::Range { .. }, ..)
204        | PipelineData::Value(Value::List { .. }, ..)
205        | PipelineData::ListStream(..) => {
206            let closure = ClosureEval::new(engine_state, stack, closure);
207
208            let result = chunk_value_stream(
209                input.into_iter(),
210                closure,
211                head,
212                engine_state.signals().clone(),
213            );
214
215            Ok(result.into_pipeline_data(head, engine_state.signals().clone()))
216        }
217
218        PipelineData::ByteStream(..) | PipelineData::Value(..) => {
219            Err(input.unsupported_input_error("list", head))
220        }
221    }
222    .map(|data| data.set_metadata(metadata))
223}
224
225fn chunk_value_stream<I>(
226    iterator: I,
227    mut closure: ClosureEval,
228    head: Span,
229    signals: Signals,
230) -> impl Iterator<Item = Value> + 'static + Send
231where
232    I: Iterator<Item = Value> + 'static + Send,
233{
234    chunk_iter_by(iterator, signals, move |value| {
235        match closure.run_with_value(value.clone()) {
236            Ok(data) => data.into_value(head).unwrap_or_else(|error| {
237                Value::error(chain_error_with_input(error, value.is_error(), head), head)
238            }),
239
240            Err(error) => Value::error(chain_error_with_input(error, value.is_error(), head), head),
241        }
242    })
243    .map(move |it| Value::list(it, head))
244}
245
246#[cfg(test)]
247mod test {
248    use super::*;
249
250    #[test]
251    fn test_examples() {
252        use crate::test_examples;
253
254        test_examples(ChunkBy {})
255    }
256}