use melodium_core::*;
use melodium_macro::{check, mel_treatment};
#[mel_treatment(
input vector Stream<Vec<u64>>
output value Stream<u64>
)]
pub async fn flatten() {
'main: while let Ok(vectors) = vector.recv_vec_u64().await {
for vec in vectors {
check!('main, value.send_u64(vec).await)
}
}
}
#[mel_treatment(
input first Stream<Vec<u64>>
input second Stream<Vec<u64>>
output chained Stream<Vec<u64>>
)]
pub async fn chain() {
while let Ok(vectors) = first.recv_vec_u64().await {
check!(chained.send_vec_u64(vectors).await)
}
while let Ok(vectors) = second.recv_vec_u64().await {
check!(chained.send_vec_u64(vectors).await)
}
}
#[mel_treatment(
input a Stream<Vec<u64>>
input b Stream<Vec<u64>>
input select Stream<bool>
output value Stream<Vec<u64>>
)]
pub async fn merge() {
while let Ok(select) = select.recv_one_bool().await {
let val;
if select {
if let Ok(v) = a.recv_one_vec_u64().await {
val = v;
} else {
break;
}
} else {
if let Ok(v) = b.recv_one_vec_u64().await {
val = v;
} else {
break;
}
}
check!(value.send_one_vec_u64(val).await)
}
}
#[mel_treatment(
input value Stream<Vec<u64>>
input select Stream<bool>
output accepted Stream<Vec<u64>>
output rejected Stream<Vec<u64>>
)]
pub async fn filter() {
let mut accepted_op = true;
let mut rejected_op = true;
while let (Ok(value), Ok(select)) =
futures::join!(value.recv_one_vec_u64(), select.recv_one_bool())
{
if select {
if let Err(_) = accepted.send_one_vec_u64(value).await {
accepted_op = false;
if !rejected_op {
break;
}
}
} else {
if let Err(_) = rejected.send_one_vec_u64(value).await {
rejected_op = false;
if !accepted_op {
break;
}
}
}
}
}
#[mel_treatment(
input stream Stream<Vec<u64>>
output start Block<void>
output end Block<void>
output first Block<Vec<u64>>
output last Block<Vec<u64>>
)]
pub async fn trigger() {
let mut last_value = None;
if let Ok(values) = stream.recv_vec_u64().await {
let _ = start.send_one_void(()).await;
if let Some(val) = values.first().cloned() {
let _ = first.send_one_vec_u64(val).await;
}
last_value = values.last().cloned();
let _ = futures::join!(start.close(), first.close());
}
while let Ok(values) = stream.recv_vec_u64().await {
last_value = values.last().cloned();
}
let _ = end.send_one_void(()).await;
if let Some(val) = last_value {
let _ = last.send_one_vec_u64(val).await;
}
}
#[mel_treatment(
input block Block<Vec<u64>>
output stream Stream<Vec<u64>>
)]
pub async fn stream() {
if let Ok(val) = block.recv_one_vec_u64().await {
let _ = stream.send_one_vec_u64(val).await;
}
}
#[mel_treatment(
input trigger Block<void>
output emit Block<Vec<u64>>
)]
pub async fn emit(value: Vec<u64>) {
if let Ok(_) = trigger.recv_one_void().await {
let _ = emit.send_one_vec_u64(value).await;
}
}
#[mel_treatment(
input stream Stream<Vec<u64>>
output pattern Stream<Vec<void>>
)]
pub async fn pattern() {
while let Ok(vectors) = stream.recv_vec_u64().await {
check!(
pattern
.send_vec_void(vectors.into_iter().map(|vec| vec![(); vec.len()]).collect())
.await
)
}
}
#[mel_treatment(
input value Stream<u64>
input pattern Stream<Vec<void>>
output fitted Stream<Vec<u64>>
)]
pub async fn fit() {
'main: while let Ok(patterns) = pattern.recv_vec_void().await {
for pattern in patterns {
let mut vector = Vec::with_capacity(pattern.len());
for _ in 0..pattern.len() {
if let Ok(val) = value.recv_one_u64().await {
vector.push(val);
} else {
break 'main;
}
}
check!('main, fitted.send_one_vec_u64(vector).await)
}
}
}
#[mel_treatment(
default value 0
input pattern Stream<Vec<void>>
output filled Stream<Vec<u64>>
)]
pub async fn fill(value: u64) {
while let Ok(pat) = pattern.recv_vec_void().await {
check!(
filled
.send_vec_u64(
pat.into_iter()
.map(|p| vec![value.clone(); p.len()])
.collect()
)
.await
)
}
}
#[mel_treatment(
default default 0
input vector Stream<Vec<u64>>
input size Stream<u64>
output resized Stream<Vec<u64>>
)]
pub async fn resize(default: u64) {
while let Ok(size) = size.recv_one_u64().await {
if let Ok(mut vec) = vector.recv_one_vec_u64().await {
vec.resize(size as usize, default.clone());
check!(resized.send_one_vec_u64(vec).await);
} else {
break;
}
}
}