use futures::{pin_mut, select, FutureExt};
use melodium_core::common::executive::{GetData, Value};
use melodium_macro::{check, mel_treatment};
use std::collections::VecDeque;
pub mod concentrate;
pub mod vec;
#[mel_treatment(
generic T ()
input first Stream<T>
input second Stream<T>
output chained Stream<T>
)]
pub async fn chain() {
while let Ok(values) = first.recv_many().await {
check!(chained.send_many(values).await)
}
while let Ok(values) = second.recv_many().await {
check!(chained.send_many(values).await)
}
}
#[mel_treatment(
generic T ()
input stream Stream<T>
output start Block<void>
output end Block<void>
output first Block<T>
output last Block<T>
)]
pub async fn trigger() {
let mut last_value = None;
if let Ok(mut values) = stream.recv_many().await {
let _ = start.send_one(().into()).await;
if let Some(val) = values.pop_front() {
let _ = first.send_one(val.clone()).await;
last_value = Some(val);
}
if let Some(val) = Into::<VecDeque<Value>>::into(values).pop_back() {
last_value = Some(val);
}
let _ = futures::join!(start.close(), first.close());
}
while let Ok(values) = stream.recv_many().await {
last_value = Into::<VecDeque<Value>>::into(values).pop_back();
}
let _ = end.send_one(().into()).await;
if let Some(val) = last_value {
let _ = last.send_one(val).await;
}
}
#[mel_treatment(
generic T ()
input value Block<T>
output check Block<void>
)]
pub async fn check() {
if let Ok(_) = value.recv_one().await {
let _ = check.send_one(().into()).await;
}
}
#[mel_treatment(
generic T ()
input value Block<T>
output uncheck Block<void>
)]
pub async fn uncheck() {
if let Err(_) = value.recv_one().await {
let _ = uncheck.send_one(().into()).await;
}
}
#[mel_treatment(
generic T ()
input trigger Block<void>
output emit Block<T>
)]
pub async fn emit(value: T) {
if let Ok(_) = trigger.recv_one().await {
let _ = emit.send_one(value).await;
}
}
#[mel_treatment(
generic T ()
input block Block<T>
output stream Stream<T>
)]
pub async fn stream() {
if let Ok(val) = block.recv_one().await {
let _ = stream.send_one(val).await;
}
}
#[mel_treatment(
generic T ()
input a Stream<T>
input b Stream<T>
output value Stream<T>
)]
pub async fn merge() {
let xa = async {
while let Ok(a) = (&a).recv_many().await {
check!(value.send_many(a).await);
}
}
.fuse();
let xb = async {
while let Ok(b) = (&b).recv_many().await {
check!(value.send_many(b).await);
}
}
.fuse();
pin_mut!(xa, xb);
loop {
select! {
() = xa => {},
() = xb => {},
complete => break,
};
}
}
#[mel_treatment(
generic T ()
input a Stream<T>
input b Stream<T>
input select Stream<bool>
output value Stream<T>
)]
pub async fn arrange() {
while let Ok(select) = select
.recv_one()
.await
.map(|val| GetData::<bool>::try_data(val).unwrap())
{
let val;
if select {
if let Ok(v) = a.recv_one().await {
val = v;
} else {
break;
}
} else {
if let Ok(v) = b.recv_one().await {
val = v;
} else {
break;
}
}
check!(value.send_one(val).await)
}
}
#[mel_treatment(
generic T ()
input pattern Stream<void>
output filled Stream<T>
)]
pub async fn fill(value: T) {
while let Ok(pat) = pattern.recv_many().await {
let mut transmission = melodium_core::TransmissionValue::new(value.clone());
for _ in 1..pat.len() {
transmission.push(value.clone());
}
check!(filled.send_many(transmission).await)
}
}
#[mel_treatment(
generic T ()
input value Stream<T>
input select Stream<bool>
output accepted Stream<T>
output rejected Stream<T>
)]
pub async fn filter() {
let mut accepted_op = true;
let mut rejected_op = true;
while let (Ok(value), Ok(select)) = futures::join!(value.recv_one(), select.recv_one()) {
let select = GetData::<bool>::try_data(select).unwrap();
if select {
if let Err(_) = accepted.send_one(value).await {
accepted_op = false;
if !rejected_op {
break;
}
}
} else {
if let Err(_) = rejected.send_one(value).await {
rejected_op = false;
if !accepted_op {
break;
}
}
}
}
}
#[mel_treatment(
generic T ()
input value Block<T>
input select Block<bool>
output accepted Block<T>
output rejected Block<T>
)]
pub async fn filterBlock() {
if let (Ok(value), Ok(select)) = futures::join!(value.recv_one(), select.recv_one()) {
let select = GetData::<bool>::try_data(select).unwrap();
if select {
let _ = accepted.send_one(value).await;
} else {
let _ = rejected.send_one(value).await;
}
}
}
#[mel_treatment(
generic T ()
input value Stream<T>
input pattern Stream<void>
output fitted Stream<T>
)]
pub async fn fit() {
'main: while let Ok(pattern) = pattern
.recv_many()
.await
.map(|values| TryInto::<Vec<()>>::try_into(values).unwrap())
{
for _ in pattern {
if let Ok(val) = value.recv_one().await {
check!('main, fitted.send_one(val).await)
} else {
break 'main;
}
}
}
}
#[mel_treatment(
generic T ()
input stream Stream<T>
output count Stream<u128>
)]
pub async fn count() {
let mut i: u128 = 0;
while let Ok(iter) = stream.recv_many().await {
let next_i = i + iter.len() as u128;
check!(
count
.send_many((i..next_i).collect::<VecDeque<_>>().into())
.await
);
i = next_i;
}
}
#[mel_treatment(
generic T ()
input length Block<u128>
output stream Stream<T>
)]
pub async fn generate(data: T) {
if let Ok(length) = length
.recv_one()
.await
.map(|val| GetData::<u128>::try_data(val).unwrap())
{
const CHUNK: u128 = 2u128.pow(20);
let mut total = 0u128;
while total < length {
let chunk = u128::min(CHUNK, length - total) as usize;
let mut transmission = melodium_core::TransmissionValue::new(data.clone());
for _ in 1..chunk {
transmission.push(data.clone());
}
check!(stream.send_many(transmission).await);
total += chunk as u128;
}
}
}
#[mel_treatment(
generic T ()
input trigger Block<void>
output stream Stream<T>
)]
pub async fn generate_indefinitely(data: T) {
if let Ok(_) = trigger.recv_one().await {
const CHUNK: usize = 2usize.pow(20);
loop {
let mut transmission = melodium_core::TransmissionValue::new(data.clone());
for _ in 1..CHUNK {
transmission.push(data.clone());
}
check!(stream.send_many(transmission).await);
}
}
}
#[mel_treatment(
generic T ()
input stream Stream<T>
input block Block<T>
output output Stream<T>
)]
pub async fn insert() {
let streaming = async {
while let Ok(values) = (&stream).recv_many().await {
check!(output.send_many(values).await);
}
}
.fuse();
let insert_block = async {
if let Ok(val) = (&block).recv_one().await {
let _ = output.send_one(val).await;
}
}
.fuse();
pin_mut!(streaming, insert_block);
loop {
select! {
() = streaming => {},
() = insert_block => {},
complete => break,
};
}
}
#[mel_treatment(
generic T ()
input a Block<T>
input b Block<T>
output stream Stream<T>
)]
pub async fn flock() {
let xa = async {
if let Ok(a) = (&a).recv_one().await {
let _ = stream.send_one(a).await;
}
}
.fuse();
let xb = async {
if let Ok(b) = (&b).recv_one().await {
let _ = stream.send_one(b).await;
}
}
.fuse();
pin_mut!(xa, xb);
loop {
select! {
() = xa => {},
() = xb => {},
complete => break,
};
}
}
#[mel_treatment(
generic T ()
input a Block<T>
input b Block<T>
output value Block<T>
)]
pub async fn one() {
let xa = async { (&a).recv_one().await.ok() }.fuse();
let xb = async { (&b).recv_one().await.ok() }.fuse();
pin_mut!(xa, xb);
loop {
let val = select! {
val = xa => val,
val = xb => val,
complete => break,
};
if let Some(val) = val {
let _ = value.send_one(val).await;
break;
}
}
}
#[mel_treatment(
generic T ()
input trigger Block<void>
output closed Stream<T>
)]
pub async fn close() {
}
#[mel_treatment(
generic T ()
input trigger Block<void>
output closed Block<T>
)]
pub async fn close_block() {
}
#[mel_treatment(
generic T ()
input stream Stream<T>
)]
pub async fn consume() {
while let Ok(_) = stream.recv_many().await {
}
}
#[mel_treatment(
generic T ()
input stream Stream<T>
output passed Stream<T>
)]
pub async fn pass(cond: bool) {
if cond {
while let Ok(data) = stream.recv_many().await {
check!(passed.send_many(data).await)
}
}
}
#[mel_treatment(
generic T ()
input block Block<T>
output passed Block<T>
)]
pub async fn passBlock(cond: bool) {
if cond {
if let Ok(data) = block.recv_one().await {
let _ = passed.send_one(data).await;
}
}
}
#[mel_treatment(
generic T ()
input leverage Block<bool>
input stream Stream<T>
output passed Stream<T>
)]
pub async fn barrier() {
if let Ok(true) = leverage
.recv_one()
.await
.map(|val| GetData::<bool>::try_data(val).unwrap())
{
while let Ok(data) = stream.recv_many().await {
check!(passed.send_many(data).await)
}
}
}
#[mel_treatment(
generic T ()
input cut Block<void>
input stream Stream<T>
output passed Stream<T>
)]
pub async fn cut() {
let cut = async { cut.recv_one().await.is_ok() }.fuse();
let pass = async {
while let Ok(values) = stream.recv_many().await {
check!(passed.send_many(values).await)
}
}
.fuse();
pin_mut!(cut, pass);
loop {
select! {
() = pass => break,
do_cut = cut => if do_cut {
break
},
complete => break,
}
}
}
#[mel_treatment(
generic T ()
input leverage Block<void>
input data Stream<T>
output released Stream<T>
)]
pub async fn release() {
if let Ok(_) = leverage.recv_one().await {
while let Ok(data) = data.recv_many().await {
check!(released.send_many(data).await)
}
}
}
#[mel_treatment(
generic T ()
input leverage Block<void>
input data Block<T>
output released Block<T>
)]
pub async fn releaseBlock() {
if let Ok(_) = leverage.recv_one().await {
if let Ok(data) = data.recv_one().await {
let _ = released.send_one(data).await;
}
}
}
#[mel_treatment(
generic T ()
input a Block<T>
input b Block<T>
output awaited Block<void>
)]
pub async fn waitBlock() {
let (a, b) = futures::join!(async { a.recv_one().await.is_ok() }, async {
b.recv_one().await.is_ok()
});
if a && b {
let _ = awaited.send_one(().into()).await;
}
}