Skip to main content

sim_kernel/catalog/
tx.rs

1use std::collections::BTreeSet;
2
3use crate::{Error, Result, Symbol};
4
5use super::{
6    CatalogEvent, CatalogEventOp, CatalogRow, CatalogStore, CatalogTableSpec, CatalogWritePolicy,
7};
8
9/// An atomic catalog transaction: an ordered batch of [`CatalogOp`]s applied
10/// together against a [`CatalogStore`].
11///
12/// # Examples
13///
14/// ```
15/// # use sim_kernel::catalog::{CatalogRow, CatalogTx};
16/// # use sim_kernel::Symbol;
17/// let mut tx = CatalogTx::new();
18/// tx.put_row(CatalogRow::new(Symbol::new("registry/libs"), Symbol::new("demo")));
19/// assert_eq!(tx.ops().len(), 1);
20/// ```
21#[derive(Clone, Debug, Default, PartialEq, Eq)]
22pub struct CatalogTx {
23    ops: Vec<CatalogOp>,
24}
25
26impl CatalogTx {
27    /// Creates an empty transaction.
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    /// Appends one operation to the transaction.
33    pub fn push(&mut self, op: CatalogOp) {
34        self.ops.push(op);
35    }
36
37    /// Queues a row insert or replace.
38    pub fn put_row(&mut self, row: CatalogRow) {
39        self.push(CatalogOp::PutRow(row));
40    }
41
42    /// Queues deletion of the row at `table`/`key`.
43    pub fn delete_row(&mut self, table: Symbol, key: Symbol) {
44        self.push(CatalogOp::DeleteRow { table, key });
45    }
46
47    /// Queues reserving `reserved` ids from the named sequence.
48    pub fn bump_sequence(&mut self, name: Symbol, reserved: u64) {
49        self.push(CatalogOp::BumpSequence { name, reserved });
50    }
51
52    /// Returns the queued operations in order.
53    pub fn ops(&self) -> &[CatalogOp] {
54        &self.ops
55    }
56
57    /// Checks the transaction against `store` without mutating it, enforcing
58    /// table existence, write policies, required fields, and per-row uniqueness.
59    pub fn validate(&self, store: &CatalogStore) -> Result<()> {
60        let mut touched_rows = BTreeSet::new();
61        for op in &self.ops {
62            match op {
63                CatalogOp::PutRow(row) => {
64                    validate_unique_row_op(&mut touched_rows, &row.table, &row.key)?;
65                    let spec = table_spec(store, &row.table)?;
66                    validate_put(store, spec, row)?;
67                }
68                CatalogOp::DeleteRow { table, key } => {
69                    validate_unique_row_op(&mut touched_rows, table, key)?;
70                    let spec = table_spec(store, table)?;
71                    validate_delete(spec)?;
72                }
73                CatalogOp::BumpSequence { .. } => {}
74            }
75        }
76        Ok(())
77    }
78
79    /// Validates and applies the transaction, returning the new catalog epoch
80    /// and recording an audit event per operation.
81    pub fn commit(self, store: &mut CatalogStore) -> Result<u64> {
82        self.validate(store)?;
83        let epoch = store.bump_epoch();
84        for op in self.ops {
85            match op {
86                CatalogOp::PutRow(mut row) => {
87                    row.set_epoch(epoch);
88                    let table = row.table.clone();
89                    let key = row.key.clone();
90                    store.put_row(row);
91                    store.push_event(CatalogEvent {
92                        epoch,
93                        op: CatalogEventOp::PutRow { table, key },
94                    });
95                }
96                CatalogOp::DeleteRow { table, key } => {
97                    store.delete_row(&table, &key);
98                    store.push_event(CatalogEvent {
99                        epoch,
100                        op: CatalogEventOp::DeleteRow { table, key },
101                    });
102                }
103                CatalogOp::BumpSequence { name, reserved } => {
104                    let next = store.bump_sequence(name.clone(), reserved);
105                    store.push_event(CatalogEvent {
106                        epoch,
107                        op: CatalogEventOp::Sequence { name, next },
108                    });
109                }
110            }
111        }
112        Ok(epoch)
113    }
114}
115
116/// A single operation within a [`CatalogTx`].
117#[derive(Clone, Debug, PartialEq, Eq)]
118pub enum CatalogOp {
119    /// Insert or replace a row.
120    PutRow(CatalogRow),
121    /// Delete the row at the given table and key.
122    DeleteRow {
123        /// Table holding the row.
124        table: Symbol,
125        /// Key of the row to delete.
126        key: Symbol,
127    },
128    /// Reserve ids from a named sequence.
129    BumpSequence {
130        /// Sequence to advance.
131        name: Symbol,
132        /// Number of ids to reserve.
133        reserved: u64,
134    },
135}
136
137fn validate_unique_row_op(
138    touched_rows: &mut BTreeSet<(Symbol, Symbol)>,
139    table: &Symbol,
140    key: &Symbol,
141) -> Result<()> {
142    if touched_rows.insert((table.clone(), key.clone())) {
143        Ok(())
144    } else {
145        Err(Error::CatalogConflict {
146            table: table.clone(),
147            key: key.clone(),
148        })
149    }
150}
151
152fn table_spec<'a>(store: &'a CatalogStore, table: &Symbol) -> Result<&'a CatalogTableSpec> {
153    store.table(table).ok_or_else(|| Error::CatalogSchema {
154        table: table.clone(),
155        message: "unknown catalog table".to_owned(),
156    })
157}
158
159fn validate_put(store: &CatalogStore, spec: &CatalogTableSpec, row: &CatalogRow) -> Result<()> {
160    match spec.policy {
161        CatalogWritePolicy::Derived => {
162            return Err(Error::CatalogReadOnly {
163                table: spec.name.clone(),
164            });
165        }
166        CatalogWritePolicy::Sealed | CatalogWritePolicy::AppendOnly
167            if store.row(&row.table, &row.key).is_some() =>
168        {
169            return Err(Error::CatalogConflict {
170                table: row.table.clone(),
171                key: row.key.clone(),
172            });
173        }
174        CatalogWritePolicy::Mutable
175        | CatalogWritePolicy::Sealed
176        | CatalogWritePolicy::AppendOnly => {}
177    }
178
179    for field in &spec.required_fields {
180        if !row.has_field(field) {
181            return Err(Error::CatalogSchema {
182                table: row.table.clone(),
183                message: format!("missing required catalog field {field}"),
184            });
185        }
186    }
187    Ok(())
188}
189
190fn validate_delete(spec: &CatalogTableSpec) -> Result<()> {
191    match spec.policy {
192        CatalogWritePolicy::Mutable => Ok(()),
193        CatalogWritePolicy::Sealed
194        | CatalogWritePolicy::AppendOnly
195        | CatalogWritePolicy::Derived => Err(Error::CatalogReadOnly {
196            table: spec.name.clone(),
197        }),
198    }
199}