std_mel/flow/mod.rs
1use futures::{pin_mut, select, FutureExt};
2use melodium_core::common::executive::{GetData, Value};
3use melodium_macro::{check, mel_treatment};
4use std::collections::VecDeque;
5
6pub mod vec;
7
8/// Chain two streams.
9///
10///
11/// ```mermaid
12/// graph LR
13/// T("chain()")
14/// A["🟨 🟨 🟨 🟨 🟨 🟨"] -->|first| T
15/// B["… 🟪 🟪 🟪"] -->|second| T
16///
17/// T -->|chained| O["… 🟪 🟪 🟪 🟨 🟨 🟨 🟨 🟨 🟨"]
18///
19/// style A fill:#ffff,stroke:#ffff
20/// style B fill:#ffff,stroke:#ffff
21/// style O fill:#ffff,stroke:#ffff
22/// ```
23#[mel_treatment(
24 generic T ()
25 input first Stream<T>
26 input second Stream<T>
27 output chained Stream<T>
28)]
29pub async fn chain() {
30 while let Ok(values) = first.recv_many().await {
31 check!(chained.send_many(values).await)
32 }
33
34 while let Ok(values) = second.recv_many().await {
35 check!(chained.send_many(values).await)
36 }
37}
38
39/// Trigger on a stream start and end.
40///
41/// Emit `start` when a first value is send through the stream.
42/// Emit `end` when stream is finally over.
43///
44/// Emit `first` with the first value coming in the stream.
45/// Emit `last` with the last value coming in the stream.
46///
47/// ℹ️ `start` and `first` are always emitted together.
48/// If the stream only contains one element, `first` and `last` both contains it.
49/// If the stream never transmit any data before being ended, only `end` is emitted.
50///
51/// ```mermaid
52/// graph LR
53/// T("trigger()")
54/// B["🟥 … 🟨 🟨 🟨 🟨 🟨 🟨 … 🟩"] -->|stream| T
55///
56/// T -->|start| S["〈🟦〉"]
57/// T -->|first| F["〈🟩〉"]
58/// T -->|last| L["〈🟥〉"]
59/// T -->|end| E["〈🟦〉"]
60///
61/// style B fill:#ffff,stroke:#ffff
62/// style S fill:#ffff,stroke:#ffff
63/// style F fill:#ffff,stroke:#ffff
64/// style L fill:#ffff,stroke:#ffff
65/// style E fill:#ffff,stroke:#ffff
66/// ```
67#[mel_treatment(
68 generic T ()
69 input stream Stream<T>
70 output start Block<void>
71 output end Block<void>
72 output first Block<T>
73 output last Block<T>
74)]
75pub async fn trigger() {
76 let mut last_value = None;
77
78 if let Ok(mut values) = stream.recv_many().await {
79 let _ = start.send_one(().into()).await;
80 if let Some(val) = values.pop_front() {
81 let _ = first.send_one(val).await;
82 }
83 last_value = Into::<VecDeque<Value>>::into(values).pop_back();
84 let _ = futures::join!(start.close(), first.close());
85 }
86
87 while let Ok(values) = stream.recv_many().await {
88 last_value = Into::<VecDeque<Value>>::into(values).pop_back();
89 }
90
91 let _ = end.send_one(().into()).await;
92 if let Some(val) = last_value {
93 let _ = last.send_one(val).await;
94 }
95
96 // We don't close `end` and `last` explicitly here,
97 // because it would be redundant with boilerplate
98 // implementation of treatments.
99}
100
101/// Check a blocking value.
102///
103/// When `value` block is received, `check` is emitted.
104///
105/// ```mermaid
106/// graph LR
107/// T("check()")
108/// B["〈🟨〉"] -->|value| T
109///
110/// T -->|check| S["〈🟦〉"]
111///
112/// style B fill:#ffff,stroke:#ffff
113/// style S fill:#ffff,stroke:#ffff
114/// ```
115#[mel_treatment(
116 generic T ()
117 input value Block<T>
118 output check Block<void>
119)]
120pub async fn check() {
121 if let Ok(_) = value.recv_one().await {
122 let _ = check.send_one(().into()).await;
123 }
124}
125
126/// Emit a blocking value.
127///
128/// When `trigger` is enabled, `value` is emitted as block.
129///
130/// ```mermaid
131/// graph LR
132/// T("emit(value=🟨)")
133/// B["〈🟦〉"] -->|trigger| T
134///
135/// T -->|emit| S["〈🟨〉"]
136///
137/// style B fill:#ffff,stroke:#ffff
138/// style S fill:#ffff,stroke:#ffff
139/// ```
140#[mel_treatment(
141 generic T ()
142 input trigger Block<void>
143 output emit Block<T>
144)]
145pub async fn emit(value: T) {
146 if let Ok(_) = trigger.recv_one().await {
147 let _ = emit.send_one(value).await;
148 }
149}
150
151/// Stream a blocking value.
152///
153/// ```mermaid
154/// graph LR
155/// T("stream()")
156/// B["〈🟦〉"] -->|block| T
157///
158/// T -->|stream| S["🟦"]
159///
160///
161/// style B fill:#ffff,stroke:#ffff
162/// style S fill:#ffff,stroke:#ffff
163/// ```
164#[mel_treatment(
165 generic T ()
166 input block Block<T>
167 output stream Stream<T>
168)]
169pub async fn stream() {
170 if let Ok(val) = block.recv_one().await {
171 let _ = stream.send_one(val).await;
172 }
173}
174
175/// Merge two streams.
176///
177/// The two streams are merged without predictible order.
178///
179/// ℹ️ Merge continues as long as `a` or `b` continues too, while the other can be ended.
180///
181/// ```mermaid
182/// graph LR
183/// T("merge()")
184/// A["… 🟦 🟫 …"] -->|a| T
185/// B["… 🟧 🟪 🟨 …"] -->|b| T
186///
187///
188/// T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
189///
190/// style V fill:#ffff,stroke:#ffff
191/// style A fill:#ffff,stroke:#ffff
192/// style B fill:#ffff,stroke:#ffff
193/// ```
194#[mel_treatment(
195 generic T ()
196 input a Stream<T>
197 input b Stream<T>
198 output value Stream<T>
199)]
200pub async fn merge() {
201 let xa = async {
202 while let Ok(a) = (&a).recv_many().await {
203 check!(value.send_many(a).await);
204 }
205 }
206 .fuse();
207 let xb = async {
208 while let Ok(b) = (&b).recv_many().await {
209 check!(value.send_many(b).await);
210 }
211 }
212 .fuse();
213
214 pin_mut!(xa, xb);
215
216 loop {
217 select! {
218 () = xa => {},
219 () = xb => {},
220 complete => break,
221 };
222 }
223}
224
225/// Arrange two streams as one.
226///
227/// The two streams are merged using the `select` stream:
228/// - when `true`, value from `a` is used;
229/// - when `false`, value from `b` is used.
230///
231/// ℹ️ No value from either `a` or `b` are discarded, they are used when `select` give turn.
232///
233/// ⚠️ When `select` ends merge terminates without treating the remaining values from `a` and `b`.
234/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
235/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
236///
237/// ```mermaid
238/// graph LR
239/// T("arrange()")
240/// A["… 🟦 🟫 …"] -->|a| T
241/// B["… 🟧 🟪 🟨 …"] -->|b| T
242/// O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
243///
244///
245/// T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
246///
247/// style V fill:#ffff,stroke:#ffff
248/// style O fill:#ffff,stroke:#ffff
249/// style A fill:#ffff,stroke:#ffff
250/// style B fill:#ffff,stroke:#ffff
251/// ```
252#[mel_treatment(
253 generic T ()
254 input a Stream<T>
255 input b Stream<T>
256 input select Stream<bool>
257 output value Stream<T>
258)]
259pub async fn arrange() {
260 while let Ok(select) = select
261 .recv_one()
262 .await
263 .map(|val| GetData::<bool>::try_data(val).unwrap())
264 {
265 let val;
266 if select {
267 if let Ok(v) = a.recv_one().await {
268 val = v;
269 } else {
270 break;
271 }
272 } else {
273 if let Ok(v) = b.recv_one().await {
274 val = v;
275 } else {
276 break;
277 }
278 }
279
280 check!(value.send_one(val).await)
281 }
282}
283
284/// Fill a pattern stream with a `value.
285///
286/// ```mermaid
287/// graph LR
288/// T("fill(value=🟧)")
289/// B["… 🟦 🟦 🟦 …"] -->|pattern| T
290///
291/// T -->|filled| O["… 🟧 🟧 🟧 …"]
292///
293/// style B fill:#ffff,stroke:#ffff
294/// style O fill:#ffff,stroke:#ffff
295/// ```
296#[mel_treatment(
297 generic T ()
298 input pattern Stream<void>
299 output filled Stream<T>
300)]
301pub async fn fill(value: T) {
302 while let Ok(pat) = pattern.recv_many().await {
303 let mut transmission = melodium_core::TransmissionValue::new(value.clone());
304 for _ in 1..pat.len() {
305 transmission.push(value.clone());
306 }
307 check!(filled.send_many(transmission).await)
308 }
309}
310
311/// Filter a stream according to `bool` stream.
312///
313/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
314///
315/// ```mermaid
316/// graph LR
317/// T("filter()")
318/// V["… 🟦 🟧 🟪 🟫 🟨 …"] -->|value| T
319/// D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
320///
321/// T -->|accepted| A["… 🟦 🟫 …"]
322/// T -->|rejected| R["… 🟧 🟪 🟨 …"]
323///
324/// style V fill:#ffff,stroke:#ffff
325/// style D fill:#ffff,stroke:#ffff
326/// style A fill:#ffff,stroke:#ffff
327/// style R fill:#ffff,stroke:#ffff
328/// ```
329#[mel_treatment(
330 generic T ()
331 input value Stream<T>
332 input select Stream<bool>
333 output accepted Stream<T>
334 output rejected Stream<T>
335)]
336pub async fn filter() {
337 let mut accepted_op = true;
338 let mut rejected_op = true;
339
340 while let (Ok(value), Ok(select)) = futures::join!(value.recv_one(), select.recv_one()) {
341 let select = GetData::<bool>::try_data(select).unwrap();
342 if select {
343 if let Err(_) = accepted.send_one(value).await {
344 // If we cannot send anymore on accepted, we note it,
345 // and check if rejected is still valid, else just terminate.
346 accepted_op = false;
347 if !rejected_op {
348 break;
349 }
350 }
351 } else {
352 if let Err(_) = rejected.send_one(value).await {
353 // If we cannot send anymore on rejected, we note it,
354 // and check if accepted is still valid, else just terminate.
355 rejected_op = false;
356 if !accepted_op {
357 break;
358 }
359 }
360 }
361 }
362}
363
364/// Fit a stream into a pattern.
365///
366/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
367///
368/// ```mermaid
369/// graph LR
370/// T("fit()")
371/// A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
372/// B["🟦 🟦 🟦 🟦"] -->|pattern| T
373///
374/// T -->|fitted| O["🟨 🟨 🟨 🟨"]
375///
376/// style A fill:#ffff,stroke:#ffff
377/// style B fill:#ffff,stroke:#ffff
378/// style O fill:#ffff,stroke:#ffff
379/// ```
380#[mel_treatment(
381 generic T ()
382 input value Stream<T>
383 input pattern Stream<void>
384 output fitted Stream<T>
385)]
386pub async fn fit() {
387 'main: while let Ok(pattern) = pattern
388 .recv_many()
389 .await
390 .map(|values| TryInto::<Vec<()>>::try_into(values).unwrap())
391 {
392 for _ in pattern {
393 if let Ok(val) = value.recv_one().await {
394 check!('main, fitted.send_one(val).await)
395 } else {
396 break 'main;
397 }
398 }
399 }
400}
401
402/// Gives count of elements passing through stream.
403///
404/// This count increment one for each element within the stream, starting at 1.
405///
406/// ```mermaid
407/// graph LR
408/// T("count()")
409/// V["🟦 🟦 🟦 …"] -->|iter| T
410///
411/// T -->|count| P["1️⃣ 2️⃣ 3️⃣ …"]
412///
413/// style V fill:#ffff,stroke:#ffff
414/// style P fill:#ffff,stroke:#ffff
415/// ```
416#[mel_treatment(
417 generic T ()
418 input stream Stream<T>
419 output count Stream<u128>
420)]
421pub async fn count() {
422 let mut i: u128 = 0;
423 while let Ok(iter) = stream.recv_many().await {
424 let next_i = i + iter.len() as u128;
425 check!(
426 count
427 .send_many((i..next_i).collect::<VecDeque<_>>().into())
428 .await
429 );
430 i = next_i;
431 }
432}
433
434/// Generate a stream with a given length.
435///
436/// ```mermaid
437/// graph LR
438/// T("generate()")
439/// B["〈🟨〉"] -->|length| T
440///
441/// T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
442///
443///
444/// style B fill:#ffff,stroke:#ffff
445/// style S fill:#ffff,stroke:#ffff
446/// ```
447#[mel_treatment(
448 generic T ()
449 input length Block<u128>
450 output stream Stream<T>
451)]
452pub async fn generate(data: T) {
453 if let Ok(length) = length
454 .recv_one()
455 .await
456 .map(|val| GetData::<u128>::try_data(val).unwrap())
457 {
458 const CHUNK: u128 = 2u128.pow(20);
459 let mut total = 0u128;
460 while total < length {
461 let chunk = u128::min(CHUNK, length - total) as usize;
462 let mut transmission = melodium_core::TransmissionValue::new(data.clone());
463 for _ in 1..chunk {
464 transmission.push(data.clone());
465 }
466 check!(stream.send_many(transmission).await);
467 total += chunk as u128;
468 }
469 }
470}
471
472/// Generate a stream indefinitely.
473///
474/// This generates a continuous stream, until stream consumers closes it.
475///
476/// ```mermaid
477/// graph LR
478/// T("generateIndefinitely()")
479/// B["〈🟦〉"] -->|trigger| T
480///
481/// T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
482///
483///
484/// style B fill:#ffff,stroke:#ffff
485/// style S fill:#ffff,stroke:#ffff
486/// ```
487#[mel_treatment(
488 generic T ()
489 input trigger Block<void>
490 output stream Stream<T>
491)]
492pub async fn generate_indefinitely(data: T) {
493 if let Ok(_) = trigger.recv_one().await {
494 const CHUNK: usize = 2usize.pow(20);
495 loop {
496 let mut transmission = melodium_core::TransmissionValue::new(data.clone());
497 for _ in 1..CHUNK {
498 transmission.push(data.clone());
499 }
500 check!(stream.send_many(transmission).await);
501 }
502 }
503}
504
505/// Insert a block into a stream.
506///
507/// `block` is inserted into `stream` when it comes and everything is streamed to `output`.
508///
509/// ℹ️ No assumption on block insertion position in stream can be made.
510///
511/// ```mermaid
512/// graph LR
513/// T("insert()")
514/// A["… 🟦 🟦 🟦 🟦 …"] -->|stream| T
515/// B["〈🟧〉"] -->|block| T
516///
517///
518/// T -->|output| V["… 🟦 🟧 🟦 🟦 🟦 …"]
519///
520/// style V fill:#ffff,stroke:#ffff
521/// style A fill:#ffff,stroke:#ffff
522/// style B fill:#ffff,stroke:#ffff
523/// ```
524#[mel_treatment(
525 generic T ()
526 input stream Stream<T>
527 input block Block<T>
528 output output Stream<T>
529)]
530pub async fn insert() {
531 let streaming = async {
532 while let Ok(values) = (&stream).recv_many().await {
533 check!(output.send_many(values).await);
534 }
535 }
536 .fuse();
537 let insert_block = async {
538 if let Ok(val) = (&block).recv_one().await {
539 let _ = output.send_one(val).await;
540 }
541 }
542 .fuse();
543
544 pin_mut!(streaming, insert_block);
545
546 loop {
547 select! {
548 () = streaming => {},
549 () = insert_block => {},
550 complete => break,
551 };
552 }
553}
554
555/// Merge two incoming blocks as a stream.
556///
557/// Each block is taken when it arrives and send through `stream`.
558///
559/// ℹ️ No priority on blocks order in stream can be assumed.
560///
561/// ```mermaid
562/// graph LR
563/// T("flock()")
564/// A["〈🟦〉"] -->|a| T
565/// B["〈🟧〉"] -->|b| T
566///
567///
568/// T -->|stream| V["🟧 🟦"]
569///
570/// style V fill:#ffff,stroke:#ffff
571/// style A fill:#ffff,stroke:#ffff
572/// style B fill:#ffff,stroke:#ffff
573/// ```
574#[mel_treatment(
575 generic T ()
576 input a Block<T>
577 input b Block<T>
578 output stream Stream<T>
579)]
580pub async fn flock() {
581 let xa = async {
582 if let Ok(a) = (&a).recv_one().await {
583 let _ = stream.send_one(a).await;
584 }
585 }
586 .fuse();
587 let xb = async {
588 if let Ok(b) = (&b).recv_one().await {
589 let _ = stream.send_one(b).await;
590 }
591 }
592 .fuse();
593
594 pin_mut!(xa, xb);
595
596 loop {
597 select! {
598 () = xa => {},
599 () = xb => {},
600 complete => break,
601 };
602 }
603}
604
605/// Emit one block.
606///
607/// Take first block coming among `a` or `b` and emit it in `value`, ignoring the remaining one.
608///
609/// ℹ️ No priority between blocks can be assumed if they are ready at same moment.
610///
611/// ```mermaid
612/// graph LR
613/// T("one()")
614/// A["…"] -->|a| T
615/// B["〈🟧〉"] -->|b| T
616///
617///
618/// T -->|value| V["〈🟧〉"]
619///
620/// style V fill:#ffff,stroke:#ffff
621/// style A fill:#ffff,stroke:#ffff
622/// style B fill:#ffff,stroke:#ffff
623/// ```
624#[mel_treatment(
625 generic T ()
626 input a Block<T>
627 input b Block<T>
628 output value Block<T>
629)]
630pub async fn one() {
631 let xa = async { (&a).recv_one().await.ok() }.fuse();
632 let xb = async { (&b).recv_one().await.ok() }.fuse();
633
634 pin_mut!(xa, xb);
635
636 loop {
637 let val = select! {
638 val = xa => val,
639 val = xb => val,
640 complete => break,
641 };
642
643 if let Some(val) = val {
644 let _ = value.send_one(val).await;
645 break;
646 }
647 }
648}