flow_mel/
void.rs

1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Chain two streams of `void`.
5///
6///
7/// ```mermaid
8/// graph LR
9///     T("chain()")
10///     A["🟨 🟨 🟨 🟨 🟨 🟨"] -->|first| T
11///     B["… 🟪 🟪 🟪"] -->|second| T
12///     
13///     T -->|chained| O["… 🟪 🟪 🟪 🟨 🟨 🟨 🟨 🟨 🟨"]
14///
15///     style A fill:#ffff,stroke:#ffff
16///     style B fill:#ffff,stroke:#ffff
17///     style O fill:#ffff,stroke:#ffff
18/// ```
19#[mel_treatment(
20    input first Stream<void>
21    input second Stream<void>
22    output chained Stream<void>
23)]
24pub async fn chain() {
25    while let Ok(values) = first.recv_void().await {
26        check!(chained.send_void(values).await)
27    }
28
29    while let Ok(values) = second.recv_void().await {
30        check!(chained.send_void(values).await)
31    }
32}
33
34/// Trigger on `void` stream start and end.
35///
36/// Emit `start` when a first value is send through the stream.
37/// Emit `end` when stream is finally over.
38///
39/// Emit `first` with the first value coming in the stream.
40/// Emit `last` with the last value coming in the stream.
41///
42/// ℹ️ `start` and `first` are always emitted together.
43/// If the stream only contains one element, `first` and `last` both contains it.
44/// If the stream never transmit any data before being ended, only `end` is emitted.
45///
46/// ```mermaid
47/// graph LR
48///     T("trigger()")
49///     B["🟥 … 🟨 🟨 🟨 🟨 🟨 🟨 … 🟩"] -->|stream| T
50///     
51///     T -->|start| S["〈🟦〉"]
52///     T -->|first| F["〈🟩〉"]
53///     T -->|last| L["〈🟥〉"]
54///     T -->|end| E["〈🟦〉"]
55///
56///     style B fill:#ffff,stroke:#ffff
57///     style S fill:#ffff,stroke:#ffff
58///     style F fill:#ffff,stroke:#ffff
59///     style L fill:#ffff,stroke:#ffff
60///     style E fill:#ffff,stroke:#ffff
61/// ```
62#[mel_treatment(
63    input stream Stream<void>
64    output start Block<void>
65    output end Block<void>
66    output first Block<void>
67    output last Block<void>
68)]
69pub async fn trigger() {
70    let mut last_value = None;
71
72    if let Ok(values) = stream.recv_void().await {
73        let _ = start.send_one_void(()).await;
74        if let Some(val) = values.first().cloned() {
75            let _ = first.send_one_void(val).await;
76        }
77        last_value = values.last().cloned();
78        let _ = futures::join!(start.close(), first.close());
79    }
80
81    while let Ok(values) = stream.recv_void().await {
82        last_value = values.last().cloned();
83    }
84
85    let _ = end.send_one_void(()).await;
86    if let Some(val) = last_value {
87        let _ = last.send_one_void(val).await;
88    }
89
90    // We don't close `end` and `last` explicitly here,
91    // because it would be redundant with boilerplate
92    // implementation of treatments.
93}
94
95/// Stream a block `void` value.
96///
97/// ```mermaid
98/// graph LR
99///     T("stream()")
100///     B["〈🟦〉"] -->|block| T
101///         
102///     T -->|stream| S["🟦"]
103///     
104///     
105///     style B fill:#ffff,stroke:#ffff
106///     style S fill:#ffff,stroke:#ffff
107/// ```
108#[mel_treatment(
109    input block Block<void>
110    output stream Stream<void>
111)]
112pub async fn stream() {
113    if let Ok(val) = block.recv_one_void().await {
114        let _ = stream.send_one_void(val).await;
115    }
116}
117
118/// Gives count of elements passing through stream.
119///
120/// This count increment one for each element within the stream, starting at 1.
121///
122/// ```mermaid
123/// graph LR
124///     T("count()")
125///     V["🟦 🟦 🟦 …"] -->|iter| T
126///     
127///     T -->|count| P["1️⃣ 2️⃣ 3️⃣ …"]
128///
129///     style V fill:#ffff,stroke:#ffff
130///     style P fill:#ffff,stroke:#ffff
131/// ```
132#[mel_treatment(
133    input stream Stream<void>
134    output count Stream<u128>
135)]
136pub async fn count() {
137    let mut i: u128 = 0;
138    while let Ok(iter) = stream.recv_void().await {
139        let next_i = i + iter.len() as u128;
140        check!(count.send_u128((i..next_i).collect()).await);
141        i = next_i;
142    }
143}
144
145/// Generate a stream of `void` according to a length.
146///
147/// ```mermaid
148/// graph LR
149///     T("generate()")
150///     B["〈🟨〉"] -->|length| T
151///         
152///     T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
153///     
154///     
155///     style B fill:#ffff,stroke:#ffff
156///     style S fill:#ffff,stroke:#ffff
157/// ```
158#[mel_treatment(
159    input length Block<u128>
160    output stream Stream<void>
161)]
162pub async fn generate() {
163    if let Ok(length) = length.recv_one_u128().await {
164        const CHUNK: u128 = 2u128.pow(20);
165        let mut total = 0u128;
166        while total < length {
167            let chunk = u128::min(CHUNK, length - total) as usize;
168            check!(stream.send_void(vec![(); chunk]).await);
169            total += chunk as u128;
170        }
171    }
172}
173
174/// Generate a stream of `void` indefinitely.
175///
176/// This generates a continuous stream of `void`, until stream consumers closes it.
177///
178/// ```mermaid
179/// graph LR
180///     T("generateIndefinitely()")
181///     B["〈🟦〉"] -->|trigger| T
182///         
183///     T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
184///     
185///     
186///     style B fill:#ffff,stroke:#ffff
187///     style S fill:#ffff,stroke:#ffff
188/// ```
189#[mel_treatment(
190    input trigger Block<void>
191    output stream Stream<void>
192)]
193pub async fn generate_indefinitely() {
194    if let Ok(_) = trigger.recv_one_void().await {
195        const CHUNK: usize = 2usize.pow(20);
196        loop {
197            check!(stream.send_void(vec![(); CHUNK]).await);
198        }
199    }
200}