use quote::quote_spanned;
use super::{
OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs,
};
pub const _COUNTER: OperatorConstraints = OperatorConstraints {
name: "_counter",
categories: &[OperatorCategory::Map],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: &(0..=1),
soft_range_out: &(0..=1),
num_args: 2,
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
df_ident,
op_span,
ident,
inputs,
outputs,
is_pull,
arguments,
..
},
_| {
let read_ident = wc.make_ident("read");
let write_ident = wc.make_ident("write");
let tag_expr = &arguments[0];
let tag_ident = wc.make_ident("tag");
let duration_expr = &arguments[1];
let duration_ident = wc.make_ident("duration");
let write_prologue = quote_spanned! {op_span=>
let #write_ident = ::std::rc::Rc::new(::std::cell::Cell::new(0_u64));
let #read_ident = ::std::rc::Rc::clone(&#write_ident);
let #duration_ident = #duration_expr;
let #tag_ident = #tag_expr;
#df_ident.request_task(async move {
loop {
println!("{}: {}", #tag_ident, #read_ident.get());
#root::tokio::time::sleep(#duration_ident).await;
}
});
};
let count_ident = wc.make_ident("count");
let write_iterator = if is_pull {
let input = &inputs[0];
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::pull::Pull::inspect(#input, |_| { #count_ident += 1; });
}
} else if outputs.is_empty() {
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::push::inspect(|_| { #count_ident += 1; }, #root::dfir_pipes::push::for_each(::std::mem::drop));
}
} else {
let output = &outputs[0];
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::push::inspect(|_| { #count_ident += 1; }, #output);
}
};
let write_iterator = quote_spanned! {op_span=>
let mut #count_ident = 0;
#write_iterator
};
let write_iterator_after = quote_spanned! {op_span=>
#write_ident.set(#write_ident.get() + #count_ident);
};
Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
write_iterator_after,
..Default::default()
})
},
};