flow_mel/vec/
i8.rs

1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Flatten a stream of `Vec<i8>`.
5///
6/// All the input vectors are turned into continuous stream of scalar values, keeping order.
7/// ```mermaid
8/// graph LR
9///     T("flatten()")
10///     B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|vector| T
11///     
12///     T -->|value| O["🟦 🟦 🟦 🟦 🟦 🟦"]
13///
14///     style B fill:#ffff,stroke:#ffff
15///     style O fill:#ffff,stroke:#ffff
16/// ```
17#[mel_treatment(
18    input vector Stream<Vec<i8>>
19    output value Stream<i8>
20)]
21pub async fn flatten() {
22    'main: while let Ok(vectors) = vector.recv_vec_i8().await {
23        for vec in vectors {
24            check!('main, value.send_i8(vec).await)
25        }
26    }
27}
28
29/// Chain two streams of `Vec<i8>`.
30///
31///
32/// ```mermaid
33/// graph LR
34///     T("chain()")
35///     A["[🟨 🟨][🟨 🟨 🟨][🟨]"] -->|first| T
36///     B["…[🟪][🟪 🟪]"] -->|second| T
37///     
38///     T -->|chained| O["…[🟪][🟪 🟪][🟨 🟨][🟨 🟨 🟨][🟨]"]
39///
40///     style A fill:#ffff,stroke:#ffff
41///     style B fill:#ffff,stroke:#ffff
42///     style O fill:#ffff,stroke:#ffff
43/// ```
44#[mel_treatment(
45    input first Stream<Vec<i8>>
46    input second Stream<Vec<i8>>
47    output chained Stream<Vec<i8>>
48)]
49pub async fn chain() {
50    while let Ok(vectors) = first.recv_vec_i8().await {
51        check!(chained.send_vec_i8(vectors).await)
52    }
53
54    while let Ok(vectors) = second.recv_vec_i8().await {
55        check!(chained.send_vec_i8(vectors).await)
56    }
57}
58
59/// Merge two streams of `Vec<i8>`.
60///
61/// The two streams are merged using the `select` stream:
62/// - when `true`, vector from `a` is used;
63/// - when `false`, vector from `b` is used.
64///
65/// ℹ️ No vector from either `a` or `b` are discarded, they are used when `select` give turn.
66///
67/// ⚠️ When `select` ends merge terminates without treating the remaining vectors from `a` and `b`.
68/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
69/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
70///
71/// ```mermaid
72/// graph LR
73///     T("merge()")
74///     A["…[🟪 🟪 🟪][🟪 🟪]…"] -->|a| T
75///     B["…[🟨 🟨][🟨][🟨 🟨 🟨]…"] -->|b| T
76///     O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
77///     
78///
79///     T -->|value| V["…[🟪 🟪 🟪][🟨 🟨][🟨][🟪 🟪][🟨 🟨 🟨]…"]
80///
81///     style V fill:#ffff,stroke:#ffff
82///     style O fill:#ffff,stroke:#ffff
83///     style A fill:#ffff,stroke:#ffff
84///     style B fill:#ffff,stroke:#ffff
85/// ```
86#[mel_treatment(
87    input a Stream<Vec<i8>>
88    input b Stream<Vec<i8>>
89    input select Stream<bool>
90    output value Stream<Vec<i8>>
91)]
92pub async fn merge() {
93    while let Ok(select) = select.recv_one_bool().await {
94        let val;
95        if select {
96            if let Ok(v) = a.recv_one_vec_i8().await {
97                val = v;
98            } else {
99                break;
100            }
101        } else {
102            if let Ok(v) = b.recv_one_vec_i8().await {
103                val = v;
104            } else {
105                break;
106            }
107        }
108
109        check!(value.send_one_vec_i8(val).await)
110    }
111}
112
113/// Filter a `Vec<i8>` stream according to `bool` stream.
114///
115/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
116///  
117/// ```mermaid
118/// graph LR
119///     T("filter()")
120///     V["…[🟪 🟪 🟪][🟨 🟨][🟨][🟪 🟪][🟨 🟨 🟨]…"] -->|value| T
121///     D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
122///     
123///     T -->|accepted| A["…[🟪 🟪 🟪][🟪 🟪]…"]
124///     T -->|rejected| R["…[🟨 🟨][🟨][🟨 🟨 🟨]…"]
125///
126///     style V fill:#ffff,stroke:#ffff
127///     style D fill:#ffff,stroke:#ffff
128///     style A fill:#ffff,stroke:#ffff
129///     style R fill:#ffff,stroke:#ffff
130/// ```
131#[mel_treatment(
132    input value Stream<Vec<i8>>
133    input select Stream<bool>
134    output accepted Stream<Vec<i8>>
135    output rejected Stream<Vec<i8>>
136)]
137pub async fn filter() {
138    let mut accepted_op = true;
139    let mut rejected_op = true;
140
141    while let (Ok(value), Ok(select)) =
142        futures::join!(value.recv_one_vec_i8(), select.recv_one_bool())
143    {
144        if select {
145            if let Err(_) = accepted.send_one_vec_i8(value).await {
146                // If we cannot send anymore on accepted, we note it,
147                // and check if rejected is still valid, else just terminate.
148                accepted_op = false;
149                if !rejected_op {
150                    break;
151                }
152            }
153        } else {
154            if let Err(_) = rejected.send_one_vec_i8(value).await {
155                // If we cannot send anymore on rejected, we note it,
156                // and check if accepted is still valid, else just terminate.
157                rejected_op = false;
158                if !accepted_op {
159                    break;
160                }
161            }
162        }
163    }
164}
165
166/// Trigger on `Vec<i8>` stream start and end.
167///
168/// Emit `start` when a first value is send through the stream.
169/// Emit `end` when stream is finally over.
170///
171/// Emit `first` with the first vector coming in the stream.
172/// Emit `last` with the last vector coming in the stream.
173///
174/// ℹ️ `start` and `first` are always emitted together.
175/// If the stream only contains one vector, `first` and `last` both contains it.
176/// If the stream never transmit any data before being ended, only `end` is emitted.
177///
178/// ```mermaid
179/// graph LR
180///     T("trigger()")
181///     B["[🟥 🟥] … [🟨 🟨] [🟨 🟨] [🟨 🟨] … [🟩 🟩]"] -->|stream| T
182///     
183///     T -->|start| S["〈🟦〉"]
184///     T -->|first| F["〈[🟩 🟩]〉"]
185///     T -->|last| L["〈[🟥 🟥]〉"]
186///     T -->|end| E["〈🟦〉"]
187///
188///     style B fill:#ffff,stroke:#ffff
189///     style S fill:#ffff,stroke:#ffff
190///     style F fill:#ffff,stroke:#ffff
191///     style L fill:#ffff,stroke:#ffff
192///     style E fill:#ffff,stroke:#ffff
193/// ```
194#[mel_treatment(
195    input stream Stream<Vec<i8>>
196    output start Block<void>
197    output end Block<void>
198    output first Block<Vec<i8>>
199    output last Block<Vec<i8>>
200)]
201pub async fn trigger() {
202    let mut last_value = None;
203
204    if let Ok(values) = stream.recv_vec_i8().await {
205        let _ = start.send_one_void(()).await;
206        if let Some(val) = values.first().cloned() {
207            let _ = first.send_one_vec_i8(val).await;
208        }
209        last_value = values.last().cloned();
210        let _ = futures::join!(start.close(), first.close());
211    }
212
213    while let Ok(values) = stream.recv_vec_i8().await {
214        last_value = values.last().cloned();
215    }
216
217    let _ = end.send_one_void(()).await;
218    if let Some(val) = last_value {
219        let _ = last.send_one_vec_i8(val).await;
220    }
221
222    // We don't close `end` and `last` explicitly here,
223    // because it would be redundant with boilerplate
224    // implementation of treatments.
225}
226
227/// Stream a block `Vec<i8>` element.
228///
229/// ```mermaid
230/// graph LR
231///     T("stream()")
232///     B["〈[🟦]〉"] -->|block| T
233///         
234///     T -->|stream| S["[🟦]"]
235///     
236///     
237///     style B fill:#ffff,stroke:#ffff
238///     style S fill:#ffff,stroke:#ffff
239/// ```
240#[mel_treatment(
241    input block Block<Vec<i8>>
242    output stream Stream<Vec<i8>>
243)]
244pub async fn stream() {
245    if let Ok(val) = block.recv_one_vec_i8().await {
246        let _ = stream.send_one_vec_i8(val).await;
247    }
248}
249
250/// Emit a block `Vec<i8>` value.
251///
252/// When `trigger` is enabled, `value` is emitted as block.
253///
254/// ```mermaid
255/// graph LR
256///     T("emit(value=[🟨])")
257///     B["〈🟦〉"] -->|trigger| T
258///         
259///     T -->|emit| S["〈[🟨]〉"]
260///     
261///     style B fill:#ffff,stroke:#ffff
262///     style S fill:#ffff,stroke:#ffff
263/// ```
264#[mel_treatment(
265    input trigger Block<void>
266    output emit Block<Vec<i8>>
267)]
268pub async fn emit(value: Vec<i8>) {
269    if let Ok(_) = trigger.recv_one_void().await {
270        let _ = emit.send_one_vec_i8(value).await;
271    }
272}
273
274/// Gives pattern of a `Vec<i8>` stream.
275///
276/// ```mermaid
277/// graph LR
278///     T("pattern()")
279///     A["…[🟨 🟨][🟨][🟨 🟨 🟨]"] -->|stream| T
280///     
281///     T -->|pattern| O["… [🟦 🟦][🟦][🟦 🟦 🟦]"]
282///
283///     style A fill:#ffff,stroke:#ffff
284///     style O fill:#ffff,stroke:#ffff
285/// ```
286#[mel_treatment(
287    input stream Stream<Vec<i8>>
288    output pattern Stream<Vec<void>>
289)]
290pub async fn pattern() {
291    while let Ok(vectors) = stream.recv_vec_i8().await {
292        check!(
293            pattern
294                .send_vec_void(vectors.into_iter().map(|vec| vec![(); vec.len()]).collect())
295                .await
296        )
297    }
298}
299
300/// Fit a stream of `i8` into stream of `Vec<i8>`, using a pattern.
301///
302/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
303/// If there are not enough values to fit the pattern, uncomplete vector is trashed.
304///
305/// ```mermaid
306/// graph LR
307///     T("fit()")
308///     A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
309///     B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|pattern| T
310///     
311///     T -->|fitted| O["[🟨 🟨][🟨][🟨 🟨 🟨]"]
312///
313///     style A fill:#ffff,stroke:#ffff
314///     style B fill:#ffff,stroke:#ffff
315///     style O fill:#ffff,stroke:#ffff
316/// ```
317#[mel_treatment(
318    input value Stream<i8>
319    input pattern Stream<Vec<void>>
320    output fitted Stream<Vec<i8>>
321)]
322pub async fn fit() {
323    'main: while let Ok(patterns) = pattern.recv_vec_void().await {
324        for pattern in patterns {
325            let mut vector = Vec::with_capacity(pattern.len());
326            for _ in 0..pattern.len() {
327                if let Ok(val) = value.recv_one_i8().await {
328                    vector.push(val);
329                } else {
330                    // Uncomplete, we 'trash' vector
331                    break 'main;
332                }
333            }
334            check!('main, fitted.send_one_vec_i8(vector).await)
335        }
336    }
337}
338
339/// Fill a pattern stream with a `i8` value.
340///
341/// ```mermaid
342/// graph LR
343/// T("fill(value=🟧)")
344/// B["…[🟦 🟦][🟦][🟦 🟦 🟦]…"] -->|pattern| T
345///
346/// T -->|filled| O["…[🟧 🟧][🟧][🟧 🟧 🟧]…"]
347///
348/// style B fill:#ffff,stroke:#ffff
349/// style O fill:#ffff,stroke:#ffff
350/// ```
351#[mel_treatment(
352    default value 0
353    input pattern Stream<Vec<void>>
354    output filled Stream<Vec<i8>>
355)]
356pub async fn fill(value: i8) {
357    while let Ok(pat) = pattern.recv_vec_void().await {
358        check!(
359            filled
360                .send_vec_i8(
361                    pat.into_iter()
362                        .map(|p| vec![value.clone(); p.len()])
363                        .collect()
364                )
365                .await
366        )
367    }
368}
369
370/// Resize vectors according to given streamed size.
371///
372/// If a vector is smaller than expected size, it is extended using the `default` value.
373///
374/// ```mermaid
375/// graph LR
376///     T("resize(default=🟨)")
377///     V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
378///     S["3️⃣ 2️⃣ 3️⃣ 2️⃣ …"] -->|size| T
379///     
380///     T -->|resized| P["[🟦 🟦 🟨][🟦 🟨][🟨 🟨 🟨][🟦 🟦]…"]
381///
382///     style V fill:#ffff,stroke:#ffff
383///     style S fill:#ffff,stroke:#ffff
384///     style P fill:#ffff,stroke:#ffff
385/// ```
386#[mel_treatment(
387    default default 0
388    input vector Stream<Vec<i8>>
389    input size Stream<u64>
390    output resized Stream<Vec<i8>>
391)]
392pub async fn resize(default: i8) {
393    while let Ok(size) = size.recv_one_u64().await {
394        if let Ok(mut vec) = vector.recv_one_vec_i8().await {
395            vec.resize(size as usize, default.clone());
396            check!(resized.send_one_vec_i8(vec).await);
397        } else {
398            break;
399        }
400    }
401}