use proc_macro2::TokenStream;
use quote::quote_spanned;
use syn::{Ident, parse_quote};
use super::{
DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, Persistence, RANGE_0,
RANGE_1, WriteContextArgs,
};
use crate::diagnostic::Diagnostic;
pub const JOIN_FUSED: OperatorConstraints = OperatorConstraints {
name: "join_fused",
categories: &[OperatorCategory::MultiIn],
hard_range_inn: &(2..=2),
soft_range_inn: &(2..=2),
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 2,
persistence_args: &(0..=2),
type_args: RANGE_0,
is_external_input: false,
flo_type: None,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::Stratum),
write_fn: |wc @ &WriteContextArgs {
root,
op_span,
work_fn_async,
ident,
inputs,
is_pull,
arguments,
..
},
diagnostics| {
assert!(is_pull);
let persistences: [_; 2] = wc.persistence_args_disallow_mutable(diagnostics);
let (lhs_prologue, lhs_tick_end, lhs_pre_write_iter, lhs_borrow) =
make_joindata(wc, persistences[0], "lhs").map_err(|err| diagnostics.push(err))?;
let (rhs_prologue, rhs_tick_end, rhs_pre_write_iter, rhs_borrow) =
make_joindata(wc, persistences[1], "rhs").map_err(|err| diagnostics.push(err))?;
let lhs = &inputs[0];
let rhs = &inputs[1];
let lhs_accum = &arguments[0];
let rhs_accum = &arguments[1];
let write_iterator = quote_spanned! {op_span=>
#lhs_pre_write_iter
#rhs_pre_write_iter
let #ident = {
async fn __check_accum<Accumulator, Key, Accum, Prev, Hasher, Item>(accum: &mut Accumulator, borrow: &mut ::std::collections::HashMap<Key, Accum, Hasher>, prev: Prev)
where
Accumulator: #root::dfir_pipes::pull::Accumulator<Accum, Item>,
Key: ::std::cmp::Eq + ::std::hash::Hash + ::std::clone::Clone,
Prev: #root::dfir_pipes::pull::Pull<Item = (Key, Item)>,
Hasher: ::std::hash::BuildHasher,
Item: ::std::clone::Clone,
{
let () = #root::dfir_pipes::pull::accumulate_all(accum, borrow, prev).await;
}
#work_fn_async(__check_accum(&mut #lhs_accum, &mut *#lhs_borrow, #lhs)).await;
#work_fn_async(__check_accum(&mut #rhs_accum, &mut *#rhs_borrow, #rhs)).await;
#[allow(suspicious_double_ref_op, clippy::clone_on_copy)]
#[allow(clippy::disallowed_methods, reason = "FxHasher is deterministic")]
let iter = #rhs_borrow
.iter()
.filter_map(|(k, v2)| {
#lhs_borrow.get(k).map(|v1| (k.clone(), (v1.clone(), v2.clone())))
});
#root::dfir_pipes::pull::iter(iter)
};
};
Ok(OperatorWriteOutput {
write_prologue: quote_spanned! {op_span=>
#lhs_prologue
#rhs_prologue
},
write_iterator,
write_iterator_after: Default::default(),
write_tick_end: quote_spanned! {op_span=>
#lhs_tick_end
#rhs_tick_end
},
})
},
};
pub(crate) fn make_joindata(
wc: &WriteContextArgs,
persistence: Persistence,
side: &str,
) -> Result<(TokenStream, TokenStream, TokenStream, Ident), Diagnostic> {
let joindata_ident = wc.make_ident(format!("joindata_{}", side));
let borrow_ident = wc.make_ident(format!("joindata_{}_borrow", side));
let &WriteContextArgs {
root,
op_span,
..
} = wc;
Ok(match persistence {
Persistence::None => (
Default::default(),
Default::default(),
quote_spanned! {op_span=>
let #borrow_ident = &mut #root::rustc_hash::FxHashMap::default();
},
borrow_ident,
),
Persistence::Tick | Persistence::Loop | Persistence::Static => {
let tick_end = match persistence {
Persistence::Tick => quote_spanned! {op_span=>
#joindata_ident.clear();
},
_ => Default::default(),
};
(
quote_spanned! {op_span=>
let mut #joindata_ident = #root::rustc_hash::FxHashMap::default();
},
tick_end,
quote_spanned! {op_span=>
let #borrow_ident = &mut #joindata_ident;
},
borrow_ident
)
}
Persistence::Mutable => panic!(),
})
}