flow_mel/void.rs
1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Chain two streams of `void`.
5///
6///
7/// ```mermaid
8/// graph LR
9/// T("chain()")
10/// A["🟨 🟨 🟨 🟨 🟨 🟨"] -->|first| T
11/// B["… 🟪 🟪 🟪"] -->|second| T
12///
13/// T -->|chained| O["… 🟪 🟪 🟪 🟨 🟨 🟨 🟨 🟨 🟨"]
14///
15/// style A fill:#ffff,stroke:#ffff
16/// style B fill:#ffff,stroke:#ffff
17/// style O fill:#ffff,stroke:#ffff
18/// ```
19#[mel_treatment(
20 input first Stream<void>
21 input second Stream<void>
22 output chained Stream<void>
23)]
24pub async fn chain() {
25 while let Ok(values) = first.recv_void().await {
26 check!(chained.send_void(values).await)
27 }
28
29 while let Ok(values) = second.recv_void().await {
30 check!(chained.send_void(values).await)
31 }
32}
33
34/// Trigger on `void` stream start and end.
35///
36/// Emit `start` when a first value is send through the stream.
37/// Emit `end` when stream is finally over.
38///
39/// Emit `first` with the first value coming in the stream.
40/// Emit `last` with the last value coming in the stream.
41///
42/// ℹ️ `start` and `first` are always emitted together.
43/// If the stream only contains one element, `first` and `last` both contains it.
44/// If the stream never transmit any data before being ended, only `end` is emitted.
45///
46/// ```mermaid
47/// graph LR
48/// T("trigger()")
49/// B["🟥 … 🟨 🟨 🟨 🟨 🟨 🟨 … 🟩"] -->|stream| T
50///
51/// T -->|start| S["〈🟦〉"]
52/// T -->|first| F["〈🟩〉"]
53/// T -->|last| L["〈🟥〉"]
54/// T -->|end| E["〈🟦〉"]
55///
56/// style B fill:#ffff,stroke:#ffff
57/// style S fill:#ffff,stroke:#ffff
58/// style F fill:#ffff,stroke:#ffff
59/// style L fill:#ffff,stroke:#ffff
60/// style E fill:#ffff,stroke:#ffff
61/// ```
62#[mel_treatment(
63 input stream Stream<void>
64 output start Block<void>
65 output end Block<void>
66 output first Block<void>
67 output last Block<void>
68)]
69pub async fn trigger() {
70 let mut last_value = None;
71
72 if let Ok(values) = stream.recv_void().await {
73 let _ = start.send_one_void(()).await;
74 if let Some(val) = values.first().cloned() {
75 let _ = first.send_one_void(val).await;
76 }
77 last_value = values.last().cloned();
78 let _ = futures::join!(start.close(), first.close());
79 }
80
81 while let Ok(values) = stream.recv_void().await {
82 last_value = values.last().cloned();
83 }
84
85 let _ = end.send_one_void(()).await;
86 if let Some(val) = last_value {
87 let _ = last.send_one_void(val).await;
88 }
89
90 // We don't close `end` and `last` explicitly here,
91 // because it would be redundant with boilerplate
92 // implementation of treatments.
93}
94
95/// Stream a block `void` value.
96///
97/// ```mermaid
98/// graph LR
99/// T("stream()")
100/// B["〈🟦〉"] -->|block| T
101///
102/// T -->|stream| S["🟦"]
103///
104///
105/// style B fill:#ffff,stroke:#ffff
106/// style S fill:#ffff,stroke:#ffff
107/// ```
108#[mel_treatment(
109 input block Block<void>
110 output stream Stream<void>
111)]
112pub async fn stream() {
113 if let Ok(val) = block.recv_one_void().await {
114 let _ = stream.send_one_void(val).await;
115 }
116}
117
118/// Gives count of elements passing through stream.
119///
120/// This count increment one for each element within the stream, starting at 1.
121///
122/// ```mermaid
123/// graph LR
124/// T("count()")
125/// V["🟦 🟦 🟦 …"] -->|iter| T
126///
127/// T -->|count| P["1️⃣ 2️⃣ 3️⃣ …"]
128///
129/// style V fill:#ffff,stroke:#ffff
130/// style P fill:#ffff,stroke:#ffff
131/// ```
132#[mel_treatment(
133 input stream Stream<void>
134 output count Stream<u128>
135)]
136pub async fn count() {
137 let mut i: u128 = 0;
138 while let Ok(iter) = stream.recv_void().await {
139 let next_i = i + iter.len() as u128;
140 check!(count.send_u128((i..next_i).collect()).await);
141 i = next_i;
142 }
143}
144
145/// Generate a stream of `void` according to a length.
146///
147/// ```mermaid
148/// graph LR
149/// T("generate()")
150/// B["〈🟨〉"] -->|length| T
151///
152/// T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
153///
154///
155/// style B fill:#ffff,stroke:#ffff
156/// style S fill:#ffff,stroke:#ffff
157/// ```
158#[mel_treatment(
159 input length Block<u128>
160 output stream Stream<void>
161)]
162pub async fn generate() {
163 if let Ok(length) = length.recv_one_u128().await {
164 const CHUNK: u128 = 2u128.pow(20);
165 let mut total = 0u128;
166 while total < length {
167 let chunk = u128::min(CHUNK, length - total) as usize;
168 check!(stream.send_void(vec![(); chunk]).await);
169 total += chunk as u128;
170 }
171 }
172}
173
174/// Generate a stream of `void` indefinitely.
175///
176/// This generates a continuous stream of `void`, until stream consumers closes it.
177///
178/// ```mermaid
179/// graph LR
180/// T("generateIndefinitely()")
181/// B["〈🟦〉"] -->|trigger| T
182///
183/// T -->|stream| S["… 🟦 🟦 🟦 🟦 🟦 🟦"]
184///
185///
186/// style B fill:#ffff,stroke:#ffff
187/// style S fill:#ffff,stroke:#ffff
188/// ```
189#[mel_treatment(
190 input trigger Block<void>
191 output stream Stream<void>
192)]
193pub async fn generate_indefinitely() {
194 if let Ok(_) = trigger.recv_one_void().await {
195 const CHUNK: usize = 2usize.pow(20);
196 loop {
197 check!(stream.send_void(vec![(); CHUNK]).await);
198 }
199 }
200}