flow_mel/
i16.rs

1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Chain two streams of `i16`.
5///
6///
7/// ```mermaid
8/// graph LR
9///     T("chain()")
10///     A["🟨 🟨 🟨 🟨 🟨 🟨"] -->|first| T
11///     B["… 🟪 🟪 🟪"] -->|second| T
12///     
13///     T -->|chained| O["… 🟪 🟪 🟪 🟨 🟨 🟨 🟨 🟨 🟨"]
14///
15///     style A fill:#ffff,stroke:#ffff
16///     style B fill:#ffff,stroke:#ffff
17///     style O fill:#ffff,stroke:#ffff
18/// ```
19#[mel_treatment(
20    input first Stream<i16>
21    input second Stream<i16>
22    output chained Stream<i16>
23)]
24pub async fn chain() {
25    while let Ok(values) = first.recv_i16().await {
26        check!(chained.send_i16(values).await)
27    }
28
29    while let Ok(values) = second.recv_i16().await {
30        check!(chained.send_i16(values).await)
31    }
32}
33
34/// Trigger on `i16` stream start and end.
35///
36/// Emit `start` when a first value is send through the stream.
37/// Emit `end` when stream is finally over.
38///
39/// Emit `first` with the first value coming in the stream.
40/// Emit `last` with the last value coming in the stream.
41///
42/// ℹ️ `start` and `first` are always emitted together.
43/// If the stream only contains one element, `first` and `last` both contains it.
44/// If the stream never transmit any data before being ended, only `end` is emitted.
45///
46/// ```mermaid
47/// graph LR
48///     T("trigger()")
49///     B["🟥 … 🟨 🟨 🟨 🟨 🟨 🟨 … 🟩"] -->|stream| T
50///     
51///     T -->|start| S["〈🟦〉"]
52///     T -->|first| F["〈🟩〉"]
53///     T -->|last| L["〈🟥〉"]
54///     T -->|end| E["〈🟦〉"]
55///
56///     style B fill:#ffff,stroke:#ffff
57///     style S fill:#ffff,stroke:#ffff
58///     style F fill:#ffff,stroke:#ffff
59///     style L fill:#ffff,stroke:#ffff
60///     style E fill:#ffff,stroke:#ffff
61/// ```
62#[mel_treatment(
63    input stream Stream<i16>
64    output start Block<void>
65    output end Block<void>
66    output first Block<i16>
67    output last Block<i16>
68)]
69pub async fn trigger() {
70    let mut last_value = None;
71
72    if let Ok(values) = stream.recv_i16().await {
73        let _ = start.send_one_void(()).await;
74        if let Some(val) = values.first().cloned() {
75            let _ = first.send_one_i16(val).await;
76        }
77        last_value = values.last().cloned();
78        let _ = futures::join!(start.close(), first.close());
79    }
80
81    while let Ok(values) = stream.recv_i16().await {
82        last_value = values.last().cloned();
83    }
84
85    let _ = end.send_one_void(()).await;
86    if let Some(val) = last_value {
87        let _ = last.send_one_i16(val).await;
88    }
89
90    // We don't close `end` and `last` explicitly here,
91    // because it would be redundant with boilerplate
92    // implementation of treatments.
93}
94
95/// Stream a block `i16` value.
96///
97/// ```mermaid
98/// graph LR
99///     T("stream()")
100///     B["〈🟦〉"] -->|block| T
101///         
102///     T -->|stream| S["🟦"]
103///     
104///     
105///     style B fill:#ffff,stroke:#ffff
106///     style S fill:#ffff,stroke:#ffff
107/// ```
108#[mel_treatment(
109    input block Block<i16>
110    output stream Stream<i16>
111)]
112pub async fn stream() {
113    if let Ok(val) = block.recv_one_i16().await {
114        let _ = stream.send_one_i16(val).await;
115    }
116}
117
118/// Merge two streams of `i16`.
119///
120/// The two streams are merged using the `select` stream:
121/// - when `true`, value from `a` is used;
122/// - when `false`, value from `b` is used.
123///
124/// ℹ️ No value from either `a` or `b` are discarded, they are used when `select` give turn.
125///
126/// ⚠️ When `select` ends merge terminates without treating the remaining values from `a` and `b`.
127/// When `select` give turn to `a` or `b` while the concerned stream is ended, the merge terminates.
128/// Merge continues as long as `select` and concerned stream does, while the other can be ended.
129///
130/// ```mermaid
131/// graph LR
132///     T("merge()")
133///     A["… 🟦 🟫 …"] -->|a| T
134///     B["… 🟧 🟪 🟨 …"] -->|b| T
135///     O["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
136///     
137///
138///     T -->|value| V["… 🟦 🟧 🟪 🟫 🟨 …"]
139///
140///     style V fill:#ffff,stroke:#ffff
141///     style O fill:#ffff,stroke:#ffff
142///     style A fill:#ffff,stroke:#ffff
143///     style B fill:#ffff,stroke:#ffff
144/// ```
145#[mel_treatment(
146    input a Stream<i16>
147    input b Stream<i16>
148    input select Stream<bool>
149    output value Stream<i16>
150)]
151pub async fn merge() {
152    while let Ok(select) = select.recv_one_bool().await {
153        let val;
154        if select {
155            if let Ok(v) = a.recv_one_i16().await {
156                val = v;
157            } else {
158                break;
159            }
160        } else {
161            if let Ok(v) = b.recv_one_i16().await {
162                val = v;
163            } else {
164                break;
165            }
166        }
167
168        check!(value.send_one_i16(val).await)
169    }
170}
171
172/// Fill a pattern stream with a `i16` value.
173///
174/// ```mermaid
175/// graph LR
176/// T("fill(value=🟧)")
177/// B["… 🟦 🟦 🟦 …"] -->|pattern| T
178///
179/// T -->|filled| O["… 🟧 🟧 🟧 …"]
180///
181/// style B fill:#ffff,stroke:#ffff
182/// style O fill:#ffff,stroke:#ffff
183/// ```
184#[mel_treatment(
185    default value 0
186    input pattern Stream<void>
187    output filled Stream<i16>
188)]
189pub async fn fill(value: i16) {
190    while let Ok(pat) = pattern.recv_void().await {
191        check!(filled.send_i16(vec![value.clone(); pat.len()]).await)
192    }
193}
194
195/// Filter a `i16` stream according to `bool` stream.
196///
197/// ℹ️ If both streams are not the same size nothing is sent through accepted nor rejected.
198///  
199/// ```mermaid
200/// graph LR
201///     T("filter()")
202///     V["… 🟦 🟧 🟪 🟫 🟨 …"] -->|value| T
203///     D["… 🟩 🟥 🟥 🟩 🟥 …"] -->|select|T
204///     
205///     T -->|accepted| A["… 🟦 🟫 …"]
206///     T -->|rejected| R["… 🟧 🟪 🟨 …"]
207///
208///     style V fill:#ffff,stroke:#ffff
209///     style D fill:#ffff,stroke:#ffff
210///     style A fill:#ffff,stroke:#ffff
211///     style R fill:#ffff,stroke:#ffff
212/// ```
213#[mel_treatment(
214    input value Stream<i16>
215    input select Stream<bool>
216    output accepted Stream<i16>
217    output rejected Stream<i16>
218)]
219pub async fn filter() {
220    let mut accepted_op = true;
221    let mut rejected_op = true;
222
223    while let (Ok(value), Ok(select)) = futures::join!(value.recv_one_i16(), select.recv_one_bool())
224    {
225        if select {
226            if let Err(_) = accepted.send_one_i16(value).await {
227                // If we cannot send anymore on accepted, we note it,
228                // and check if rejected is still valid, else just terminate.
229                accepted_op = false;
230                if !rejected_op {
231                    break;
232                }
233            }
234        } else {
235            if let Err(_) = rejected.send_one_i16(value).await {
236                // If we cannot send anymore on rejected, we note it,
237                // and check if accepted is still valid, else just terminate.
238                rejected_op = false;
239                if !accepted_op {
240                    break;
241                }
242            }
243        }
244    }
245}
246
247/// Fit a stream of `i16` into a pattern.
248///
249/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
250///
251/// ```mermaid
252/// graph LR
253///     T("fit()")
254///     A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
255///     B["🟦 🟦 🟦 🟦"] -->|pattern| T
256///     
257///     T -->|fitted| O["🟨 🟨 🟨 🟨"]
258///
259///     style A fill:#ffff,stroke:#ffff
260///     style B fill:#ffff,stroke:#ffff
261///     style O fill:#ffff,stroke:#ffff
262/// ```
263#[mel_treatment(
264    input value Stream<i16>
265    input pattern Stream<void>
266    output fitted Stream<i16>
267)]
268pub async fn fit() {
269    'main: while let Ok(pattern) = pattern.recv_void().await {
270        for _ in pattern {
271            if let Ok(val) = value.recv_one_i16().await {
272                check!('main, fitted.send_one_i16(val).await)
273            } else {
274                break 'main;
275            }
276        }
277    }
278}
279
280/// Emit a block `i16` value.
281///
282/// When `trigger` is enabled, `value` is emitted as block.
283///
284/// ```mermaid
285/// graph LR
286///     T("emit(value=🟨)")
287///     B["〈🟦〉"] -->|trigger| T
288///         
289///     T -->|emit| S["〈🟨〉"]
290///     
291///     style B fill:#ffff,stroke:#ffff
292///     style S fill:#ffff,stroke:#ffff
293/// ```
294#[mel_treatment(
295    input trigger Block<void>
296    output emit Block<i16>
297)]
298pub async fn emit(value: i16) {
299    if let Ok(_) = trigger.recv_one_void().await {
300        let _ = emit.send_one_i16(value).await;
301    }
302}