flow_mel/vec/
void.rs

1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Flatten a stream of `Vec<void>`.
5///
6/// All the input vectors are turned into continuous stream of scalar values, keeping order.
7/// ```mermaid
8/// graph LR
9///     T("flatten()")
10///     B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|vector| T
11///     
12///     T -->|value| O["🟦 🟦 🟦 🟦 🟦 🟦"]
13///
14///     style B fill:#ffff,stroke:#ffff
15///     style O fill:#ffff,stroke:#ffff
16/// ```
17#[mel_treatment(
18    input vector Stream<Vec<void>>
19    output value Stream<void>
20)]
21pub async fn flatten() {
22    'main: while let Ok(vectors) = vector.recv_vec_void().await {
23        for vec in vectors {
24            check!('main, value.send_void(vec).await)
25        }
26    }
27}
28
29/// Chain two streams of `Vec<void>`.
30///
31///
32/// ```mermaid
33/// graph LR
34///     T("chain()")
35///     A["[🟨 🟨][🟨 🟨 🟨][🟨]"] -->|first| T
36///     B["…[🟪][🟪 🟪]"] -->|second| T
37///     
38///     T -->|chained| O["…[🟪][🟪 🟪][🟨 🟨][🟨 🟨 🟨][🟨]"]
39///
40///     style A fill:#ffff,stroke:#ffff
41///     style B fill:#ffff,stroke:#ffff
42///     style O fill:#ffff,stroke:#ffff
43/// ```
44#[mel_treatment(
45    input first Stream<Vec<void>>
46    input second Stream<Vec<void>>
47    output chained Stream<Vec<void>>
48)]
49pub async fn chain() {
50    while let Ok(vectors) = first.recv_vec_void().await {
51        check!(chained.send_vec_void(vectors).await)
52    }
53
54    while let Ok(vectors) = second.recv_vec_void().await {
55        check!(chained.send_vec_void(vectors).await)
56    }
57}
58
59/// Merge two streams of `Vec<void>`.
60///
61/// The two streams are merged using the `select` stream:
62/// - when `true`, vector from `a` is used;
63/// - when `false`, vector from `b` is used.
64///
65/// ℹ️ No vector from either `a` or `b` are discarded, they are used when `select` give turn.
66///
67/// ⚠️ When `select` ends merge terminates without treating the remaining vectors from `a` and `b`.
68/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
69/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
70///
71/// ```mermaid
72/// graph LR
73///     T("merge()")
74///     A["…[🟪 🟪 🟪][🟪 🟪]…"] -->|a| T
75///     B["…[🟨 🟨][🟨][🟨 🟨 🟨]…"] -->|b| T
76///     O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
77///     
78///
79///     T -->|value| V["…[🟪 🟪 🟪][🟨 🟨][🟨][🟪 🟪][🟨 🟨 🟨]…"]
80///
81///     style V fill:#ffff,stroke:#ffff
82///     style O fill:#ffff,stroke:#ffff
83///     style A fill:#ffff,stroke:#ffff
84///     style B fill:#ffff,stroke:#ffff
85/// ```
86#[mel_treatment(
87    input a Stream<Vec<void>>
88    input b Stream<Vec<void>>
89    input select Stream<bool>
90    output value Stream<Vec<void>>
91)]
92pub async fn merge() {
93    while let Ok(select) = select.recv_one_bool().await {
94        let val;
95        if select {
96            if let Ok(v) = a.recv_one_vec_void().await {
97                val = v;
98            } else {
99                break;
100            }
101        } else {
102            if let Ok(v) = b.recv_one_vec_void().await {
103                val = v;
104            } else {
105                break;
106            }
107        }
108
109        check!(value.send_one_vec_void(val).await)
110    }
111}
112
113/// Filter a `Vec<void>` stream according to `bool` stream.
114///
115/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
116///  
117/// ```mermaid
118/// graph LR
119///     T("filter()")
120///     V["…[🟪 🟪 🟪][🟨 🟨][🟨][🟪 🟪][🟨 🟨 🟨]…"] -->|value| T
121///     D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
122///     
123///     T -->|accepted| A["…[🟪 🟪 🟪][🟪 🟪]…"]
124///     T -->|rejected| R["…[🟨 🟨][🟨][🟨 🟨 🟨]…"]
125///
126///     style V fill:#ffff,stroke:#ffff
127///     style D fill:#ffff,stroke:#ffff
128///     style A fill:#ffff,stroke:#ffff
129///     style R fill:#ffff,stroke:#ffff
130/// ```
131#[mel_treatment(
132    input value Stream<Vec<void>>
133    input select Stream<bool>
134    output accepted Stream<Vec<void>>
135    output rejected Stream<Vec<void>>
136)]
137pub async fn filter() {
138    let mut accepted_op = true;
139    let mut rejected_op = true;
140
141    while let (Ok(value), Ok(select)) =
142        futures::join!(value.recv_one_vec_void(), select.recv_one_bool())
143    {
144        if select {
145            if let Err(_) = accepted.send_one_vec_void(value).await {
146                // If we cannot send anymore on accepted, we note it,
147                // and check if rejected is still valid, else just terminate.
148                accepted_op = false;
149                if !rejected_op {
150                    break;
151                }
152            }
153        } else {
154            if let Err(_) = rejected.send_one_vec_void(value).await {
155                // If we cannot send anymore on rejected, we note it,
156                // and check if accepted is still valid, else just terminate.
157                rejected_op = false;
158                if !accepted_op {
159                    break;
160                }
161            }
162        }
163    }
164}
165
166/// Trigger on `Vec<void>` stream start and end.
167///
168/// Emit `start` when a first value is send through the stream.
169/// Emit `end` when stream is finally over.
170///
171/// Emit `first` with the first vector coming in the stream.
172/// Emit `last` with the last vector coming in the stream.
173///
174/// ℹ️ `start` and `first` are always emitted together.
175/// If the stream only contains one vector, `first` and `last` both contains it.
176/// If the stream never transmit any data before being ended, only `end` is emitted.
177///
178/// ```mermaid
179/// graph LR
180///     T("trigger()")
181///     B["[🟥 🟥] … [🟨 🟨] [🟨 🟨] [🟨 🟨] … [🟩 🟩]"] -->|stream| T
182///     
183///     T -->|start| S["〈🟦〉"]
184///     T -->|first| F["〈[🟩 🟩]〉"]
185///     T -->|last| L["〈[🟥 🟥]〉"]
186///     T -->|end| E["〈🟦〉"]
187///
188///     style B fill:#ffff,stroke:#ffff
189///     style S fill:#ffff,stroke:#ffff
190///     style F fill:#ffff,stroke:#ffff
191///     style L fill:#ffff,stroke:#ffff
192///     style E fill:#ffff,stroke:#ffff
193/// ```
194#[mel_treatment(
195    input stream Stream<Vec<void>>
196    output start Block<void>
197    output end Block<void>
198    output first Block<Vec<void>>
199    output last Block<Vec<void>>
200)]
201pub async fn trigger() {
202    let mut last_value = None;
203
204    if let Ok(values) = stream.recv_vec_void().await {
205        let _ = start.send_one_void(()).await;
206        if let Some(val) = values.first().cloned() {
207            let _ = first.send_one_vec_void(val).await;
208        }
209        last_value = values.last().cloned();
210        let _ = futures::join!(start.close(), first.close());
211    }
212
213    while let Ok(values) = stream.recv_vec_void().await {
214        last_value = values.last().cloned();
215    }
216
217    let _ = end.send_one_void(()).await;
218    if let Some(val) = last_value {
219        let _ = last.send_one_vec_void(val).await;
220    }
221
222    // We don't close `end` and `last` explicitly here,
223    // because it would be redundant with boilerplate
224    // implementation of treatments.
225}
226
227/// Stream a block `Vec<void>` element.
228///
229/// ```mermaid
230/// graph LR
231///     T("stream()")
232///     B["〈[🟦]〉"] -->|block| T
233///         
234///     T -->|stream| S["[🟦]"]
235///     
236///     
237///     style B fill:#ffff,stroke:#ffff
238///     style S fill:#ffff,stroke:#ffff
239/// ```
240#[mel_treatment(
241    input block Block<Vec<void>>
242    output stream Stream<Vec<void>>
243)]
244pub async fn stream() {
245    if let Ok(val) = block.recv_one_vec_void().await {
246        let _ = stream.send_one_vec_void(val).await;
247    }
248}
249
250/// Emit a block `Vec<void>` value.
251///
252/// When `trigger` is enabled, `value` is emitted as block.
253///
254/// ```mermaid
255/// graph LR
256///     T("emit(value=[🟨])")
257///     B["〈🟦〉"] -->|trigger| T
258///         
259///     T -->|emit| S["〈[🟨]〉"]
260///     
261///     style B fill:#ffff,stroke:#ffff
262///     style S fill:#ffff,stroke:#ffff
263/// ```
264#[mel_treatment(
265    input trigger Block<void>
266    output emit Block<Vec<void>>
267)]
268pub async fn emit(value: Vec<void>) {
269    if let Ok(_) = trigger.recv_one_void().await {
270        let _ = emit.send_one_vec_void(value).await;
271    }
272}
273
274/// Gives count of elements passing through stream.
275///
276/// This count increment one for each vector within the stream, starting at 1.
277/// ℹ️ The count is independant from vector sizes.
278///
279/// ```mermaid
280/// graph LR
281///     T("count()")
282///     V["[🟦 🟦][🟦][🟦 🟦 🟦]…"] -->|stream| T
283///     
284///     T -->|count| P["1️⃣ 2️⃣ 3️⃣ …"]
285///
286///     style V fill:#ffff,stroke:#ffff
287///     style P fill:#ffff,stroke:#ffff
288/// ```
289#[mel_treatment(
290    input stream Stream<Vec<void>>
291    output count Stream<u128>
292)]
293pub async fn count() {
294    let mut i: u128 = 0;
295    while let Ok(iter) = stream.recv_vec_void().await {
296        let next_i = i + iter.len() as u128;
297        check!(count.send_u128((i..next_i).collect()).await);
298        i = next_i;
299    }
300}
301
302/// Gives size of vectors passing through stream.
303///
304/// For each vector one `size` value is sent, giving the number of elements contained within matching vector.
305///
306/// ```mermaid
307/// graph LR
308///     T("size()")
309///     V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
310///     
311///     T -->|size| P["2️⃣ 1️⃣ 0️⃣ 3️⃣ …"]
312///
313///     style V fill:#ffff,stroke:#ffff
314///     style P fill:#ffff,stroke:#ffff
315/// ```
316#[mel_treatment(
317    input vector Stream<Vec<void>>
318    output size Stream<u64>
319)]
320pub async fn size() {
321    while let Ok(iter) = vector.recv_vec_void().await {
322        check!(
323            size.send_u64(iter.into_iter().map(|v| v.len() as u64).collect())
324                .await
325        );
326    }
327}
328
329/// Resize vectors according to given streamed size.
330///
331/// ```mermaid
332/// graph LR
333///     T("resize()")
334///     V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
335///     S["3️⃣ 2️⃣ 3️⃣ 2️⃣ …"] -->|size| T
336///     
337///     T -->|resized| P["[🟦 🟦 🟦][🟦 🟦][🟦 🟦 🟦][🟦 🟦]…"]
338///
339///     style V fill:#ffff,stroke:#ffff
340///     style S fill:#ffff,stroke:#ffff
341///     style P fill:#ffff,stroke:#ffff
342/// ```
343#[mel_treatment(
344    input vector Stream<Vec<void>>
345    input size Stream<u64>
346    output resized Stream<Vec<void>>
347)]
348pub async fn resize() {
349    while let Ok(size) = size.recv_one_u64().await {
350        if let Ok(mut vec) = vector.recv_one_vec_void().await {
351            vec.resize(size as usize, ());
352            check!(resized.send_one_vec_void(vec).await);
353        } else {
354            break;
355        }
356    }
357}
358
359/// Generate a stream of empty `Vec<void>` according to a length.
360///
361/// ```mermaid
362/// graph LR
363///     T("generate()")
364///     B["〈🟨〉"] -->|length| T
365///         
366///     T -->|stream| S["… [][][][][]"]
367///     
368///     
369///     style B fill:#ffff,stroke:#ffff
370///     style S fill:#ffff,stroke:#ffff
371/// ```
372#[mel_treatment(
373    input length Block<u128>
374    output stream Stream<Vec<void>>
375)]
376pub async fn generate() {
377    if let Ok(length) = length.recv_one_u128().await {
378        const CHUNK: u128 = 2u128.pow(20);
379        let mut total = 0u128;
380        while total < length {
381            let chunk = u128::min(CHUNK, length - total) as usize;
382            check!(stream.send_vec_void(vec![vec![]; chunk]).await);
383            total += chunk as u128;
384        }
385    }
386}
387
388/// Generate a stream of empty `Vec<void>` indefinitely.
389///
390/// This generates a continuous stream of `Vec<void>`, until stream consumers closes it.
391///
392/// ```mermaid
393/// graph LR
394///     T("generateIndefinitely()")
395///     B["〈🟦〉"] -->|trigger| T
396///         
397///     T -->|stream| S["… [][][][][]"]
398///     
399///     
400///     style B fill:#ffff,stroke:#ffff
401///     style S fill:#ffff,stroke:#ffff
402/// ```
403#[mel_treatment(
404    input trigger Block<void>
405    output stream Stream<Vec<void>>
406)]
407pub async fn generate_indefinitely() {
408    if let Ok(_) = trigger.recv_one_void().await {
409        const CHUNK: usize = 2usize.pow(20);
410        loop {
411            check!(stream.send_vec_void(vec![vec![]; CHUNK]).await);
412        }
413    }
414}