flow_mel/vec/i8.rs
1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Flatten a stream of `Vec<i8>`.
5///
6/// All the input vectors are turned into continuous stream of scalar values, keeping order.
7/// ```mermaid
8/// graph LR
9/// T("flatten()")
10/// B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|vector| T
11///
12/// T -->|value| O["🟦 🟦 🟦 🟦 🟦 🟦"]
13///
14/// style B fill:#ffff,stroke:#ffff
15/// style O fill:#ffff,stroke:#ffff
16/// ```
17#[mel_treatment(
18 input vector Stream<Vec<i8>>
19 output value Stream<i8>
20)]
21pub async fn flatten() {
22 'main: while let Ok(vectors) = vector.recv_vec_i8().await {
23 for vec in vectors {
24 check!('main, value.send_i8(vec).await)
25 }
26 }
27}
28
29/// Chain two streams of `Vec<i8>`.
30///
31///
32/// ```mermaid
33/// graph LR
34/// T("chain()")
35/// A["[🟨 🟨][🟨 🟨 🟨][🟨]"] -->|first| T
36/// B["…[🟪][🟪 🟪]"] -->|second| T
37///
38/// T -->|chained| O["…[🟪][🟪 🟪][🟨 🟨][🟨 🟨 🟨][🟨]"]
39///
40/// style A fill:#ffff,stroke:#ffff
41/// style B fill:#ffff,stroke:#ffff
42/// style O fill:#ffff,stroke:#ffff
43/// ```
44#[mel_treatment(
45 input first Stream<Vec<i8>>
46 input second Stream<Vec<i8>>
47 output chained Stream<Vec<i8>>
48)]
49pub async fn chain() {
50 while let Ok(vectors) = first.recv_vec_i8().await {
51 check!(chained.send_vec_i8(vectors).await)
52 }
53
54 while let Ok(vectors) = second.recv_vec_i8().await {
55 check!(chained.send_vec_i8(vectors).await)
56 }
57}
58
59/// Merge two streams of `Vec<i8>`.
60///
61/// The two streams are merged using the `select` stream:
62/// - when `true`, vector from `a` is used;
63/// - when `false`, vector from `b` is used.
64///
65/// ℹ️ No vector from either `a` or `b` are discarded, they are used when `select` give turn.
66///
67/// ⚠️ When `select` ends merge terminates without treating the remaining vectors from `a` and `b`.
68/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
69/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
70///
71/// ```mermaid
72/// graph LR
73/// T("merge()")
74/// A["…[🟪 🟪 🟪][🟪 🟪]…"] -->|a| T
75/// B["…[🟨 🟨][🟨][🟨 🟨 🟨]…"] -->|b| T
76/// O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
77///
78///
79/// T -->|value| V["…[🟪 🟪 🟪][🟨 🟨][🟨][🟪 🟪][🟨 🟨 🟨]…"]
80///
81/// style V fill:#ffff,stroke:#ffff
82/// style O fill:#ffff,stroke:#ffff
83/// style A fill:#ffff,stroke:#ffff
84/// style B fill:#ffff,stroke:#ffff
85/// ```
86#[mel_treatment(
87 input a Stream<Vec<i8>>
88 input b Stream<Vec<i8>>
89 input select Stream<bool>
90 output value Stream<Vec<i8>>
91)]
92pub async fn merge() {
93 while let Ok(select) = select.recv_one_bool().await {
94 let val;
95 if select {
96 if let Ok(v) = a.recv_one_vec_i8().await {
97 val = v;
98 } else {
99 break;
100 }
101 } else {
102 if let Ok(v) = b.recv_one_vec_i8().await {
103 val = v;
104 } else {
105 break;
106 }
107 }
108
109 check!(value.send_one_vec_i8(val).await)
110 }
111}
112
113/// Filter a `Vec<i8>` stream according to `bool` stream.
114///
115/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
116///
117/// ```mermaid
118/// graph LR
119/// T("filter()")
120/// V["…[🟪 🟪 🟪][🟨 🟨][🟨][🟪 🟪][🟨 🟨 🟨]…"] -->|value| T
121/// D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
122///
123/// T -->|accepted| A["…[🟪 🟪 🟪][🟪 🟪]…"]
124/// T -->|rejected| R["…[🟨 🟨][🟨][🟨 🟨 🟨]…"]
125///
126/// style V fill:#ffff,stroke:#ffff
127/// style D fill:#ffff,stroke:#ffff
128/// style A fill:#ffff,stroke:#ffff
129/// style R fill:#ffff,stroke:#ffff
130/// ```
131#[mel_treatment(
132 input value Stream<Vec<i8>>
133 input select Stream<bool>
134 output accepted Stream<Vec<i8>>
135 output rejected Stream<Vec<i8>>
136)]
137pub async fn filter() {
138 let mut accepted_op = true;
139 let mut rejected_op = true;
140
141 while let (Ok(value), Ok(select)) =
142 futures::join!(value.recv_one_vec_i8(), select.recv_one_bool())
143 {
144 if select {
145 if let Err(_) = accepted.send_one_vec_i8(value).await {
146 // If we cannot send anymore on accepted, we note it,
147 // and check if rejected is still valid, else just terminate.
148 accepted_op = false;
149 if !rejected_op {
150 break;
151 }
152 }
153 } else {
154 if let Err(_) = rejected.send_one_vec_i8(value).await {
155 // If we cannot send anymore on rejected, we note it,
156 // and check if accepted is still valid, else just terminate.
157 rejected_op = false;
158 if !accepted_op {
159 break;
160 }
161 }
162 }
163 }
164}
165
166/// Trigger on `Vec<i8>` stream start and end.
167///
168/// Emit `start` when a first value is send through the stream.
169/// Emit `end` when stream is finally over.
170///
171/// Emit `first` with the first vector coming in the stream.
172/// Emit `last` with the last vector coming in the stream.
173///
174/// ℹ️ `start` and `first` are always emitted together.
175/// If the stream only contains one vector, `first` and `last` both contains it.
176/// If the stream never transmit any data before being ended, only `end` is emitted.
177///
178/// ```mermaid
179/// graph LR
180/// T("trigger()")
181/// B["[🟥 🟥] … [🟨 🟨] [🟨 🟨] [🟨 🟨] … [🟩 🟩]"] -->|stream| T
182///
183/// T -->|start| S["〈🟦〉"]
184/// T -->|first| F["〈[🟩 🟩]〉"]
185/// T -->|last| L["〈[🟥 🟥]〉"]
186/// T -->|end| E["〈🟦〉"]
187///
188/// style B fill:#ffff,stroke:#ffff
189/// style S fill:#ffff,stroke:#ffff
190/// style F fill:#ffff,stroke:#ffff
191/// style L fill:#ffff,stroke:#ffff
192/// style E fill:#ffff,stroke:#ffff
193/// ```
194#[mel_treatment(
195 input stream Stream<Vec<i8>>
196 output start Block<void>
197 output end Block<void>
198 output first Block<Vec<i8>>
199 output last Block<Vec<i8>>
200)]
201pub async fn trigger() {
202 let mut last_value = None;
203
204 if let Ok(values) = stream.recv_vec_i8().await {
205 let _ = start.send_one_void(()).await;
206 if let Some(val) = values.first().cloned() {
207 let _ = first.send_one_vec_i8(val).await;
208 }
209 last_value = values.last().cloned();
210 let _ = futures::join!(start.close(), first.close());
211 }
212
213 while let Ok(values) = stream.recv_vec_i8().await {
214 last_value = values.last().cloned();
215 }
216
217 let _ = end.send_one_void(()).await;
218 if let Some(val) = last_value {
219 let _ = last.send_one_vec_i8(val).await;
220 }
221
222 // We don't close `end` and `last` explicitly here,
223 // because it would be redundant with boilerplate
224 // implementation of treatments.
225}
226
227/// Stream a block `Vec<i8>` element.
228///
229/// ```mermaid
230/// graph LR
231/// T("stream()")
232/// B["〈[🟦]〉"] -->|block| T
233///
234/// T -->|stream| S["[🟦]"]
235///
236///
237/// style B fill:#ffff,stroke:#ffff
238/// style S fill:#ffff,stroke:#ffff
239/// ```
240#[mel_treatment(
241 input block Block<Vec<i8>>
242 output stream Stream<Vec<i8>>
243)]
244pub async fn stream() {
245 if let Ok(val) = block.recv_one_vec_i8().await {
246 let _ = stream.send_one_vec_i8(val).await;
247 }
248}
249
250/// Emit a block `Vec<i8>` value.
251///
252/// When `trigger` is enabled, `value` is emitted as block.
253///
254/// ```mermaid
255/// graph LR
256/// T("emit(value=[🟨])")
257/// B["〈🟦〉"] -->|trigger| T
258///
259/// T -->|emit| S["〈[🟨]〉"]
260///
261/// style B fill:#ffff,stroke:#ffff
262/// style S fill:#ffff,stroke:#ffff
263/// ```
264#[mel_treatment(
265 input trigger Block<void>
266 output emit Block<Vec<i8>>
267)]
268pub async fn emit(value: Vec<i8>) {
269 if let Ok(_) = trigger.recv_one_void().await {
270 let _ = emit.send_one_vec_i8(value).await;
271 }
272}
273
274/// Gives pattern of a `Vec<i8>` stream.
275///
276/// ```mermaid
277/// graph LR
278/// T("pattern()")
279/// A["…[🟨 🟨][🟨][🟨 🟨 🟨]"] -->|stream| T
280///
281/// T -->|pattern| O["… [🟦 🟦][🟦][🟦 🟦 🟦]"]
282///
283/// style A fill:#ffff,stroke:#ffff
284/// style O fill:#ffff,stroke:#ffff
285/// ```
286#[mel_treatment(
287 input stream Stream<Vec<i8>>
288 output pattern Stream<Vec<void>>
289)]
290pub async fn pattern() {
291 while let Ok(vectors) = stream.recv_vec_i8().await {
292 check!(
293 pattern
294 .send_vec_void(vectors.into_iter().map(|vec| vec![(); vec.len()]).collect())
295 .await
296 )
297 }
298}
299
300/// Fit a stream of `i8` into stream of `Vec<i8>`, using a pattern.
301///
302/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
303/// If there are not enough values to fit the pattern, uncomplete vector is trashed.
304///
305/// ```mermaid
306/// graph LR
307/// T("fit()")
308/// A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
309/// B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|pattern| T
310///
311/// T -->|fitted| O["[🟨 🟨][🟨][🟨 🟨 🟨]"]
312///
313/// style A fill:#ffff,stroke:#ffff
314/// style B fill:#ffff,stroke:#ffff
315/// style O fill:#ffff,stroke:#ffff
316/// ```
317#[mel_treatment(
318 input value Stream<i8>
319 input pattern Stream<Vec<void>>
320 output fitted Stream<Vec<i8>>
321)]
322pub async fn fit() {
323 'main: while let Ok(patterns) = pattern.recv_vec_void().await {
324 for pattern in patterns {
325 let mut vector = Vec::with_capacity(pattern.len());
326 for _ in 0..pattern.len() {
327 if let Ok(val) = value.recv_one_i8().await {
328 vector.push(val);
329 } else {
330 // Uncomplete, we 'trash' vector
331 break 'main;
332 }
333 }
334 check!('main, fitted.send_one_vec_i8(vector).await)
335 }
336 }
337}
338
339/// Fill a pattern stream with a `i8` value.
340///
341/// ```mermaid
342/// graph LR
343/// T("fill(value=🟧)")
344/// B["…[🟦 🟦][🟦][🟦 🟦 🟦]…"] -->|pattern| T
345///
346/// T -->|filled| O["…[🟧 🟧][🟧][🟧 🟧 🟧]…"]
347///
348/// style B fill:#ffff,stroke:#ffff
349/// style O fill:#ffff,stroke:#ffff
350/// ```
351#[mel_treatment(
352 default value 0
353 input pattern Stream<Vec<void>>
354 output filled Stream<Vec<i8>>
355)]
356pub async fn fill(value: i8) {
357 while let Ok(pat) = pattern.recv_vec_void().await {
358 check!(
359 filled
360 .send_vec_i8(
361 pat.into_iter()
362 .map(|p| vec![value.clone(); p.len()])
363 .collect()
364 )
365 .await
366 )
367 }
368}
369
370/// Resize vectors according to given streamed size.
371///
372/// If a vector is smaller than expected size, it is extended using the `default` value.
373///
374/// ```mermaid
375/// graph LR
376/// T("resize(default=🟨)")
377/// V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
378/// S["3️⃣ 2️⃣ 3️⃣ 2️⃣ …"] -->|size| T
379///
380/// T -->|resized| P["[🟦 🟦 🟨][🟦 🟨][🟨 🟨 🟨][🟦 🟦]…"]
381///
382/// style V fill:#ffff,stroke:#ffff
383/// style S fill:#ffff,stroke:#ffff
384/// style P fill:#ffff,stroke:#ffff
385/// ```
386#[mel_treatment(
387 default default 0
388 input vector Stream<Vec<i8>>
389 input size Stream<u64>
390 output resized Stream<Vec<i8>>
391)]
392pub async fn resize(default: i8) {
393 while let Ok(size) = size.recv_one_u64().await {
394 if let Ok(mut vec) = vector.recv_one_vec_i8().await {
395 vec.resize(size as usize, default.clone());
396 check!(resized.send_one_vec_i8(vec).await);
397 } else {
398 break;
399 }
400 }
401}