1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
use std::{sync::mpsc, thread};

use nu_engine::{eval_block_with_early_return, CallExt};
use nu_protocol::{
    ast::Call,
    engine::{Closure, Command, EngineState, Stack},
    Category, Example, IntoInterruptiblePipelineData, PipelineData, ShellError, Signature,
    SyntaxShape, Type, Value,
};

#[derive(Clone)]
pub struct Interleave;

impl Command for Interleave {
    fn name(&self) -> &str {
        "interleave"
    }

    fn usage(&self) -> &str {
        "Read multiple streams in parallel and combine them into one stream."
    }

    fn extra_usage(&self) -> &str {
        r#"This combinator is useful for reading output from multiple commands.

If input is provided to `interleave`, the input will be combined with the
output of the closures. This enables `interleave` to be used at any position
within a pipeline.

Because items from each stream will be inserted into the final stream as soon
as they are available, there is no guarantee of how the final output will be
ordered. However, the order of items from any given stream is guaranteed to be
preserved as they were in that stream.

If interleaving streams in a fair (round-robin) manner is desired, consider
using `zip { ... } | flatten` instead."#
    }

    fn signature(&self) -> Signature {
        Signature::build("interleave")
            .input_output_types(vec![
                (Type::List(Type::Any.into()), Type::List(Type::Any.into())),
                (Type::Nothing, Type::List(Type::Any.into())),
            ])
            .named(
                "buffer-size",
                SyntaxShape::Int,
                "Number of items to buffer from the streams. Increases memory usage, but can help \
                    performance when lots of output is produced.",
                Some('b'),
            )
            .rest(
                "closures",
                SyntaxShape::Closure(None),
                "The closures that will generate streams to be combined.",
            )
            .allow_variants_without_examples(true)
            .category(Category::Filters)
    }

    fn examples(&self) -> Vec<Example> {
        vec![
            Example {
                example: "seq 1 50 | wrap a | interleave { seq 1 50 | wrap b }",
                description: r#"Read two sequences of numbers into separate columns of a table.
Note that the order of rows with 'a' columns and rows with 'b' columns is arbitrary."#,
                result: None,
            },
            Example {
                example: "seq 1 3 | interleave { seq 4 6 } | sort",
                description: "Read two sequences of numbers, one from input. Sort for consistency.",
                result: Some(Value::test_list(vec![
                    Value::test_int(1),
                    Value::test_int(2),
                    Value::test_int(3),
                    Value::test_int(4),
                    Value::test_int(5),
                    Value::test_int(6),
                ])),
            },
            Example {
                example: r#"interleave { "foo\nbar\n" | lines } { "baz\nquux\n" | lines } | sort"#,
                description: "Read two sequences, but without any input. Sort for consistency.",
                result: Some(Value::test_list(vec![
                    Value::test_string("bar"),
                    Value::test_string("baz"),
                    Value::test_string("foo"),
                    Value::test_string("quux"),
                ])),
            },
            Example {
                example: r#"(
interleave
    { nu -c "print hello; print world" | lines | each { "greeter: " ++ $in } }
    { nu -c "print nushell; print rocks" | lines | each { "evangelist: " ++ $in } }
)"#,
                description: "Run two commands in parallel and annotate their output.",
                result: None,
            },
            Example {
                example: "seq 1 20000 | interleave --buffer-size 16 { seq 1 20000 } | math sum",
                description: "Use a buffer to increase the performance of high-volume streams.",
                result: None,
            },
        ]
    }

    fn run(
        &self,
        engine_state: &EngineState,
        stack: &mut Stack,
        call: &Call,
        input: PipelineData,
    ) -> Result<PipelineData, ShellError> {
        let buffer_size: usize = call
            .get_flag(engine_state, stack, "buffer-size")?
            .unwrap_or(0);
        let (tx, rx) = mpsc::sync_channel(buffer_size);

        let closures: Vec<Closure> = call.rest(engine_state, stack, 0)?;

        // Spawn the threads for the input and closure outputs
        (!input.is_nothing())
            .then(|| Ok(input))
            .into_iter()
            .chain(closures.into_iter().map(|closure| {
                // Evaluate the closure on this thread
                let block = engine_state.get_block(closure.block_id);
                let mut stack = stack.captures_to_stack(closure.captures);
                eval_block_with_early_return(
                    engine_state,
                    &mut stack,
                    block,
                    PipelineData::Empty,
                    true,
                    false,
                )
            }))
            .try_for_each(|stream| {
                stream.and_then(|stream| {
                    // Then take the stream and spawn a thread to send it to our channel
                    let tx = tx.clone();
                    thread::Builder::new()
                        .name("interleave consumer".into())
                        .spawn(move || {
                            for value in stream {
                                if tx.send(value).is_err() {
                                    // Stop sending if the channel is dropped
                                    break;
                                }
                            }
                        })
                        .map(|_| ())
                        .map_err(|err| ShellError::IOErrorSpanned {
                            msg: err.to_string(),
                            span: call.head,
                        })
                })
            })?;

        // Now that threads are writing to the channel, we just return it as a stream
        Ok(rx
            .into_iter()
            .into_pipeline_data(engine_state.ctrlc.clone()))
    }
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn test_examples() {
        use crate::test_examples;

        test_examples(Interleave {})
    }
}