#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
pub struct OpFilter {
pub insert: bool,
pub update: bool,
pub delete: bool,
}
impl OpFilter {
pub fn all() -> Self {
Self {
insert: true,
update: true,
delete: true,
}
}
pub fn matches(&self, op: &str) -> bool {
match op {
"INSERT" => self.insert,
"UPDATE" => self.update,
"DELETE" => self.delete,
_ => false,
}
}
pub fn display(&self) -> String {
let mut parts = Vec::new();
if self.insert {
parts.push("INSERT");
}
if self.update {
parts.push("UPDATE");
}
if self.delete {
parts.push("DELETE");
}
parts.join(",")
}
}
impl Default for OpFilter {
fn default() -> Self {
Self::all()
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Default, zerompk::ToMessagePack, zerompk::FromMessagePack,
)]
#[repr(u8)]
#[msgpack(c_enum)]
pub enum StreamFormat {
#[default]
Json = 0,
Msgpack = 1,
}
impl StreamFormat {
pub fn as_str(self) -> &'static str {
match self {
Self::Json => "json",
Self::Msgpack => "msgpack",
}
}
pub fn from_str_opt(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"json" => Some(Self::Json),
"msgpack" | "messagepack" => Some(Self::Msgpack),
_ => None,
}
}
}
#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
pub struct RetentionConfig {
pub max_events: u64,
pub max_age_secs: u64,
}
impl Default for RetentionConfig {
fn default() -> Self {
Self {
max_events: 1_000_000,
max_age_secs: 86_400,
}
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Default, zerompk::ToMessagePack, zerompk::FromMessagePack,
)]
#[repr(u8)]
#[msgpack(c_enum)]
pub enum LateDataPolicy {
#[default]
Allow = 0,
Drop = 1,
Recompute = 2,
}
impl LateDataPolicy {
pub fn as_str(self) -> &'static str {
match self {
Self::Allow => "ALLOW",
Self::Drop => "DROP",
Self::Recompute => "RECOMPUTE",
}
}
pub fn from_str_opt(s: &str) -> Option<Self> {
match s.to_uppercase().as_str() {
"ALLOW" => Some(Self::Allow),
"DROP" => Some(Self::Drop),
"RECOMPUTE" => Some(Self::Recompute),
_ => None,
}
}
}
#[derive(Debug, Clone, Default, zerompk::ToMessagePack, zerompk::FromMessagePack)]
pub struct CompactionConfig {
pub enabled: bool,
pub key_field: String,
pub tombstone_grace_secs: u64,
}
impl CompactionConfig {
pub fn key(field: impl Into<String>) -> Self {
Self {
enabled: true,
key_field: field.into(),
tombstone_grace_secs: 86_400,
}
}
}
#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
#[msgpack(map, allow_unknown_fields)]
pub struct ChangeStreamDef {
pub tenant_id: u64,
pub name: String,
pub collection: String,
pub op_filter: OpFilter,
pub format: StreamFormat,
pub retention: RetentionConfig,
#[msgpack(default)]
pub compaction: CompactionConfig,
#[msgpack(default)]
pub webhook: crate::event::webhook::WebhookConfig,
#[msgpack(default)]
pub late_data: LateDataPolicy,
#[msgpack(default)]
pub kafka: crate::event::kafka::KafkaDeliveryConfig,
pub owner: String,
pub created_at: u64,
}
impl ChangeStreamDef {
pub fn is_wildcard(&self) -> bool {
self.collection == "*"
}
pub fn matches_collection(&self, collection: &str) -> bool {
self.is_wildcard() || self.collection == collection
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn op_filter_matches() {
let f = OpFilter {
insert: true,
update: false,
delete: true,
};
assert!(f.matches("INSERT"));
assert!(!f.matches("UPDATE"));
assert!(f.matches("DELETE"));
}
#[test]
fn wildcard_matches_all() {
let def = ChangeStreamDef {
tenant_id: 1,
name: "all".into(),
collection: "*".into(),
op_filter: OpFilter::all(),
format: StreamFormat::Json,
retention: RetentionConfig::default(),
compaction: CompactionConfig::default(),
webhook: crate::event::webhook::WebhookConfig::default(),
late_data: LateDataPolicy::default(),
kafka: crate::event::kafka::KafkaDeliveryConfig::default(),
owner: "admin".into(),
created_at: 0,
};
assert!(def.matches_collection("orders"));
assert!(def.matches_collection("users"));
assert!(def.is_wildcard());
}
#[test]
fn specific_collection_filter() {
let def = ChangeStreamDef {
tenant_id: 1,
name: "orders_stream".into(),
collection: "orders".into(),
op_filter: OpFilter::all(),
format: StreamFormat::Json,
retention: RetentionConfig::default(),
compaction: CompactionConfig::default(),
webhook: crate::event::webhook::WebhookConfig::default(),
late_data: LateDataPolicy::default(),
kafka: crate::event::kafka::KafkaDeliveryConfig::default(),
owner: "admin".into(),
created_at: 0,
};
assert!(def.matches_collection("orders"));
assert!(!def.matches_collection("users"));
assert!(!def.is_wildcard());
}
}