1use std::collections::BTreeSet;
2
3use crate::{Error, Result, Symbol};
4
5use super::{
6 CatalogEvent, CatalogEventOp, CatalogRow, CatalogStore, CatalogTableSpec, CatalogWritePolicy,
7};
8
9#[derive(Clone, Debug, Default, PartialEq, Eq)]
22pub struct CatalogTx {
23 ops: Vec<CatalogOp>,
24}
25
26impl CatalogTx {
27 pub fn new() -> Self {
29 Self::default()
30 }
31
32 pub fn push(&mut self, op: CatalogOp) {
34 self.ops.push(op);
35 }
36
37 pub fn put_row(&mut self, row: CatalogRow) {
39 self.push(CatalogOp::PutRow(row));
40 }
41
42 pub fn delete_row(&mut self, table: Symbol, key: Symbol) {
44 self.push(CatalogOp::DeleteRow { table, key });
45 }
46
47 pub fn bump_sequence(&mut self, name: Symbol, reserved: u64) {
49 self.push(CatalogOp::BumpSequence { name, reserved });
50 }
51
52 pub fn ops(&self) -> &[CatalogOp] {
54 &self.ops
55 }
56
57 pub fn validate(&self, store: &CatalogStore) -> Result<()> {
60 let mut touched_rows = BTreeSet::new();
61 for op in &self.ops {
62 match op {
63 CatalogOp::PutRow(row) => {
64 validate_unique_row_op(&mut touched_rows, &row.table, &row.key)?;
65 let spec = table_spec(store, &row.table)?;
66 validate_put(store, spec, row)?;
67 }
68 CatalogOp::DeleteRow { table, key } => {
69 validate_unique_row_op(&mut touched_rows, table, key)?;
70 let spec = table_spec(store, table)?;
71 validate_delete(spec)?;
72 }
73 CatalogOp::BumpSequence { .. } => {}
74 }
75 }
76 Ok(())
77 }
78
79 pub fn commit(self, store: &mut CatalogStore) -> Result<u64> {
82 self.validate(store)?;
83 let epoch = store.bump_epoch();
84 for op in self.ops {
85 match op {
86 CatalogOp::PutRow(mut row) => {
87 row.set_epoch(epoch);
88 let table = row.table.clone();
89 let key = row.key.clone();
90 store.put_row(row);
91 store.push_event(CatalogEvent {
92 epoch,
93 op: CatalogEventOp::PutRow { table, key },
94 });
95 }
96 CatalogOp::DeleteRow { table, key } => {
97 store.delete_row(&table, &key);
98 store.push_event(CatalogEvent {
99 epoch,
100 op: CatalogEventOp::DeleteRow { table, key },
101 });
102 }
103 CatalogOp::BumpSequence { name, reserved } => {
104 let next = store.bump_sequence(name.clone(), reserved);
105 store.push_event(CatalogEvent {
106 epoch,
107 op: CatalogEventOp::Sequence { name, next },
108 });
109 }
110 }
111 }
112 Ok(epoch)
113 }
114}
115
116#[derive(Clone, Debug, PartialEq, Eq)]
118pub enum CatalogOp {
119 PutRow(CatalogRow),
121 DeleteRow {
123 table: Symbol,
125 key: Symbol,
127 },
128 BumpSequence {
130 name: Symbol,
132 reserved: u64,
134 },
135}
136
137fn validate_unique_row_op(
138 touched_rows: &mut BTreeSet<(Symbol, Symbol)>,
139 table: &Symbol,
140 key: &Symbol,
141) -> Result<()> {
142 if touched_rows.insert((table.clone(), key.clone())) {
143 Ok(())
144 } else {
145 Err(Error::CatalogConflict {
146 table: table.clone(),
147 key: key.clone(),
148 })
149 }
150}
151
152fn table_spec<'a>(store: &'a CatalogStore, table: &Symbol) -> Result<&'a CatalogTableSpec> {
153 store.table(table).ok_or_else(|| Error::CatalogSchema {
154 table: table.clone(),
155 message: "unknown catalog table".to_owned(),
156 })
157}
158
159fn validate_put(store: &CatalogStore, spec: &CatalogTableSpec, row: &CatalogRow) -> Result<()> {
160 match spec.policy {
161 CatalogWritePolicy::Derived => {
162 return Err(Error::CatalogReadOnly {
163 table: spec.name.clone(),
164 });
165 }
166 CatalogWritePolicy::Sealed | CatalogWritePolicy::AppendOnly
167 if store.row(&row.table, &row.key).is_some() =>
168 {
169 return Err(Error::CatalogConflict {
170 table: row.table.clone(),
171 key: row.key.clone(),
172 });
173 }
174 CatalogWritePolicy::Mutable
175 | CatalogWritePolicy::Sealed
176 | CatalogWritePolicy::AppendOnly => {}
177 }
178
179 for field in &spec.required_fields {
180 if !row.has_field(field) {
181 return Err(Error::CatalogSchema {
182 table: row.table.clone(),
183 message: format!("missing required catalog field {field}"),
184 });
185 }
186 }
187 Ok(())
188}
189
190fn validate_delete(spec: &CatalogTableSpec) -> Result<()> {
191 match spec.policy {
192 CatalogWritePolicy::Mutable => Ok(()),
193 CatalogWritePolicy::Sealed
194 | CatalogWritePolicy::AppendOnly
195 | CatalogWritePolicy::Derived => Err(Error::CatalogReadOnly {
196 table: spec.name.clone(),
197 }),
198 }
199}