use std::any::Any;
use std::borrow::Borrow;
use std::collections::{BTreeMap, HashMap};
use std::marker::PhantomData;
pub(crate) type StoreFactory = Box<
dyn Fn(
&str,
String,
Box<dyn crate::store::byte::ByteKeyValueStore>,
) -> Box<dyn crate::store::api::StateStore>
+ Send
+ Sync,
>;
use crabka_protocol::owned::streams_group_heartbeat_request::Topology as WireTopology;
use super::grouping::group_nodes;
use super::node::{NodeKind, NodeRegistry};
use super::wire::to_wire;
use crate::processor::api::ProcessorSupplier;
use crate::processor::erased::ProcessorError;
use crate::processor::factory::{MakeDeser, NodeFactory};
use crate::processor::graph::{Graph, GraphSource};
use crate::processor::node::{ErasedNode, ProcessorNode, SinkNode, SourceNode};
use crate::processor::serde::{Consumed, Produced, Serde};
#[derive(Debug, thiserror::Error)]
pub enum TopologyError {
#[error("duplicate node name: {0}")]
DuplicateNode(String),
#[error("node {node} references unknown predecessor {predecessor}")]
UnknownPredecessor { node: String, predecessor: String },
#[error("topology has no source nodes")]
Empty,
}
#[derive(Debug, Clone)]
enum StoredError {
DuplicateNode(String),
UnknownPredecessor { node: String, predecessor: String },
Empty,
}
impl From<StoredError> for TopologyError {
fn from(e: StoredError) -> Self {
match e {
StoredError::DuplicateNode(n) => TopologyError::DuplicateNode(n),
StoredError::UnknownPredecessor { node, predecessor } => {
TopologyError::UnknownPredecessor { node, predecessor }
}
StoredError::Empty => TopologyError::Empty,
}
}
}
impl From<TopologyError> for StoredError {
fn from(e: TopologyError) -> Self {
match e {
TopologyError::DuplicateNode(n) => StoredError::DuplicateNode(n),
TopologyError::UnknownPredecessor { node, predecessor } => {
StoredError::UnknownPredecessor { node, predecessor }
}
TopologyError::Empty => StoredError::Empty,
}
}
}
#[derive(Default)]
pub struct Topology {
reg: NodeRegistry,
error: Option<StoredError>,
factories: HashMap<String, NodeFactory>,
store_factories: HashMap<String, (Option<String>, StoreFactory)>,
global_store_factories: HashMap<String, (Option<String>, StoreFactory)>,
global_store_topics: HashMap<String, String>,
}
impl std::fmt::Debug for Topology {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Topology")
.field("reg", &self.reg)
.field("error", &self.error)
.field(
"factories",
&format!("<{} factories>", self.factories.len()),
)
.field(
"store_factories",
&format!("<{} store_factories>", self.store_factories.len()),
)
.field(
"global_store_factories",
&format!(
"<{} global_store_factories>",
self.global_store_factories.len()
),
)
.field("global_store_topics", &self.global_store_topics)
.finish()
}
}
pub struct NodeHandle<K, V> {
name: String,
_pd: PhantomData<fn() -> (K, V)>,
}
impl<K, V> NodeHandle<K, V> {
fn new(name: String) -> Self {
Self {
name,
_pd: PhantomData,
}
}
pub(crate) fn from_name(name: String) -> Self {
Self::new(name)
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
}
impl<K, V> Clone for NodeHandle<K, V> {
fn clone(&self) -> Self {
Self::new(self.name.clone())
}
}
impl<K, V> std::fmt::Debug for NodeHandle<K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NodeHandle")
.field("name", &self.name)
.finish()
}
}
impl Topology {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn add_source<K, V, KS, VS>(
&mut self,
name: impl Into<String>,
topics: impl IntoIterator<Item = impl Into<String>>,
consumed: impl Into<Consumed<KS, VS>>,
) -> NodeHandle<K, V>
where
K: Any + Send + Clone,
V: Any + Send + Clone,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
let Consumed {
key_serde,
value_serde,
} = consumed.into();
let name: String = name.into();
let topics: Vec<String> = topics.into_iter().map(Into::into).collect();
let r = self.reg.add_source(&name, topics.clone());
self.record(r);
let make_deser: MakeDeser = {
let ks = key_serde;
let vs = value_serde;
let n = name.clone();
Box::new(move || {
let node = SourceNode::new(n.clone(), ks.clone(), vs.clone());
Box::new(move |k: Option<&[u8]>, v: &[u8], ts: i64| node.deserialize(k, v, ts))
as Box<dyn Fn(Option<&[u8]>, &[u8], i64) -> Result<_, ProcessorError> + Send>
})
};
self.factories.insert(
name.clone(),
NodeFactory {
make_node: None,
make_deser: Some(make_deser),
},
);
NodeHandle::new(name)
}
pub fn add_processor<KIn, VIn, KOut, VOut, S, P, I>(
&mut self,
name: impl Into<String>,
supplier: S,
parents: I,
) -> NodeHandle<KOut, VOut>
where
KIn: Any + Send,
VIn: Any + Send,
KOut: Any + Send + Clone,
VOut: Any + Send + Clone,
S: ProcessorSupplier<KIn, VIn, KOut, VOut> + Clone,
I: IntoIterator<Item = P>,
P: Borrow<NodeHandle<KIn, VIn>>,
{
let name: String = name.into();
let preds: Vec<String> = parents
.into_iter()
.map(|p| p.borrow().name.clone())
.collect();
let r = self.reg.add_processor(&name, preds);
self.record(r);
let make_node: crate::processor::factory::MakeNode = {
let n = name.clone();
let s = supplier;
Box::new(move || {
Box::new(ProcessorNode::<KIn, VIn, KOut, VOut>::new(n.clone(), &s))
as Box<dyn ErasedNode>
})
};
self.factories.insert(
name.clone(),
NodeFactory {
make_node: Some(make_node),
make_deser: None,
},
);
NodeHandle::new(name)
}
pub fn add_sink<K, V, KS, VS, P, I>(
&mut self,
name: impl Into<String>,
topic: impl Into<String>,
parents: I,
produced: impl Into<Produced<KS, VS>>,
) where
K: Any + Send,
V: Any + Send,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
I: IntoIterator<Item = P>,
P: Borrow<NodeHandle<K, V>>,
{
let Produced {
key_serde,
value_serde,
} = produced.into();
let name: String = name.into();
let topic: String = topic.into();
let preds: Vec<String> = parents
.into_iter()
.map(|p| p.borrow().name.clone())
.collect();
let r = self.reg.add_sink(&name, topic.clone(), preds);
self.record(r);
let make_node: crate::processor::factory::MakeNode = {
let n = name.clone();
let t = topic.clone();
let ks = key_serde;
let vs = value_serde;
Box::new(move || {
Box::new(SinkNode::<K, V, KS, VS>::new(
n.clone(),
t.clone(),
ks.clone(),
vs.clone(),
)) as Box<dyn ErasedNode>
})
};
self.factories.insert(
name,
NodeFactory {
make_node: Some(make_node),
make_deser: None,
},
);
}
pub fn add_state_store<K, V, KS, VS>(
&mut self,
name: impl Into<String>,
key_serde: KS,
value_serde: VS,
processors: impl IntoIterator<Item = impl Into<String>>,
) -> &mut Self
where
K: Send + 'static,
V: Send + 'static,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
self.add_state_store_inner::<K, V, KS, VS>(name, key_serde, value_serde, processors, None)
}
pub fn add_state_store_with_changelog<K, V, KS, VS>(
&mut self,
name: impl Into<String>,
key_serde: KS,
value_serde: VS,
processors: impl IntoIterator<Item = impl Into<String>>,
changelog_topic: impl Into<String>,
) -> &mut Self
where
K: Send + 'static,
V: Send + 'static,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
self.add_state_store_inner::<K, V, KS, VS>(
name,
key_serde,
value_serde,
processors,
Some(changelog_topic.into()),
)
}
fn add_state_store_inner<K, V, KS, VS>(
&mut self,
name: impl Into<String>,
key_serde: KS,
value_serde: VS,
processors: impl IntoIterator<Item = impl Into<String>>,
changelog_override: Option<String>,
) -> &mut Self
where
K: Send + 'static,
V: Send + 'static,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
let name: String = name.into();
let procs: Vec<String> = processors.into_iter().map(Into::into).collect();
self.reg.add_store(&name, procs, changelog_override.clone());
self.store_factories.insert(
name,
(
changelog_override,
Box::new(
move |store_name: &str,
changelog: String,
backend: Box<dyn crate::store::byte::ByteKeyValueStore>| {
Box::new(crate::store::kv::KeyValueBytesStore::<K, V>::new(
store_name.to_string(),
backend,
Box::new(key_serde.clone()),
Box::new(value_serde.clone()),
changelog,
)) as Box<dyn crate::store::api::StateStore>
},
),
),
);
self
}
pub fn add_window_store<K, V, KS, VS>(
&mut self,
name: impl Into<String>,
key_serde: KS,
value_serde: VS,
size_ms: i64,
grace_ms: i64,
processors: impl IntoIterator<Item = impl Into<String>>,
) -> &mut Self
where
K: Send + 'static,
V: Send + 'static,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
let name: String = name.into();
let retention_ms = size_ms + grace_ms + 86_400_000;
let procs: Vec<String> = processors.into_iter().map(Into::into).collect();
self.reg.add_window_store(&name, procs, None, retention_ms);
self.store_factories.insert(
name.clone(),
(
None,
Box::new(
move |store_name: &str,
changelog: String,
backend: Box<dyn crate::store::byte::ByteKeyValueStore>| {
Box::new(crate::store::window::WindowBytesStore::<K, V>::new(
store_name.to_string(),
backend,
Box::new(key_serde.clone()),
Box::new(value_serde.clone()),
changelog,
)) as Box<dyn crate::store::api::StateStore>
},
),
),
);
self
}
#[allow(clippy::too_many_arguments)] pub fn add_join_window_store<K, V, KS, VS>(
&mut self,
name: impl Into<String>,
key_serde: KS,
value_serde: VS,
before_ms: i64,
after_ms: i64,
grace_ms: i64,
processors: impl IntoIterator<Item = impl Into<String>>,
) -> &mut Self
where
K: Send + 'static,
V: Send + 'static,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
let name: String = name.into();
let retention_ms = before_ms + after_ms + grace_ms + 86_400_000;
let procs: Vec<String> = processors.into_iter().map(Into::into).collect();
self.reg
.add_join_window_store(&name, procs, None, retention_ms);
self.store_factories.insert(
name.clone(),
(
None,
Box::new(
move |store_name: &str,
changelog: String,
backend: Box<dyn crate::store::byte::ByteKeyValueStore>| {
Box::new(crate::store::join_window::JoinWindowBytesStore::<K, V>::new(
store_name.to_string(),
backend,
Box::new(key_serde.clone()),
Box::new(value_serde.clone()),
changelog,
)) as Box<dyn crate::store::api::StateStore>
},
),
),
);
self
}
pub fn add_session_store<K, V, KS, VS>(
&mut self,
name: impl Into<String>,
key_serde: KS,
value_serde: VS,
gap_ms: i64,
grace_ms: i64,
processors: impl IntoIterator<Item = impl Into<String>>,
) -> &mut Self
where
K: Send + 'static,
V: Send + 'static,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
let name: String = name.into();
let retention_ms = gap_ms + grace_ms + 86_400_000;
let procs: Vec<String> = processors.into_iter().map(Into::into).collect();
self.reg.add_window_store(&name, procs, None, retention_ms);
self.store_factories.insert(
name.clone(),
(
None,
Box::new(
move |store_name: &str,
changelog: String,
backend: Box<dyn crate::store::byte::ByteKeyValueStore>| {
Box::new(crate::store::session::SessionBytesStore::<K, V>::new(
store_name.to_string(),
backend,
Box::new(key_serde.clone()),
Box::new(value_serde.clone()),
changelog,
)) as Box<dyn crate::store::api::StateStore>
},
),
),
);
self
}
pub(crate) fn add_fk_subscription_store(
&mut self,
name: impl Into<String>,
processors: impl IntoIterator<Item = impl Into<String>>,
) -> &mut Self {
let name: String = name.into();
let procs: Vec<String> = processors.into_iter().map(Into::into).collect();
self.reg.add_store(&name, procs, None); self.store_factories.insert(
name,
(
None,
Box::new(
move |store_name: &str,
changelog: String,
backend: Box<dyn crate::store::byte::ByteKeyValueStore>| {
Box::new(crate::store::fk_subscription::SubscriptionBytesStore::new(
store_name.to_string(),
backend,
changelog,
)) as Box<dyn crate::store::api::StateStore>
},
),
),
);
self
}
pub fn add_suppress_store<K, V, KS, VS>(
&mut self,
name: impl Into<String>,
key_serde: KS,
value_serde: VS,
logging: bool,
processors: impl IntoIterator<Item = impl Into<String>>,
) -> &mut Self
where
K: Send + 'static,
V: Send + 'static,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
let name: String = name.into();
let procs: Vec<String> = processors.into_iter().map(Into::into).collect();
if logging {
self.reg.add_store(&name, procs, None);
}
self.store_factories.insert(
name.clone(),
(
None,
Box::new(
move |store_name: &str,
changelog: String,
_backend: Box<dyn crate::store::byte::ByteKeyValueStore>| {
let cl = if logging { changelog } else { String::new() };
let mut store =
crate::store::suppress_store::SuppressBytesStore::<K, V>::new(
store_name.to_string(),
Box::new(key_serde.clone()),
Box::new(value_serde.clone()),
cl,
);
if !logging {
crate::store::api::StateStore::set_logging(&mut store, false);
}
Box::new(store) as Box<dyn crate::store::api::StateStore>
},
),
),
);
self
}
pub(crate) fn add_state_store_no_changelog<K, V, KS, VS>(
&mut self,
name: impl Into<String>,
key_serde: KS,
value_serde: VS,
) -> &mut Self
where
K: Send + 'static,
V: Send + 'static,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
let name: String = name.into();
self.store_factories.insert(
name,
(
None, Box::new(
move |store_name: &str,
_changelog: String,
backend: Box<dyn crate::store::byte::ByteKeyValueStore>| {
Box::new(crate::store::kv::KeyValueBytesStore::<K, V>::new(
store_name.to_string(),
backend,
Box::new(key_serde.clone()),
Box::new(value_serde.clone()),
String::new(),
)) as Box<dyn crate::store::api::StateStore>
},
),
),
);
self
}
pub fn add_global_store<K, V, KS, VS>(
&mut self,
store_name: impl Into<String>,
source_name: impl Into<String>,
topic: impl Into<String>,
processor_name: impl Into<String>,
consumed: impl Into<Consumed<KS, VS>>,
) -> &mut Self
where
K: Send + 'static,
V: Send + 'static,
KS: Serde<K> + Clone,
VS: Serde<V> + Clone,
{
let store_name: String = store_name.into();
let source_name: String = source_name.into();
let topic: String = topic.into();
let processor_name: String = processor_name.into();
let Consumed {
key_serde,
value_serde,
} = consumed.into();
let r = self.reg.add_source(&source_name, vec![topic.clone()]);
self.record(r);
let r = self
.reg
.add_processor(&processor_name, vec![source_name.clone()]);
self.record(r);
self.reg.add_global_source(&topic);
let factory: StoreFactory = Box::new(
move |sn: &str,
_changelog: String,
backend: Box<dyn crate::store::byte::ByteKeyValueStore>| {
Box::new(crate::store::kv::KeyValueBytesStore::<K, V>::new(
sn.to_string(),
backend,
Box::new(key_serde.clone()),
Box::new(value_serde.clone()),
String::new(),
)) as Box<dyn crate::store::api::StateStore>
},
);
self.global_store_topics.insert(store_name.clone(), topic);
self.global_store_factories
.insert(store_name, (None, factory));
self
}
pub fn connect_processor_store(&mut self, processor: &str, store: &str) -> &mut Self {
self.reg.connect_processor_store(processor, store);
self
}
#[cfg(test)]
pub(crate) fn store_entry_for_test(&self, store: &str) -> Option<Vec<String>> {
self.reg
.stores
.iter()
.find(|e| e.name == store)
.map(|e| e.processors.clone())
}
#[cfg(test)]
pub(crate) fn has_global_store_for_test(&self, store: &str) -> bool {
self.global_store_factories.contains_key(store)
}
pub fn add_repartition_topic<S: Into<String>>(&mut self, name: S) -> &mut Self {
self.reg.repartition_topics.insert(name.into());
self
}
pub fn add_copartition_group(
&mut self,
topics: impl IntoIterator<Item = impl Into<String>>,
) -> &mut Self {
self.reg
.add_copartition_group(topics.into_iter().map(Into::into).collect());
self
}
pub fn build<S: Into<String>>(
mut self,
application_id: S,
) -> Result<BuiltTopology, TopologyError> {
if let Some(e) = self.error.take() {
return Err(e.into());
}
self.reg.validate_predecessors()?;
let groups = group_nodes(&self.reg);
if groups.is_empty() {
return Err(TopologyError::Empty);
}
let app = application_id.into();
let wire = to_wire(&groups, &app);
let mut source_topics: BTreeMap<String, Vec<String>> = BTreeMap::new();
for g in &groups {
let mut all = g.source_topics.clone();
all.extend(g.repartition_source_topics.iter().cloned());
source_topics.insert(g.id.clone(), all);
}
let mut global_nodes: std::collections::HashSet<&str> = std::collections::HashSet::new();
for n in &self.reg.nodes {
let is_global = match &n.kind {
NodeKind::Source { topics } => topics
.iter()
.any(|t| self.reg.global_source_topics.contains(t)),
NodeKind::Processor { predecessors } | NodeKind::Sink { predecessors, .. } => {
predecessors
.iter()
.any(|p| global_nodes.contains(p.as_str()))
}
};
if is_global {
global_nodes.insert(n.name.as_str());
}
}
let node_specs: Vec<NodeSpec> = self
.reg
.nodes
.iter()
.filter(|n| !global_nodes.contains(n.name.as_str()))
.map(|n| {
let (predecessors, kind_str, st, sink_t) = match &n.kind {
NodeKind::Source { topics } => (Vec::new(), "source", topics.clone(), None),
NodeKind::Processor { predecessors } => {
(predecessors.clone(), "processor", Vec::new(), None)
}
NodeKind::Sink {
predecessors,
topic,
} => (
predecessors.clone(),
"sink",
Vec::new(),
Some(topic.clone()),
),
};
NodeSpec {
name: n.name.clone(),
kind: kind_str,
predecessors,
source_topics: st,
sink_topic: sink_t,
}
})
.collect();
Ok(BuiltTopology {
wire,
source_topics,
application_id: app,
factories: self.factories,
node_specs,
store_factories: self.store_factories,
global_store_factories: self.global_store_factories,
global_store_topics: self.global_store_topics,
})
}
fn record(&mut self, r: Result<(), TopologyError>) {
if self.error.is_none()
&& let Err(e) = r
{
self.error = Some(e.into());
}
}
}
pub(crate) struct NodeSpec {
pub name: String,
pub kind: &'static str,
pub predecessors: Vec<String>,
pub source_topics: Vec<String>,
pub sink_topic: Option<String>,
}
pub struct BuiltTopology {
wire: WireTopology,
source_topics: BTreeMap<String, Vec<String>>,
application_id: String,
factories: HashMap<String, NodeFactory>,
node_specs: Vec<NodeSpec>,
store_factories: HashMap<String, (Option<String>, StoreFactory)>,
global_store_factories: HashMap<String, (Option<String>, StoreFactory)>,
#[allow(dead_code)]
global_store_topics: HashMap<String, String>,
}
impl std::fmt::Debug for BuiltTopology {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BuiltTopology")
.field("application_id", &self.application_id)
.field("source_topics", &self.source_topics)
.finish_non_exhaustive()
}
}
impl BuiltTopology {
#[must_use]
pub fn to_wire(&self) -> super::wire::WireTopology {
super::wire::WireTopology::from(&self.wire)
}
#[must_use]
pub(crate) fn to_wire_request(&self) -> WireTopology {
self.wire.clone()
}
pub(crate) fn global_store_factories(
&self,
) -> &HashMap<String, (Option<String>, StoreFactory)> {
&self.global_store_factories
}
pub(crate) fn global_store_topics(&self) -> HashMap<String, String> {
self.global_store_topics.clone()
}
#[must_use]
pub fn source_topics_for(&self, subtopology_id: &str) -> &[String] {
self.source_topics
.get(subtopology_id)
.map_or(&[], Vec::as_slice)
}
#[must_use]
pub fn application_id(&self) -> &str {
&self.application_id
}
#[must_use]
pub fn list_source_topics(&self) -> Vec<String> {
self.node_specs
.iter()
.filter(|s| s.kind == "source")
.flat_map(|s| s.source_topics.iter().cloned())
.collect()
}
#[must_use]
pub fn list_sink_topics(&self) -> Vec<String> {
self.node_specs
.iter()
.filter_map(|s| s.sink_topic.clone())
.collect()
}
pub(crate) async fn instantiate(
&self,
backend: &crate::store::backend::StoreBackend,
app_id: &str,
) -> Result<Graph, ProcessorError> {
let non_source: Vec<&NodeSpec> = self
.node_specs
.iter()
.filter(|s| s.kind != "source")
.collect();
let name_to_idx: HashMap<&str, usize> = non_source
.iter()
.enumerate()
.map(|(i, s)| (s.name.as_str(), i))
.collect();
let nodes: Vec<Box<dyn ErasedNode>> = non_source
.iter()
.map(|s| {
let factory = self
.factories
.get(&s.name)
.ok_or_else(|| ProcessorError::Serde {
node: s.name.clone(),
message: "factory missing".into(),
})?;
let make = factory
.make_node
.as_ref()
.ok_or_else(|| ProcessorError::Serde {
node: s.name.clone(),
message: "make_node missing".into(),
})?;
Ok((make)())
})
.collect::<Result<Vec<_>, ProcessorError>>()?;
let mut children: Vec<Vec<usize>> = vec![Vec::new(); nodes.len()];
for (child_idx, s) in non_source.iter().enumerate() {
for parent_name in &s.predecessors {
if let Some(&parent_idx) = name_to_idx.get(parent_name.as_str()) {
children[parent_idx].push(child_idx);
}
}
}
let sources: Vec<GraphSource> = self
.node_specs
.iter()
.filter(|s| s.kind == "source")
.flat_map(|src_spec| {
let src_name = src_spec.name.as_str();
let src_children: Vec<usize> = non_source
.iter()
.enumerate()
.filter(|(_, s)| s.predecessors.iter().any(|p| p == src_name))
.map(|(i, _)| i)
.collect();
let Some(factory) = self.factories.get(src_spec.name.as_str()) else {
return vec![];
};
let Some(make_deser) = &factory.make_deser else {
return vec![];
};
src_spec
.source_topics
.iter()
.map(|topic| {
let deser = (make_deser)();
GraphSource {
topic: topic.clone(),
deserialize: deser,
children: src_children.clone(),
}
})
.collect()
})
.collect();
let mut store_registry = crate::store::registry::StoreRegistry::default();
for (store_name, (changelog_override, factory)) in &self.store_factories {
let changelog = changelog_override
.clone()
.unwrap_or_else(|| format!("{app_id}-{store_name}-changelog"));
let bytes = backend.open(app_id, store_name).await;
store_registry.insert(factory(store_name, changelog, bytes));
}
Ok(Graph {
nodes,
children,
sources,
output: Vec::new(),
stores: store_registry,
globals: crate::runtime::global::GlobalStateManager::default(),
schedules: Vec::new(),
stream_time: i64::MIN,
wall_clock: 0,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
use crate::processor::serde::StringSerde;
use assert2::check;
use async_trait::async_trait;
struct Upper;
#[async_trait]
impl Processor<String, String, String, String> for Upper {
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, String, String>,
r: Record<String, String>,
) {
ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
}
}
#[test]
fn copartition_group_emitted_in_wire() {
use crate::processor::serde::{BytesSerde, Consumed, Produced};
let mut t = Topology::new();
let a = t.add_source("sa", ["left"], Consumed::with(BytesSerde, BytesSerde));
let b = t.add_source("sb", ["right"], Consumed::with(BytesSerde, BytesSerde));
t.add_sink(
"snk",
"out",
[&a, &b],
Produced::with(BytesSerde, BytesSerde),
); t.add_copartition_group(["left", "right"]);
let wire = t.build("app").unwrap().to_wire();
let sub = &wire.subtopologies[0];
check!(sub.copartition_groups.len() == 1);
check!(sub.copartition_groups[0].source_topics == vec![0i16, 1i16]); }
#[test]
fn global_store_is_invisible_and_bumps_stream_index() {
let mut t = Topology::new();
t.add_global_store::<String, String, _, _>(
"global-store",
"gsrc",
"global",
"gproc",
Consumed::with(StringSerde, StringSerde),
);
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
t.add_sink(
"snk",
"out",
[&src],
Produced::with(StringSerde, StringSerde),
);
check!(t.has_global_store_for_test("global-store"));
let built = t.build("app").unwrap();
let wire = built.to_wire();
check!(wire.subtopologies.len() == 1);
check!(wire.subtopologies[0].subtopology_id == "1");
check!(wire.subtopologies[0].source_topics == vec!["in".to_string()]);
check!(
wire.subtopologies
.iter()
.all(|s| s.state_changelog_topics.is_empty())
);
check!(
!wire.subtopologies[0]
.source_topics
.contains(&"global".to_string())
);
}
#[test]
fn build_single_source_sink_wire_unchanged() {
let mut t = Topology::new();
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
let up = t.add_processor("up", || Upper, [&src]);
t.add_sink(
"out",
"out-topic",
[&up],
Produced::with(StringSerde, StringSerde),
);
let built = t.build("app").unwrap();
let wire = built.to_wire();
check!(wire.epoch == 0);
check!(wire.subtopologies[0].subtopology_id == "0");
check!(wire.subtopologies[0].source_topics == vec!["in".to_string()]);
check!(built.source_topics_for("0") == ["in".to_string()]);
}
#[test]
fn topology_error_converts_to_stored_error() {
check!(matches!(
StoredError::from(TopologyError::Empty),
StoredError::Empty
));
check!(matches!(
StoredError::from(TopologyError::DuplicateNode("n".into())),
StoredError::DuplicateNode(_)
));
check!(matches!(
StoredError::from(TopologyError::UnknownPredecessor {
node: "a".into(),
predecessor: "b".into(),
}),
StoredError::UnknownPredecessor { .. }
));
}
#[test]
fn node_handle_clone_debug_and_owned_wiring() {
let mut t = Topology::new();
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
let src2 = src.clone();
check!(src2.name() == "src");
check!(format!("{src2:?}").contains("src"));
t.add_sink(
"out",
"out",
[src2],
Produced::with(StringSerde, StringSerde),
);
check!(t.build("app").is_ok());
}
#[test]
fn handle_from_another_topology_is_rejected_at_build() {
let mut a = Topology::new();
let foreign = a.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
let mut b = Topology::new();
b.add_sink(
"out",
"o",
[&foreign],
Produced::with(StringSerde, StringSerde),
);
check!(b.build("app").is_err());
}
#[test]
fn instantiate_runs_records() {
let mut t = Topology::new();
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
let up = t.add_processor("up", || Upper, [&src]);
t.add_sink(
"out",
"out-topic",
[&up],
Produced::with(StringSerde, StringSerde),
);
let built = t.build("app").unwrap();
let mut g = pollster::block_on(
built.instantiate(&crate::store::backend::StoreBackend::InMemory, "app"),
)
.unwrap();
pollster::block_on(g.pipe("in", Some(b"k"), b"hi", 0)).unwrap();
let out = g.take_output();
check!(out.len() == 1);
check!(out[0].value.as_ref().unwrap().as_ref() == b"HI");
}
#[test]
fn empty_topology_is_rejected() {
let topo = Topology::new();
check!(topo.build("app").is_err());
}
#[test]
fn build_with_processor_store_and_repartition() {
let mut t = Topology::new();
t.add_repartition_topic("rp");
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
let proc = t.add_processor("proc", || Upper, [&src]);
t.add_state_store("store", StringSerde, StringSerde, [proc.name()]);
t.add_sink(
"rsink",
"rp",
[&proc],
Produced::with(StringSerde, StringSerde),
);
let rsrc = t.add_source("rsrc", ["rp"], Consumed::with(StringSerde, StringSerde));
t.add_sink(
"out",
"out-topic",
[&rsrc],
Produced::with(StringSerde, StringSerde),
);
let built = t.build("my-app").unwrap();
check!(built.application_id() == "my-app");
let wire = built.to_wire();
check!(wire.subtopologies.len() >= 2);
let has_changelog = wire.subtopologies.iter().any(|s| {
s.state_changelog_topics
.iter()
.any(|c| c.name == "my-app-store-changelog")
});
check!(has_changelog);
}
#[test]
fn application_id_accessor() {
let mut t = Topology::new();
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
t.add_sink(
"snk",
"out",
[&src],
Produced::with(StringSerde, StringSerde),
);
let built = t.build("my-streams-app").unwrap();
check!(built.application_id() == "my-streams-app");
}
#[test]
fn source_topics_for_unknown_id_returns_empty() {
let mut t = Topology::new();
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
t.add_sink(
"snk",
"out",
[&src],
Produced::with(StringSerde, StringSerde),
);
let built = t.build("app").unwrap();
check!(built.source_topics_for("99").is_empty());
}
#[test]
fn connect_processor_store_adds_processor_to_store() {
let mut t = Topology::new();
t.add_state_store::<String, String, _, _>("s", StringSerde, StringSerde, ["p1"]);
t.connect_processor_store("p2", "s");
let entry = t.store_entry_for_test("s").unwrap();
check!(entry == vec!["p1".to_string(), "p2".to_string()]);
}
#[test]
fn connect_processor_store_is_idempotent() {
let mut t = Topology::new();
t.add_state_store::<String, String, _, _>("s", StringSerde, StringSerde, ["p1"]);
t.connect_processor_store("p1", "s"); let entry = t.store_entry_for_test("s").unwrap();
check!(entry == vec!["p1".to_string()]);
}
#[test]
fn connect_processor_store_unknown_store_is_noop() {
let mut t = Topology::new();
t.add_state_store::<String, String, _, _>("s", StringSerde, StringSerde, ["p1"]);
t.connect_processor_store("p2", "no_such_store"); let entry = t.store_entry_for_test("s").unwrap();
check!(entry == vec!["p1".to_string()]); }
#[test]
fn duplicate_node_name_propagates_error() {
let mut t = Topology::new();
t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
t.add_source("src", ["other"], Consumed::with(StringSerde, StringSerde)); check!(t.build("app").is_err());
}
#[test]
fn instantiate_repartition_topology_lists_topics() {
let mut t = Topology::new();
t.add_repartition_topic("rp");
let s1 = t.add_source("s1", ["in"], Consumed::with(StringSerde, StringSerde));
let p = t.add_processor("p", || Upper, [&s1]);
t.add_sink(
"to_rp",
"rp",
[&p],
Produced::with(StringSerde, StringSerde),
);
let s2 = t.add_source("s2", ["rp"], Consumed::with(StringSerde, StringSerde));
t.add_sink(
"out",
"out",
[&s2],
Produced::with(StringSerde, StringSerde),
);
let built = t.build("app").unwrap();
let mut srcs = built.list_source_topics();
srcs.sort();
check!(srcs == vec!["in".to_string(), "rp".to_string()]);
let mut sinks = built.list_sink_topics();
sinks.sort();
check!(sinks == vec!["out".to_string(), "rp".to_string()]);
let mut g = pollster::block_on(
built.instantiate(&crate::store::backend::StoreBackend::InMemory, "app"),
)
.unwrap();
pollster::block_on(g.pipe("in", None, b"hi", 0)).unwrap();
let out1 = g.take_output();
check!(out1.iter().any(|o| o.topic == "rp"));
}
#[test]
fn instantiate_builds_stores_and_processes_statefully() {
use crate::processor::serde::I64Serde;
struct Counter;
#[async_trait]
impl Processor<String, String, String, i64> for Counter {
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, String, i64>,
r: Record<String, String>,
) {
let n = {
let s = ctx.get_state_store::<String, i64>("counts").unwrap();
let n = s.get(&r.value).await.unwrap_or(0) + 1;
s.put(r.value.clone(), n).await;
n
};
ctx.forward(Record::new(Some(r.value), n, r.timestamp));
}
}
let mut t = Topology::new();
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
let c = t.add_processor("c", || Counter, [&src]);
t.add_state_store("counts", StringSerde, I64Serde, [c.name()]);
t.add_sink("out", "out", [&c], Produced::with(StringSerde, I64Serde));
let built = t.build("app").unwrap();
check!(built.to_wire().subtopologies.iter().any(|s| {
s.state_changelog_topics
.iter()
.any(|c| c.name == "app-counts-changelog")
}));
let mut g = pollster::block_on(
built.instantiate(&crate::store::backend::StoreBackend::InMemory, "app"),
)
.unwrap();
pollster::block_on(g.pipe("in", None, b"x", 0)).unwrap();
pollster::block_on(g.pipe("in", None, b"x", 1)).unwrap();
check!(
g.take_output()
.last()
.unwrap()
.value
.as_ref()
.unwrap()
.as_ref()
== [0, 0, 0, 0, 0, 0, 0, 2]
);
}
}