#[derive(Debug, Clone)]
pub struct Grouped<KS, VS> {
#[allow(dead_code)]
pub(crate) key_serde: KS,
#[allow(dead_code)]
pub(crate) value_serde: VS,
pub(crate) name: Option<String>,
}
impl<KS, VS> Grouped<KS, VS> {
pub fn with(key_serde: KS, value_serde: VS) -> Self {
Self {
key_serde,
value_serde,
name: None,
}
}
#[must_use]
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
}
#[derive(Debug, Clone, Copy)]
pub struct VersionedConfig {
pub history_retention_ms: i64,
pub segment_interval_ms: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct Materialized<KS, VS> {
#[allow(dead_code)]
pub(crate) key_serde: KS,
#[allow(dead_code)]
pub(crate) value_serde: VS,
pub(crate) store_name: Option<String>,
pub(crate) logging: bool,
pub(crate) caching: bool,
pub(crate) versioned: Option<VersionedConfig>,
}
impl<KS, VS> Materialized<KS, VS> {
pub fn with(key_serde: KS, value_serde: VS) -> Self {
Self {
key_serde,
value_serde,
store_name: None,
logging: true,
caching: true,
versioned: None,
}
}
#[must_use]
pub fn as_store(mut self, name: impl Into<String>) -> Self {
self.store_name = Some(name.into());
self
}
#[must_use]
pub fn with_logging(mut self, on: bool) -> Self {
self.logging = on;
self
}
#[must_use]
pub fn with_caching(mut self, on: bool) -> Self {
self.caching = on;
self
}
#[must_use]
pub fn caching_enabled(&self) -> bool {
self.caching
}
#[must_use]
pub fn as_versioned(mut self, name: impl Into<String>, history_retention_ms: i64) -> Self {
self.store_name = Some(name.into());
self.versioned = Some(VersionedConfig {
history_retention_ms,
segment_interval_ms: None,
});
self
}
}
impl<KS, VS> From<(KS, VS)> for Materialized<KS, VS> {
fn from((key_serde, value_serde): (KS, VS)) -> Self {
Self::with(key_serde, value_serde)
}
}
impl<KS, VS> From<(KS, VS)> for Grouped<KS, VS> {
fn from((key_serde, value_serde): (KS, VS)) -> Self {
Self::with(key_serde, value_serde)
}
}
impl<KS, VS> From<(KS, VS)> for Repartitioned<KS, VS> {
fn from((key_serde, value_serde): (KS, VS)) -> Self {
Self::with(key_serde, value_serde)
}
}
impl<KS, V1S, V2S> From<(KS, V1S, V2S)> for StreamJoined<KS, V1S, V2S> {
fn from((key_serde, value1_serde, value2_serde): (KS, V1S, V2S)) -> Self {
Self::with(key_serde, value1_serde, value2_serde)
}
}
#[allow(clippy::struct_field_names)] #[derive(Debug, Clone, Copy)]
pub struct StreamJoined<KS, V1S, V2S> {
pub(crate) key_serde: KS,
pub(crate) value1_serde: V1S,
pub(crate) value2_serde: V2S,
}
impl<KS, V1S, V2S> StreamJoined<KS, V1S, V2S> {
pub fn with(key_serde: KS, value1_serde: V1S, value2_serde: V2S) -> Self {
Self {
key_serde,
value1_serde,
value2_serde,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct Joined {
pub(crate) grace_ms: Option<i64>,
pub(crate) name: Option<String>,
}
impl Joined {
#[must_use]
pub fn with_grace_period(grace_ms: i64) -> Self {
Self {
grace_ms: Some(grace_ms),
name: None,
}
}
#[must_use]
pub fn as_named(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
}
#[derive(Debug, Clone)]
pub struct Repartitioned<KS, VS> {
pub(crate) key_serde: KS,
pub(crate) value_serde: VS,
pub(crate) name: Option<String>,
pub(crate) partitions: Option<i32>,
}
impl<KS, VS> Repartitioned<KS, VS> {
pub fn with(key_serde: KS, value_serde: VS) -> Self {
Self {
key_serde,
value_serde,
name: None,
partitions: None,
}
}
#[must_use]
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
#[must_use]
pub fn num_partitions(mut self, n: i32) -> Self {
self.partitions = Some(n);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::serde::{I64Serde, StringSerde};
use assert2::check;
#[test]
fn grouped_materialized_repartitioned_carry_serdes_and_names() {
let g = Grouped::with(StringSerde, I64Serde).with_name("g1");
check!(g.name.as_deref() == Some("g1"));
let m = Materialized::with(StringSerde, I64Serde).as_store("counts");
check!(m.store_name.as_deref() == Some("counts"));
check!(m.logging);
let r = Repartitioned::with(StringSerde, I64Serde)
.with_name("rp")
.num_partitions(4);
check!(r.name.as_deref() == Some("rp"));
check!(r.partitions == Some(4));
}
#[test]
fn materialized_caching_defaults_on_and_toggles() {
let m = Materialized::with(StringSerde, I64Serde);
check!(m.caching_enabled()); let m = m.with_caching(false);
check!(!m.caching_enabled());
}
#[test]
fn joined_carries_grace_and_name() {
let j = Joined::with_grace_period(5_000).as_named("jb");
check!(j.grace_ms == Some(5_000));
check!(j.name.as_deref() == Some("jb"));
check!(Joined::default().grace_ms == None);
}
#[test]
fn materialized_as_versioned_sets_config() {
let m = Materialized::with(StringSerde, I64Serde).as_versioned("vstore", 600_000);
check!(m.store_name.as_deref() == Some("vstore"));
let vc = m.versioned.expect("versioned config");
check!(vc.history_retention_ms == 600_000);
}
}