use reifydb_core::{
encoded::{key::EncodedKey, row::EncodedRow},
interface::cdc::SystemChange,
key::{Key, kind::KeyKind},
};
use reifydb_transaction::transaction::Transaction;
use crate::{Result, catalog::Catalog, error::CatalogChangeError};
mod binding;
mod column;
mod config;
mod dictionary;
mod flow;
mod granted_role;
mod handler;
mod identity;
mod migration;
mod namespace;
mod passthrough;
mod policy;
mod primary_key;
mod procedure;
mod procedure_param;
mod retention;
mod ringbuffer;
mod row_shape;
mod series;
mod sink;
mod source;
mod sumtype;
mod table;
mod ttl;
mod view;
mod role;
use binding::BindingApplier;
use column::ColumnApplier;
use config::ConfigApplier;
use dictionary::DictionaryApplier;
use flow::FlowApplier;
use granted_role::GrantedRoleApplier;
use handler::HandlerApplier;
use identity::IdentityApplier;
use migration::{MigrationApplier, MigrationEventApplier};
use namespace::NamespaceApplier;
use passthrough::PassthroughApplier;
use policy::PolicyApplier;
use primary_key::PrimaryKeyApplier;
use procedure::ProcedureApplier;
use procedure_param::ProcedureParamApplier;
use retention::{OperatorRetentionStrategyApplier, ShapeRetentionStrategyApplier};
use ringbuffer::RingBufferApplier;
use role::RoleApplier;
use row_shape::{RowShapeFieldApplier, RowShapeHeaderApplier};
use series::SeriesApplier;
use sink::SinkApplier;
use source::SourceApplier;
use sumtype::SumTypeApplier;
use table::TableApplier;
use ttl::RowTtlApplier;
use view::ViewApplier;
pub trait CatalogChangeApplier {
fn set(catalog: &Catalog, txn: &mut Transaction<'_>, key: &EncodedKey, row: &EncodedRow) -> Result<()>;
fn remove(catalog: &Catalog, txn: &mut Transaction<'_>, key: &EncodedKey) -> Result<()>;
}
pub fn apply_system_change(catalog: &Catalog, txn: &mut Transaction<'_>, change: &SystemChange) -> Result<()> {
let kind = match Key::kind(change.key()) {
Some(k) => k,
None => {
return Err(CatalogChangeError::UnrecognizedKey {
raw: change.key().as_ref().to_vec(),
}
.into());
}
};
match kind {
KeyKind::Binding => dispatch::<BindingApplier>(catalog, txn, change),
KeyKind::Namespace => dispatch::<NamespaceApplier>(catalog, txn, change),
KeyKind::Table => dispatch::<TableApplier>(catalog, txn, change),
KeyKind::View => dispatch::<ViewApplier>(catalog, txn, change),
KeyKind::PrimaryKey => dispatch::<PrimaryKeyApplier>(catalog, txn, change),
KeyKind::Flow => dispatch::<FlowApplier>(catalog, txn, change),
KeyKind::Handler => dispatch::<HandlerApplier>(catalog, txn, change),
KeyKind::Dictionary => dispatch::<DictionaryApplier>(catalog, txn, change),
KeyKind::SumType => dispatch::<SumTypeApplier>(catalog, txn, change),
KeyKind::RingBuffer => dispatch::<RingBufferApplier>(catalog, txn, change),
KeyKind::Identity => dispatch::<IdentityApplier>(catalog, txn, change),
KeyKind::Role => dispatch::<RoleApplier>(catalog, txn, change),
KeyKind::GrantedRole => dispatch::<GrantedRoleApplier>(catalog, txn, change),
KeyKind::Policy => dispatch::<PolicyApplier>(catalog, txn, change),
KeyKind::Source => dispatch::<SourceApplier>(catalog, txn, change),
KeyKind::Sink => dispatch::<SinkApplier>(catalog, txn, change),
KeyKind::Migration => dispatch::<MigrationApplier>(catalog, txn, change),
KeyKind::MigrationEvent => dispatch::<MigrationEventApplier>(catalog, txn, change),
KeyKind::ConfigStorage => dispatch::<ConfigApplier>(catalog, txn, change),
KeyKind::Series => dispatch::<SeriesApplier>(catalog, txn, change),
KeyKind::ShapeRetentionStrategy => dispatch::<ShapeRetentionStrategyApplier>(catalog, txn, change),
KeyKind::OperatorRetentionStrategy => {
dispatch::<OperatorRetentionStrategyApplier>(catalog, txn, change)
}
KeyKind::RowTtl => dispatch::<RowTtlApplier>(catalog, txn, change),
KeyKind::Shape => dispatch::<RowShapeHeaderApplier>(catalog, txn, change),
KeyKind::RowShapeField => dispatch::<RowShapeFieldApplier>(catalog, txn, change),
KeyKind::Procedure => dispatch::<ProcedureApplier>(catalog, txn, change),
KeyKind::ProcedureParam => dispatch::<ProcedureParamApplier>(catalog, txn, change),
KeyKind::Column | KeyKind::Columns => dispatch::<ColumnApplier>(catalog, txn, change),
KeyKind::NamespaceTable
| KeyKind::NamespaceView
| KeyKind::NamespaceFlow
| KeyKind::NamespaceRingBuffer
| KeyKind::NamespaceDictionary
| KeyKind::NamespaceSumType
| KeyKind::NamespaceHandler
| KeyKind::NamespaceBinding
| KeyKind::NamespaceProcedure
| KeyKind::NamespaceSource
| KeyKind::NamespaceSink
| KeyKind::NamespaceSeries
| KeyKind::VariantHandler
| KeyKind::PolicyOp => dispatch::<PassthroughApplier>(catalog, txn, change),
_ => dispatch::<PassthroughApplier>(catalog, txn, change),
}
}
fn dispatch<T: CatalogChangeApplier>(
catalog: &Catalog,
txn: &mut Transaction<'_>,
change: &SystemChange,
) -> Result<()> {
match change {
SystemChange::Insert {
key,
post,
} => T::set(catalog, txn, key, post),
SystemChange::Update {
key,
post,
..
} => T::set(catalog, txn, key, post),
SystemChange::Delete {
key,
..
} => T::remove(catalog, txn, key),
}
}