std_mel/flow/
vec.rs

1use melodium_core::*;
2use melodium_macro::{check, mel_treatment};
3
4/// Flatten a stream of vector.
5///
6/// All the input vectors are turned into continuous stream of scalar values, keeping order.
7/// ```mermaid
8/// graph LR
9///     T("flatten()")
10///     B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|vector| T
11///     
12///     T -->|value| O["🟦 🟦 🟦 🟦 🟦 🟦"]
13///
14///     style B fill:#ffff,stroke:#ffff
15///     style O fill:#ffff,stroke:#ffff
16/// ```
17#[mel_treatment(
18    generic T ()
19    input vector Stream<Vec<T>>
20    output value Stream<T>
21)]
22pub async fn flatten() {
23    'main: while let Ok(mut vectors) = vector
24        .recv_many()
25        .await
26        .map(|values| Into::<VecDeque<Value>>::into(values))
27    {
28        while let Some(vector) = vectors.pop_front().map(|val| match val {
29            Value::Vec(vec) => vec,
30            _ => panic!("Vec expected"),
31        }) {
32            for val in vector {
33                check!('main, value.send_one(val).await)
34            }
35        }
36    }
37}
38
39/// Gives pattern of a stream of vectors.
40///
41/// ```mermaid
42/// graph LR
43///     T("pattern()")
44///     A["…[🟨 🟨][🟨][🟨 🟨 🟨]"] -->|stream| T
45///     
46///     T -->|pattern| O["… [🟦 🟦][🟦][🟦 🟦 🟦]"]
47///
48///     style A fill:#ffff,stroke:#ffff
49///     style O fill:#ffff,stroke:#ffff
50/// ```
51#[mel_treatment(
52    generic T ()
53    input stream Stream<Vec<T>>
54    output pattern Stream<Vec<void>>
55)]
56pub async fn pattern() {
57    'main: while let Ok(vectors) = stream
58        .recv_many()
59        .await
60        .map(|values| Into::<VecDeque<Value>>::into(values))
61    {
62        for val in vectors {
63            match val {
64                Value::Vec(vec) => {
65                    check!('main, pattern.send_one(vec![(); vec.len()].into()).await)
66                }
67                _ => panic!("Vec expected"),
68            }
69        }
70    }
71}
72
73/// Fit a stream of raw values into stream of vectors using a pattern.
74///
75/// ℹ️ If some remaining values doesn't fit into the pattern, they are trashed.
76/// If there are not enough values to fit the pattern, uncomplete vector is trashed.
77///
78/// ```mermaid
79/// graph LR
80///     T("fit()")
81///     A["… 🟨 🟨 🟨 🟨 🟨 🟨"] -->|value| T
82///     B["[🟦 🟦][🟦][🟦 🟦 🟦]"] -->|pattern| T
83///     
84///     T -->|fitted| O["[🟨 🟨][🟨][🟨 🟨 🟨]"]
85///
86///     style A fill:#ffff,stroke:#ffff
87///     style B fill:#ffff,stroke:#ffff
88///     style O fill:#ffff,stroke:#ffff
89/// ```
90#[mel_treatment(
91    generic T ()
92    input value Stream<T>
93    input pattern Stream<Vec<void>>
94    output fitted Stream<Vec<T>>
95)]
96pub async fn fit() {
97    'main: while let Ok(patterns) = pattern
98        .recv_many()
99        .await
100        .map(|values| Into::<VecDeque<Value>>::into(values))
101    {
102        for pattern in patterns {
103            match pattern {
104                Value::Vec(pattern) => {
105                    let mut vector = Vec::with_capacity(pattern.len());
106                    for _ in 0..pattern.len() {
107                        if let Ok(val) = value.recv_one().await {
108                            vector.push(val);
109                        } else {
110                            // Uncomplete, we 'trash' vector
111                            break 'main;
112                        }
113                    }
114                    check!('main, fitted.send_one(vector.into()).await)
115                }
116                _ => panic!("Vec expected"),
117            }
118        }
119    }
120}
121
122/// Fill a pattern stream with a `i64` value.
123///
124/// ```mermaid
125/// graph LR
126/// T("fill(value=🟧)")
127/// B["…[🟦 🟦][🟦][🟦 🟦 🟦]…"] -->|pattern| T
128///
129/// T -->|filled| O["…[🟧 🟧][🟧][🟧 🟧 🟧]…"]
130///
131/// style B fill:#ffff,stroke:#ffff
132/// style O fill:#ffff,stroke:#ffff
133/// ```
134#[mel_treatment(
135    generic T ()
136    input pattern Stream<Vec<void>>
137    output filled Stream<Vec<T>>
138)]
139pub async fn fill(value: T) {
140    'main: while let Ok(patterns) = pattern
141        .recv_many()
142        .await
143        .map(|values| Into::<VecDeque<Value>>::into(values))
144    {
145        for pattern in patterns {
146            match pattern {
147                Value::Vec(pattern) => {
148                    check!('main, filled.send_one(vec![value.clone(); pattern.len()].into()).await)
149                }
150
151                _ => panic!("Vec expected"),
152            }
153        }
154    }
155}
156
157/// Gives size of vectors passing through stream.
158///
159/// For each vector one `size` value is sent, giving the number of elements contained within matching vector.
160///
161/// ```mermaid
162/// graph LR
163///     T("size()")
164///     V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
165///     
166///     T -->|size| P["2️⃣ 1️⃣ 0️⃣ 3️⃣ …"]
167///
168///     style V fill:#ffff,stroke:#ffff
169///     style P fill:#ffff,stroke:#ffff
170/// ```
171#[mel_treatment(
172    generic T ()
173    input vector Stream<Vec<T>>
174    output size Stream<u64>
175)]
176pub async fn size() {
177    while let Ok(iter) = vector
178        .recv_many()
179        .await
180        .map(|values| Into::<VecDeque<Value>>::into(values))
181    {
182        check!(
183            size.send_many(
184                iter.into_iter()
185                    .map(|v| match v {
186                        Value::Vec(v) => v.len() as u64,
187                        _ => panic!("Vec expected"),
188                    })
189                    .collect::<VecDeque<_>>()
190                    .into()
191            )
192            .await
193        );
194    }
195}
196
197/// Resize vectors according to given streamed size.
198///
199/// If a vector is smaller than expected size, it is extended using the `default` value.
200///
201/// ```mermaid
202/// graph LR
203///     T("resize(default=🟨)")
204///     V["[🟦 🟦][🟦][][🟦 🟦 🟦]…"] -->|vector| T
205///     S["3️⃣ 2️⃣ 3️⃣ 2️⃣ …"] -->|size| T
206///     
207///     T -->|resized| P["[🟦 🟦 🟨][🟦 🟨][🟨 🟨 🟨][🟦 🟦]…"]
208///
209///     style V fill:#ffff,stroke:#ffff
210///     style S fill:#ffff,stroke:#ffff
211///     style P fill:#ffff,stroke:#ffff
212/// ```
213#[mel_treatment(
214    generic T ()
215    input vector Stream<Vec<T>>
216    input size Stream<u64>
217    output resized Stream<Vec<T>>
218)]
219pub async fn resize(default: T) {
220    while let Ok(size) = size
221        .recv_one()
222        .await
223        .map(|val| GetData::<u64>::try_data(val).unwrap())
224    {
225        if let Ok(vec) = vector.recv_one().await {
226            match vec {
227                Value::Vec(mut vec) => {
228                    vec.resize(size as usize, default.clone());
229                    check!(resized.send_one(vec.into()).await);
230                }
231                _ => panic!("Vec expected"),
232            }
233        } else {
234            break;
235        }
236    }
237}