std_mel/flow/
mod.rs

1use futures::{pin_mut, select, FutureExt};
2use melodium_core::common::executive::{GetData, Value};
3use melodium_macro::{check, mel_treatment};
4use std::collections::VecDeque;
5
6pub mod concentrate;
7pub mod vec;
8
9/// Chain two streams.
10///
11///
12/// ```mermaid
13/// graph LR
14///     T("chain()")
15///     A["🟨 🟨 🟨 🟨 🟨 🟨"] -->|first| T
16///     B["… 🟪 🟪 🟪"] -->|second| T
17///     
18///     T -->|chained| O["… 🟪 🟪 🟪 🟨 🟨 🟨 🟨 🟨 🟨"]
19///
20///     style A fill:#ffff,stroke:#ffff
21///     style B fill:#ffff,stroke:#ffff
22///     style O fill:#ffff,stroke:#ffff
23/// ```
24#[mel_treatment(
25    generic T ()
26    input first Stream<T>
27    input second Stream<T>
28    output chained Stream<T>
29)]
30pub async fn chain() {
31    while let Ok(values) = first.recv_many().await {
32        check!(chained.send_many(values).await)
33    }
34
35    while let Ok(values) = second.recv_many().await {
36        check!(chained.send_many(values).await)
37    }
38}
39
40/// Trigger on a stream start and end.
41///
42/// Emit `start` when a first value is send through the stream.
43/// Emit `end` when stream is finally over.
44///
45/// Emit `first` with the first value coming in the stream.
46/// Emit `last` with the last value coming in the stream.
47///
48/// ℹ️ `start` and `first` are always emitted together.
49/// If the stream only contains one element, `first` and `last` both contains it.
50/// If the stream never transmit any data before being ended, only `end` is emitted.
51///
52/// ```mermaid
53/// graph LR
54///     T("trigger()")
55///     B["🟥 … 🟨 🟨 🟨 🟨 🟨 🟨 … 🟩"] -->|stream| T
56///     
57///     T -->|start| S["〈🟦〉"]
58///     T -->|first| F["〈🟩〉"]
59///     T -->|last| L["〈🟥〉"]
60///     T -->|end| E["〈🟦〉"]
61///
62///     style B fill:#ffff,stroke:#ffff
63///     style S fill:#ffff,stroke:#ffff
64///     style F fill:#ffff,stroke:#ffff
65///     style L fill:#ffff,stroke:#ffff
66///     style E fill:#ffff,stroke:#ffff
67/// ```
68#[mel_treatment(
69    generic T ()
70    input stream Stream<T>
71    output start Block<void>
72    output end Block<void>
73    output first Block<T>
74    output last Block<T>
75)]
76pub async fn trigger() {
77    let mut last_value = None;
78
79    if let Ok(mut values) = stream.recv_many().await {
80        let _ = start.send_one(().into()).await;
81        if let Some(val) = values.pop_front() {
82            let _ = first.send_one(val.clone()).await;
83            last_value = Some(val);
84        }
85        if let Some(val) = Into::<VecDeque<Value>>::into(values).pop_back() {
86            last_value = Some(val);
87        }
88
89        let _ = futures::join!(start.close(), first.close());
90    }
91
92    while let Ok(values) = stream.recv_many().await {
93        last_value = Into::<VecDeque<Value>>::into(values).pop_back();
94    }
95
96    let _ = end.send_one(().into()).await;
97    if let Some(val) = last_value {
98        let _ = last.send_one(val).await;
99    }
100
101    // We don't close `end` and `last` explicitly here,
102    // because it would be redundant with boilerplate
103    // implementation of treatments.
104}
105
106/// Check a blocking value.
107///
108/// When `value` block is received, `check` is emitted.
109///
110/// ```mermaid
111/// graph LR
112///     T("check()")
113///     B["〈🟨〉"] -->|value| T
114///         
115///     T -->|check| S["〈🟦〉"]
116///     
117///     style B fill:#ffff,stroke:#ffff
118///     style S fill:#ffff,stroke:#ffff
119/// ```
120#[mel_treatment(
121    generic T ()
122    input value Block<T>
123    output check Block<void>
124)]
125pub async fn check() {
126    if let Ok(_) = value.recv_one().await {
127        let _ = check.send_one(().into()).await;
128    }
129}
130
131/// Uncheck a blocking value.
132///
133/// When `value` block stream is closed without receiving anything, `uncheck` is emitted.
134///
135/// ```mermaid
136/// graph LR
137///     T("uncheck()")
138///     B["〈🟨〉"] -->|value| T
139///         
140///     T -->|uncheck| S["〈🟦〉"]
141///     
142///     style B fill:#ffff,stroke:#ffff
143///     style S fill:#ffff,stroke:#ffff
144/// ```
145#[mel_treatment(
146    generic T ()
147    input value Block<T>
148    output uncheck Block<void>
149)]
150pub async fn uncheck() {
151    if let Err(_) = value.recv_one().await {
152        let _ = uncheck.send_one(().into()).await;
153    }
154}
155
156/// Emit a blocking value.
157///
158/// When `trigger` is enabled, `value` is emitted as block.
159///
160/// ```mermaid
161/// graph LR
162///     T("emit(value=🟨)")
163///     B["〈🟦〉"] -->|trigger| T
164///         
165///     T -->|emit| S["〈🟨〉"]
166///     
167///     style B fill:#ffff,stroke:#ffff
168///     style S fill:#ffff,stroke:#ffff
169/// ```
170#[mel_treatment(
171    generic T ()
172    input trigger Block<void>
173    output emit Block<T>
174)]
175pub async fn emit(value: T) {
176    if let Ok(_) = trigger.recv_one().await {
177        let _ = emit.send_one(value).await;
178    }
179}
180
181/// Stream a blocking value.
182///
183/// ```mermaid
184/// graph LR
185///     T("stream()")
186///     B["〈🟦〉"] -->|block| T
187///         
188///     T -->|stream| S["🟦"]
189///     
190///     
191///     style B fill:#ffff,stroke:#ffff
192///     style S fill:#ffff,stroke:#ffff
193/// ```
194#[mel_treatment(
195    generic T ()
196    input block Block<T>
197    output stream Stream<T>
198)]
199pub async fn stream() {
200    if let Ok(val) = block.recv_one().await {
201        let _ = stream.send_one(val).await;
202    }
203}
204
205/// Merge two streams.
206///
207/// The two streams are merged without predictible order.
208///
209/// ℹ️ Merge continues as long as `a` or `b` continues too, while the other can be ended.
210///
211/// ```mermaid
212/// graph LR
213///     T("merge()")
214///     A["… 🟦 🟫 …"] -->|a| T
215///     B["… 🟧 🟪 🟨 …"] -->|b| T
216///     
217///
218///     T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
219///
220///     style V fill:#ffff,stroke:#ffff
221///     style A fill:#ffff,stroke:#ffff
222///     style B fill:#ffff,stroke:#ffff
223/// ```
224#[mel_treatment(
225    generic T ()
226    input a Stream<T>
227    input b Stream<T>
228    output value Stream<T>
229)]
230pub async fn merge() {
231    let xa = async {
232        while let Ok(a) = (&a).recv_many().await {
233            check!(value.send_many(a).await);
234        }
235    }
236    .fuse();
237    let xb = async {
238        while let Ok(b) = (&b).recv_many().await {
239            check!(value.send_many(b).await);
240        }
241    }
242    .fuse();
243
244    pin_mut!(xa, xb);
245
246    loop {
247        select! {
248            () = xa => {},
249            () = xb => {},
250            complete => break,
251        };
252    }
253}
254
255/// Arrange two streams as one.
256///
257/// The two streams are merged using the `select` stream:
258/// - when `true`, value from `a` is used;
259/// - when `false`, value from `b` is used.
260///
261/// ℹ️ No value from either `a` or `b` are discarded, they are used when `select` give turn.
262///
263/// ⚠️ When `select` ends merge terminates without treating the remaining values from `a` and `b`.
264/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
265/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
266///
267/// ```mermaid
268/// graph LR
269///     T("arrange()")
270///     A["… 🟦 🟫 …"] -->|a| T
271///     B["… 🟧 🟪 🟨 …"] -->|b| T
272///     O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
273///     
274///
275///     T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
276///
277///     style V fill:#ffff,stroke:#ffff
278///     style O fill:#ffff,stroke:#ffff
279///     style A fill:#ffff,stroke:#ffff
280///     style B fill:#ffff,stroke:#ffff
281/// ```
282#[mel_treatment(
283    generic T ()
284    input a Stream<T>
285    input b Stream<T>
286    input select Stream<bool>
287    output value Stream<T>
288)]
289pub async fn arrange() {
290    while let Ok(select) = select
291        .recv_one()
292        .await
293        .map(|val| GetData::<bool>::try_data(val).unwrap())
294    {
295        let val;
296        if select {
297            if let Ok(v) = a.recv_one().await {
298                val = v;
299            } else {
300                break;
301            }
302        } else {
303            if let Ok(v) = b.recv_one().await {
304                val = v;
305            } else {
306                break;
307            }
308        }
309
310        check!(value.send_one(val).await)
311    }
312}
313
314/// Fill a pattern stream with a `value.
315///
316/// ```mermaid
317/// graph LR
318/// T("fill(value=🟧)")
319/// B["… 🟦 🟦 🟦 …"] -->|pattern| T
320///
321/// T -->|filled| O["… 🟧 🟧 🟧 …"]
322///
323/// style B fill:#ffff,stroke:#ffff
324/// style O fill:#ffff,stroke:#ffff
325/// ```
326#[mel_treatment(
327    generic T ()
328    input pattern Stream<void>
329    output filled Stream<T>
330)]
331pub async fn fill(value: T) {
332    while let Ok(pat) = pattern.recv_many().await {
333        let mut transmission = melodium_core::TransmissionValue::new(value.clone());
334        for _ in 1..pat.len() {
335            transmission.push(value.clone());
336        }
337        check!(filled.send_many(transmission).await)
338    }
339}
340
341/// Filter a stream according to `bool` stream.
342///
343/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
344///  
345/// ```mermaid
346/// graph LR
347///     T("filter()")
348///     V["… 🟦 🟧 🟪 🟫 🟨 …"] -->|value| T
349///     D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
350///     
351///     T -->|accepted| A["… 🟦 🟫 …"]
352///     T -->|rejected| R["… 🟧 🟪 🟨 …"]
353///
354///     style V fill:#ffff,stroke:#ffff
355///     style D fill:#ffff,stroke:#ffff
356///     style A fill:#ffff,stroke:#ffff
357///     style R fill:#ffff,stroke:#ffff
358/// ```
359#[mel_treatment(
360    generic T ()
361    input value Stream<T>
362    input select Stream<bool>
363    output accepted Stream<T>
364    output rejected Stream<T>
365)]
366pub async fn filter() {
367    let mut accepted_op = true;
368    let mut rejected_op = true;
369
370    while let (Ok(value), Ok(select)) = futures::join!(value.recv_one(), select.recv_one()) {
371        let select = GetData::<bool>::try_data(select).unwrap();
372        if select {
373            if let Err(_) = accepted.send_one(value).await {
374                // If we cannot send anymore on accepted, we note it,
375                // and check if rejected is still valid, else just terminate.
376                accepted_op = false;
377                if !rejected_op {
378                    break;
379                }
380            }
381        } else {
382            if let Err(_) = rejected.send_one(value).await {
383                // If we cannot send anymore on rejected, we note it,
384                // and check if accepted is still valid, else just terminate.
385                rejected_op = false;
386                if !accepted_op {
387                    break;
388                }
389            }
390        }
391    }
392}
393
394/// Filter a block according to `bool` value.
395///
396/// ℹ️ If `select` is never received nothing is emitted.
397///  
398/// ```mermaid
399/// graph LR
400///     T("filterBlock()")
401///     V["〈🟦〉"] -->|value| T
402///     D["〈🟩〉"] -->|select|T
403///     
404///     T -->|accepted| A["〈🟦〉"]
405///     T -->|rejected| R[" "]
406///
407///     style V fill:#ffff,stroke:#ffff
408///     style D fill:#ffff,stroke:#ffff
409///     style A fill:#ffff,stroke:#ffff
410///     style R fill:#ffff,stroke:#ffff
411/// ```
412#[mel_treatment(
413    generic T ()
414    input value Block<T>
415    input select Block<bool>
416    output accepted Block<T>
417    output rejected Block<T>
418)]
419pub async fn filterBlock() {
420    if let (Ok(value), Ok(select)) = futures::join!(value.recv_one(), select.recv_one()) {
421        let select = GetData::<bool>::try_data(select).unwrap();
422        if select {
423            let _ = accepted.send_one(value).await;
424        } else {
425            let _ = rejected.send_one(value).await;
426        }
427    }
428}
429
430/// Fit a stream into a pattern.
431///
432/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
433///
434/// ```mermaid
435/// graph LR
436///     T("fit()")
437///     A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
438///     B["🟦 🟦 🟦 🟦"] -->|pattern| T
439///     
440///     T -->|fitted| O["🟨 🟨 🟨 🟨"]
441///
442///     style A fill:#ffff,stroke:#ffff
443///     style B fill:#ffff,stroke:#ffff
444///     style O fill:#ffff,stroke:#ffff
445/// ```
446#[mel_treatment(
447    generic T ()
448    input value Stream<T>
449    input pattern Stream<void>
450    output fitted Stream<T>
451)]
452pub async fn fit() {
453    'main: while let Ok(pattern) = pattern
454        .recv_many()
455        .await
456        .map(|values| TryInto::<Vec<()>>::try_into(values).unwrap())
457    {
458        for _ in pattern {
459            if let Ok(val) = value.recv_one().await {
460                check!('main, fitted.send_one(val).await)
461            } else {
462                break 'main;
463            }
464        }
465    }
466}
467
468/// Gives count of elements passing through stream.
469///
470/// This count increment one for each element within the stream, starting at 1.
471///
472/// ```mermaid
473/// graph LR
474///     T("count()")
475///     V["🟦 🟦 🟦 …"] -->|iter| T
476///     
477///     T -->|count| P["1️⃣ 2️⃣ 3️⃣ …"]
478///
479///     style V fill:#ffff,stroke:#ffff
480///     style P fill:#ffff,stroke:#ffff
481/// ```
482#[mel_treatment(
483    generic T ()
484    input stream Stream<T>
485    output count Stream<u128>
486)]
487pub async fn count() {
488    let mut i: u128 = 0;
489    while let Ok(iter) = stream.recv_many().await {
490        let next_i = i + iter.len() as u128;
491        check!(
492            count
493                .send_many((i..next_i).collect::<VecDeque<_>>().into())
494                .await
495        );
496        i = next_i;
497    }
498}
499
500/// Generate a stream with a given length.
501///
502/// ```mermaid
503/// graph LR
504///     T("generate()")
505///     B["〈🟨〉"] -->|length| T
506///         
507///     T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
508///     
509///     
510///     style B fill:#ffff,stroke:#ffff
511///     style S fill:#ffff,stroke:#ffff
512/// ```
513#[mel_treatment(
514    generic T ()
515    input length Block<u128>
516    output stream Stream<T>
517)]
518pub async fn generate(data: T) {
519    if let Ok(length) = length
520        .recv_one()
521        .await
522        .map(|val| GetData::<u128>::try_data(val).unwrap())
523    {
524        const CHUNK: u128 = 2u128.pow(20);
525        let mut total = 0u128;
526        while total < length {
527            let chunk = u128::min(CHUNK, length - total) as usize;
528            let mut transmission = melodium_core::TransmissionValue::new(data.clone());
529            for _ in 1..chunk {
530                transmission.push(data.clone());
531            }
532            check!(stream.send_many(transmission).await);
533            total += chunk as u128;
534        }
535    }
536}
537
538/// Generate a stream indefinitely.
539///
540/// This generates a continuous stream, until stream consumers closes it.
541///
542/// ```mermaid
543/// graph LR
544///     T("generateIndefinitely()")
545///     B["〈🟦〉"] -->|trigger| T
546///         
547///     T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
548///     
549///     
550///     style B fill:#ffff,stroke:#ffff
551///     style S fill:#ffff,stroke:#ffff
552/// ```
553#[mel_treatment(
554    generic T ()
555    input trigger Block<void>
556    output stream Stream<T>
557)]
558pub async fn generate_indefinitely(data: T) {
559    if let Ok(_) = trigger.recv_one().await {
560        const CHUNK: usize = 2usize.pow(20);
561        loop {
562            let mut transmission = melodium_core::TransmissionValue::new(data.clone());
563            for _ in 1..CHUNK {
564                transmission.push(data.clone());
565            }
566            check!(stream.send_many(transmission).await);
567        }
568    }
569}
570
571/// Insert a block into a stream.
572///
573/// `block` is inserted into `stream` when it comes and everything is streamed to `output`.
574///
575/// ℹ️ No assumption on block insertion position in stream can be made.
576///
577/// ```mermaid
578/// graph LR
579///     T("insert()")
580///     A["… 🟦 🟦 🟦 🟦 …"] -->|stream| T
581///     B["〈🟧〉"] -->|block| T
582///     
583///
584///     T -->|output| V["… 🟦 🟧 🟦 🟦 🟦 …"]
585///
586///     style V fill:#ffff,stroke:#ffff
587///     style A fill:#ffff,stroke:#ffff
588///     style B fill:#ffff,stroke:#ffff
589/// ```
590#[mel_treatment(
591    generic T ()
592    input stream Stream<T>
593    input block Block<T>
594    output output Stream<T>
595)]
596pub async fn insert() {
597    let streaming = async {
598        while let Ok(values) = (&stream).recv_many().await {
599            check!(output.send_many(values).await);
600        }
601    }
602    .fuse();
603    let insert_block = async {
604        if let Ok(val) = (&block).recv_one().await {
605            let _ = output.send_one(val).await;
606        }
607    }
608    .fuse();
609
610    pin_mut!(streaming, insert_block);
611
612    loop {
613        select! {
614            () = streaming => {},
615            () = insert_block => {},
616            complete => break,
617        };
618    }
619}
620
621/// Merge two incoming blocks as a stream.
622///
623/// Each block is taken when it arrives and send through `stream`.
624///
625/// ℹ️ No priority on blocks order in stream can be assumed.
626///
627/// ```mermaid
628/// graph LR
629///     T("flock()")
630///     A["〈🟦〉"] -->|a| T
631///     B["〈🟧〉"] -->|b| T
632///     
633///
634///     T -->|stream| V["🟧 🟦"]
635///
636///     style V fill:#ffff,stroke:#ffff
637///     style A fill:#ffff,stroke:#ffff
638///     style B fill:#ffff,stroke:#ffff
639/// ```
640#[mel_treatment(
641    generic T ()
642    input a Block<T>
643    input b Block<T>
644    output stream Stream<T>
645)]
646pub async fn flock() {
647    let xa = async {
648        if let Ok(a) = (&a).recv_one().await {
649            let _ = stream.send_one(a).await;
650        }
651    }
652    .fuse();
653    let xb = async {
654        if let Ok(b) = (&b).recv_one().await {
655            let _ = stream.send_one(b).await;
656        }
657    }
658    .fuse();
659
660    pin_mut!(xa, xb);
661
662    loop {
663        select! {
664            () = xa => {},
665            () = xb => {},
666            complete => break,
667        };
668    }
669}
670
671/// Emit one block.
672///
673/// Take first block coming among `a` or `b` and emit it in `value`, ignoring the remaining one.
674///
675/// ℹ️ No priority between blocks can be assumed if they are ready at same moment.
676///
677/// ```mermaid
678/// graph LR
679///     T("one()")
680///     A["〈〉"] -->|a| T
681///     B["〈🟧〉"] -->|b| T
682///     
683///
684///     T -->|value| V["〈🟧〉"]
685///
686///     style V fill:#ffff,stroke:#ffff
687///     style A fill:#ffff,stroke:#ffff
688///     style B fill:#ffff,stroke:#ffff
689/// ```
690#[mel_treatment(
691    generic T ()
692    input a Block<T>
693    input b Block<T>
694    output value Block<T>
695)]
696pub async fn one() {
697    let xa = async { (&a).recv_one().await.ok() }.fuse();
698    let xb = async { (&b).recv_one().await.ok() }.fuse();
699
700    pin_mut!(xa, xb);
701
702    loop {
703        let val = select! {
704            val = xa => val,
705            val = xb => val,
706            complete => break,
707        };
708
709        if let Some(val) = val {
710            let _ = value.send_one(val).await;
711            break;
712        }
713    }
714}
715
716/// Never send any value.
717///
718/// No value is ever sent on `closed` output, which is immediately closed.
719///
720#[mel_treatment(
721    generic T ()
722    input trigger Block<void>
723    output closed Stream<T>
724)]
725pub async fn close() {
726    // Nothing to do
727}
728
729/// Never send any value.
730///
731/// No value is ever sent on `closed` output, which is immediately closed.
732///
733#[mel_treatment(
734    generic T ()
735    input trigger Block<void>
736    output closed Block<T>
737)]
738pub async fn close_block() {
739    // Nothing to do
740}
741
742/// Consume stream indefinitely.
743///
744/// Input `stream` is consumed indefinitely until it becomes closed by previous treatment.
745///
746#[mel_treatment(
747    generic T ()
748    input stream Stream<T>
749)]
750pub async fn consume() {
751    while let Ok(_) = stream.recv_many().await {
752        // Nothing to do.
753    }
754}
755
756/// Pass stream under condition.
757///
758/// If `if` is `true`, pass the stream, else closes it.
759///
760#[mel_treatment(
761    generic T ()
762    input stream Stream<T>
763    output passed Stream<T>
764)]
765pub async fn pass(cond: bool) {
766    if cond {
767        while let Ok(data) = stream.recv_many().await {
768            check!(passed.send_many(data).await)
769        }
770    }
771}
772
773/// Pass block under condition.
774///
775/// If `if` is `true`, pass the block, else nothing.
776///
777#[mel_treatment(
778    generic T ()
779    input block Block<T>
780    output passed Block<T>
781)]
782pub async fn passBlock(cond: bool) {
783    if cond {
784        if let Ok(data) = block.recv_one().await {
785            let _ = passed.send_one(data).await;
786        }
787    }
788}
789
790/// Barrier stream under condition.
791///
792/// Awaits `leverage` to let stream pass if it is `true`, else closes the stream.
793///
794#[mel_treatment(
795    generic T ()
796    input leverage Block<bool>
797    input stream Stream<T>
798    output passed Stream<T>
799)]
800pub async fn barrier() {
801    if let Ok(true) = leverage
802        .recv_one()
803        .await
804        .map(|val| GetData::<bool>::try_data(val).unwrap())
805    {
806        while let Ok(data) = stream.recv_many().await {
807            check!(passed.send_many(data).await)
808        }
809    }
810}
811
812/// Pass stream until cut signal.
813///
814/// Let stream pass until `cut` signal si received.
815///
816#[mel_treatment(
817    generic T ()
818    input cut Block<void>
819    input stream Stream<T>
820    output passed Stream<T>
821)]
822pub async fn cut() {
823    let cut = async { cut.recv_one().await.is_ok() }.fuse();
824    let pass = async {
825        while let Ok(values) = stream.recv_many().await {
826            check!(passed.send_many(values).await)
827        }
828    }
829    .fuse();
830
831    pin_mut!(cut, pass);
832
833    loop {
834        select! {
835            () = pass => break,
836            do_cut = cut => if do_cut {
837                break
838            },
839            complete => break,
840        }
841    }
842}
843
844/// Release stream once signal is received.
845///
846/// Awaits `leverage` to let stream pass, else closes the stream.
847///
848#[mel_treatment(
849    generic T ()
850    input leverage Block<void>
851    input data Stream<T>
852    output released Stream<T>
853)]
854pub async fn release() {
855    if let Ok(_) = leverage.recv_one().await {
856        while let Ok(data) = data.recv_many().await {
857            check!(released.send_many(data).await)
858        }
859    }
860}
861
862/// Release block once signal is received.
863///
864/// Awaits `leverage` to let block pass, else closes the flow.
865///
866#[mel_treatment(
867    generic T ()
868    input leverage Block<void>
869    input data Block<T>
870    output released Block<T>
871)]
872pub async fn releaseBlock() {
873    if let Ok(_) = leverage.recv_one().await {
874        if let Ok(data) = data.recv_one().await {
875            let _ = released.send_one(data).await;
876        }
877    }
878}
879
880/// Await blocks.
881///
882/// Wait for two blocks and send `awaited` once both are received.
883///
884/// ℹ️ If one block is never received, `awaited` is never emitted.
885#[mel_treatment(
886    generic T ()
887    input a Block<T>
888    input b Block<T>
889    output awaited Block<void>
890)]
891pub async fn waitBlock() {
892    let (a, b) = futures::join!(async { a.recv_one().await.is_ok() }, async {
893        b.recv_one().await.is_ok()
894    });
895
896    if a && b {
897        let _ = awaited.send_one(().into()).await;
898    }
899}