use melodium_core::*;
use melodium_macro::{check, mel_treatment};
#[mel_treatment(
generic T ()
input vector Stream<Vec<T>>
output value Stream<T>
)]
pub async fn flatten() {
'main: while let Ok(mut vectors) = vector
.recv_many()
.await
.map(|values| Into::<VecDeque<Value>>::into(values))
{
while let Some(vector) = vectors.pop_front().map(|val| match val {
Value::Vec(vec) => vec,
_ => panic!("Vec expected"),
}) {
for val in vector {
check!('main, value.send_one(val).await)
}
}
}
}
#[mel_treatment(
generic T ()
input stream Stream<Vec<T>>
output pattern Stream<Vec<void>>
)]
pub async fn pattern() {
'main: while let Ok(vectors) = stream
.recv_many()
.await
.map(|values| Into::<VecDeque<Value>>::into(values))
{
for val in vectors {
match val {
Value::Vec(vec) => {
check!('main, pattern.send_one(vec![(); vec.len()].into()).await)
}
_ => panic!("Vec expected"),
}
}
}
}
#[mel_treatment(
generic T ()
input value Stream<T>
input pattern Stream<Vec<void>>
output fitted Stream<Vec<T>>
)]
pub async fn fit() {
'main: while let Ok(patterns) = pattern
.recv_many()
.await
.map(|values| Into::<VecDeque<Value>>::into(values))
{
for pattern in patterns {
match pattern {
Value::Vec(pattern) => {
let mut vector = Vec::with_capacity(pattern.len());
for _ in 0..pattern.len() {
if let Ok(val) = value.recv_one().await {
vector.push(val);
} else {
break 'main;
}
}
check!('main, fitted.send_one(vector.into()).await)
}
_ => panic!("Vec expected"),
}
}
}
}
#[mel_treatment(
generic T ()
input pattern Stream<Vec<void>>
output filled Stream<Vec<T>>
)]
pub async fn fill(value: T) {
'main: while let Ok(patterns) = pattern
.recv_many()
.await
.map(|values| Into::<VecDeque<Value>>::into(values))
{
for pattern in patterns {
match pattern {
Value::Vec(pattern) => {
check!('main, filled.send_one(vec![value.clone(); pattern.len()].into()).await)
}
_ => panic!("Vec expected"),
}
}
}
}
#[mel_treatment(
generic T ()
input vector Stream<Vec<T>>
output size Stream<u64>
)]
pub async fn size() {
while let Ok(iter) = vector
.recv_many()
.await
.map(|values| Into::<VecDeque<Value>>::into(values))
{
check!(
size.send_many(
iter.into_iter()
.map(|v| match v {
Value::Vec(v) => v.len() as u64,
_ => panic!("Vec expected"),
})
.collect::<VecDeque<_>>()
.into()
)
.await
);
}
}
#[mel_treatment(
generic T ()
input vector Stream<Vec<T>>
input size Stream<u64>
output resized Stream<Vec<T>>
)]
pub async fn resize(default: T) {
while let Ok(size) = size
.recv_one()
.await
.map(|val| GetData::<u64>::try_data(val).unwrap())
{
if let Ok(vec) = vector.recv_one().await {
match vec {
Value::Vec(mut vec) => {
vec.resize(size as usize, default.clone());
check!(resized.send_one(vec.into()).await);
}
_ => panic!("Vec expected"),
}
} else {
break;
}
}
}