std_mel/flow/vec.rs
1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Flatten a stream of vector.
5///
6/// All the input vectors are turned into continuous stream of scalar values, keeping order.
7/// ```mermaid
8/// graph LR
9/// T("flatten()")
10/// B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|vector| T
11///
12/// T -->|value| O["🟦 🟦 🟦 🟦 🟦 🟦"]
13///
14/// style B fill:#ffff,stroke:#ffff
15/// style O fill:#ffff,stroke:#ffff
16/// ```
17#[mel_treatment(
18 generic T ()
19 input vector Stream<Vec<T>>
20 output value Stream<T>
21)]
22pub async fn flatten() {
23 'main: while let Ok(mut vectors) = vector
24 .recv_many()
25 .await
26 .map(|values| Into::<VecDeque<Value>>::into(values))
27 {
28 while let Some(vector) = vectors.pop_front().map(|val| match val {
29 Value::Vec(vec) => vec,
30 _ => panic!("Vec expected"),
31 }) {
32 for val in vector {
33 check!('main, value.send_one(val).await)
34 }
35 }
36 }
37}
38
39/// Gives pattern of a stream of vectors.
40///
41/// ```mermaid
42/// graph LR
43/// T("pattern()")
44/// A["…[🟨 🟨][🟨][🟨 🟨 🟨]"] -->|stream| T
45///
46/// T -->|pattern| O["… [🟦 🟦][🟦][🟦 🟦 🟦]"]
47///
48/// style A fill:#ffff,stroke:#ffff
49/// style O fill:#ffff,stroke:#ffff
50/// ```
51#[mel_treatment(
52 generic T ()
53 input stream Stream<Vec<T>>
54 output pattern Stream<Vec<void>>
55)]
56pub async fn pattern() {
57 'main: while let Ok(vectors) = stream
58 .recv_many()
59 .await
60 .map(|values| Into::<VecDeque<Value>>::into(values))
61 {
62 for val in vectors {
63 match val {
64 Value::Vec(vec) => {
65 check!('main, pattern.send_one(vec![(); vec.len()].into()).await)
66 }
67 _ => panic!("Vec expected"),
68 }
69 }
70 }
71}
72
73/// Fit a stream of raw values into stream of vectors using a pattern.
74///
75/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
76/// If there are not enough values to fit the pattern, uncomplete vector is trashed.
77///
78/// ```mermaid
79/// graph LR
80/// T("fit()")
81/// A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
82/// B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|pattern| T
83///
84/// T -->|fitted| O["[🟨 🟨][🟨][🟨 🟨 🟨]"]
85///
86/// style A fill:#ffff,stroke:#ffff
87/// style B fill:#ffff,stroke:#ffff
88/// style O fill:#ffff,stroke:#ffff
89/// ```
90#[mel_treatment(
91 generic T ()
92 input value Stream<T>
93 input pattern Stream<Vec<void>>
94 output fitted Stream<Vec<T>>
95)]
96pub async fn fit() {
97 'main: while let Ok(patterns) = pattern
98 .recv_many()
99 .await
100 .map(|values| Into::<VecDeque<Value>>::into(values))
101 {
102 for pattern in patterns {
103 match pattern {
104 Value::Vec(pattern) => {
105 let mut vector = Vec::with_capacity(pattern.len());
106 for _ in 0..pattern.len() {
107 if let Ok(val) = value.recv_one().await {
108 vector.push(val);
109 } else {
110 // Uncomplete, we 'trash' vector
111 break 'main;
112 }
113 }
114 check!('main, fitted.send_one(vector.into()).await)
115 }
116 _ => panic!("Vec expected"),
117 }
118 }
119 }
120}
121
122/// Fill a pattern stream with a `i64` value.
123///
124/// ```mermaid
125/// graph LR
126/// T("fill(value=🟧)")
127/// B["…[🟦 🟦][🟦][🟦 🟦 🟦]…"] -->|pattern| T
128///
129/// T -->|filled| O["…[🟧 🟧][🟧][🟧 🟧 🟧]…"]
130///
131/// style B fill:#ffff,stroke:#ffff
132/// style O fill:#ffff,stroke:#ffff
133/// ```
134#[mel_treatment(
135 generic T ()
136 input pattern Stream<Vec<void>>
137 output filled Stream<Vec<T>>
138)]
139pub async fn fill(value: T) {
140 'main: while let Ok(patterns) = pattern
141 .recv_many()
142 .await
143 .map(|values| Into::<VecDeque<Value>>::into(values))
144 {
145 for pattern in patterns {
146 match pattern {
147 Value::Vec(pattern) => {
148 check!('main, filled.send_one(vec![value.clone(); pattern.len()].into()).await)
149 }
150
151 _ => panic!("Vec expected"),
152 }
153 }
154 }
155}
156
157/// Gives size of vectors passing through stream.
158///
159/// For each vector one `size` value is sent, giving the number of elements contained within matching vector.
160///
161/// ```mermaid
162/// graph LR
163/// T("size()")
164/// V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
165///
166/// T -->|size| P["2️⃣ 1️⃣ 0️⃣ 3️⃣ …"]
167///
168/// style V fill:#ffff,stroke:#ffff
169/// style P fill:#ffff,stroke:#ffff
170/// ```
171#[mel_treatment(
172 generic T ()
173 input vector Stream<Vec<T>>
174 output size Stream<u64>
175)]
176pub async fn size() {
177 while let Ok(iter) = vector
178 .recv_many()
179 .await
180 .map(|values| Into::<VecDeque<Value>>::into(values))
181 {
182 check!(
183 size.send_many(
184 iter.into_iter()
185 .map(|v| match v {
186 Value::Vec(v) => v.len() as u64,
187 _ => panic!("Vec expected"),
188 })
189 .collect::<VecDeque<_>>()
190 .into()
191 )
192 .await
193 );
194 }
195}
196
197/// Resize vectors according to given streamed size.
198///
199/// If a vector is smaller than expected size, it is extended using the `default` value.
200///
201/// ```mermaid
202/// graph LR
203/// T("resize(default=🟨)")
204/// V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
205/// S["3️⃣ 2️⃣ 3️⃣ 2️⃣ …"] -->|size| T
206///
207/// T -->|resized| P["[🟦 🟦 🟨][🟦 🟨][🟨 🟨 🟨][🟦 🟦]…"]
208///
209/// style V fill:#ffff,stroke:#ffff
210/// style S fill:#ffff,stroke:#ffff
211/// style P fill:#ffff,stroke:#ffff
212/// ```
213#[mel_treatment(
214 generic T ()
215 input vector Stream<Vec<T>>
216 input size Stream<u64>
217 output resized Stream<Vec<T>>
218)]
219pub async fn resize(default: T) {
220 while let Ok(size) = size
221 .recv_one()
222 .await
223 .map(|val| GetData::<u64>::try_data(val).unwrap())
224 {
225 if let Ok(vec) = vector.recv_one().await {
226 match vec {
227 Value::Vec(mut vec) => {
228 vec.resize(size as usize, default.clone());
229 check!(resized.send_one(vec.into()).await);
230 }
231 _ => panic!("Vec expected"),
232 }
233 } else {
234 break;
235 }
236 }
237}