nu_command/filters/
interleave.rs1use nu_engine::{ClosureEvalOnce, command_prelude::*};
2use nu_protocol::{engine::Closure, shell_error::io::IoError};
3use std::{sync::mpsc, thread};
4
5#[derive(Clone)]
6pub struct Interleave;
7
8impl Command for Interleave {
9 fn name(&self) -> &str {
10 "interleave"
11 }
12
13 fn description(&self) -> &str {
14 "Read multiple streams in parallel and combine them into one stream."
15 }
16
17 fn extra_description(&self) -> &str {
18 r#"This combinator is useful for reading output from multiple commands.
19
20If input is provided to `interleave`, the input will be combined with the
21output of the closures. This enables `interleave` to be used at any position
22within a pipeline.
23
24Because items from each stream will be inserted into the final stream as soon
25as they are available, there is no guarantee of how the final output will be
26ordered. However, the order of items from any given stream is guaranteed to be
27preserved as they were in that stream.
28
29If interleaving streams in a fair (round-robin) manner is desired, consider
30using `zip { ... } | flatten` instead."#
31 }
32
33 fn signature(&self) -> Signature {
34 Signature::build("interleave")
35 .input_output_types(vec![
36 (Type::List(Type::Any.into()), Type::List(Type::Any.into())),
37 (Type::Nothing, Type::List(Type::Any.into())),
38 ])
39 .named(
40 "buffer-size",
41 SyntaxShape::Int,
42 "Number of items to buffer from the streams. Increases memory usage, but can help \
43 performance when lots of output is produced.",
44 Some('b'),
45 )
46 .rest(
47 "closures",
48 SyntaxShape::Closure(None),
49 "The closures that will generate streams to be combined.",
50 )
51 .allow_variants_without_examples(true)
52 .category(Category::Filters)
53 }
54
55 fn examples(&self) -> Vec<Example<'_>> {
56 vec![
57 Example {
58 example: "seq 1 50 | wrap a | interleave { seq 1 50 | wrap b }",
59 description: r#"Read two sequences of numbers into separate columns of a table.
60Note that the order of rows with 'a' columns and rows with 'b' columns is arbitrary."#,
61 result: None,
62 },
63 Example {
64 example: "seq 1 3 | interleave { seq 4 6 } | sort",
65 description: "Read two sequences of numbers, one from input. Sort for consistency.",
66 result: Some(Value::test_list(vec![
67 Value::test_int(1),
68 Value::test_int(2),
69 Value::test_int(3),
70 Value::test_int(4),
71 Value::test_int(5),
72 Value::test_int(6),
73 ])),
74 },
75 Example {
76 example: r#"interleave { "foo\nbar\n" | lines } { "baz\nquux\n" | lines } | sort"#,
77 description: "Read two sequences, but without any input. Sort for consistency.",
78 result: Some(Value::test_list(vec![
79 Value::test_string("bar"),
80 Value::test_string("baz"),
81 Value::test_string("foo"),
82 Value::test_string("quux"),
83 ])),
84 },
85 Example {
86 example: r#"(
87interleave
88 { nu -c "print hello; print world" | lines | each { "greeter: " ++ $in } }
89 { nu -c "print nushell; print rocks" | lines | each { "evangelist: " ++ $in } }
90)"#,
91 description: "Run two commands in parallel and annotate their output.",
92 result: None,
93 },
94 Example {
95 example: "seq 1 20000 | interleave --buffer-size 16 { seq 1 20000 } | math sum",
96 description: "Use a buffer to increase the performance of high-volume streams.",
97 result: None,
98 },
99 ]
100 }
101
102 fn run(
103 &self,
104 engine_state: &EngineState,
105 stack: &mut Stack,
106 call: &Call,
107 input: PipelineData,
108 ) -> Result<PipelineData, ShellError> {
109 let head = call.head;
110 let closures: Vec<Closure> = call.rest(engine_state, stack, 0)?;
111 let buffer_size: usize = call
112 .get_flag(engine_state, stack, "buffer-size")?
113 .unwrap_or(0);
114
115 let (tx, rx) = mpsc::sync_channel(buffer_size);
116
117 (!input.is_nothing())
119 .then(|| Ok(input))
120 .into_iter()
121 .chain(closures.into_iter().map(|closure| {
122 ClosureEvalOnce::new(engine_state, stack, closure)
123 .run_with_input(PipelineData::empty())
124 }))
125 .try_for_each(|stream| {
126 stream.and_then(|stream| {
127 let tx = tx.clone();
129 thread::Builder::new()
130 .name("interleave consumer".into())
131 .spawn(move || {
132 for value in stream {
133 if tx.send(value).is_err() {
134 break;
136 }
137 }
138 })
139 .map(|_| ())
140 .map_err(|err| IoError::new(err, head, None).into())
141 })
142 })?;
143
144 Ok(rx
146 .into_iter()
147 .into_pipeline_data(head, engine_state.signals().clone()))
148 }
149}
150
151#[cfg(test)]
152mod test {
153 use super::*;
154
155 #[test]
156 fn test_examples() {
157 use crate::test_examples;
158
159 test_examples(Interleave {})
160 }
161}