use std::any::Any;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use crate::dsl::builder::InternalStreamsBuilder;
use crate::dsl::graph::{GraphNodeKind, LowerState, NodeId};
use crate::dsl::kstream::KStream;
use crate::dsl::names;
use crate::dsl::processors::change::Change;
use crate::dsl::processors::ktable_join::{
JoinKind, KTableKTableJoinOtherProcessor, KTableKTableJoinThisProcessor,
};
use crate::dsl::processors::stateless::MergeProcessor;
use crate::dsl::processors::table::{
KTableFilterProcessor, KTableMapValuesProcessor, KTableMapValuesViewProcessor,
KTableToStreamProcessor,
};
use crate::processor::serde::Serde;
use crate::topology::NodeHandle;
pub(crate) type SuppressStoreFactory = Arc<dyn Fn(&mut LowerState, &str, &str, bool) + Send + Sync>;
pub(crate) fn kv_suppress_factory<K, V, KS, VS>(
key_serde: KS,
value_serde: VS,
) -> SuppressStoreFactory
where
K: Any + Send + Sync + Clone,
V: Any + Send + Clone,
KS: Serde<K> + Clone + 'static,
VS: Serde<V> + Clone + 'static,
{
Arc::new(
move |state: &mut LowerState, store_name: &str, proc_name: &str, logging: bool| {
state.topology.add_suppress_store::<K, V, KS, VS>(
store_name.to_string(),
key_serde.clone(),
value_serde.clone(),
logging,
[proc_name.to_string()],
);
},
)
}
pub struct KTable<K, V> {
builder: Rc<RefCell<InternalStreamsBuilder>>,
node: NodeId,
#[allow(dead_code)]
store_name: Option<String>,
#[allow(dead_code)]
source_topic: Option<String>,
window_grace_ms: Option<i64>,
suppress_store_factory: Option<SuppressStoreFactory>,
_pd: PhantomData<fn() -> (K, V)>,
}
impl<K, V> KTable<K, V> {
pub(crate) fn new(
builder: Rc<RefCell<InternalStreamsBuilder>>,
node: NodeId,
store_name: Option<String>,
source_topic: Option<String>,
) -> Self {
Self {
builder,
node,
store_name,
source_topic,
window_grace_ms: None,
suppress_store_factory: None,
_pd: PhantomData,
}
}
#[allow(dead_code)]
pub(crate) fn store_name(&self) -> Option<&str> {
self.store_name.as_deref()
}
#[allow(dead_code)]
pub(crate) fn source_topic(&self) -> Option<&str> {
self.source_topic.as_deref()
}
#[must_use]
pub(crate) fn with_window_grace(mut self, grace_ms: Option<i64>) -> Self {
self.window_grace_ms = grace_ms;
self
}
#[must_use]
pub(crate) fn with_suppress_factory(mut self, factory: Option<SuppressStoreFactory>) -> Self {
self.suppress_store_factory = factory;
self
}
}
impl<K, V> KTable<K, V>
where
K: Any + Send + Sync + Clone,
V: Any + Send + Clone,
{
#[must_use]
pub fn to_stream(&self) -> KStream<K, V> {
let parent_id = self.node;
let mut g = self.builder.borrow_mut();
let name = g.new_processor_name(names::TABLE_TOSTREAM);
let id = g.graph.add(
name.clone(),
GraphNodeKind::TableProcessor { store_name: None },
vec![parent_id],
);
g.graph.nodes[id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent =
NodeHandle::<K, Change<V>>::from_name(state.handle_name[&parent_id].clone());
let h = state.topology.add_processor::<K, Change<V>, K, V, _, _, _>(
name.clone(),
|| KTableToStreamProcessor { _pd: PhantomData },
[parent],
);
state.handle_name.insert(id, h.name().to_string());
}));
drop(g);
KStream::new(Rc::clone(&self.builder), id)
}
pub fn map_values<V2, F>(&self, f: F) -> KTable<K, V2>
where
V2: Any + Send + Clone,
F: Fn(&V) -> V2 + Clone + Send + Sync + 'static,
{
let grace = self.window_grace_ms;
let parent_id = self.node;
let mut g = self.builder.borrow_mut();
let name = g.new_processor_name(names::TABLE_MAPVALUES);
let id = g.graph.add(
name.clone(),
GraphNodeKind::TableProcessor { store_name: None },
vec![parent_id],
);
let f2 = f.clone();
g.graph.nodes[id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent =
NodeHandle::<K, Change<V>>::from_name(state.handle_name[&parent_id].clone());
let h = state
.topology
.add_processor::<K, Change<V>, K, Change<V2>, _, _, _>(
name.clone(),
move || KTableMapValuesViewProcessor {
f: f2.clone(),
_pd: PhantomData,
},
[parent],
);
state.handle_name.insert(id, h.name().to_string());
}));
drop(g);
KTable::new(Rc::clone(&self.builder), id, None, None).with_window_grace(grace)
}
pub fn map_values_materialized<V2, KS, VS, F>(
&self,
f: F,
materialized: crate::dsl::config::Materialized<KS, VS>,
) -> KTable<K, V2>
where
V2: Any + Send + Clone,
KS: Serde<K> + Clone + 'static,
VS: Serde<V2> + Clone + 'static,
F: Fn(&V) -> V2 + Clone + Send + Sync + 'static,
{
let grace = self.window_grace_ms;
let store_name = mint_table_store(&self.builder, &materialized, names::TABLE_MAPVALUES);
let crate::dsl::config::Materialized {
key_serde,
value_serde,
..
} = materialized;
let parent_id = self.node;
let mut g = self.builder.borrow_mut();
let name = g.new_processor_name(names::TABLE_MAPVALUES);
let id = g.graph.add(
name.clone(),
GraphNodeKind::TableProcessor {
store_name: Some(store_name.clone()),
},
vec![parent_id],
);
let f2 = f.clone();
let store_for_thunk = store_name.clone();
g.graph.nodes[id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent =
NodeHandle::<K, Change<V>>::from_name(state.handle_name[&parent_id].clone());
let store_for_proc = store_for_thunk.clone();
let h = state
.topology
.add_processor::<K, Change<V>, K, Change<V2>, _, _, _>(
name.clone(),
move || KTableMapValuesProcessor {
f: f2.clone(),
store_name: store_for_proc.clone(),
_pd: PhantomData,
},
[parent],
);
state.topology.add_state_store::<K, V2, KS, VS>(
store_for_thunk.clone(),
key_serde.clone(),
value_serde.clone(),
[h.name().to_string()],
);
state.handle_name.insert(id, h.name().to_string());
}));
drop(g);
KTable::new(Rc::clone(&self.builder), id, Some(store_name), None).with_window_grace(grace)
}
#[must_use]
pub fn filter<KS, VS, P>(
&self,
predicate: P,
materialized: crate::dsl::config::Materialized<KS, VS>,
) -> KTable<K, V>
where
KS: Serde<K> + Clone + 'static,
VS: Serde<V> + Clone + 'static,
P: Fn(&K, &V) -> bool + Clone + Send + Sync + 'static,
{
let grace = self.window_grace_ms;
let suppress_factory = self.suppress_store_factory.clone();
let store_name = mint_table_store(&self.builder, &materialized, names::TABLE_FILTER);
let crate::dsl::config::Materialized {
key_serde,
value_serde,
..
} = materialized;
let parent_id = self.node;
let mut g = self.builder.borrow_mut();
let name = g.new_processor_name(names::TABLE_FILTER);
let id = g.graph.add(
name.clone(),
GraphNodeKind::TableProcessor {
store_name: Some(store_name.clone()),
},
vec![parent_id],
);
let p2 = predicate.clone();
let store_for_thunk = store_name.clone();
g.graph.nodes[id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent =
NodeHandle::<K, Change<V>>::from_name(state.handle_name[&parent_id].clone());
let store_for_proc = store_for_thunk.clone();
let h = state
.topology
.add_processor::<K, Change<V>, K, Change<V>, _, _, _>(
name.clone(),
move || KTableFilterProcessor {
predicate: p2.clone(),
store_name: store_for_proc.clone(),
_pd: PhantomData,
},
[parent],
);
state.topology.add_state_store::<K, V, KS, VS>(
store_for_thunk.clone(),
key_serde.clone(),
value_serde.clone(),
[h.name().to_string()],
);
state.handle_name.insert(id, h.name().to_string());
}));
drop(g);
KTable::new(Rc::clone(&self.builder), id, Some(store_name), None)
.with_window_grace(grace)
.with_suppress_factory(suppress_factory)
}
pub fn join<VB, VR, F>(&self, other: &KTable<K, VB>, joiner: F) -> KTable<K, VR>
where
VB: Any + Send + Clone,
VR: Any + Send + Clone,
F: Fn(&V, &VB) -> VR + Clone + Send + Sync + 'static,
{
let jf = move |a: Option<&V>, b: Option<&VB>| {
joiner(
a.expect("inner join: a present"),
b.expect("inner join: b present"),
)
};
self.join_impl(other, jf, JoinKind::inner())
}
pub fn left_join<VB, VR, F>(&self, other: &KTable<K, VB>, joiner: F) -> KTable<K, VR>
where
VB: Any + Send + Clone,
VR: Any + Send + Clone,
F: Fn(&V, Option<&VB>) -> VR + Clone + Send + Sync + 'static,
{
let jf = move |a: Option<&V>, b: Option<&VB>| joiner(a.expect("left join: a present"), b);
self.join_impl(other, jf, JoinKind::left())
}
pub fn outer_join<VB, VR, F>(&self, other: &KTable<K, VB>, joiner: F) -> KTable<K, VR>
where
VB: Any + Send + Clone,
VR: Any + Send + Clone,
F: Fn(Option<&V>, Option<&VB>) -> VR + Clone + Send + Sync + 'static,
{
self.join_impl(other, joiner, JoinKind::outer())
}
#[allow(clippy::too_many_lines)]
fn join_impl<VB, VR, JF>(&self, other: &KTable<K, VB>, jf: JF, kind: JoinKind) -> KTable<K, VR>
where
VB: Any + Send + Clone,
VR: Any + Send + Clone,
JF: Fn(Option<&V>, Option<&VB>) -> VR + Clone + Send + Sync + 'static,
{
let a_store = self
.store_name()
.expect("KTable-KTable join: left table must be materialized")
.to_string();
let b_store = other
.store_name()
.expect("KTable-KTable join: right table must be materialized")
.to_string();
let a_src = self.source_topic().map(str::to_string);
let b_src = other.source_topic().map(str::to_string);
let self_node = self.node;
let other_node = other.node;
let mut g = self.builder.borrow_mut();
let join_this = g.new_processor_name(names::KTABLE_JOIN_THIS);
let join_other = g.new_processor_name(names::KTABLE_JOIN_OTHER);
let merge = g.new_processor_name(names::KTABLE_MERGE);
let this_id = g.graph.add(
join_this.clone(),
GraphNodeKind::StatelessProcessor {
repartition_required: false,
},
vec![self_node],
);
let b_store_this = b_store.clone();
let jf_this = jf.clone();
let join_this_name = join_this.clone();
g.graph.nodes[this_id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent =
NodeHandle::<K, Change<V>>::from_name(state.handle_name[&self_node].clone());
let store_for_proc = b_store_this.clone();
let jf_for_proc = jf_this.clone();
let h = state
.topology
.add_processor::<K, Change<V>, K, Change<VR>, _, _, _>(
join_this_name.clone(),
move || KTableKTableJoinThisProcessor {
other_store: store_for_proc.clone(),
joiner: jf_for_proc.clone(),
kind,
_pd: PhantomData,
},
[parent],
);
state
.topology
.connect_processor_store(h.name(), &b_store_this);
state.handle_name.insert(this_id, h.name().to_string());
}));
let other_id = g.graph.add(
join_other.clone(),
GraphNodeKind::StatelessProcessor {
repartition_required: false,
},
vec![other_node],
);
let a_store_other = a_store.clone();
let jf_other = jf.clone();
let join_other_name = join_other.clone();
g.graph.nodes[other_id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent =
NodeHandle::<K, Change<VB>>::from_name(state.handle_name[&other_node].clone());
let store_for_proc = a_store_other.clone();
let jf_for_proc = jf_other.clone();
let h = state
.topology
.add_processor::<K, Change<VB>, K, Change<VR>, _, _, _>(
join_other_name.clone(),
move || KTableKTableJoinOtherProcessor {
other_store: store_for_proc.clone(),
joiner: jf_for_proc.clone(),
kind,
_pd: PhantomData,
},
[parent],
);
state
.topology
.connect_processor_store(h.name(), &a_store_other);
state.handle_name.insert(other_id, h.name().to_string());
}));
let merge_id = g.graph.add(
merge.clone(),
GraphNodeKind::StatelessProcessor {
repartition_required: false,
},
vec![this_id, other_id],
);
g.graph.nodes[merge_id].lower = Some(Box::new(move |state: &mut LowerState| {
let this_parent =
NodeHandle::<K, Change<VR>>::from_name(state.handle_name[&this_id].clone());
let other_parent =
NodeHandle::<K, Change<VR>>::from_name(state.handle_name[&other_id].clone());
let h = state
.topology
.add_processor::<K, Change<VR>, K, Change<VR>, _, _, _>(
merge.clone(),
|| MergeProcessor::<K, Change<VR>> { _pd: PhantomData },
[this_parent, other_parent],
);
if let (Some(a), Some(bb)) = (&a_src, &b_src) {
state
.topology
.add_copartition_group([a.clone(), bb.clone()]);
}
state.handle_name.insert(merge_id, h.name().to_string());
}));
drop(g);
KTable::new(Rc::clone(&self.builder), merge_id, None, None)
}
}
impl<K, V> KTable<K, V>
where
K: Any + Send + Sync + Clone,
V: Any + Send + Clone,
{
#[must_use]
pub fn suppress(&self, suppressed: crate::dsl::suppress::Suppressed<K>) -> KTable<K, V> {
let wait_ms = match suppressed.wait {
crate::dsl::suppress::WaitKind::UpstreamGrace => self.window_grace_ms.unwrap_or(0),
crate::dsl::suppress::WaitKind::Fixed(ms) => ms,
};
let buffer_time = suppressed.buffer_time;
let max_records = suppressed.buffer.record_cap();
let max_bytes = suppressed.buffer.byte_cap();
let emit_early = suppressed.buffer.is_emit_early();
let logging = suppressed.logging;
let factory = self.suppress_store_factory.clone().expect(
"suppress requires a serde-carrying KTable (a windowed/session aggregation \
or builder.table); a mapValues-derived view has no value serde for the buffer",
);
let parent_id = self.node;
let mut g = self.builder.borrow_mut();
let name = g.new_processor_name(names::KTABLE_SUPPRESS);
let store_name = g.new_processor_name(names::KTABLE_SUPPRESS_STORE);
let store_for_thunk = store_name.clone();
let id = g.graph.add(
name.clone(),
GraphNodeKind::TableProcessor { store_name: None },
vec![parent_id],
);
g.graph.nodes[id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent =
NodeHandle::<K, Change<V>>::from_name(state.handle_name[&parent_id].clone());
let store_for_proc = store_for_thunk.clone();
let h = state
.topology
.add_processor::<K, Change<V>, K, Change<V>, _, _, _>(
name.clone(),
move || {
crate::dsl::processors::suppress::KTableSuppressProcessor::<K, V>::new(
store_for_proc.clone(),
wait_ms,
buffer_time,
max_records,
max_bytes,
emit_early,
)
},
[parent],
);
let proc_name = h.name().to_string();
factory(state, &store_for_thunk, &proc_name, logging);
state.handle_name.insert(id, proc_name);
}));
drop(g);
KTable::new(Rc::clone(&self.builder), id, Some(store_name), None)
.with_window_grace(self.window_grace_ms)
.with_suppress_factory(self.suppress_store_factory.clone())
}
}
fn mint_table_store<KS, VS>(
builder: &Rc<RefCell<InternalStreamsBuilder>>,
materialized: &crate::dsl::config::Materialized<KS, VS>,
prefix: &str,
) -> String {
match &materialized.store_name {
Some(name) => name.clone(),
None => builder.borrow_mut().new_processor_name(prefix),
}
}