use proc_macro2::TokenStream;
use quote::quote_spanned;
use syn::{Ident, parse_quote};
use super::{
DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, Persistence, RANGE_0,
RANGE_1, WriteContextArgs,
};
use crate::diagnostic::Diagnostic;
pub const JOIN_FUSED: OperatorConstraints = OperatorConstraints {
name: "join_fused",
categories: &[OperatorCategory::MultiIn],
hard_range_inn: &(2..=2),
soft_range_inn: &(2..=2),
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 2,
persistence_args: &(0..=2),
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
flo_type: None,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::Stratum),
write_fn: |wc @ &WriteContextArgs {
root,
context,
op_span,
work_fn_async,
ident,
inputs,
is_pull,
arguments,
..
},
diagnostics| {
assert!(is_pull);
let persistences: [_; 2] = wc.persistence_args_disallow_mutable(diagnostics);
let (lhs_prologue, lhs_prologue_after, lhs_pre_write_iter, lhs_borrow) =
make_joindata(wc, persistences[0], "lhs").map_err(|err| diagnostics.push(err))?;
let (rhs_prologue, rhs_prologue_after, rhs_pre_write_iter, rhs_borrow) =
make_joindata(wc, persistences[1], "rhs").map_err(|err| diagnostics.push(err))?;
let lhs = &inputs[0];
let rhs = &inputs[1];
let lhs_accum = &arguments[0];
let rhs_accum = &arguments[1];
let write_iterator = quote_spanned! {op_span=>
#lhs_pre_write_iter
#rhs_pre_write_iter
let #ident = {
async fn __check_accum<Accumulator, Key, Accum, Prev, Hasher, Item>(accum: &mut Accumulator, borrow: &mut ::std::collections::HashMap<Key, Accum, Hasher>, prev: Prev)
where
Accumulator: #root::dfir_pipes::pull::Accumulator<Accum, Item>,
Key: ::std::cmp::Eq + ::std::hash::Hash + ::std::clone::Clone,
Prev: #root::dfir_pipes::pull::Pull<Item = (Key, Item)>,
Hasher: ::std::hash::BuildHasher,
Item: ::std::clone::Clone,
{
let () = #root::dfir_pipes::pull::accumulate_all(accum, borrow, prev).await;
}
#work_fn_async(__check_accum(&mut #lhs_accum, &mut *#lhs_borrow, #lhs)).await;
#work_fn_async(__check_accum(&mut #rhs_accum, &mut *#rhs_borrow, #rhs)).await;
#[allow(suspicious_double_ref_op, clippy::clone_on_copy)]
#[allow(clippy::disallowed_methods, reason = "FxHasher is deterministic")]
let iter = #rhs_borrow
.iter()
.filter_map(|(k, v2)| {
#lhs_borrow.get(k).map(|v1| (k.clone(), (v1.clone(), v2.clone())))
});
#root::dfir_pipes::pull::iter(iter)
};
};
let write_iterator_after =
if persistences[0] == Persistence::Static || persistences[1] == Persistence::Static {
quote_spanned! {op_span=>
#context.schedule_subgraph(#context.current_subgraph(), false);
}
} else {
quote_spanned! {op_span=>}
};
Ok(OperatorWriteOutput {
write_prologue: quote_spanned! {op_span=>
#lhs_prologue
#rhs_prologue
},
write_prologue_after: quote_spanned! {op_span=>
#lhs_prologue_after
#rhs_prologue_after
},
write_iterator,
write_iterator_after,
})
},
};
pub(crate) fn make_joindata(
wc: &WriteContextArgs,
persistence: Persistence,
side: &str,
) -> Result<(TokenStream, TokenStream, TokenStream, Ident), Diagnostic> {
let joindata_ident = wc.make_ident(format!("joindata_{}", side));
let borrow_ident = wc.make_ident(format!("joindata_{}_borrow", side));
let &WriteContextArgs {
context,
df_ident,
root,
op_span,
..
} = wc;
Ok(match persistence {
Persistence::None => (
Default::default(),
Default::default(),
quote_spanned! {op_span=>
let #borrow_ident = &mut #root::rustc_hash::FxHashMap::default();
},
borrow_ident,
),
Persistence::Tick | Persistence::Loop | Persistence::Static => {
let lifespan = wc.persistence_as_state_lifespan(persistence);
(
quote_spanned! {op_span=>
let #joindata_ident = #df_ident.add_state(::std::cell::RefCell::new(#root::rustc_hash::FxHashMap::default()));
},
lifespan.map(|lifespan| quote_spanned! {op_span=>
#df_ident.set_state_lifespan_hook(#joindata_ident, #lifespan, |rcell| { rcell.take(); });
}).unwrap_or_default(),
quote_spanned! {op_span=>
let mut #borrow_ident = unsafe {
#context.state_ref_unchecked(#joindata_ident)
}.borrow_mut();
},
borrow_ident
)
}
Persistence::Mutable => panic!(),
})
}