use std::marker::PhantomData;
use async_trait::async_trait;
use bytes::Bytes;
use crate::dsl::processors::change::Change;
use crate::dsl::processors::fk::murmur3::hash128;
use crate::dsl::processors::fk::subscription::{
Instruction, SubscriptionResponseWrapper, SubscriptionWrapper,
};
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
use crate::processor::serde::Serde;
type Marker<T> = PhantomData<fn() -> T>;
pub(crate) struct SubscriptionSendProcessor<K, VA, KO, FKE> {
pub fk_extractor: FKE,
pub va_serde: Box<dyn Serde<VA>>,
pub ko_serde: Box<dyn Serde<KO>>,
pub k_serde: Box<dyn Serde<K>>,
pub is_left: bool,
pub _pd: Marker<(K, VA, KO)>,
}
impl<K, VA, KO, FKE> SubscriptionSendProcessor<K, VA, KO, FKE>
where
K: Send + 'static,
VA: Send + 'static,
KO: Send + Clone + 'static,
FKE: Fn(&VA) -> KO + Send + 'static,
{
fn forward(
&self,
ctx: &mut ProcessorContext<'_, '_, KO, SubscriptionWrapper>,
key: &K,
fk: KO,
instruction: Instruction,
hash: Option<Vec<u8>>,
timestamp: i64,
) {
let primary_key = self.k_serde.serialize("", key);
let primary_partition = ctx.record_context().partition;
ctx.forward(Record::new(
Some(fk),
SubscriptionWrapper {
instruction,
hash,
primary_key,
primary_partition,
},
timestamp,
));
}
}
#[async_trait]
impl<K, VA, KO, FKE> Processor<K, Change<VA>, KO, SubscriptionWrapper>
for SubscriptionSendProcessor<K, VA, KO, FKE>
where
K: Send + Sync + 'static,
VA: Send + Sync + 'static,
KO: Send + Sync + Clone + 'static,
FKE: Fn(&VA) -> KO + Send + Sync + 'static,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, KO, SubscriptionWrapper>,
r: Record<K, Change<VA>>,
) {
let key = r.key.expect("FK subscription-send requires a non-null key");
let ts = r.timestamp;
let hash = r
.value
.new
.as_ref()
.map(|v| hash128(&self.va_serde.serialize("", v)).to_vec());
let old_fk = r.value.old.as_ref().map(|v| (self.fk_extractor)(v));
let new_fk = r.value.new.as_ref().map(|v| (self.fk_extractor)(v));
let had_old = r.value.old.is_some();
if self.is_left {
self.left_join_instructions(ctx, &key, had_old, old_fk, new_fk, hash.as_deref(), ts);
} else {
self.default_join_instructions(ctx, &key, had_old, old_fk, new_fk, hash.as_deref(), ts);
}
}
}
impl<K, VA, KO, FKE> SubscriptionSendProcessor<K, VA, KO, FKE>
where
K: Send + 'static,
VA: Send + 'static,
KO: Send + Clone + 'static,
FKE: Fn(&VA) -> KO + Send + 'static,
{
fn fk_differs(&self, a: &KO, b: &KO) -> bool {
self.ko_serde.serialize("", a) != self.ko_serde.serialize("", b)
}
#[allow(clippy::too_many_arguments)] fn left_join_instructions(
&self,
ctx: &mut ProcessorContext<'_, '_, KO, SubscriptionWrapper>,
key: &K,
had_old: bool,
old_fk: Option<KO>,
new_fk: Option<KO>,
hash: Option<&[u8]>,
ts: i64,
) {
use Instruction::{DeleteKeyNoPropagate, PropagateNullIfNoFkValAvailable};
if had_old {
if let Some(ofk) = old_fk
&& new_fk.as_ref().is_none_or(|nfk| self.fk_differs(nfk, &ofk))
{
self.forward(
ctx,
key,
ofk,
DeleteKeyNoPropagate,
hash.map(<[u8]>::to_vec),
ts,
);
}
if let Some(nfk) = new_fk {
self.forward(
ctx,
key,
nfk,
PropagateNullIfNoFkValAvailable,
hash.map(<[u8]>::to_vec),
ts,
);
}
} else if let Some(nfk) = new_fk {
self.forward(
ctx,
key,
nfk,
PropagateNullIfNoFkValAvailable,
hash.map(<[u8]>::to_vec),
ts,
);
}
}
#[allow(clippy::too_many_arguments)] fn default_join_instructions(
&self,
ctx: &mut ProcessorContext<'_, '_, KO, SubscriptionWrapper>,
key: &K,
had_old: bool,
old_fk: Option<KO>,
new_fk: Option<KO>,
hash: Option<&[u8]>,
ts: i64,
) {
use Instruction::{
DeleteKeyAndPropagate, DeleteKeyNoPropagate, PropagateNullIfNoFkValAvailable,
PropagateOnlyIfFkValAvailable,
};
let h = || hash.map(<[u8]>::to_vec);
if !had_old {
if let Some(nfk) = new_fk {
self.forward(ctx, key, nfk, PropagateOnlyIfFkValAvailable, h(), ts);
}
return;
}
match (old_fk, new_fk) {
(None, None) => { }
(Some(ofk), None) => {
self.forward(ctx, key, ofk, DeleteKeyAndPropagate, h(), ts);
}
(Some(ofk), Some(nfk)) if self.fk_differs(&ofk, &nfk) => {
self.forward(ctx, key, ofk, DeleteKeyNoPropagate, h(), ts);
self.forward(ctx, key, nfk, PropagateNullIfNoFkValAvailable, h(), ts);
}
(_, Some(nfk)) => {
self.forward(ctx, key, nfk, PropagateOnlyIfFkValAvailable, h(), ts);
}
}
}
}
pub(crate) struct SubscriptionReceiveProcessor<KO> {
pub store_name: String,
pub ko_serde: Box<dyn Serde<KO>>,
pub _pd: Marker<KO>,
}
#[async_trait]
impl<KO> Processor<KO, SubscriptionWrapper, KO, SubscriptionWrapper>
for SubscriptionReceiveProcessor<KO>
where
KO: Send + Sync + Clone + 'static,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, KO, SubscriptionWrapper>,
r: Record<KO, SubscriptionWrapper>,
) {
let fk = r
.key
.expect("FK subscription-receive requires a non-null key");
let fk_bytes = self.ko_serde.serialize(&self.store_name, &fk);
let w = r.value;
let pk = w.primary_key.clone();
let is_delete = matches!(
w.instruction,
Instruction::DeleteKeyAndPropagate | Instruction::DeleteKeyNoPropagate
);
{
let store = ctx
.get_fk_subscription_store(&self.store_name)
.expect("FK subscription store not found");
if is_delete {
store.delete(&fk_bytes, &pk).await;
} else {
store.put(&fk_bytes, &pk, &w, r.timestamp).await;
}
}
ctx.forward(Record::new(Some(fk), w, r.timestamp));
}
}
pub(crate) struct SubscriptionJoinProcessor<KO, K, VB> {
pub b_store: String,
pub k_serde: Box<dyn Serde<K>>,
pub vb_serde: Box<dyn Serde<VB>>,
pub _pd: Marker<(KO, K, VB)>,
}
#[async_trait]
impl<KO, K, VB> Processor<KO, SubscriptionWrapper, K, SubscriptionResponseWrapper>
for SubscriptionJoinProcessor<KO, K, VB>
where
KO: Send + Sync + Clone + 'static,
K: Send + Sync + Clone + 'static,
VB: Send + Sync + Clone + 'static,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, K, SubscriptionResponseWrapper>,
r: Record<KO, SubscriptionWrapper>,
) {
let fk = r.key.expect("FK subscription-join requires a non-null key");
let w = r.value;
let fk_val: Option<VB> = match ctx.get_state_store::<KO, VB>(&self.b_store) {
Some(s) => s.get(&fk).await,
None => None,
};
let fk_val_bytes = fk_val
.as_ref()
.map(|v| self.vb_serde.serialize(&self.b_store, v));
let pk = self.k_serde.deserialize(&self.b_store, &w.primary_key).ok();
let response = match w.instruction {
Instruction::DeleteKeyAndPropagate => Some(SubscriptionResponseWrapper {
hash: w.hash.clone(),
foreign_value: None,
}),
Instruction::PropagateNullIfNoFkValAvailable => Some(SubscriptionResponseWrapper {
hash: w.hash.clone(),
foreign_value: fk_val_bytes,
}),
Instruction::PropagateOnlyIfFkValAvailable => {
fk_val_bytes.map(|fv| SubscriptionResponseWrapper {
hash: w.hash.clone(),
foreign_value: Some(fv),
})
}
Instruction::DeleteKeyNoPropagate => None,
};
if let Some(resp) = response {
ctx.forward(Record::new(pk, resp, r.timestamp));
}
}
}
pub(crate) struct ForeignTableJoinProcessor<KO, K, VB> {
pub store_name: String,
pub ko_serde: Box<dyn Serde<KO>>,
pub k_serde: Box<dyn Serde<K>>,
pub vb_serde: Box<dyn Serde<VB>>,
pub _pd: Marker<(KO, K, VB)>,
}
#[async_trait]
impl<KO, K, VB> Processor<KO, Change<VB>, K, SubscriptionResponseWrapper>
for ForeignTableJoinProcessor<KO, K, VB>
where
KO: Send + Sync + Clone + 'static,
K: Send + Sync + Clone + 'static,
VB: Send + Sync + Clone + 'static,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, K, SubscriptionResponseWrapper>,
r: Record<KO, Change<VB>>,
) {
let fk = r
.key
.expect("FK foreign-table-join requires a non-null key");
let fk_bytes = self.ko_serde.serialize(&self.store_name, &fk);
let new_vb: Option<Bytes> = r
.value
.new
.as_ref()
.map(|v| self.vb_serde.serialize(&self.store_name, v));
let subs: Vec<(Bytes, SubscriptionWrapper)> = {
let store = ctx
.get_fk_subscription_store(&self.store_name)
.expect("FK subscription store not found");
store.range_by_foreign(&fk_bytes).await
};
for (pk_bytes, w) in subs {
let Ok(pk) = self.k_serde.deserialize(&self.store_name, &pk_bytes) else {
continue;
};
ctx.forward(Record::new(
Some(pk),
SubscriptionResponseWrapper {
hash: w.hash.clone(),
foreign_value: new_vb.clone(),
},
r.timestamp,
));
}
}
}
pub(crate) struct SubscriptionResolverProcessor<K, VA, VB, VR, J> {
pub a_store: String,
pub va_serde: Box<dyn Serde<VA>>,
pub vb_serde: Box<dyn Serde<VB>>,
pub joiner: J,
pub is_left: bool,
pub _pd: Marker<(K, VA, VB, VR)>,
}
#[async_trait]
impl<K, VA, VB, VR, J> Processor<K, SubscriptionResponseWrapper, K, Change<VR>>
for SubscriptionResolverProcessor<K, VA, VB, VR, J>
where
K: Send + Sync + Clone + 'static,
VA: Send + Sync + Clone + 'static,
VB: Send + Sync + Clone + 'static,
VR: Send + Sync + Clone + 'static,
J: Fn(&VA, Option<&VB>) -> VR + Send + Sync + 'static,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, K, Change<VR>>,
r: Record<K, SubscriptionResponseWrapper>,
) {
let key = r.key.expect("FK resolver requires a non-null key");
let resp = r.value;
let current_va: Option<VA> = match ctx.get_state_store::<K, VA>(&self.a_store) {
Some(s) => s.get(&key).await,
None => None,
};
let current_hash: Option<Vec<u8>> = current_va
.as_ref()
.map(|v| hash128(&self.va_serde.serialize("", v)).to_vec());
if resp.hash != current_hash {
return;
}
let foreign_vb: Option<VB> = resp
.foreign_value
.as_ref()
.and_then(|b| self.vb_serde.deserialize(&self.a_store, b).ok());
let result: Option<VR> =
if resp.foreign_value.is_none() && (!self.is_left || current_va.is_none()) {
None } else {
current_va
.as_ref()
.map(|va| (self.joiner)(va, foreign_vb.as_ref()))
};
ctx.forward(Record::new(
Some(key),
Change {
old: None,
new: result,
},
r.timestamp,
));
}
}
pub(crate) struct FkJoinOutputProcessor<K, VR> {
pub _pd: Marker<(K, VR)>,
}
#[async_trait]
impl<K, VR> Processor<K, Change<VR>, K, Change<VR>> for FkJoinOutputProcessor<K, VR>
where
K: Send + Sync + Clone + 'static,
VR: Send + Sync + Clone + 'static,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, K, Change<VR>>,
r: Record<K, Change<VR>>,
) {
ctx.forward(r);
}
}
#[cfg(test)]
pub(crate) type ChangeBuffer<K, V> = std::sync::Arc<std::sync::Mutex<Vec<(Option<K>, Option<V>)>>>;
#[cfg(test)]
pub(crate) struct ChangeCollectorProcessor<K, V> {
pub buf: ChangeBuffer<K, V>,
pub _pd: Marker<(K, V)>,
}
#[cfg(test)]
#[async_trait]
impl<K, V> Processor<K, Change<V>, K, V> for ChangeCollectorProcessor<K, V>
where
K: Send + Sync + Clone + 'static,
V: Send + Sync + Clone + 'static,
{
async fn process(
&mut self,
_ctx: &mut ProcessorContext<'_, '_, K, V>,
r: Record<K, Change<V>>,
) {
self.buf.lock().unwrap().push((r.key, r.value.new));
}
}