nu_cmd_lang/core_commands/
collect.rs

1use nu_engine::{command_prelude::*, get_eval_block, redirect_env};
2use nu_protocol::engine::Closure;
3
4#[derive(Clone)]
5pub struct Collect;
6
7impl Command for Collect {
8    fn name(&self) -> &str {
9        "collect"
10    }
11
12    fn signature(&self) -> Signature {
13        Signature::build("collect")
14            .input_output_types(vec![(Type::Any, Type::Any)])
15            .optional(
16                "closure",
17                SyntaxShape::Closure(Some(vec![SyntaxShape::Any])),
18                "The closure to run once the stream is collected.",
19            )
20            .switch(
21                "keep-env",
22                "let the closure affect environment variables",
23                None,
24            )
25            .category(Category::Filters)
26    }
27
28    fn description(&self) -> &str {
29        "Collect a stream into a value."
30    }
31
32    fn extra_description(&self) -> &str {
33        r#"If provided, run a closure with the collected value as input.
34
35The entire stream will be collected into one value in memory, so if the stream
36is particularly large, this can cause high memory usage."#
37    }
38
39    fn run(
40        &self,
41        engine_state: &EngineState,
42        stack: &mut Stack,
43        call: &Call,
44        input: PipelineData,
45    ) -> Result<PipelineData, ShellError> {
46        let closure: Option<Closure> = call.opt(engine_state, stack, 0)?;
47
48        let metadata = input.metadata().and_then(|m| m.for_collect());
49
50        let input = input.into_value(call.head)?;
51        let result;
52
53        if let Some(closure) = closure {
54            let block = engine_state.get_block(closure.block_id);
55            let mut stack_captures =
56                stack.captures_to_stack_preserve_out_dest(closure.captures.clone());
57
58            let mut saved_positional = None;
59            if let Some(var) = block.signature.get_positional(0)
60                && let Some(var_id) = &var.var_id
61            {
62                stack_captures.add_var(*var_id, input.clone());
63                saved_positional = Some(*var_id);
64            }
65
66            let eval_block = get_eval_block(engine_state);
67
68            result = eval_block(
69                engine_state,
70                &mut stack_captures,
71                block,
72                input.into_pipeline_data_with_metadata(metadata),
73            )
74            .map(|p| p.body);
75
76            if call.has_flag(engine_state, stack, "keep-env")? {
77                redirect_env(engine_state, stack, &stack_captures);
78                // for when we support `data | let x = $in;`
79                // remove the variables added earlier
80                for (var_id, _) in closure.captures {
81                    stack_captures.remove_var(var_id);
82                }
83                if let Some(u) = saved_positional {
84                    stack_captures.remove_var(u);
85                }
86                // add any new variables to the stack
87                stack.vars.extend(stack_captures.vars);
88            }
89        } else {
90            result = Ok(input.into_pipeline_data_with_metadata(metadata));
91        }
92
93        result
94    }
95
96    fn examples(&self) -> Vec<Example<'_>> {
97        vec![
98            Example {
99                description: "Use the second value in the stream",
100                example: "[1 2 3] | collect { |x| $x.1 }",
101                result: Some(Value::test_int(2)),
102            },
103            Example {
104                description: "Read and write to the same file",
105                example: "open file.txt | collect | save -f file.txt",
106                result: None,
107            },
108        ]
109    }
110}
111
112#[cfg(test)]
113mod test {
114    use super::*;
115
116    #[test]
117    fn test_examples() {
118        use crate::test_examples;
119
120        test_examples(Collect {})
121    }
122}