flow_mel/string.rs
1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Chain two streams of `string`.
5///
6///
7/// ```mermaid
8/// graph LR
9/// T("chain()")
10/// A["🟨 🟨 🟨 🟨 🟨 🟨"] -->|first| T
11/// B["… 🟪 🟪 🟪"] -->|second| T
12///
13/// T -->|chained| O["… 🟪 🟪 🟪 🟨 🟨 🟨 🟨 🟨 🟨"]
14///
15/// style A fill:#ffff,stroke:#ffff
16/// style B fill:#ffff,stroke:#ffff
17/// style O fill:#ffff,stroke:#ffff
18/// ```
19#[mel_treatment(
20 input first Stream<string>
21 input second Stream<string>
22 output chained Stream<string>
23)]
24pub async fn chain() {
25 while let Ok(values) = first.recv_string().await {
26 check!(chained.send_string(values).await)
27 }
28
29 while let Ok(values) = second.recv_string().await {
30 check!(chained.send_string(values).await)
31 }
32}
33
34/// Trigger on `string` stream start and end.
35///
36/// Emit `start` when a first value is send through the stream.
37/// Emit `end` when stream is finally over.
38///
39/// Emit `first` with the first value coming in the stream.
40/// Emit `last` with the last value coming in the stream.
41///
42/// ℹ️ `start` and `first` are always emitted together.
43/// If the stream only contains one element, `first` and `last` both contains it.
44/// If the stream never transmit any data before being ended, only `end` is emitted.
45///
46/// ```mermaid
47/// graph LR
48/// T("trigger()")
49/// B["🟥 … 🟨 🟨 🟨 🟨 🟨 🟨 … 🟩"] -->|stream| T
50///
51/// T -->|start| S["〈🟦〉"]
52/// T -->|first| F["〈🟩〉"]
53/// T -->|last| L["〈🟥〉"]
54/// T -->|end| E["〈🟦〉"]
55///
56/// style B fill:#ffff,stroke:#ffff
57/// style S fill:#ffff,stroke:#ffff
58/// style F fill:#ffff,stroke:#ffff
59/// style L fill:#ffff,stroke:#ffff
60/// style E fill:#ffff,stroke:#ffff
61/// ```
62#[mel_treatment(
63 input stream Stream<string>
64 output start Block<void>
65 output end Block<void>
66 output first Block<string>
67 output last Block<string>
68)]
69pub async fn trigger() {
70 let mut last_value = None;
71
72 if let Ok(values) = stream.recv_string().await {
73 let _ = start.send_one_void(()).await;
74 if let Some(val) = values.first().cloned() {
75 let _ = first.send_one_string(val).await;
76 }
77 last_value = values.last().cloned();
78 let _ = futures::join!(start.close(), first.close());
79 }
80
81 while let Ok(values) = stream.recv_string().await {
82 last_value = values.last().cloned();
83 }
84
85 let _ = end.send_one_void(()).await;
86 if let Some(val) = last_value {
87 let _ = last.send_one_string(val).await;
88 }
89
90 // We don't close `end` and `last` explicitly here,
91 // because it would be redundant with boilerplate
92 // implementation of treatments.
93}
94
95/// Stream a block `string` value.
96///
97/// ```mermaid
98/// graph LR
99/// T("stream()")
100/// B["〈🟦〉"] -->|block| T
101///
102/// T -->|stream| S["🟦"]
103///
104///
105/// style B fill:#ffff,stroke:#ffff
106/// style S fill:#ffff,stroke:#ffff
107/// ```
108#[mel_treatment(
109 input block Block<string>
110 output stream Stream<string>
111)]
112pub async fn stream() {
113 if let Ok(val) = block.recv_one_string().await {
114 let _ = stream.send_one_string(val).await;
115 }
116}
117
118/// Merge two streams of `string`.
119///
120/// The two streams are merged using the `select` stream:
121/// - when `true`, value from `a` is used;
122/// - when `false`, value from `b` is used.
123///
124/// ℹ️ No value from either `a` or `b` are discarded, they are used when `select` give turn.
125///
126/// ⚠️ When `select` ends merge terminates without treating the remaining values from `a` and `b`.
127/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
128/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
129///
130/// ```mermaid
131/// graph LR
132/// T("merge()")
133/// A["… 🟦 🟫 …"] -->|a| T
134/// B["… 🟧 🟪 🟨 …"] -->|b| T
135/// O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
136///
137///
138/// T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
139///
140/// style V fill:#ffff,stroke:#ffff
141/// style O fill:#ffff,stroke:#ffff
142/// style A fill:#ffff,stroke:#ffff
143/// style B fill:#ffff,stroke:#ffff
144/// ```
145#[mel_treatment(
146 input a Stream<string>
147 input b Stream<string>
148 input select Stream<bool>
149 output value Stream<string>
150)]
151pub async fn merge() {
152 while let Ok(select) = select.recv_one_bool().await {
153 let val;
154 if select {
155 if let Ok(v) = a.recv_one_string().await {
156 val = v;
157 } else {
158 break;
159 }
160 } else {
161 if let Ok(v) = b.recv_one_string().await {
162 val = v;
163 } else {
164 break;
165 }
166 }
167
168 check!(value.send_one_string(val).await)
169 }
170}
171
172/// Fill a pattern stream with a `string` value.
173///
174/// ```mermaid
175/// graph LR
176/// T("fill(value=🟧)")
177/// B["… 🟦 🟦 🟦 …"] -->|pattern| T
178///
179/// T -->|filled| O["… 🟧 🟧 🟧 …"]
180///
181/// style B fill:#ffff,stroke:#ffff
182/// style O fill:#ffff,stroke:#ffff
183/// ```
184#[mel_treatment(
185 default value ""
186 input pattern Stream<void>
187 output filled Stream<string>
188)]
189pub async fn fill(value: string) {
190 while let Ok(pat) = pattern.recv_void().await {
191 check!(filled.send_string(vec![value.clone(); pat.len()]).await)
192 }
193}
194
195/// Filter a `string` stream according to `bool` stream.
196///
197/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
198///
199/// ```mermaid
200/// graph LR
201/// T("filter()")
202/// V["… 🟦 🟧 🟪 🟫 🟨 …"] -->|value| T
203/// D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
204///
205/// T -->|accepted| A["… 🟦 🟫 …"]
206/// T -->|rejected| R["… 🟧 🟪 🟨 …"]
207///
208/// style V fill:#ffff,stroke:#ffff
209/// style D fill:#ffff,stroke:#ffff
210/// style A fill:#ffff,stroke:#ffff
211/// style R fill:#ffff,stroke:#ffff
212/// ```
213#[mel_treatment(
214 input value Stream<string>
215 input select Stream<bool>
216 output accepted Stream<string>
217 output rejected Stream<string>
218)]
219pub async fn filter() {
220 let mut accepted_op = true;
221 let mut rejected_op = true;
222
223 while let (Ok(value), Ok(select)) =
224 futures::join!(value.recv_one_string(), select.recv_one_bool())
225 {
226 if select {
227 if let Err(_) = accepted.send_one_string(value).await {
228 // If we cannot send anymore on accepted, we note it,
229 // and check if rejected is still valid, else just terminate.
230 accepted_op = false;
231 if !rejected_op {
232 break;
233 }
234 }
235 } else {
236 if let Err(_) = rejected.send_one_string(value).await {
237 // If we cannot send anymore on rejected, we note it,
238 // and check if accepted is still valid, else just terminate.
239 rejected_op = false;
240 if !accepted_op {
241 break;
242 }
243 }
244 }
245 }
246}
247
248/// Fit a stream of `string` into a pattern.
249///
250/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
251///
252/// ```mermaid
253/// graph LR
254/// T("fit()")
255/// A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
256/// B["🟦 🟦 🟦 🟦"] -->|pattern| T
257///
258/// T -->|fitted| O["🟨 🟨 🟨 🟨"]
259///
260/// style A fill:#ffff,stroke:#ffff
261/// style B fill:#ffff,stroke:#ffff
262/// style O fill:#ffff,stroke:#ffff
263/// ```
264#[mel_treatment(
265 input value Stream<string>
266 input pattern Stream<void>
267 output fitted Stream<string>
268)]
269pub async fn fit() {
270 'main: while let Ok(pattern) = pattern.recv_void().await {
271 for _ in pattern {
272 if let Ok(val) = value.recv_one_string().await {
273 check!('main, fitted.send_one_string(val).await)
274 } else {
275 break 'main;
276 }
277 }
278 }
279}
280
281/// Emit a block `string` value.
282///
283/// When `trigger` is enabled, `value` is emitted as block.
284///
285/// ```mermaid
286/// graph LR
287/// T("emit(value=🟨)")
288/// B["〈🟦〉"] -->|trigger| T
289///
290/// T -->|emit| S["〈🟨〉"]
291///
292/// style B fill:#ffff,stroke:#ffff
293/// style S fill:#ffff,stroke:#ffff
294/// ```
295#[mel_treatment(
296 input trigger Block<void>
297 output emit Block<string>
298)]
299pub async fn emit(value: string) {
300 if let Ok(_) = trigger.recv_one_void().await {
301 let _ = emit.send_one_string(value).await;
302 }
303}