std_mel/flow/mod.rs
1use futures::{pin_mut, select, FutureExt};
2use melodium_core::common::executive::{GetData, Value};
3use melodium_macro::{check, mel_treatment};
4use std::collections::VecDeque;
5
6pub mod concentrate;
7pub mod vec;
8
9/// Chain two streams.
10///
11///
12/// ```mermaid
13/// graph LR
14/// T("chain()")
15/// A["🟨 🟨 🟨 🟨 🟨 🟨"] -->|first| T
16/// B["… 🟪 🟪 🟪"] -->|second| T
17///
18/// T -->|chained| O["… 🟪 🟪 🟪 🟨 🟨 🟨 🟨 🟨 🟨"]
19///
20/// style A fill:#ffff,stroke:#ffff
21/// style B fill:#ffff,stroke:#ffff
22/// style O fill:#ffff,stroke:#ffff
23/// ```
24#[mel_treatment(
25 generic T ()
26 input first Stream<T>
27 input second Stream<T>
28 output chained Stream<T>
29)]
30pub async fn chain() {
31 while let Ok(values) = first.recv_many().await {
32 check!(chained.send_many(values).await)
33 }
34
35 while let Ok(values) = second.recv_many().await {
36 check!(chained.send_many(values).await)
37 }
38}
39
40/// Trigger on a stream start and end.
41///
42/// Emit `start` when a first value is send through the stream.
43/// Emit `end` when stream is finally over.
44///
45/// Emit `first` with the first value coming in the stream.
46/// Emit `last` with the last value coming in the stream.
47///
48/// ℹ️ `start` and `first` are always emitted together.
49/// If the stream only contains one element, `first` and `last` both contains it.
50/// If the stream never transmit any data before being ended, only `end` is emitted.
51///
52/// ```mermaid
53/// graph LR
54/// T("trigger()")
55/// B["🟥 … 🟨 🟨 🟨 🟨 🟨 🟨 … 🟩"] -->|stream| T
56///
57/// T -->|start| S["〈🟦〉"]
58/// T -->|first| F["〈🟩〉"]
59/// T -->|last| L["〈🟥〉"]
60/// T -->|end| E["〈🟦〉"]
61///
62/// style B fill:#ffff,stroke:#ffff
63/// style S fill:#ffff,stroke:#ffff
64/// style F fill:#ffff,stroke:#ffff
65/// style L fill:#ffff,stroke:#ffff
66/// style E fill:#ffff,stroke:#ffff
67/// ```
68#[mel_treatment(
69 generic T ()
70 input stream Stream<T>
71 output start Block<void>
72 output end Block<void>
73 output first Block<T>
74 output last Block<T>
75)]
76pub async fn trigger() {
77 let mut last_value = None;
78
79 if let Ok(mut values) = stream.recv_many().await {
80 let _ = start.send_one(().into()).await;
81 if let Some(val) = values.pop_front() {
82 let _ = first.send_one(val.clone()).await;
83 last_value = Some(val);
84 }
85 if let Some(val) = Into::<VecDeque<Value>>::into(values).pop_back() {
86 last_value = Some(val);
87 }
88
89 let _ = futures::join!(start.close(), first.close());
90 }
91
92 while let Ok(values) = stream.recv_many().await {
93 last_value = Into::<VecDeque<Value>>::into(values).pop_back();
94 }
95
96 let _ = end.send_one(().into()).await;
97 if let Some(val) = last_value {
98 let _ = last.send_one(val).await;
99 }
100
101 // We don't close `end` and `last` explicitly here,
102 // because it would be redundant with boilerplate
103 // implementation of treatments.
104}
105
106/// Check a blocking value.
107///
108/// When `value` block is received, `check` is emitted.
109///
110/// ```mermaid
111/// graph LR
112/// T("check()")
113/// B["〈🟨〉"] -->|value| T
114///
115/// T -->|check| S["〈🟦〉"]
116///
117/// style B fill:#ffff,stroke:#ffff
118/// style S fill:#ffff,stroke:#ffff
119/// ```
120#[mel_treatment(
121 generic T ()
122 input value Block<T>
123 output check Block<void>
124)]
125pub async fn check() {
126 if let Ok(_) = value.recv_one().await {
127 let _ = check.send_one(().into()).await;
128 }
129}
130
131/// Uncheck a blocking value.
132///
133/// When `value` block stream is closed without receiving anything, `uncheck` is emitted.
134///
135/// ```mermaid
136/// graph LR
137/// T("uncheck()")
138/// B["〈🟨〉"] -->|value| T
139///
140/// T -->|uncheck| S["〈🟦〉"]
141///
142/// style B fill:#ffff,stroke:#ffff
143/// style S fill:#ffff,stroke:#ffff
144/// ```
145#[mel_treatment(
146 generic T ()
147 input value Block<T>
148 output uncheck Block<void>
149)]
150pub async fn uncheck() {
151 if let Err(_) = value.recv_one().await {
152 let _ = uncheck.send_one(().into()).await;
153 }
154}
155
156/// Emit a blocking value.
157///
158/// When `trigger` is enabled, `value` is emitted as block.
159///
160/// ```mermaid
161/// graph LR
162/// T("emit(value=🟨)")
163/// B["〈🟦〉"] -->|trigger| T
164///
165/// T -->|emit| S["〈🟨〉"]
166///
167/// style B fill:#ffff,stroke:#ffff
168/// style S fill:#ffff,stroke:#ffff
169/// ```
170#[mel_treatment(
171 generic T ()
172 input trigger Block<void>
173 output emit Block<T>
174)]
175pub async fn emit(value: T) {
176 if let Ok(_) = trigger.recv_one().await {
177 let _ = emit.send_one(value).await;
178 }
179}
180
181/// Stream a blocking value.
182///
183/// ```mermaid
184/// graph LR
185/// T("stream()")
186/// B["〈🟦〉"] -->|block| T
187///
188/// T -->|stream| S["🟦"]
189///
190///
191/// style B fill:#ffff,stroke:#ffff
192/// style S fill:#ffff,stroke:#ffff
193/// ```
194#[mel_treatment(
195 generic T ()
196 input block Block<T>
197 output stream Stream<T>
198)]
199pub async fn stream() {
200 if let Ok(val) = block.recv_one().await {
201 let _ = stream.send_one(val).await;
202 }
203}
204
205/// Merge two streams.
206///
207/// The two streams are merged without predictible order.
208///
209/// ℹ️ Merge continues as long as `a` or `b` continues too, while the other can be ended.
210///
211/// ```mermaid
212/// graph LR
213/// T("merge()")
214/// A["… 🟦 🟫 …"] -->|a| T
215/// B["… 🟧 🟪 🟨 …"] -->|b| T
216///
217///
218/// T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
219///
220/// style V fill:#ffff,stroke:#ffff
221/// style A fill:#ffff,stroke:#ffff
222/// style B fill:#ffff,stroke:#ffff
223/// ```
224#[mel_treatment(
225 generic T ()
226 input a Stream<T>
227 input b Stream<T>
228 output value Stream<T>
229)]
230pub async fn merge() {
231 let xa = async {
232 while let Ok(a) = (&a).recv_many().await {
233 check!(value.send_many(a).await);
234 }
235 }
236 .fuse();
237 let xb = async {
238 while let Ok(b) = (&b).recv_many().await {
239 check!(value.send_many(b).await);
240 }
241 }
242 .fuse();
243
244 pin_mut!(xa, xb);
245
246 loop {
247 select! {
248 () = xa => {},
249 () = xb => {},
250 complete => break,
251 };
252 }
253}
254
255/// Arrange two streams as one.
256///
257/// The two streams are merged using the `select` stream:
258/// - when `true`, value from `a` is used;
259/// - when `false`, value from `b` is used.
260///
261/// ℹ️ No value from either `a` or `b` are discarded, they are used when `select` give turn.
262///
263/// ⚠️ When `select` ends merge terminates without treating the remaining values from `a` and `b`.
264/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
265/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
266///
267/// ```mermaid
268/// graph LR
269/// T("arrange()")
270/// A["… 🟦 🟫 …"] -->|a| T
271/// B["… 🟧 🟪 🟨 …"] -->|b| T
272/// O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
273///
274///
275/// T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
276///
277/// style V fill:#ffff,stroke:#ffff
278/// style O fill:#ffff,stroke:#ffff
279/// style A fill:#ffff,stroke:#ffff
280/// style B fill:#ffff,stroke:#ffff
281/// ```
282#[mel_treatment(
283 generic T ()
284 input a Stream<T>
285 input b Stream<T>
286 input select Stream<bool>
287 output value Stream<T>
288)]
289pub async fn arrange() {
290 while let Ok(select) = select
291 .recv_one()
292 .await
293 .map(|val| GetData::<bool>::try_data(val).unwrap())
294 {
295 let val;
296 if select {
297 if let Ok(v) = a.recv_one().await {
298 val = v;
299 } else {
300 break;
301 }
302 } else {
303 if let Ok(v) = b.recv_one().await {
304 val = v;
305 } else {
306 break;
307 }
308 }
309
310 check!(value.send_one(val).await)
311 }
312}
313
314/// Fill a pattern stream with a `value.
315///
316/// ```mermaid
317/// graph LR
318/// T("fill(value=🟧)")
319/// B["… 🟦 🟦 🟦 …"] -->|pattern| T
320///
321/// T -->|filled| O["… 🟧 🟧 🟧 …"]
322///
323/// style B fill:#ffff,stroke:#ffff
324/// style O fill:#ffff,stroke:#ffff
325/// ```
326#[mel_treatment(
327 generic T ()
328 input pattern Stream<void>
329 output filled Stream<T>
330)]
331pub async fn fill(value: T) {
332 while let Ok(pat) = pattern.recv_many().await {
333 let mut transmission = melodium_core::TransmissionValue::new(value.clone());
334 for _ in 1..pat.len() {
335 transmission.push(value.clone());
336 }
337 check!(filled.send_many(transmission).await)
338 }
339}
340
341/// Filter a stream according to `bool` stream.
342///
343/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
344///
345/// ```mermaid
346/// graph LR
347/// T("filter()")
348/// V["… 🟦 🟧 🟪 🟫 🟨 …"] -->|value| T
349/// D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
350///
351/// T -->|accepted| A["… 🟦 🟫 …"]
352/// T -->|rejected| R["… 🟧 🟪 🟨 …"]
353///
354/// style V fill:#ffff,stroke:#ffff
355/// style D fill:#ffff,stroke:#ffff
356/// style A fill:#ffff,stroke:#ffff
357/// style R fill:#ffff,stroke:#ffff
358/// ```
359#[mel_treatment(
360 generic T ()
361 input value Stream<T>
362 input select Stream<bool>
363 output accepted Stream<T>
364 output rejected Stream<T>
365)]
366pub async fn filter() {
367 let mut accepted_op = true;
368 let mut rejected_op = true;
369
370 while let (Ok(value), Ok(select)) = futures::join!(value.recv_one(), select.recv_one()) {
371 let select = GetData::<bool>::try_data(select).unwrap();
372 if select {
373 if let Err(_) = accepted.send_one(value).await {
374 // If we cannot send anymore on accepted, we note it,
375 // and check if rejected is still valid, else just terminate.
376 accepted_op = false;
377 if !rejected_op {
378 break;
379 }
380 }
381 } else {
382 if let Err(_) = rejected.send_one(value).await {
383 // If we cannot send anymore on rejected, we note it,
384 // and check if accepted is still valid, else just terminate.
385 rejected_op = false;
386 if !accepted_op {
387 break;
388 }
389 }
390 }
391 }
392}
393
394/// Filter a block according to `bool` value.
395///
396/// ℹ️ If `select` is never received nothing is emitted.
397///
398/// ```mermaid
399/// graph LR
400/// T("filterBlock()")
401/// V["〈🟦〉"] -->|value| T
402/// D["〈🟩〉"] -->|select|T
403///
404/// T -->|accepted| A["〈🟦〉"]
405/// T -->|rejected| R[" "]
406///
407/// style V fill:#ffff,stroke:#ffff
408/// style D fill:#ffff,stroke:#ffff
409/// style A fill:#ffff,stroke:#ffff
410/// style R fill:#ffff,stroke:#ffff
411/// ```
412#[mel_treatment(
413 generic T ()
414 input value Block<T>
415 input select Block<bool>
416 output accepted Block<T>
417 output rejected Block<T>
418)]
419pub async fn filterBlock() {
420 if let (Ok(value), Ok(select)) = futures::join!(value.recv_one(), select.recv_one()) {
421 let select = GetData::<bool>::try_data(select).unwrap();
422 if select {
423 let _ = accepted.send_one(value).await;
424 } else {
425 let _ = rejected.send_one(value).await;
426 }
427 }
428}
429
430/// Fit a stream into a pattern.
431///
432/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
433///
434/// ```mermaid
435/// graph LR
436/// T("fit()")
437/// A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
438/// B["🟦 🟦 🟦 🟦"] -->|pattern| T
439///
440/// T -->|fitted| O["🟨 🟨 🟨 🟨"]
441///
442/// style A fill:#ffff,stroke:#ffff
443/// style B fill:#ffff,stroke:#ffff
444/// style O fill:#ffff,stroke:#ffff
445/// ```
446#[mel_treatment(
447 generic T ()
448 input value Stream<T>
449 input pattern Stream<void>
450 output fitted Stream<T>
451)]
452pub async fn fit() {
453 'main: while let Ok(pattern) = pattern
454 .recv_many()
455 .await
456 .map(|values| TryInto::<Vec<()>>::try_into(values).unwrap())
457 {
458 for _ in pattern {
459 if let Ok(val) = value.recv_one().await {
460 check!('main, fitted.send_one(val).await)
461 } else {
462 break 'main;
463 }
464 }
465 }
466}
467
468/// Gives count of elements passing through stream.
469///
470/// This count increment one for each element within the stream, starting at 1.
471///
472/// ```mermaid
473/// graph LR
474/// T("count()")
475/// V["🟦 🟦 🟦 …"] -->|iter| T
476///
477/// T -->|count| P["1️⃣ 2️⃣ 3️⃣ …"]
478///
479/// style V fill:#ffff,stroke:#ffff
480/// style P fill:#ffff,stroke:#ffff
481/// ```
482#[mel_treatment(
483 generic T ()
484 input stream Stream<T>
485 output count Stream<u128>
486)]
487pub async fn count() {
488 let mut i: u128 = 0;
489 while let Ok(iter) = stream.recv_many().await {
490 let next_i = i + iter.len() as u128;
491 check!(
492 count
493 .send_many((i..next_i).collect::<VecDeque<_>>().into())
494 .await
495 );
496 i = next_i;
497 }
498}
499
500/// Generate a stream with a given length.
501///
502/// ```mermaid
503/// graph LR
504/// T("generate()")
505/// B["〈🟨〉"] -->|length| T
506///
507/// T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
508///
509///
510/// style B fill:#ffff,stroke:#ffff
511/// style S fill:#ffff,stroke:#ffff
512/// ```
513#[mel_treatment(
514 generic T ()
515 input length Block<u128>
516 output stream Stream<T>
517)]
518pub async fn generate(data: T) {
519 if let Ok(length) = length
520 .recv_one()
521 .await
522 .map(|val| GetData::<u128>::try_data(val).unwrap())
523 {
524 const CHUNK: u128 = 2u128.pow(20);
525 let mut total = 0u128;
526 while total < length {
527 let chunk = u128::min(CHUNK, length - total) as usize;
528 let mut transmission = melodium_core::TransmissionValue::new(data.clone());
529 for _ in 1..chunk {
530 transmission.push(data.clone());
531 }
532 check!(stream.send_many(transmission).await);
533 total += chunk as u128;
534 }
535 }
536}
537
538/// Generate a stream indefinitely.
539///
540/// This generates a continuous stream, until stream consumers closes it.
541///
542/// ```mermaid
543/// graph LR
544/// T("generateIndefinitely()")
545/// B["〈🟦〉"] -->|trigger| T
546///
547/// T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
548///
549///
550/// style B fill:#ffff,stroke:#ffff
551/// style S fill:#ffff,stroke:#ffff
552/// ```
553#[mel_treatment(
554 generic T ()
555 input trigger Block<void>
556 output stream Stream<T>
557)]
558pub async fn generate_indefinitely(data: T) {
559 if let Ok(_) = trigger.recv_one().await {
560 const CHUNK: usize = 2usize.pow(20);
561 loop {
562 let mut transmission = melodium_core::TransmissionValue::new(data.clone());
563 for _ in 1..CHUNK {
564 transmission.push(data.clone());
565 }
566 check!(stream.send_many(transmission).await);
567 }
568 }
569}
570
571/// Insert a block into a stream.
572///
573/// `block` is inserted into `stream` when it comes and everything is streamed to `output`.
574///
575/// ℹ️ No assumption on block insertion position in stream can be made.
576///
577/// ```mermaid
578/// graph LR
579/// T("insert()")
580/// A["… 🟦 🟦 🟦 🟦 …"] -->|stream| T
581/// B["〈🟧〉"] -->|block| T
582///
583///
584/// T -->|output| V["… 🟦 🟧 🟦 🟦 🟦 …"]
585///
586/// style V fill:#ffff,stroke:#ffff
587/// style A fill:#ffff,stroke:#ffff
588/// style B fill:#ffff,stroke:#ffff
589/// ```
590#[mel_treatment(
591 generic T ()
592 input stream Stream<T>
593 input block Block<T>
594 output output Stream<T>
595)]
596pub async fn insert() {
597 let streaming = async {
598 while let Ok(values) = (&stream).recv_many().await {
599 check!(output.send_many(values).await);
600 }
601 }
602 .fuse();
603 let insert_block = async {
604 if let Ok(val) = (&block).recv_one().await {
605 let _ = output.send_one(val).await;
606 }
607 }
608 .fuse();
609
610 pin_mut!(streaming, insert_block);
611
612 loop {
613 select! {
614 () = streaming => {},
615 () = insert_block => {},
616 complete => break,
617 };
618 }
619}
620
621/// Merge two incoming blocks as a stream.
622///
623/// Each block is taken when it arrives and send through `stream`.
624///
625/// ℹ️ No priority on blocks order in stream can be assumed.
626///
627/// ```mermaid
628/// graph LR
629/// T("flock()")
630/// A["〈🟦〉"] -->|a| T
631/// B["〈🟧〉"] -->|b| T
632///
633///
634/// T -->|stream| V["🟧 🟦"]
635///
636/// style V fill:#ffff,stroke:#ffff
637/// style A fill:#ffff,stroke:#ffff
638/// style B fill:#ffff,stroke:#ffff
639/// ```
640#[mel_treatment(
641 generic T ()
642 input a Block<T>
643 input b Block<T>
644 output stream Stream<T>
645)]
646pub async fn flock() {
647 let xa = async {
648 if let Ok(a) = (&a).recv_one().await {
649 let _ = stream.send_one(a).await;
650 }
651 }
652 .fuse();
653 let xb = async {
654 if let Ok(b) = (&b).recv_one().await {
655 let _ = stream.send_one(b).await;
656 }
657 }
658 .fuse();
659
660 pin_mut!(xa, xb);
661
662 loop {
663 select! {
664 () = xa => {},
665 () = xb => {},
666 complete => break,
667 };
668 }
669}
670
671/// Emit one block.
672///
673/// Take first block coming among `a` or `b` and emit it in `value`, ignoring the remaining one.
674///
675/// ℹ️ No priority between blocks can be assumed if they are ready at same moment.
676///
677/// ```mermaid
678/// graph LR
679/// T("one()")
680/// A["〈〉"] -->|a| T
681/// B["〈🟧〉"] -->|b| T
682///
683///
684/// T -->|value| V["〈🟧〉"]
685///
686/// style V fill:#ffff,stroke:#ffff
687/// style A fill:#ffff,stroke:#ffff
688/// style B fill:#ffff,stroke:#ffff
689/// ```
690#[mel_treatment(
691 generic T ()
692 input a Block<T>
693 input b Block<T>
694 output value Block<T>
695)]
696pub async fn one() {
697 let xa = async { (&a).recv_one().await.ok() }.fuse();
698 let xb = async { (&b).recv_one().await.ok() }.fuse();
699
700 pin_mut!(xa, xb);
701
702 loop {
703 let val = select! {
704 val = xa => val,
705 val = xb => val,
706 complete => break,
707 };
708
709 if let Some(val) = val {
710 let _ = value.send_one(val).await;
711 break;
712 }
713 }
714}
715
716/// Never send any value.
717///
718/// No value is ever sent on `closed` output, which is immediately closed.
719///
720#[mel_treatment(
721 generic T ()
722 input trigger Block<void>
723 output closed Stream<T>
724)]
725pub async fn close() {
726 // Nothing to do
727}
728
729/// Never send any value.
730///
731/// No value is ever sent on `closed` output, which is immediately closed.
732///
733#[mel_treatment(
734 generic T ()
735 input trigger Block<void>
736 output closed Block<T>
737)]
738pub async fn close_block() {
739 // Nothing to do
740}
741
742/// Consume stream indefinitely.
743///
744/// Input `stream` is consumed indefinitely until it becomes closed by previous treatment.
745///
746#[mel_treatment(
747 generic T ()
748 input stream Stream<T>
749)]
750pub async fn consume() {
751 while let Ok(_) = stream.recv_many().await {
752 // Nothing to do.
753 }
754}
755
756/// Pass stream under condition.
757///
758/// If `if` is `true`, pass the stream, else closes it.
759///
760#[mel_treatment(
761 generic T ()
762 input stream Stream<T>
763 output passed Stream<T>
764)]
765pub async fn pass(cond: bool) {
766 if cond {
767 while let Ok(data) = stream.recv_many().await {
768 check!(passed.send_many(data).await)
769 }
770 }
771}
772
773/// Pass block under condition.
774///
775/// If `if` is `true`, pass the block, else nothing.
776///
777#[mel_treatment(
778 generic T ()
779 input block Block<T>
780 output passed Block<T>
781)]
782pub async fn passBlock(cond: bool) {
783 if cond {
784 if let Ok(data) = block.recv_one().await {
785 let _ = passed.send_one(data).await;
786 }
787 }
788}
789
790/// Barrier stream under condition.
791///
792/// Awaits `leverage` to let stream pass if it is `true`, else closes the stream.
793///
794#[mel_treatment(
795 generic T ()
796 input leverage Block<bool>
797 input stream Stream<T>
798 output passed Stream<T>
799)]
800pub async fn barrier() {
801 if let Ok(true) = leverage
802 .recv_one()
803 .await
804 .map(|val| GetData::<bool>::try_data(val).unwrap())
805 {
806 while let Ok(data) = stream.recv_many().await {
807 check!(passed.send_many(data).await)
808 }
809 }
810}
811
812/// Pass stream until cut signal.
813///
814/// Let stream pass until `cut` signal si received.
815///
816#[mel_treatment(
817 generic T ()
818 input cut Block<void>
819 input stream Stream<T>
820 output passed Stream<T>
821)]
822pub async fn cut() {
823 let cut = async { cut.recv_one().await.is_ok() }.fuse();
824 let pass = async {
825 while let Ok(values) = stream.recv_many().await {
826 check!(passed.send_many(values).await)
827 }
828 }
829 .fuse();
830
831 pin_mut!(cut, pass);
832
833 loop {
834 select! {
835 () = pass => break,
836 do_cut = cut => if do_cut {
837 break
838 },
839 complete => break,
840 }
841 }
842}
843
844/// Release stream once signal is received.
845///
846/// Awaits `leverage` to let stream pass, else closes the stream.
847///
848#[mel_treatment(
849 generic T ()
850 input leverage Block<void>
851 input data Stream<T>
852 output released Stream<T>
853)]
854pub async fn release() {
855 if let Ok(_) = leverage.recv_one().await {
856 while let Ok(data) = data.recv_many().await {
857 check!(released.send_many(data).await)
858 }
859 }
860}
861
862/// Release block once signal is received.
863///
864/// Awaits `leverage` to let block pass, else closes the flow.
865///
866#[mel_treatment(
867 generic T ()
868 input leverage Block<void>
869 input data Block<T>
870 output released Block<T>
871)]
872pub async fn releaseBlock() {
873 if let Ok(_) = leverage.recv_one().await {
874 if let Ok(data) = data.recv_one().await {
875 let _ = released.send_one(data).await;
876 }
877 }
878}
879
880/// Await blocks.
881///
882/// Wait for two blocks and send `awaited` once both are received.
883///
884/// ℹ️ If one block is never received, `awaited` is never emitted.
885#[mel_treatment(
886 generic T ()
887 input a Block<T>
888 input b Block<T>
889 output awaited Block<void>
890)]
891pub async fn waitBlock() {
892 let (a, b) = futures::join!(async { a.recv_one().await.is_ok() }, async {
893 b.recv_one().await.is_ok()
894 });
895
896 if a && b {
897 let _ = awaited.send_one(().into()).await;
898 }
899}