use quote::quote_spanned;
use syn::Ident;
use super::{
OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs,
};
pub const RESOLVE_FUTURES: OperatorConstraints = OperatorConstraints {
name: "resolve_futures",
categories: &[OperatorCategory::Map],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
write_fn: move |wc, _| {
resolve_futures_writer(Ident::new("FuturesUnordered", wc.op_span), false, wc)
},
};
pub fn resolve_futures_writer(
future_type: Ident,
blocking: bool,
wc @ &WriteContextArgs {
root,
context,
op_span,
work_fn_async,
ident,
inputs,
outputs,
is_pull,
..
}: &WriteContextArgs,
) -> Result<OperatorWriteOutput, ()> {
let futures_ident = wc.make_ident("futures");
let queue_ident = wc.make_ident("queue");
let write_prologue = quote_spanned! {op_span=>
let #futures_ident = df.add_state(
::std::cell::RefCell::new(
#root::futures::stream::#future_type::new()
)
);
};
let opt_waker = if blocking {
quote_spanned! {op_span=> None }
} else {
quote_spanned! {op_span=> Some(#context.waker()) }
};
let stream_or_sink = if is_pull {
let input = &inputs[0];
let task_cx = if blocking {
quote_spanned! {op_span=> _cx }
} else {
quote_spanned! {op_span=> &mut ::std::task::Context::from_waker(&#context.waker()) }
};
let if_pending = if blocking {
quote_spanned! {op_span=> #root::dfir_pipes::pull::PullStep::<_, _, #root::dfir_pipes::Yes, _>::pending() }
} else {
quote_spanned! {op_span=> #root::dfir_pipes::pull::PullStep::<_, _, #root::dfir_pipes::No, _>::ended() }
};
quote_spanned! {op_span=>
{
let first_item_opt = #work_fn_async(async {
let () = #root::dfir_pipes::pull::Pull::for_each(#input, |fut| {
::std::iter::Extend::extend(&mut *#queue_ident, ::std::iter::once(fut));
}).await;
if let ::std::task::Poll::Ready(opt) = ::std::future::poll_fn(
|_cx| ::std::task::Poll::Ready(#root::futures::stream::Stream::poll_next(::std::pin::Pin::new(&mut *#queue_ident), #task_cx))
).await {
opt
} else {
::std::option::Option::None
}
}).await;
#root::dfir_pipes::pull::Pull::chain(
#root::dfir_pipes::pull::iter(first_item_opt),
#root::dfir_pipes::pull::poll_fn(|_cx| {
match #root::futures::stream::Stream::poll_next(::std::pin::Pin::new(&mut *#queue_ident), #task_cx) {
::std::task::Poll::Pending => #if_pending,
::std::task::Poll::Ready(::std::option::Option::Some(item)) => #root::dfir_pipes::pull::PullStep::Ready(item, ()),
::std::task::Poll::Ready(::std::option::Option::None) => #root::dfir_pipes::pull::PullStep::Ended(#root::dfir_pipes::Yes),
}
}),
)
}
}
} else {
let output = &outputs[0];
quote_spanned! {op_span=>
#root::dfir_pipes::push::resolve_futures_state(&mut *#queue_ident, #opt_waker, #output)
}
};
let write_iterator = quote_spanned! {op_span=>
let mut #queue_ident = unsafe {
#context.state_ref_unchecked(#futures_ident).borrow_mut()
};
let #ident = #stream_or_sink;
};
Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
..Default::default()
})
}