std_mel/flow/
mod.rs

1use futures::{pin_mut, select, FutureExt};
2use melodium_core::common::executive::{GetData, Value};
3use melodium_macro::{check, mel_treatment};
4use std::collections::VecDeque;
5
6pub mod vec;
7
8/// Chain two streams.
9///
10///
11/// ```mermaid
12/// graph LR
13///     T("chain()")
14///     A["🟨 🟨 🟨 🟨 🟨 🟨"] -->|first| T
15///     B["… 🟪 🟪 🟪"] -->|second| T
16///     
17///     T -->|chained| O["… 🟪 🟪 🟪 🟨 🟨 🟨 🟨 🟨 🟨"]
18///
19///     style A fill:#ffff,stroke:#ffff
20///     style B fill:#ffff,stroke:#ffff
21///     style O fill:#ffff,stroke:#ffff
22/// ```
23#[mel_treatment(
24    generic T ()
25    input first Stream<T>
26    input second Stream<T>
27    output chained Stream<T>
28)]
29pub async fn chain() {
30    while let Ok(values) = first.recv_many().await {
31        check!(chained.send_many(values).await)
32    }
33
34    while let Ok(values) = second.recv_many().await {
35        check!(chained.send_many(values).await)
36    }
37}
38
39/// Trigger on a stream start and end.
40///
41/// Emit `start` when a first value is send through the stream.
42/// Emit `end` when stream is finally over.
43///
44/// Emit `first` with the first value coming in the stream.
45/// Emit `last` with the last value coming in the stream.
46///
47/// ℹ️ `start` and `first` are always emitted together.
48/// If the stream only contains one element, `first` and `last` both contains it.
49/// If the stream never transmit any data before being ended, only `end` is emitted.
50///
51/// ```mermaid
52/// graph LR
53///     T("trigger()")
54///     B["🟥 … 🟨 🟨 🟨 🟨 🟨 🟨 … 🟩"] -->|stream| T
55///     
56///     T -->|start| S["〈🟦〉"]
57///     T -->|first| F["〈🟩〉"]
58///     T -->|last| L["〈🟥〉"]
59///     T -->|end| E["〈🟦〉"]
60///
61///     style B fill:#ffff,stroke:#ffff
62///     style S fill:#ffff,stroke:#ffff
63///     style F fill:#ffff,stroke:#ffff
64///     style L fill:#ffff,stroke:#ffff
65///     style E fill:#ffff,stroke:#ffff
66/// ```
67#[mel_treatment(
68    generic T ()
69    input stream Stream<T>
70    output start Block<void>
71    output end Block<void>
72    output first Block<T>
73    output last Block<T>
74)]
75pub async fn trigger() {
76    let mut last_value = None;
77
78    if let Ok(mut values) = stream.recv_many().await {
79        let _ = start.send_one(().into()).await;
80        if let Some(val) = values.pop_front() {
81            let _ = first.send_one(val).await;
82        }
83        last_value = Into::<VecDeque<Value>>::into(values).pop_back();
84        let _ = futures::join!(start.close(), first.close());
85    }
86
87    while let Ok(values) = stream.recv_many().await {
88        last_value = Into::<VecDeque<Value>>::into(values).pop_back();
89    }
90
91    let _ = end.send_one(().into()).await;
92    if let Some(val) = last_value {
93        let _ = last.send_one(val).await;
94    }
95
96    // We don't close `end` and `last` explicitly here,
97    // because it would be redundant with boilerplate
98    // implementation of treatments.
99}
100
101/// Check a blocking value.
102///
103/// When `value` block is received, `check` is emitted.
104///
105/// ```mermaid
106/// graph LR
107///     T("check()")
108///     B["〈🟨〉"] -->|value| T
109///         
110///     T -->|check| S["〈🟦〉"]
111///     
112///     style B fill:#ffff,stroke:#ffff
113///     style S fill:#ffff,stroke:#ffff
114/// ```
115#[mel_treatment(
116    generic T ()
117    input value Block<T>
118    output check Block<void>
119)]
120pub async fn check() {
121    if let Ok(_) = value.recv_one().await {
122        let _ = check.send_one(().into()).await;
123    }
124}
125
126/// Emit a blocking value.
127///
128/// When `trigger` is enabled, `value` is emitted as block.
129///
130/// ```mermaid
131/// graph LR
132///     T("emit(value=🟨)")
133///     B["〈🟦〉"] -->|trigger| T
134///         
135///     T -->|emit| S["〈🟨〉"]
136///     
137///     style B fill:#ffff,stroke:#ffff
138///     style S fill:#ffff,stroke:#ffff
139/// ```
140#[mel_treatment(
141    generic T ()
142    input trigger Block<void>
143    output emit Block<T>
144)]
145pub async fn emit(value: T) {
146    if let Ok(_) = trigger.recv_one().await {
147        let _ = emit.send_one(value).await;
148    }
149}
150
151/// Stream a blocking value.
152///
153/// ```mermaid
154/// graph LR
155///     T("stream()")
156///     B["〈🟦〉"] -->|block| T
157///         
158///     T -->|stream| S["🟦"]
159///     
160///     
161///     style B fill:#ffff,stroke:#ffff
162///     style S fill:#ffff,stroke:#ffff
163/// ```
164#[mel_treatment(
165    generic T ()
166    input block Block<T>
167    output stream Stream<T>
168)]
169pub async fn stream() {
170    if let Ok(val) = block.recv_one().await {
171        let _ = stream.send_one(val).await;
172    }
173}
174
175/// Merge two streams.
176///
177/// The two streams are merged without predictible order.
178///
179/// ℹ️ Merge continues as long as `a` or `b` continues too, while the other can be ended.
180///
181/// ```mermaid
182/// graph LR
183///     T("merge()")
184///     A["… 🟦 🟫 …"] -->|a| T
185///     B["… 🟧 🟪 🟨 …"] -->|b| T
186///     
187///
188///     T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
189///
190///     style V fill:#ffff,stroke:#ffff
191///     style A fill:#ffff,stroke:#ffff
192///     style B fill:#ffff,stroke:#ffff
193/// ```
194#[mel_treatment(
195    generic T ()
196    input a Stream<T>
197    input b Stream<T>
198    output value Stream<T>
199)]
200pub async fn merge() {
201    let xa = async {
202        while let Ok(a) = (&a).recv_many().await {
203            check!(value.send_many(a).await);
204        }
205    }
206    .fuse();
207    let xb = async {
208        while let Ok(b) = (&b).recv_many().await {
209            check!(value.send_many(b).await);
210        }
211    }
212    .fuse();
213
214    pin_mut!(xa, xb);
215
216    loop {
217        select! {
218            () = xa => {},
219            () = xb => {},
220            complete => break,
221        };
222    }
223}
224
225/// Arrange two streams as one.
226///
227/// The two streams are merged using the `select` stream:
228/// - when `true`, value from `a` is used;
229/// - when `false`, value from `b` is used.
230///
231/// ℹ️ No value from either `a` or `b` are discarded, they are used when `select` give turn.
232///
233/// ⚠️ When `select` ends merge terminates without treating the remaining values from `a` and `b`.
234/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
235/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
236///
237/// ```mermaid
238/// graph LR
239///     T("arrange()")
240///     A["… 🟦 🟫 …"] -->|a| T
241///     B["… 🟧 🟪 🟨 …"] -->|b| T
242///     O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
243///     
244///
245///     T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
246///
247///     style V fill:#ffff,stroke:#ffff
248///     style O fill:#ffff,stroke:#ffff
249///     style A fill:#ffff,stroke:#ffff
250///     style B fill:#ffff,stroke:#ffff
251/// ```
252#[mel_treatment(
253    generic T ()
254    input a Stream<T>
255    input b Stream<T>
256    input select Stream<bool>
257    output value Stream<T>
258)]
259pub async fn arrange() {
260    while let Ok(select) = select
261        .recv_one()
262        .await
263        .map(|val| GetData::<bool>::try_data(val).unwrap())
264    {
265        let val;
266        if select {
267            if let Ok(v) = a.recv_one().await {
268                val = v;
269            } else {
270                break;
271            }
272        } else {
273            if let Ok(v) = b.recv_one().await {
274                val = v;
275            } else {
276                break;
277            }
278        }
279
280        check!(value.send_one(val).await)
281    }
282}
283
284/// Fill a pattern stream with a `value.
285///
286/// ```mermaid
287/// graph LR
288/// T("fill(value=🟧)")
289/// B["… 🟦 🟦 🟦 …"] -->|pattern| T
290///
291/// T -->|filled| O["… 🟧 🟧 🟧 …"]
292///
293/// style B fill:#ffff,stroke:#ffff
294/// style O fill:#ffff,stroke:#ffff
295/// ```
296#[mel_treatment(
297    generic T ()
298    input pattern Stream<void>
299    output filled Stream<T>
300)]
301pub async fn fill(value: T) {
302    while let Ok(pat) = pattern.recv_many().await {
303        let mut transmission = melodium_core::TransmissionValue::new(value.clone());
304        for _ in 1..pat.len() {
305            transmission.push(value.clone());
306        }
307        check!(filled.send_many(transmission).await)
308    }
309}
310
311/// Filter a stream according to `bool` stream.
312///
313/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
314///  
315/// ```mermaid
316/// graph LR
317///     T("filter()")
318///     V["… 🟦 🟧 🟪 🟫 🟨 …"] -->|value| T
319///     D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
320///     
321///     T -->|accepted| A["… 🟦 🟫 …"]
322///     T -->|rejected| R["… 🟧 🟪 🟨 …"]
323///
324///     style V fill:#ffff,stroke:#ffff
325///     style D fill:#ffff,stroke:#ffff
326///     style A fill:#ffff,stroke:#ffff
327///     style R fill:#ffff,stroke:#ffff
328/// ```
329#[mel_treatment(
330    generic T ()
331    input value Stream<T>
332    input select Stream<bool>
333    output accepted Stream<T>
334    output rejected Stream<T>
335)]
336pub async fn filter() {
337    let mut accepted_op = true;
338    let mut rejected_op = true;
339
340    while let (Ok(value), Ok(select)) = futures::join!(value.recv_one(), select.recv_one()) {
341        let select = GetData::<bool>::try_data(select).unwrap();
342        if select {
343            if let Err(_) = accepted.send_one(value).await {
344                // If we cannot send anymore on accepted, we note it,
345                // and check if rejected is still valid, else just terminate.
346                accepted_op = false;
347                if !rejected_op {
348                    break;
349                }
350            }
351        } else {
352            if let Err(_) = rejected.send_one(value).await {
353                // If we cannot send anymore on rejected, we note it,
354                // and check if accepted is still valid, else just terminate.
355                rejected_op = false;
356                if !accepted_op {
357                    break;
358                }
359            }
360        }
361    }
362}
363
364/// Fit a stream into a pattern.
365///
366/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
367///
368/// ```mermaid
369/// graph LR
370///     T("fit()")
371///     A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
372///     B["🟦 🟦 🟦 🟦"] -->|pattern| T
373///     
374///     T -->|fitted| O["🟨 🟨 🟨 🟨"]
375///
376///     style A fill:#ffff,stroke:#ffff
377///     style B fill:#ffff,stroke:#ffff
378///     style O fill:#ffff,stroke:#ffff
379/// ```
380#[mel_treatment(
381    generic T ()
382    input value Stream<T>
383    input pattern Stream<void>
384    output fitted Stream<T>
385)]
386pub async fn fit() {
387    'main: while let Ok(pattern) = pattern
388        .recv_many()
389        .await
390        .map(|values| TryInto::<Vec<()>>::try_into(values).unwrap())
391    {
392        for _ in pattern {
393            if let Ok(val) = value.recv_one().await {
394                check!('main, fitted.send_one(val).await)
395            } else {
396                break 'main;
397            }
398        }
399    }
400}
401
402/// Gives count of elements passing through stream.
403///
404/// This count increment one for each element within the stream, starting at 1.
405///
406/// ```mermaid
407/// graph LR
408///     T("count()")
409///     V["🟦 🟦 🟦 …"] -->|iter| T
410///     
411///     T -->|count| P["1️⃣ 2️⃣ 3️⃣ …"]
412///
413///     style V fill:#ffff,stroke:#ffff
414///     style P fill:#ffff,stroke:#ffff
415/// ```
416#[mel_treatment(
417    generic T ()
418    input stream Stream<T>
419    output count Stream<u128>
420)]
421pub async fn count() {
422    let mut i: u128 = 0;
423    while let Ok(iter) = stream.recv_many().await {
424        let next_i = i + iter.len() as u128;
425        check!(
426            count
427                .send_many((i..next_i).collect::<VecDeque<_>>().into())
428                .await
429        );
430        i = next_i;
431    }
432}
433
434/// Generate a stream with a given length.
435///
436/// ```mermaid
437/// graph LR
438///     T("generate()")
439///     B["〈🟨〉"] -->|length| T
440///         
441///     T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
442///     
443///     
444///     style B fill:#ffff,stroke:#ffff
445///     style S fill:#ffff,stroke:#ffff
446/// ```
447#[mel_treatment(
448    generic T ()
449    input length Block<u128>
450    output stream Stream<T>
451)]
452pub async fn generate(data: T) {
453    if let Ok(length) = length
454        .recv_one()
455        .await
456        .map(|val| GetData::<u128>::try_data(val).unwrap())
457    {
458        const CHUNK: u128 = 2u128.pow(20);
459        let mut total = 0u128;
460        while total < length {
461            let chunk = u128::min(CHUNK, length - total) as usize;
462            let mut transmission = melodium_core::TransmissionValue::new(data.clone());
463            for _ in 1..chunk {
464                transmission.push(data.clone());
465            }
466            check!(stream.send_many(transmission).await);
467            total += chunk as u128;
468        }
469    }
470}
471
472/// Generate a stream indefinitely.
473///
474/// This generates a continuous stream, until stream consumers closes it.
475///
476/// ```mermaid
477/// graph LR
478///     T("generateIndefinitely()")
479///     B["〈🟦〉"] -->|trigger| T
480///         
481///     T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
482///     
483///     
484///     style B fill:#ffff,stroke:#ffff
485///     style S fill:#ffff,stroke:#ffff
486/// ```
487#[mel_treatment(
488    generic T ()
489    input trigger Block<void>
490    output stream Stream<T>
491)]
492pub async fn generate_indefinitely(data: T) {
493    if let Ok(_) = trigger.recv_one().await {
494        const CHUNK: usize = 2usize.pow(20);
495        loop {
496            let mut transmission = melodium_core::TransmissionValue::new(data.clone());
497            for _ in 1..CHUNK {
498                transmission.push(data.clone());
499            }
500            check!(stream.send_many(transmission).await);
501        }
502    }
503}
504
505/// Insert a block into a stream.
506///
507/// `block` is inserted into `stream` when it comes and everything is streamed to `output`.
508///
509/// ℹ️ No assumption on block insertion position in stream can be made.
510///
511/// ```mermaid
512/// graph LR
513///     T("insert()")
514///     A["… 🟦 🟦 🟦 🟦 …"] -->|stream| T
515///     B["〈🟧〉"] -->|block| T
516///     
517///
518///     T -->|output| V["… 🟦 🟧 🟦 🟦 🟦 …"]
519///
520///     style V fill:#ffff,stroke:#ffff
521///     style A fill:#ffff,stroke:#ffff
522///     style B fill:#ffff,stroke:#ffff
523/// ```
524#[mel_treatment(
525    generic T ()
526    input stream Stream<T>
527    input block Block<T>
528    output output Stream<T>
529)]
530pub async fn insert() {
531    let streaming = async {
532        while let Ok(values) = (&stream).recv_many().await {
533            check!(output.send_many(values).await);
534        }
535    }
536    .fuse();
537    let insert_block = async {
538        if let Ok(val) = (&block).recv_one().await {
539            let _ = output.send_one(val).await;
540        }
541    }
542    .fuse();
543
544    pin_mut!(streaming, insert_block);
545
546    loop {
547        select! {
548            () = streaming => {},
549            () = insert_block => {},
550            complete => break,
551        };
552    }
553}
554
555/// Merge two incoming blocks as a stream.
556///
557/// Each block is taken when it arrives and send through `stream`.
558///
559/// ℹ️ No priority on blocks order in stream can be assumed.
560///
561/// ```mermaid
562/// graph LR
563///     T("flock()")
564///     A["〈🟦〉"] -->|a| T
565///     B["〈🟧〉"] -->|b| T
566///     
567///
568///     T -->|stream| V["🟧 🟦"]
569///
570///     style V fill:#ffff,stroke:#ffff
571///     style A fill:#ffff,stroke:#ffff
572///     style B fill:#ffff,stroke:#ffff
573/// ```
574#[mel_treatment(
575    generic T ()
576    input a Block<T>
577    input b Block<T>
578    output stream Stream<T>
579)]
580pub async fn flock() {
581    let xa = async {
582        if let Ok(a) = (&a).recv_one().await {
583            let _ = stream.send_one(a).await;
584        }
585    }
586    .fuse();
587    let xb = async {
588        if let Ok(b) = (&b).recv_one().await {
589            let _ = stream.send_one(b).await;
590        }
591    }
592    .fuse();
593
594    pin_mut!(xa, xb);
595
596    loop {
597        select! {
598            () = xa => {},
599            () = xb => {},
600            complete => break,
601        };
602    }
603}
604
605/// Emit one block.
606///
607/// Take first block coming among `a` or `b` and emit it in `value`, ignoring the remaining one.
608///
609/// ℹ️ No priority between blocks can be assumed if they are ready at same moment.
610///
611/// ```mermaid
612/// graph LR
613///     T("one()")
614///     A["…"] -->|a| T
615///     B["〈🟧〉"] -->|b| T
616///     
617///
618///     T -->|value| V["〈🟧〉"]
619///
620///     style V fill:#ffff,stroke:#ffff
621///     style A fill:#ffff,stroke:#ffff
622///     style B fill:#ffff,stroke:#ffff
623/// ```
624#[mel_treatment(
625    generic T ()
626    input a Block<T>
627    input b Block<T>
628    output value Block<T>
629)]
630pub async fn one() {
631    let xa = async { (&a).recv_one().await.ok() }.fuse();
632    let xb = async { (&b).recv_one().await.ok() }.fuse();
633
634    pin_mut!(xa, xb);
635
636    loop {
637        let val = select! {
638            val = xa => val,
639            val = xb => val,
640            complete => break,
641        };
642
643        if let Some(val) = val {
644            let _ = value.send_one(val).await;
645            break;
646        }
647    }
648}