use melodium_core::*;
use melodium_macro::{check, mel_treatment};
#[mel_treatment(
input vector Stream<Vec<void>>
output value Stream<void>
)]
pub async fn flatten() {
'main: while let Ok(vectors) = vector.recv_vec_void().await {
for vec in vectors {
check!('main, value.send_void(vec).await)
}
}
}
#[mel_treatment(
input first Stream<Vec<void>>
input second Stream<Vec<void>>
output chained Stream<Vec<void>>
)]
pub async fn chain() {
while let Ok(vectors) = first.recv_vec_void().await {
check!(chained.send_vec_void(vectors).await)
}
while let Ok(vectors) = second.recv_vec_void().await {
check!(chained.send_vec_void(vectors).await)
}
}
#[mel_treatment(
input a Stream<Vec<void>>
input b Stream<Vec<void>>
input select Stream<bool>
output value Stream<Vec<void>>
)]
pub async fn merge() {
while let Ok(select) = select.recv_one_bool().await {
let val;
if select {
if let Ok(v) = a.recv_one_vec_void().await {
val = v;
} else {
break;
}
} else {
if let Ok(v) = b.recv_one_vec_void().await {
val = v;
} else {
break;
}
}
check!(value.send_one_vec_void(val).await)
}
}
#[mel_treatment(
input value Stream<Vec<void>>
input select Stream<bool>
output accepted Stream<Vec<void>>
output rejected Stream<Vec<void>>
)]
pub async fn filter() {
let mut accepted_op = true;
let mut rejected_op = true;
while let (Ok(value), Ok(select)) =
futures::join!(value.recv_one_vec_void(), select.recv_one_bool())
{
if select {
if let Err(_) = accepted.send_one_vec_void(value).await {
accepted_op = false;
if !rejected_op {
break;
}
}
} else {
if let Err(_) = rejected.send_one_vec_void(value).await {
rejected_op = false;
if !accepted_op {
break;
}
}
}
}
}
#[mel_treatment(
input stream Stream<Vec<void>>
output start Block<void>
output end Block<void>
output first Block<Vec<void>>
output last Block<Vec<void>>
)]
pub async fn trigger() {
let mut last_value = None;
if let Ok(values) = stream.recv_vec_void().await {
let _ = start.send_one_void(()).await;
if let Some(val) = values.first().cloned() {
let _ = first.send_one_vec_void(val).await;
}
last_value = values.last().cloned();
let _ = futures::join!(start.close(), first.close());
}
while let Ok(values) = stream.recv_vec_void().await {
last_value = values.last().cloned();
}
let _ = end.send_one_void(()).await;
if let Some(val) = last_value {
let _ = last.send_one_vec_void(val).await;
}
}
#[mel_treatment(
input block Block<Vec<void>>
output stream Stream<Vec<void>>
)]
pub async fn stream() {
if let Ok(val) = block.recv_one_vec_void().await {
let _ = stream.send_one_vec_void(val).await;
}
}
#[mel_treatment(
input trigger Block<void>
output emit Block<Vec<void>>
)]
pub async fn emit(value: Vec<void>) {
if let Ok(_) = trigger.recv_one_void().await {
let _ = emit.send_one_vec_void(value).await;
}
}
#[mel_treatment(
input stream Stream<Vec<void>>
output count Stream<u128>
)]
pub async fn count() {
let mut i: u128 = 0;
while let Ok(iter) = stream.recv_vec_void().await {
let next_i = i + iter.len() as u128;
check!(count.send_u128((i..next_i).collect()).await);
i = next_i;
}
}
#[mel_treatment(
input vector Stream<Vec<void>>
output size Stream<u64>
)]
pub async fn size() {
while let Ok(iter) = vector.recv_vec_void().await {
check!(
size.send_u64(iter.into_iter().map(|v| v.len() as u64).collect())
.await
);
}
}
#[mel_treatment(
input vector Stream<Vec<void>>
input size Stream<u64>
output resized Stream<Vec<void>>
)]
pub async fn resize() {
while let Ok(size) = size.recv_one_u64().await {
if let Ok(mut vec) = vector.recv_one_vec_void().await {
vec.resize(size as usize, ());
check!(resized.send_one_vec_void(vec).await);
} else {
break;
}
}
}
#[mel_treatment(
input length Block<u128>
output stream Stream<Vec<void>>
)]
pub async fn generate() {
if let Ok(length) = length.recv_one_u128().await {
const CHUNK: u128 = 2u128.pow(20);
let mut total = 0u128;
while total < length {
let chunk = u128::min(CHUNK, length - total) as usize;
check!(stream.send_vec_void(vec![vec![]; chunk]).await);
total += chunk as u128;
}
}
}
#[mel_treatment(
input trigger Block<void>
output stream Stream<Vec<void>>
)]
pub async fn generate_indefinitely() {
if let Ok(_) = trigger.recv_one_void().await {
const CHUNK: usize = 2usize.pow(20);
loop {
check!(stream.send_vec_void(vec![vec![]; CHUNK]).await);
}
}
}