use melodium_core::*;
use melodium_macro::{check, mel_treatment};
#[mel_treatment(
input first Stream<byte>
input second Stream<byte>
output chained Stream<byte>
)]
pub async fn chain() {
while let Ok(values) = first.recv_byte().await {
check!(chained.send_byte(values).await)
}
while let Ok(values) = second.recv_byte().await {
check!(chained.send_byte(values).await)
}
}
#[mel_treatment(
input stream Stream<byte>
output start Block<void>
output end Block<void>
output first Block<byte>
output last Block<byte>
)]
pub async fn trigger() {
let mut last_value = None;
if let Ok(values) = stream.recv_byte().await {
let _ = start.send_one_void(()).await;
if let Some(val) = values.first().cloned() {
let _ = first.send_one_byte(val).await;
}
last_value = values.last().cloned();
let _ = futures::join!(start.close(), first.close());
}
while let Ok(values) = stream.recv_byte().await {
last_value = values.last().cloned();
}
let _ = end.send_one_void(()).await;
if let Some(val) = last_value {
let _ = last.send_one_byte(val).await;
}
}
#[mel_treatment(
input block Block<byte>
output stream Stream<byte>
)]
pub async fn stream() {
if let Ok(val) = block.recv_one_byte().await {
let _ = stream.send_one_byte(val).await;
}
}
#[mel_treatment(
input a Stream<byte>
input b Stream<byte>
input select Stream<bool>
output value Stream<byte>
)]
pub async fn merge() {
while let Ok(select) = select.recv_one_bool().await {
let val;
if select {
if let Ok(v) = a.recv_one_byte().await {
val = v;
} else {
break;
}
} else {
if let Ok(v) = b.recv_one_byte().await {
val = v;
} else {
break;
}
}
check!(value.send_one_byte(val).await)
}
}
#[mel_treatment(
default value 0x00
input pattern Stream<void>
output filled Stream<byte>
)]
pub async fn fill(value: byte) {
while let Ok(pat) = pattern.recv_void().await {
check!(filled.send_byte(vec![value.clone(); pat.len()]).await)
}
}
#[mel_treatment(
input value Stream<byte>
input select Stream<bool>
output accepted Stream<byte>
output rejected Stream<byte>
)]
pub async fn filter() {
let mut accepted_op = true;
let mut rejected_op = true;
while let (Ok(value), Ok(select)) =
futures::join!(value.recv_one_byte(), select.recv_one_bool())
{
if select {
if let Err(_) = accepted.send_one_byte(value).await {
accepted_op = false;
if !rejected_op {
break;
}
}
} else {
if let Err(_) = rejected.send_one_byte(value).await {
rejected_op = false;
if !accepted_op {
break;
}
}
}
}
}
#[mel_treatment(
input value Stream<byte>
input pattern Stream<void>
output fitted Stream<byte>
)]
pub async fn fit() {
'main: while let Ok(pattern) = pattern.recv_void().await {
for _ in pattern {
if let Ok(val) = value.recv_one_byte().await {
check!('main, fitted.send_one_byte(val).await)
} else {
break 'main;
}
}
}
}
#[mel_treatment(
input trigger Block<void>
output emit Block<byte>
)]
pub async fn emit(value: byte) {
if let Ok(_) = trigger.recv_one_void().await {
let _ = emit.send_one_byte(value).await;
}
}