nu_command/filters/
interleave.rs

1use nu_engine::{ClosureEvalOnce, command_prelude::*};
2use nu_protocol::{engine::Closure, shell_error::io::IoError};
3use std::{sync::mpsc, thread};
4
5#[derive(Clone)]
6pub struct Interleave;
7
8impl Command for Interleave {
9    fn name(&self) -> &str {
10        "interleave"
11    }
12
13    fn description(&self) -> &str {
14        "Read multiple streams in parallel and combine them into one stream."
15    }
16
17    fn extra_description(&self) -> &str {
18        r#"This combinator is useful for reading output from multiple commands.
19
20If input is provided to `interleave`, the input will be combined with the
21output of the closures. This enables `interleave` to be used at any position
22within a pipeline.
23
24Because items from each stream will be inserted into the final stream as soon
25as they are available, there is no guarantee of how the final output will be
26ordered. However, the order of items from any given stream is guaranteed to be
27preserved as they were in that stream.
28
29If interleaving streams in a fair (round-robin) manner is desired, consider
30using `zip { ... } | flatten` instead."#
31    }
32
33    fn signature(&self) -> Signature {
34        Signature::build("interleave")
35            .input_output_types(vec![
36                (Type::List(Type::Any.into()), Type::List(Type::Any.into())),
37                (Type::Nothing, Type::List(Type::Any.into())),
38            ])
39            .named(
40                "buffer-size",
41                SyntaxShape::Int,
42                "Number of items to buffer from the streams. Increases memory usage, but can help \
43                    performance when lots of output is produced.",
44                Some('b'),
45            )
46            .rest(
47                "closures",
48                SyntaxShape::Closure(None),
49                "The closures that will generate streams to be combined.",
50            )
51            .allow_variants_without_examples(true)
52            .category(Category::Filters)
53    }
54
55    fn examples(&self) -> Vec<Example<'_>> {
56        vec![
57            Example {
58                example: "seq 1 50 | wrap a | interleave { seq 1 50 | wrap b }",
59                description: r#"Read two sequences of numbers into separate columns of a table.
60Note that the order of rows with 'a' columns and rows with 'b' columns is arbitrary."#,
61                result: None,
62            },
63            Example {
64                example: "seq 1 3 | interleave { seq 4 6 } | sort",
65                description: "Read two sequences of numbers, one from input. Sort for consistency.",
66                result: Some(Value::test_list(vec![
67                    Value::test_int(1),
68                    Value::test_int(2),
69                    Value::test_int(3),
70                    Value::test_int(4),
71                    Value::test_int(5),
72                    Value::test_int(6),
73                ])),
74            },
75            Example {
76                example: r#"interleave { "foo\nbar\n" | lines } { "baz\nquux\n" | lines } | sort"#,
77                description: "Read two sequences, but without any input. Sort for consistency.",
78                result: Some(Value::test_list(vec![
79                    Value::test_string("bar"),
80                    Value::test_string("baz"),
81                    Value::test_string("foo"),
82                    Value::test_string("quux"),
83                ])),
84            },
85            Example {
86                example: r#"(
87interleave
88    { nu -c "print hello; print world" | lines | each { "greeter: " ++ $in } }
89    { nu -c "print nushell; print rocks" | lines | each { "evangelist: " ++ $in } }
90)"#,
91                description: "Run two commands in parallel and annotate their output.",
92                result: None,
93            },
94            Example {
95                example: "seq 1 20000 | interleave --buffer-size 16 { seq 1 20000 } | math sum",
96                description: "Use a buffer to increase the performance of high-volume streams.",
97                result: None,
98            },
99        ]
100    }
101
102    fn run(
103        &self,
104        engine_state: &EngineState,
105        stack: &mut Stack,
106        call: &Call,
107        input: PipelineData,
108    ) -> Result<PipelineData, ShellError> {
109        let head = call.head;
110        let closures: Vec<Closure> = call.rest(engine_state, stack, 0)?;
111        let buffer_size: usize = call
112            .get_flag(engine_state, stack, "buffer-size")?
113            .unwrap_or(0);
114
115        let (tx, rx) = mpsc::sync_channel(buffer_size);
116
117        // Spawn the threads for the input and closure outputs
118        (!input.is_nothing())
119            .then(|| Ok(input))
120            .into_iter()
121            .chain(closures.into_iter().map(|closure| {
122                ClosureEvalOnce::new(engine_state, stack, closure)
123                    .run_with_input(PipelineData::empty())
124            }))
125            .try_for_each(|stream| {
126                stream.and_then(|stream| {
127                    // Then take the stream and spawn a thread to send it to our channel
128                    let tx = tx.clone();
129                    thread::Builder::new()
130                        .name("interleave consumer".into())
131                        .spawn(move || {
132                            for value in stream {
133                                if tx.send(value).is_err() {
134                                    // Stop sending if the channel is dropped
135                                    break;
136                                }
137                            }
138                        })
139                        .map(|_| ())
140                        .map_err(|err| IoError::new(err, head, None).into())
141                })
142            })?;
143
144        // Now that threads are writing to the channel, we just return it as a stream
145        Ok(rx
146            .into_iter()
147            .into_pipeline_data(head, engine_state.signals().clone()))
148    }
149}
150
151#[cfg(test)]
152mod test {
153    use super::*;
154
155    #[test]
156    fn test_examples() {
157        use crate::test_examples;
158
159        test_examples(Interleave {})
160    }
161}