flow_mel/vec/void.rs
1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Flatten a stream of `Vec<void>`.
5///
6/// All the input vectors are turned into continuous stream of scalar values, keeping order.
7/// ```mermaid
8/// graph LR
9/// T("flatten()")
10/// B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|vector| T
11///
12/// T -->|value| O["🟦 🟦 🟦 🟦 🟦 🟦"]
13///
14/// style B fill:#ffff,stroke:#ffff
15/// style O fill:#ffff,stroke:#ffff
16/// ```
17#[mel_treatment(
18 input vector Stream<Vec<void>>
19 output value Stream<void>
20)]
21pub async fn flatten() {
22 'main: while let Ok(vectors) = vector.recv_vec_void().await {
23 for vec in vectors {
24 check!('main, value.send_void(vec).await)
25 }
26 }
27}
28
29/// Chain two streams of `Vec<void>`.
30///
31///
32/// ```mermaid
33/// graph LR
34/// T("chain()")
35/// A["[🟨 🟨][🟨 🟨 🟨][🟨]"] -->|first| T
36/// B["…[🟪][🟪 🟪]"] -->|second| T
37///
38/// T -->|chained| O["…[🟪][🟪 🟪][🟨 🟨][🟨 🟨 🟨][🟨]"]
39///
40/// style A fill:#ffff,stroke:#ffff
41/// style B fill:#ffff,stroke:#ffff
42/// style O fill:#ffff,stroke:#ffff
43/// ```
44#[mel_treatment(
45 input first Stream<Vec<void>>
46 input second Stream<Vec<void>>
47 output chained Stream<Vec<void>>
48)]
49pub async fn chain() {
50 while let Ok(vectors) = first.recv_vec_void().await {
51 check!(chained.send_vec_void(vectors).await)
52 }
53
54 while let Ok(vectors) = second.recv_vec_void().await {
55 check!(chained.send_vec_void(vectors).await)
56 }
57}
58
59/// Merge two streams of `Vec<void>`.
60///
61/// The two streams are merged using the `select` stream:
62/// - when `true`, vector from `a` is used;
63/// - when `false`, vector from `b` is used.
64///
65/// ℹ️ No vector from either `a` or `b` are discarded, they are used when `select` give turn.
66///
67/// ⚠️ When `select` ends merge terminates without treating the remaining vectors from `a` and `b`.
68/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
69/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
70///
71/// ```mermaid
72/// graph LR
73/// T("merge()")
74/// A["…[🟪 🟪 🟪][🟪 🟪]…"] -->|a| T
75/// B["…[🟨 🟨][🟨][🟨 🟨 🟨]…"] -->|b| T
76/// O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
77///
78///
79/// T -->|value| V["…[🟪 🟪 🟪][🟨 🟨][🟨][🟪 🟪][🟨 🟨 🟨]…"]
80///
81/// style V fill:#ffff,stroke:#ffff
82/// style O fill:#ffff,stroke:#ffff
83/// style A fill:#ffff,stroke:#ffff
84/// style B fill:#ffff,stroke:#ffff
85/// ```
86#[mel_treatment(
87 input a Stream<Vec<void>>
88 input b Stream<Vec<void>>
89 input select Stream<bool>
90 output value Stream<Vec<void>>
91)]
92pub async fn merge() {
93 while let Ok(select) = select.recv_one_bool().await {
94 let val;
95 if select {
96 if let Ok(v) = a.recv_one_vec_void().await {
97 val = v;
98 } else {
99 break;
100 }
101 } else {
102 if let Ok(v) = b.recv_one_vec_void().await {
103 val = v;
104 } else {
105 break;
106 }
107 }
108
109 check!(value.send_one_vec_void(val).await)
110 }
111}
112
113/// Filter a `Vec<void>` stream according to `bool` stream.
114///
115/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
116///
117/// ```mermaid
118/// graph LR
119/// T("filter()")
120/// V["…[🟪 🟪 🟪][🟨 🟨][🟨][🟪 🟪][🟨 🟨 🟨]…"] -->|value| T
121/// D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
122///
123/// T -->|accepted| A["…[🟪 🟪 🟪][🟪 🟪]…"]
124/// T -->|rejected| R["…[🟨 🟨][🟨][🟨 🟨 🟨]…"]
125///
126/// style V fill:#ffff,stroke:#ffff
127/// style D fill:#ffff,stroke:#ffff
128/// style A fill:#ffff,stroke:#ffff
129/// style R fill:#ffff,stroke:#ffff
130/// ```
131#[mel_treatment(
132 input value Stream<Vec<void>>
133 input select Stream<bool>
134 output accepted Stream<Vec<void>>
135 output rejected Stream<Vec<void>>
136)]
137pub async fn filter() {
138 let mut accepted_op = true;
139 let mut rejected_op = true;
140
141 while let (Ok(value), Ok(select)) =
142 futures::join!(value.recv_one_vec_void(), select.recv_one_bool())
143 {
144 if select {
145 if let Err(_) = accepted.send_one_vec_void(value).await {
146 // If we cannot send anymore on accepted, we note it,
147 // and check if rejected is still valid, else just terminate.
148 accepted_op = false;
149 if !rejected_op {
150 break;
151 }
152 }
153 } else {
154 if let Err(_) = rejected.send_one_vec_void(value).await {
155 // If we cannot send anymore on rejected, we note it,
156 // and check if accepted is still valid, else just terminate.
157 rejected_op = false;
158 if !accepted_op {
159 break;
160 }
161 }
162 }
163 }
164}
165
166/// Trigger on `Vec<void>` stream start and end.
167///
168/// Emit `start` when a first value is send through the stream.
169/// Emit `end` when stream is finally over.
170///
171/// Emit `first` with the first vector coming in the stream.
172/// Emit `last` with the last vector coming in the stream.
173///
174/// ℹ️ `start` and `first` are always emitted together.
175/// If the stream only contains one vector, `first` and `last` both contains it.
176/// If the stream never transmit any data before being ended, only `end` is emitted.
177///
178/// ```mermaid
179/// graph LR
180/// T("trigger()")
181/// B["[🟥 🟥] … [🟨 🟨] [🟨 🟨] [🟨 🟨] … [🟩 🟩]"] -->|stream| T
182///
183/// T -->|start| S["〈🟦〉"]
184/// T -->|first| F["〈[🟩 🟩]〉"]
185/// T -->|last| L["〈[🟥 🟥]〉"]
186/// T -->|end| E["〈🟦〉"]
187///
188/// style B fill:#ffff,stroke:#ffff
189/// style S fill:#ffff,stroke:#ffff
190/// style F fill:#ffff,stroke:#ffff
191/// style L fill:#ffff,stroke:#ffff
192/// style E fill:#ffff,stroke:#ffff
193/// ```
194#[mel_treatment(
195 input stream Stream<Vec<void>>
196 output start Block<void>
197 output end Block<void>
198 output first Block<Vec<void>>
199 output last Block<Vec<void>>
200)]
201pub async fn trigger() {
202 let mut last_value = None;
203
204 if let Ok(values) = stream.recv_vec_void().await {
205 let _ = start.send_one_void(()).await;
206 if let Some(val) = values.first().cloned() {
207 let _ = first.send_one_vec_void(val).await;
208 }
209 last_value = values.last().cloned();
210 let _ = futures::join!(start.close(), first.close());
211 }
212
213 while let Ok(values) = stream.recv_vec_void().await {
214 last_value = values.last().cloned();
215 }
216
217 let _ = end.send_one_void(()).await;
218 if let Some(val) = last_value {
219 let _ = last.send_one_vec_void(val).await;
220 }
221
222 // We don't close `end` and `last` explicitly here,
223 // because it would be redundant with boilerplate
224 // implementation of treatments.
225}
226
227/// Stream a block `Vec<void>` element.
228///
229/// ```mermaid
230/// graph LR
231/// T("stream()")
232/// B["〈[🟦]〉"] -->|block| T
233///
234/// T -->|stream| S["[🟦]"]
235///
236///
237/// style B fill:#ffff,stroke:#ffff
238/// style S fill:#ffff,stroke:#ffff
239/// ```
240#[mel_treatment(
241 input block Block<Vec<void>>
242 output stream Stream<Vec<void>>
243)]
244pub async fn stream() {
245 if let Ok(val) = block.recv_one_vec_void().await {
246 let _ = stream.send_one_vec_void(val).await;
247 }
248}
249
250/// Emit a block `Vec<void>` value.
251///
252/// When `trigger` is enabled, `value` is emitted as block.
253///
254/// ```mermaid
255/// graph LR
256/// T("emit(value=[🟨])")
257/// B["〈🟦〉"] -->|trigger| T
258///
259/// T -->|emit| S["〈[🟨]〉"]
260///
261/// style B fill:#ffff,stroke:#ffff
262/// style S fill:#ffff,stroke:#ffff
263/// ```
264#[mel_treatment(
265 input trigger Block<void>
266 output emit Block<Vec<void>>
267)]
268pub async fn emit(value: Vec<void>) {
269 if let Ok(_) = trigger.recv_one_void().await {
270 let _ = emit.send_one_vec_void(value).await;
271 }
272}
273
274/// Gives count of elements passing through stream.
275///
276/// This count increment one for each vector within the stream, starting at 1.
277/// ℹ️ The count is independant from vector sizes.
278///
279/// ```mermaid
280/// graph LR
281/// T("count()")
282/// V["[🟦 🟦][🟦][🟦 🟦 🟦]…"] -->|stream| T
283///
284/// T -->|count| P["1️⃣ 2️⃣ 3️⃣ …"]
285///
286/// style V fill:#ffff,stroke:#ffff
287/// style P fill:#ffff,stroke:#ffff
288/// ```
289#[mel_treatment(
290 input stream Stream<Vec<void>>
291 output count Stream<u128>
292)]
293pub async fn count() {
294 let mut i: u128 = 0;
295 while let Ok(iter) = stream.recv_vec_void().await {
296 let next_i = i + iter.len() as u128;
297 check!(count.send_u128((i..next_i).collect()).await);
298 i = next_i;
299 }
300}
301
302/// Gives size of vectors passing through stream.
303///
304/// For each vector one `size` value is sent, giving the number of elements contained within matching vector.
305///
306/// ```mermaid
307/// graph LR
308/// T("size()")
309/// V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
310///
311/// T -->|size| P["2️⃣ 1️⃣ 0️⃣ 3️⃣ …"]
312///
313/// style V fill:#ffff,stroke:#ffff
314/// style P fill:#ffff,stroke:#ffff
315/// ```
316#[mel_treatment(
317 input vector Stream<Vec<void>>
318 output size Stream<u64>
319)]
320pub async fn size() {
321 while let Ok(iter) = vector.recv_vec_void().await {
322 check!(
323 size.send_u64(iter.into_iter().map(|v| v.len() as u64).collect())
324 .await
325 );
326 }
327}
328
329/// Resize vectors according to given streamed size.
330///
331/// ```mermaid
332/// graph LR
333/// T("resize()")
334/// V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
335/// S["3️⃣ 2️⃣ 3️⃣ 2️⃣ …"] -->|size| T
336///
337/// T -->|resized| P["[🟦 🟦 🟦][🟦 🟦][🟦 🟦 🟦][🟦 🟦]…"]
338///
339/// style V fill:#ffff,stroke:#ffff
340/// style S fill:#ffff,stroke:#ffff
341/// style P fill:#ffff,stroke:#ffff
342/// ```
343#[mel_treatment(
344 input vector Stream<Vec<void>>
345 input size Stream<u64>
346 output resized Stream<Vec<void>>
347)]
348pub async fn resize() {
349 while let Ok(size) = size.recv_one_u64().await {
350 if let Ok(mut vec) = vector.recv_one_vec_void().await {
351 vec.resize(size as usize, ());
352 check!(resized.send_one_vec_void(vec).await);
353 } else {
354 break;
355 }
356 }
357}
358
359/// Generate a stream of empty `Vec<void>` according to a length.
360///
361/// ```mermaid
362/// graph LR
363/// T("generate()")
364/// B["〈🟨〉"] -->|length| T
365///
366/// T -->|stream| S["… [][][][][]"]
367///
368///
369/// style B fill:#ffff,stroke:#ffff
370/// style S fill:#ffff,stroke:#ffff
371/// ```
372#[mel_treatment(
373 input length Block<u128>
374 output stream Stream<Vec<void>>
375)]
376pub async fn generate() {
377 if let Ok(length) = length.recv_one_u128().await {
378 const CHUNK: u128 = 2u128.pow(20);
379 let mut total = 0u128;
380 while total < length {
381 let chunk = u128::min(CHUNK, length - total) as usize;
382 check!(stream.send_vec_void(vec![vec![]; chunk]).await);
383 total += chunk as u128;
384 }
385 }
386}
387
388/// Generate a stream of empty `Vec<void>` indefinitely.
389///
390/// This generates a continuous stream of `Vec<void>`, until stream consumers closes it.
391///
392/// ```mermaid
393/// graph LR
394/// T("generateIndefinitely()")
395/// B["〈🟦〉"] -->|trigger| T
396///
397/// T -->|stream| S["… [][][][][]"]
398///
399///
400/// style B fill:#ffff,stroke:#ffff
401/// style S fill:#ffff,stroke:#ffff
402/// ```
403#[mel_treatment(
404 input trigger Block<void>
405 output stream Stream<Vec<void>>
406)]
407pub async fn generate_indefinitely() {
408 if let Ok(_) = trigger.recv_one_void().await {
409 const CHUNK: usize = 2usize.pow(20);
410 loop {
411 check!(stream.send_vec_void(vec![vec![]; CHUNK]).await);
412 }
413 }
414}