1use std::collections::BTreeMap;
2
3use crate::{Error, Result, Symbol};
4
5use super::{
6 CatalogEvent, CatalogEventOp, CatalogRow, CatalogSnapshot, CatalogSnapshotRow, CatalogStore,
7 CatalogTableSpec, CatalogWritePolicy,
8};
9
10#[derive(Clone, Debug, PartialEq, Eq)]
16pub struct CatalogDelta {
17 pub from_epoch: u64,
19 pub to_epoch: u64,
21 pub table_specs: BTreeMap<Symbol, CatalogTableSpec>,
23 pub rows_changed: BTreeMap<Symbol, BTreeMap<Symbol, CatalogSnapshotRow>>,
25 pub rows_deleted: BTreeMap<Symbol, BTreeMap<Symbol, CatalogDeletedRow>>,
27 pub sequence_changes: BTreeMap<Symbol, CatalogSequenceChange>,
29}
30
31#[derive(Clone, Debug, PartialEq, Eq)]
33pub struct CatalogDeletedRow {
34 pub epoch: u64,
36 pub table: Symbol,
38 pub key: Symbol,
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct CatalogSequenceChange {
45 pub epoch: u64,
47 pub name: Symbol,
49 pub next: u64,
51}
52
53impl CatalogStore {
54 pub fn delta_since(&self, from_epoch: u64) -> Result<CatalogDelta> {
57 let to_epoch = self.epoch();
58 if from_epoch > to_epoch {
59 return Err(delta_error(
60 "delta source epoch is newer than catalog epoch",
61 ));
62 }
63
64 let snapshot = CatalogSnapshot::from_store(self);
65 let mut rows_changed = BTreeMap::new();
66 let mut rows_deleted = BTreeMap::new();
67 let mut sequence_changes = BTreeMap::new();
68
69 if from_epoch == 0 {
70 rows_changed = snapshot.rows.clone();
71 for (name, next) in &snapshot.sequences {
72 sequence_changes.insert(
73 name.clone(),
74 CatalogSequenceChange {
75 epoch: to_epoch,
76 name: name.clone(),
77 next: *next,
78 },
79 );
80 }
81 }
82
83 for event in self
84 .journal()
85 .iter()
86 .filter(|event| event.epoch > from_epoch)
87 {
88 match &event.op {
89 CatalogEventOp::PutRow { table, key } => {
90 remove_deleted_row(&mut rows_deleted, table, key);
91 if let Some(row) = snapshot.rows(table).and_then(|rows| rows.get(key)) {
92 rows_changed
93 .entry(table.clone())
94 .or_default()
95 .insert(key.clone(), row.clone());
96 }
97 }
98 CatalogEventOp::DeleteRow { table, key } => {
99 remove_changed_row(&mut rows_changed, table, key);
100 rows_deleted.entry(table.clone()).or_default().insert(
101 key.clone(),
102 CatalogDeletedRow {
103 epoch: event.epoch,
104 table: table.clone(),
105 key: key.clone(),
106 },
107 );
108 }
109 CatalogEventOp::Sequence { name, next } => {
110 sequence_changes.insert(
111 name.clone(),
112 CatalogSequenceChange {
113 epoch: event.epoch,
114 name: name.clone(),
115 next: *next,
116 },
117 );
118 }
119 }
120 }
121
122 Ok(CatalogDelta {
123 from_epoch,
124 to_epoch,
125 table_specs: snapshot.tables,
126 rows_changed,
127 rows_deleted,
128 sequence_changes,
129 })
130 }
131
132 pub fn apply_delta(&mut self, delta: CatalogDelta) -> Result<()> {
136 let mut next = self.clone();
137 next.apply_delta_in_place(delta)?;
138 *self = next;
139 Ok(())
140 }
141
142 fn apply_delta_in_place(&mut self, delta: CatalogDelta) -> Result<()> {
143 if self.overlay.is_some() {
144 return Err(delta_error(
145 "cannot apply catalog delta while overlay is active",
146 ));
147 }
148 if self.epoch != delta.from_epoch {
149 return Err(delta_error("catalog delta source epoch mismatch"));
150 }
151 if delta.to_epoch < delta.from_epoch {
152 return Err(delta_error(
153 "catalog delta target epoch precedes source epoch",
154 ));
155 }
156
157 for spec in delta.table_specs.values() {
158 self.install_or_validate_delta_table(spec)?;
159 }
160 self.validate_delta_rows(&delta)?;
161 self.validate_delta_deletes(&delta)?;
162 validate_delta_sequences(&delta)?;
163
164 for rows in delta.rows_changed.values() {
165 for row in rows.values() {
166 self.apply_delta_row(row.clone());
167 }
168 }
169 for rows in delta.rows_deleted.values() {
170 for deleted in rows.values() {
171 self.apply_delta_delete(deleted);
172 }
173 }
174 for change in delta.sequence_changes.values() {
175 self.sequences.insert(change.name.clone(), change.next);
176 self.journal.push(CatalogEvent {
177 epoch: change.epoch,
178 op: CatalogEventOp::Sequence {
179 name: change.name.clone(),
180 next: change.next,
181 },
182 });
183 }
184
185 self.epoch = delta.to_epoch;
186 if self.epoch == delta.to_epoch {
187 Ok(())
188 } else {
189 Err(delta_error("catalog delta target epoch was not reached"))
190 }
191 }
192
193 fn install_or_validate_delta_table(&mut self, spec: &CatalogTableSpec) -> Result<()> {
194 match self.tables.get(&spec.name) {
195 Some(existing) if existing == spec => Ok(()),
196 Some(_) => Err(Error::CatalogSchema {
197 table: spec.name.clone(),
198 message: "incompatible catalog table spec".to_owned(),
199 }),
200 None => {
201 self.tables.insert(spec.name.clone(), spec.clone());
202 Ok(())
203 }
204 }
205 }
206
207 fn validate_delta_rows(&self, delta: &CatalogDelta) -> Result<()> {
208 for (table, rows) in &delta.rows_changed {
209 let spec = self.table(table).ok_or_else(|| Error::CatalogSchema {
210 table: table.clone(),
211 message: "unknown catalog table".to_owned(),
212 })?;
213 for (key, row) in rows {
214 validate_row_key(table, key, row)?;
215 validate_change_epoch(row.epoch, delta, table)?;
216 validate_required_fields(spec, row)?;
217 if spec.policy == CatalogWritePolicy::Sealed && self.row(table, key).is_some() {
218 return Err(Error::CatalogConflict {
219 table: table.clone(),
220 key: key.clone(),
221 });
222 }
223 }
224 }
225 Ok(())
226 }
227
228 fn validate_delta_deletes(&self, delta: &CatalogDelta) -> Result<()> {
229 for (table, rows) in &delta.rows_deleted {
230 if self.table(table).is_none() {
231 return Err(Error::CatalogSchema {
232 table: table.clone(),
233 message: "unknown catalog table".to_owned(),
234 });
235 }
236 for (key, deleted) in rows {
237 if &deleted.table != table || &deleted.key != key {
238 return Err(Error::CatalogSchema {
239 table: table.clone(),
240 message: "deleted row key does not match row data".to_owned(),
241 });
242 }
243 validate_change_epoch(deleted.epoch, delta, table)?;
244 }
245 }
246 Ok(())
247 }
248
249 fn apply_delta_row(&mut self, snapshot_row: CatalogSnapshotRow) {
250 let mut row = CatalogRow::new(snapshot_row.table.clone(), snapshot_row.key.clone());
251 row.data = snapshot_row.data;
252 row.set_epoch(snapshot_row.epoch);
253 self.rows
254 .entry(row.table.clone())
255 .or_default()
256 .insert(row.key.clone(), row);
257 self.journal.push(CatalogEvent {
258 epoch: snapshot_row.epoch,
259 op: CatalogEventOp::PutRow {
260 table: snapshot_row.table,
261 key: snapshot_row.key,
262 },
263 });
264 }
265
266 fn apply_delta_delete(&mut self, deleted: &CatalogDeletedRow) {
267 if let Some(rows) = self.rows.get_mut(&deleted.table) {
268 rows.remove(&deleted.key);
269 if rows.is_empty() {
270 self.rows.remove(&deleted.table);
271 }
272 }
273 self.journal.push(CatalogEvent {
274 epoch: deleted.epoch,
275 op: CatalogEventOp::DeleteRow {
276 table: deleted.table.clone(),
277 key: deleted.key.clone(),
278 },
279 });
280 }
281}
282
283fn validate_row_key(table: &Symbol, key: &Symbol, row: &CatalogSnapshotRow) -> Result<()> {
284 if &row.table == table && &row.key == key {
285 Ok(())
286 } else {
287 Err(Error::CatalogSchema {
288 table: table.clone(),
289 message: "changed row key does not match row data".to_owned(),
290 })
291 }
292}
293
294fn validate_required_fields(spec: &CatalogTableSpec, row: &CatalogSnapshotRow) -> Result<()> {
295 for field in &spec.required_fields {
296 if !row.data.contains_key(field) {
297 return Err(Error::CatalogSchema {
298 table: row.table.clone(),
299 message: format!("missing required catalog field {field}"),
300 });
301 }
302 }
303 Ok(())
304}
305
306fn validate_delta_sequences(delta: &CatalogDelta) -> Result<()> {
307 for change in delta.sequence_changes.values() {
308 validate_change_epoch(change.epoch, delta, &change.name)?;
309 }
310 Ok(())
311}
312
313fn validate_change_epoch(epoch: u64, delta: &CatalogDelta, table: &Symbol) -> Result<()> {
314 if epoch <= delta.to_epoch && (epoch > delta.from_epoch || delta.from_epoch == 0) {
315 Ok(())
316 } else {
317 Err(Error::CatalogSchema {
318 table: table.clone(),
319 message: "catalog delta change epoch is outside delta bounds".to_owned(),
320 })
321 }
322}
323
324fn remove_changed_row(
325 rows_changed: &mut BTreeMap<Symbol, BTreeMap<Symbol, CatalogSnapshotRow>>,
326 table: &Symbol,
327 key: &Symbol,
328) {
329 if let Some(rows) = rows_changed.get_mut(table) {
330 rows.remove(key);
331 if rows.is_empty() {
332 rows_changed.remove(table);
333 }
334 }
335}
336
337fn remove_deleted_row(
338 rows_deleted: &mut BTreeMap<Symbol, BTreeMap<Symbol, CatalogDeletedRow>>,
339 table: &Symbol,
340 key: &Symbol,
341) {
342 if let Some(rows) = rows_deleted.get_mut(table) {
343 rows.remove(key);
344 if rows.is_empty() {
345 rows_deleted.remove(table);
346 }
347 }
348}
349
350fn delta_error(message: &'static str) -> Error {
351 Error::CatalogSchema {
352 table: Symbol::qualified("catalog", "delta"),
353 message: message.to_owned(),
354 }
355}