flow_mel/
string.rs

1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Chain two streams of `string`.
5///
6///
7/// ```mermaid
8/// graph LR
9///     T("chain()")
10///     A["🟨 🟨 🟨 🟨 🟨 🟨"] -->|first| T
11///     B["… 🟪 🟪 🟪"] -->|second| T
12///     
13///     T -->|chained| O["… 🟪 🟪 🟪 🟨 🟨 🟨 🟨 🟨 🟨"]
14///
15///     style A fill:#ffff,stroke:#ffff
16///     style B fill:#ffff,stroke:#ffff
17///     style O fill:#ffff,stroke:#ffff
18/// ```
19#[mel_treatment(
20    input first Stream<string>
21    input second Stream<string>
22    output chained Stream<string>
23)]
24pub async fn chain() {
25    while let Ok(values) = first.recv_string().await {
26        check!(chained.send_string(values).await)
27    }
28
29    while let Ok(values) = second.recv_string().await {
30        check!(chained.send_string(values).await)
31    }
32}
33
34/// Trigger on `string` stream start and end.
35///
36/// Emit `start` when a first value is send through the stream.
37/// Emit `end` when stream is finally over.
38///
39/// Emit `first` with the first value coming in the stream.
40/// Emit `last` with the last value coming in the stream.
41///
42/// ℹ️ `start` and `first` are always emitted together.
43/// If the stream only contains one element, `first` and `last` both contains it.
44/// If the stream never transmit any data before being ended, only `end` is emitted.
45///
46/// ```mermaid
47/// graph LR
48///     T("trigger()")
49///     B["🟥 … 🟨 🟨 🟨 🟨 🟨 🟨 … 🟩"] -->|stream| T
50///     
51///     T -->|start| S["〈🟦〉"]
52///     T -->|first| F["〈🟩〉"]
53///     T -->|last| L["〈🟥〉"]
54///     T -->|end| E["〈🟦〉"]
55///
56///     style B fill:#ffff,stroke:#ffff
57///     style S fill:#ffff,stroke:#ffff
58///     style F fill:#ffff,stroke:#ffff
59///     style L fill:#ffff,stroke:#ffff
60///     style E fill:#ffff,stroke:#ffff
61/// ```
62#[mel_treatment(
63    input stream Stream<string>
64    output start Block<void>
65    output end Block<void>
66    output first Block<string>
67    output last Block<string>
68)]
69pub async fn trigger() {
70    let mut last_value = None;
71
72    if let Ok(values) = stream.recv_string().await {
73        let _ = start.send_one_void(()).await;
74        if let Some(val) = values.first().cloned() {
75            let _ = first.send_one_string(val).await;
76        }
77        last_value = values.last().cloned();
78        let _ = futures::join!(start.close(), first.close());
79    }
80
81    while let Ok(values) = stream.recv_string().await {
82        last_value = values.last().cloned();
83    }
84
85    let _ = end.send_one_void(()).await;
86    if let Some(val) = last_value {
87        let _ = last.send_one_string(val).await;
88    }
89
90    // We don't close `end` and `last` explicitly here,
91    // because it would be redundant with boilerplate
92    // implementation of treatments.
93}
94
95/// Stream a block `string` value.
96///
97/// ```mermaid
98/// graph LR
99///     T("stream()")
100///     B["〈🟦〉"] -->|block| T
101///         
102///     T -->|stream| S["🟦"]
103///     
104///     
105///     style B fill:#ffff,stroke:#ffff
106///     style S fill:#ffff,stroke:#ffff
107/// ```
108#[mel_treatment(
109    input block Block<string>
110    output stream Stream<string>
111)]
112pub async fn stream() {
113    if let Ok(val) = block.recv_one_string().await {
114        let _ = stream.send_one_string(val).await;
115    }
116}
117
118/// Merge two streams of `string`.
119///
120/// The two streams are merged using the `select` stream:
121/// - when `true`, value from `a` is used;
122/// - when `false`, value from `b` is used.
123///
124/// ℹ️ No value from either `a` or `b` are discarded, they are used when `select` give turn.
125///
126/// ⚠️ When `select` ends merge terminates without treating the remaining values from `a` and `b`.
127/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
128/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
129///
130/// ```mermaid
131/// graph LR
132///     T("merge()")
133///     A["… 🟦 🟫 …"] -->|a| T
134///     B["… 🟧 🟪 🟨 …"] -->|b| T
135///     O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
136///     
137///
138///     T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
139///
140///     style V fill:#ffff,stroke:#ffff
141///     style O fill:#ffff,stroke:#ffff
142///     style A fill:#ffff,stroke:#ffff
143///     style B fill:#ffff,stroke:#ffff
144/// ```
145#[mel_treatment(
146    input a Stream<string>
147    input b Stream<string>
148    input select Stream<bool>
149    output value Stream<string>
150)]
151pub async fn merge() {
152    while let Ok(select) = select.recv_one_bool().await {
153        let val;
154        if select {
155            if let Ok(v) = a.recv_one_string().await {
156                val = v;
157            } else {
158                break;
159            }
160        } else {
161            if let Ok(v) = b.recv_one_string().await {
162                val = v;
163            } else {
164                break;
165            }
166        }
167
168        check!(value.send_one_string(val).await)
169    }
170}
171
172/// Fill a pattern stream with a `string` value.
173///
174/// ```mermaid
175/// graph LR
176/// T("fill(value=🟧)")
177/// B["… 🟦 🟦 🟦 …"] -->|pattern| T
178///
179/// T -->|filled| O["… 🟧 🟧 🟧 …"]
180///
181/// style B fill:#ffff,stroke:#ffff
182/// style O fill:#ffff,stroke:#ffff
183/// ```
184#[mel_treatment(
185    default value ""
186    input pattern Stream<void>
187    output filled Stream<string>
188)]
189pub async fn fill(value: string) {
190    while let Ok(pat) = pattern.recv_void().await {
191        check!(filled.send_string(vec![value.clone(); pat.len()]).await)
192    }
193}
194
195/// Filter a `string` stream according to `bool` stream.
196///
197/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
198///  
199/// ```mermaid
200/// graph LR
201///     T("filter()")
202///     V["… 🟦 🟧 🟪 🟫 🟨 …"] -->|value| T
203///     D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
204///     
205///     T -->|accepted| A["… 🟦 🟫 …"]
206///     T -->|rejected| R["… 🟧 🟪 🟨 …"]
207///
208///     style V fill:#ffff,stroke:#ffff
209///     style D fill:#ffff,stroke:#ffff
210///     style A fill:#ffff,stroke:#ffff
211///     style R fill:#ffff,stroke:#ffff
212/// ```
213#[mel_treatment(
214    input value Stream<string>
215    input select Stream<bool>
216    output accepted Stream<string>
217    output rejected Stream<string>
218)]
219pub async fn filter() {
220    let mut accepted_op = true;
221    let mut rejected_op = true;
222
223    while let (Ok(value), Ok(select)) =
224        futures::join!(value.recv_one_string(), select.recv_one_bool())
225    {
226        if select {
227            if let Err(_) = accepted.send_one_string(value).await {
228                // If we cannot send anymore on accepted, we note it,
229                // and check if rejected is still valid, else just terminate.
230                accepted_op = false;
231                if !rejected_op {
232                    break;
233                }
234            }
235        } else {
236            if let Err(_) = rejected.send_one_string(value).await {
237                // If we cannot send anymore on rejected, we note it,
238                // and check if accepted is still valid, else just terminate.
239                rejected_op = false;
240                if !accepted_op {
241                    break;
242                }
243            }
244        }
245    }
246}
247
248/// Fit a stream of `string` into a pattern.
249///
250/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
251///
252/// ```mermaid
253/// graph LR
254///     T("fit()")
255///     A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
256///     B["🟦 🟦 🟦 🟦"] -->|pattern| T
257///     
258///     T -->|fitted| O["🟨 🟨 🟨 🟨"]
259///
260///     style A fill:#ffff,stroke:#ffff
261///     style B fill:#ffff,stroke:#ffff
262///     style O fill:#ffff,stroke:#ffff
263/// ```
264#[mel_treatment(
265    input value Stream<string>
266    input pattern Stream<void>
267    output fitted Stream<string>
268)]
269pub async fn fit() {
270    'main: while let Ok(pattern) = pattern.recv_void().await {
271        for _ in pattern {
272            if let Ok(val) = value.recv_one_string().await {
273                check!('main, fitted.send_one_string(val).await)
274            } else {
275                break 'main;
276            }
277        }
278    }
279}
280
281/// Emit a block `string` value.
282///
283/// When `trigger` is enabled, `value` is emitted as block.
284///
285/// ```mermaid
286/// graph LR
287///     T("emit(value=🟨)")
288///     B["〈🟦〉"] -->|trigger| T
289///         
290///     T -->|emit| S["〈🟨〉"]
291///     
292///     style B fill:#ffff,stroke:#ffff
293///     style S fill:#ffff,stroke:#ffff
294/// ```
295#[mel_treatment(
296    input trigger Block<void>
297    output emit Block<string>
298)]
299pub async fn emit(value: string) {
300    if let Ok(_) = trigger.recv_one_void().await {
301        let _ = emit.send_one_string(value).await;
302    }
303}