1#![forbid(unsafe_code)]
8
9use crate::catalog::TableCatalog;
10use crate::constraints::{
11 ConstraintId, ConstraintKind, ConstraintRecord, ConstraintState, ForeignKeyAction,
12 ForeignKeyConstraint, PrimaryKeyConstraint, UniqueConstraint,
13};
14use crate::constraints::{ForeignKeyTableInfo, ValidatedForeignKey, validate_foreign_keys};
15use crate::reserved;
16use crate::resolvers::resolve_table_name;
17use crate::sys_catalog::{ConstraintNameRecord, SysCatalog};
18use crate::sys_catalog::{MultiColumnIndexEntryMeta, SingleColumnIndexEntryMeta};
19use crate::table::Table;
20use crate::types::{FieldId, TableColumn, TableId};
21use crate::view::{ForeignKeyView, TableView};
22use crate::{ColMeta, TableMeta, TableMultiColumnIndexMeta};
23use arrow::datatypes::DataType;
24use llkv_column_map::ColumnStore;
25use llkv_column_map::store::IndexKind;
26use llkv_column_map::types::LogicalFieldId;
27use llkv_plan::ForeignKeySpec;
28use llkv_result::{Error, Result as LlkvResult};
29use llkv_storage::pager::Pager;
30use rustc_hash::{FxHashMap, FxHashSet};
31use simd_r_drive_entry_handle::EntryHandle;
32use std::sync::{Arc, RwLock};
33
34#[derive(Clone, Debug, Default, PartialEq, Eq)]
35pub struct SingleColumnIndexEntry {
36 pub index_name: String,
37 pub canonical_name: String,
38 pub column_id: FieldId,
39 pub column_name: String,
40 pub unique: bool,
41 pub ascending: bool,
42 pub nulls_first: bool,
43}
44
45#[derive(Clone, Debug, Default)]
46struct TableSnapshot {
47 table_meta: Option<TableMeta>,
48 column_metas: FxHashMap<FieldId, ColMeta>,
49 constraints: FxHashMap<ConstraintId, ConstraintRecord>,
50 constraint_names: FxHashMap<ConstraintId, String>,
51 single_indexes: FxHashMap<String, SingleColumnIndexEntry>,
52 multi_column_indexes: FxHashMap<String, MultiColumnIndexEntryMeta>,
53 sort_indexes: FxHashSet<FieldId>,
54}
55
56impl TableSnapshot {
57 fn new(
58 table_meta: Option<TableMeta>,
59 column_metas: FxHashMap<FieldId, ColMeta>,
60 constraints: FxHashMap<ConstraintId, ConstraintRecord>,
61 constraint_names: FxHashMap<ConstraintId, String>,
62 single_indexes: FxHashMap<String, SingleColumnIndexEntry>,
63 multi_column_indexes: FxHashMap<String, MultiColumnIndexEntryMeta>,
64 sort_indexes: FxHashSet<FieldId>,
65 ) -> Self {
66 Self {
67 table_meta,
68 column_metas,
69 constraints,
70 constraint_names,
71 single_indexes,
72 multi_column_indexes,
73 sort_indexes,
74 }
75 }
76}
77
78#[derive(Clone, Debug)]
79struct TableState {
80 current: TableSnapshot,
81 persisted: TableSnapshot,
82}
83
84impl TableState {
85 fn from_snapshot(snapshot: TableSnapshot) -> Self {
86 Self {
87 current: snapshot.clone(),
88 persisted: snapshot,
89 }
90 }
91}
92
93#[derive(Default)]
94struct ReferencingIndex {
95 parent_to_children: FxHashMap<TableId, FxHashSet<(TableId, ConstraintId)>>,
96 child_to_parents: FxHashMap<TableId, FxHashSet<TableId>>,
97 initialized: bool,
98}
99
100impl ReferencingIndex {
101 fn remove_child(&mut self, child_id: TableId) {
102 if let Some(parents) = self.child_to_parents.remove(&child_id) {
103 for parent_id in parents {
104 if let Some(children) = self.parent_to_children.get_mut(&parent_id) {
105 children.retain(|(entry_child, _)| *entry_child != child_id);
106 if children.is_empty() {
107 self.parent_to_children.remove(&parent_id);
108 }
109 }
110 }
111 }
112 }
113
114 fn insert(&mut self, parent_id: TableId, child_id: TableId, constraint_id: ConstraintId) {
115 self.parent_to_children
116 .entry(parent_id)
117 .or_default()
118 .insert((child_id, constraint_id));
119 self.child_to_parents
120 .entry(child_id)
121 .or_default()
122 .insert(parent_id);
123 self.initialized = true;
124 }
125
126 fn children(&self, parent_id: TableId) -> Vec<(TableId, ConstraintId)> {
127 self.parent_to_children
128 .get(&parent_id)
129 .map(|set| set.iter().cloned().collect())
130 .unwrap_or_default()
131 }
132
133 fn mark_initialized(&mut self) {
134 self.initialized = true;
135 }
136
137 fn is_initialized(&self) -> bool {
138 self.initialized
139 }
140}
141
142pub struct MetadataManager<P>
144where
145 P: Pager<Blob = EntryHandle> + Send + Sync,
146{
147 store: Arc<ColumnStore<P>>,
148 tables: RwLock<FxHashMap<TableId, TableState>>,
149 referencing_index: RwLock<ReferencingIndex>,
150}
151
152impl<P> MetadataManager<P>
153where
154 P: Pager<Blob = EntryHandle> + Send + Sync,
155{
156 pub fn new(store: Arc<ColumnStore<P>>) -> Self {
158 Self {
159 store,
160 tables: RwLock::new(FxHashMap::default()),
161 referencing_index: RwLock::new(ReferencingIndex::default()),
162 }
163 }
164
165 fn ensure_table_state(&self, table_id: TableId) -> LlkvResult<()> {
167 if self.tables.read().unwrap().contains_key(&table_id) {
168 return Ok(());
169 }
170 let state = self.load_table_state(table_id)?;
171 {
172 let mut tables = self.tables.write().unwrap();
173 tables.entry(table_id).or_insert(state);
174 }
175 self.refresh_referencing_index_for_table(table_id);
176 Ok(())
177 }
178
179 fn load_table_state(&self, table_id: TableId) -> LlkvResult<TableState> {
180 let catalog = SysCatalog::new(&self.store);
181 let table_meta = catalog.get_table_meta(table_id);
182 let constraint_records = catalog.constraint_records_for_table(table_id)?;
183 let constraint_ids: Vec<ConstraintId> = constraint_records
184 .iter()
185 .map(|record| record.constraint_id)
186 .collect();
187 let constraint_name_entries = if constraint_ids.is_empty() {
188 Vec::new()
189 } else {
190 catalog.get_constraint_names(table_id, &constraint_ids)?
191 };
192 let multi_uniques = catalog.get_multi_column_indexes(table_id)?;
193 let single_index_metas = catalog.get_single_column_indexes(table_id)?;
194 let multi_column_index_metas = catalog.get_multi_column_indexes(table_id)?;
195 let mut constraints = FxHashMap::default();
196 let mut constraint_names = FxHashMap::default();
197 let mut single_indexes = FxHashMap::default();
198 let mut multi_column_indexes = FxHashMap::default();
199 let mut sort_indexes = FxHashSet::default();
200 for meta in single_index_metas {
201 sort_indexes.insert(meta.column_id);
202 single_indexes.insert(
203 meta.canonical_name.clone(),
204 SingleColumnIndexEntry {
205 index_name: meta.index_name,
206 canonical_name: meta.canonical_name,
207 column_id: meta.column_id,
208 column_name: meta.column_name,
209 unique: meta.unique,
210 ascending: meta.ascending,
211 nulls_first: meta.nulls_first,
212 },
213 );
214 }
215 for meta in multi_uniques {
216 multi_column_indexes.insert(meta.canonical_name.clone(), meta);
217 }
218 for meta in multi_column_index_metas {
219 multi_column_indexes.insert(meta.canonical_name.clone(), meta);
220 }
221 for (record, name) in constraint_records
222 .into_iter()
223 .zip(constraint_name_entries.into_iter())
224 {
225 if let Some(name) = name {
226 constraint_names.insert(record.constraint_id, name);
227 }
228 constraints.insert(record.constraint_id, record);
229 }
230 let snapshot = TableSnapshot::new(
231 table_meta,
232 FxHashMap::default(),
233 constraints,
234 constraint_names,
235 single_indexes,
236 multi_column_indexes,
237 sort_indexes,
238 );
239 Ok(TableState::from_snapshot(snapshot))
240 }
241
242 fn refresh_referencing_index_for_table(&self, table_id: TableId) {
243 let foreign_keys: Vec<(TableId, ConstraintId)> = {
244 let tables = self.tables.read().unwrap();
245 match tables.get(&table_id) {
246 Some(state) => state
247 .current
248 .constraints
249 .iter()
250 .filter(|(_, record)| record.is_active())
251 .filter_map(|(constraint_id, record)| {
252 if let ConstraintKind::ForeignKey(fk) = &record.kind {
253 Some((fk.referenced_table, *constraint_id))
254 } else {
255 None
256 }
257 })
258 .collect(),
259 None => Vec::new(),
260 }
261 };
262
263 let mut index = self.referencing_index.write().unwrap();
264 index.remove_child(table_id);
265 for (parent_table, constraint_id) in foreign_keys {
266 index.insert(parent_table, table_id, constraint_id);
267 }
268 }
269
270 fn constraint_name_for(
271 &self,
272 table_id: TableId,
273 constraint_id: ConstraintId,
274 ) -> LlkvResult<Option<String>> {
275 self.ensure_table_state(table_id)?;
276 let tables = self.tables.read().unwrap();
277 let state = tables.get(&table_id).unwrap();
278 Ok(state.current.constraint_names.get(&constraint_id).cloned())
279 }
280
281 fn ensure_referencing_index_initialized(&self) -> LlkvResult<()> {
282 let needs_init = {
283 let index = self.referencing_index.read().unwrap();
284 !index.is_initialized()
285 };
286
287 if !needs_init {
288 return Ok(());
289 }
290
291 let metas = self.all_table_metas()?;
292 for (table_id, _) in metas {
293 self.ensure_table_state(table_id)?;
294 self.refresh_referencing_index_for_table(table_id);
295 }
296
297 let mut index = self.referencing_index.write().unwrap();
298 index.mark_initialized();
299 Ok(())
300 }
301
302 pub fn table_meta(&self, table_id: TableId) -> LlkvResult<Option<TableMeta>> {
304 self.ensure_table_state(table_id)?;
305 let tables = self.tables.read().unwrap();
306 Ok(tables
307 .get(&table_id)
308 .and_then(|state| state.current.table_meta.clone()))
309 }
310
311 pub fn foreign_keys_referencing(
313 &self,
314 referenced_table: TableId,
315 ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
316 self.ensure_referencing_index_initialized()?;
317 let index = self.referencing_index.read().unwrap();
318 Ok(index.children(referenced_table))
319 }
320
321 pub fn set_table_meta(&self, table_id: TableId, meta: TableMeta) -> LlkvResult<()> {
323 self.ensure_table_state(table_id)?;
324 let mut tables = self.tables.write().unwrap();
325 let state = tables.get_mut(&table_id).unwrap();
326 state.current.table_meta = Some(meta);
327 Ok(())
328 }
329
330 pub fn column_metas(
332 &self,
333 table_id: TableId,
334 field_ids: &[FieldId],
335 ) -> LlkvResult<Vec<Option<ColMeta>>> {
336 self.ensure_table_state(table_id)?;
337
338 let missing_ids = {
340 let tables = self.tables.read().unwrap();
341 let state = tables.get(&table_id).unwrap();
342 field_ids
343 .iter()
344 .copied()
345 .filter(|field_id| !state.current.column_metas.contains_key(field_id))
346 .collect::<Vec<_>>()
347 };
348
349 if !missing_ids.is_empty() {
350 let catalog = SysCatalog::new(&self.store);
351 let fetched = catalog.get_cols_meta(table_id, &missing_ids);
352 let mut tables = self.tables.write().unwrap();
353 let state = tables.get_mut(&table_id).unwrap();
354 for (idx, field_id) in missing_ids.iter().enumerate() {
355 if let Some(meta) = fetched[idx].clone() {
356 state.current.column_metas.insert(*field_id, meta.clone());
357 state
358 .persisted
359 .column_metas
360 .entry(*field_id)
361 .or_insert(meta);
362 }
363 }
364 }
365
366 let tables = self.tables.read().unwrap();
367 let state = tables.get(&table_id).unwrap();
368 Ok(field_ids
369 .iter()
370 .map(|field_id| state.current.column_metas.get(field_id).cloned())
371 .collect())
372 }
373
374 pub fn set_column_meta(&self, table_id: TableId, meta: ColMeta) -> LlkvResult<()> {
376 self.ensure_table_state(table_id)?;
377 let mut tables = self.tables.write().unwrap();
378 let state = tables.get_mut(&table_id).unwrap();
379 state.current.column_metas.insert(meta.col_id, meta);
380 Ok(())
381 }
382
383 pub fn multi_column_uniques(
385 &self,
386 table_id: TableId,
387 ) -> LlkvResult<Vec<MultiColumnIndexEntryMeta>> {
388 self.ensure_table_state(table_id)?;
389 let tables = self.tables.read().unwrap();
390 let state = tables.get(&table_id).unwrap();
391 Ok(state
392 .current
393 .multi_column_indexes
394 .values()
395 .filter(|entry| entry.unique)
396 .cloned()
397 .collect())
398 }
399
400 pub fn set_multi_column_uniques(
402 &self,
403 table_id: TableId,
404 uniques: Vec<MultiColumnIndexEntryMeta>,
405 ) -> LlkvResult<()> {
406 self.ensure_table_state(table_id)?;
407 let mut tables = self.tables.write().unwrap();
408 let state = tables.get_mut(&table_id).unwrap();
409 state
411 .current
412 .multi_column_indexes
413 .retain(|_, entry| !entry.unique);
414 for unique in uniques {
416 state
417 .current
418 .multi_column_indexes
419 .insert(unique.canonical_name.clone(), unique);
420 }
421 Ok(())
422 }
423
424 pub fn single_column_indexes(
426 &self,
427 table_id: TableId,
428 ) -> LlkvResult<Vec<SingleColumnIndexEntry>> {
429 self.ensure_table_state(table_id)?;
430 let tables = self.tables.read().unwrap();
431 let state = tables.get(&table_id).unwrap();
432 Ok(state.current.single_indexes.values().cloned().collect())
433 }
434
435 pub fn single_column_index(
437 &self,
438 table_id: TableId,
439 canonical_index_name: &str,
440 ) -> LlkvResult<Option<SingleColumnIndexEntry>> {
441 self.ensure_table_state(table_id)?;
442 let tables = self.tables.read().unwrap();
443 let state = tables.get(&table_id).unwrap();
444 Ok(state
445 .current
446 .single_indexes
447 .get(canonical_index_name)
448 .cloned())
449 }
450
451 pub fn put_single_column_index(
453 &self,
454 table_id: TableId,
455 entry: SingleColumnIndexEntry,
456 ) -> LlkvResult<()> {
457 self.ensure_table_state(table_id)?;
458 let mut tables = self.tables.write().unwrap();
459 let state = tables.get_mut(&table_id).unwrap();
460 let column_id = entry.column_id;
461 state
462 .current
463 .single_indexes
464 .insert(entry.canonical_name.clone(), entry);
465 state.current.sort_indexes.insert(column_id);
466 Ok(())
467 }
468
469 pub fn remove_single_column_index(
471 &self,
472 table_id: TableId,
473 canonical_index_name: &str,
474 ) -> LlkvResult<Option<SingleColumnIndexEntry>> {
475 self.ensure_table_state(table_id)?;
476 let mut tables = self.tables.write().unwrap();
477 let state = tables.get_mut(&table_id).unwrap();
478 let removed = state.current.single_indexes.remove(canonical_index_name);
479 if let Some(ref entry) = removed {
480 let still_indexed = state
481 .current
482 .single_indexes
483 .values()
484 .any(|existing| existing.column_id == entry.column_id);
485 if !still_indexed {
486 state.current.sort_indexes.remove(&entry.column_id);
487 }
488 }
489 Ok(removed)
490 }
491
492 pub fn put_multi_column_index(
494 &self,
495 table_id: TableId,
496 entry: MultiColumnIndexEntryMeta,
497 ) -> LlkvResult<()> {
498 self.ensure_table_state(table_id)?;
499 let mut tables = self.tables.write().unwrap();
500 let state = tables.get_mut(&table_id).unwrap();
501 state
502 .current
503 .multi_column_indexes
504 .insert(entry.canonical_name.clone(), entry);
505 Ok(())
506 }
507
508 pub fn remove_multi_column_index(
510 &self,
511 table_id: TableId,
512 canonical_index_name: &str,
513 ) -> LlkvResult<Option<MultiColumnIndexEntryMeta>> {
514 self.ensure_table_state(table_id)?;
515 let mut tables = self.tables.write().unwrap();
516 let state = tables.get_mut(&table_id).unwrap();
517 Ok(state
518 .current
519 .multi_column_indexes
520 .remove(canonical_index_name))
521 }
522
523 pub fn get_multi_column_index(
525 &self,
526 table_id: TableId,
527 canonical_index_name: &str,
528 ) -> LlkvResult<Option<MultiColumnIndexEntryMeta>> {
529 self.ensure_table_state(table_id)?;
530 let tables = self.tables.read().unwrap();
531 Ok(tables.get(&table_id).and_then(|state| {
532 state
533 .current
534 .multi_column_indexes
535 .get(canonical_index_name)
536 .cloned()
537 }))
538 }
539
540 pub fn register_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
542 self.ensure_table_state(table_id)?;
543
544 {
545 let mut tables = self.tables.write().unwrap();
546 let state = tables.get_mut(&table_id).unwrap();
547 if state.persisted.sort_indexes.contains(&field_id)
548 || state.current.sort_indexes.contains(&field_id)
549 {
550 state.current.sort_indexes.insert(field_id);
551 return Ok(());
552 }
553 }
554
555 if self.field_has_sort_index(table_id, field_id)? {
556 let mut tables = self.tables.write().unwrap();
557 let state = tables.get_mut(&table_id).unwrap();
558 state.persisted.sort_indexes.insert(field_id);
559 state.current.sort_indexes.insert(field_id);
560 return Ok(());
561 }
562
563 let mut tables = self.tables.write().unwrap();
564 let state = tables.get_mut(&table_id).unwrap();
565 state.current.sort_indexes.insert(field_id);
566 Ok(())
567 }
568
569 pub fn unregister_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
571 self.ensure_table_state(table_id)?;
572
573 let mut tables = self.tables.write().unwrap();
574 let state = tables.get_mut(&table_id).unwrap();
575 state.current.sort_indexes.remove(&field_id);
576
577 if !state.persisted.sort_indexes.contains(&field_id) {
578 drop(tables);
579 if self.field_has_sort_index(table_id, field_id)? {
580 let mut tables = self.tables.write().unwrap();
581 let state = tables.get_mut(&table_id).unwrap();
582 state.persisted.sort_indexes.insert(field_id);
583 }
584 }
585
586 Ok(())
587 }
588
589 pub fn update_multi_column_uniques<F, T>(&self, table_id: TableId, f: F) -> LlkvResult<T>
591 where
592 F: FnOnce(&mut Vec<MultiColumnIndexEntryMeta>) -> T,
593 {
594 self.ensure_table_state(table_id)?;
595 let mut tables = self.tables.write().unwrap();
596 let state = tables.get_mut(&table_id).unwrap();
597 let mut uniques: Vec<MultiColumnIndexEntryMeta> = state
598 .current
599 .multi_column_indexes
600 .values()
601 .filter(|entry| entry.unique)
602 .cloned()
603 .collect();
604 let result = f(&mut uniques);
605 state
607 .current
608 .multi_column_indexes
609 .retain(|_, entry| !entry.unique);
610 for unique in uniques {
612 state
613 .current
614 .multi_column_indexes
615 .insert(unique.canonical_name.clone(), unique);
616 }
617 Ok(result)
618 }
619
620 pub fn prepare_table_drop(&self, table_id: TableId, column_ids: &[FieldId]) -> LlkvResult<()> {
625 if !column_ids.is_empty() {
626 let _ = self.column_metas(table_id, column_ids)?;
627 } else {
628 self.ensure_table_state(table_id)?;
629 }
630
631 let mut tables = self.tables.write().unwrap();
632 if let Some(state) = tables.get_mut(&table_id) {
633 state.current.table_meta = None;
634 state.current.column_metas.clear();
635 state.current.constraints.clear();
636 state.current.constraint_names.clear();
637 state.current.single_indexes.clear();
638 state.current.multi_column_indexes.clear();
639 state.current.sort_indexes.clear();
640 }
641 drop(tables);
642 self.refresh_referencing_index_for_table(table_id);
643 Ok(())
644 }
645
646 pub fn remove_table_state(&self, table_id: TableId) {
648 self.tables.write().unwrap().remove(&table_id);
649 self.referencing_index
650 .write()
651 .unwrap()
652 .remove_child(table_id);
653 }
654
655 pub fn constraint_records(&self, table_id: TableId) -> LlkvResult<Vec<ConstraintRecord>> {
657 self.ensure_table_state(table_id)?;
658 let tables = self.tables.read().unwrap();
659 let state = tables.get(&table_id).unwrap();
660 Ok(state.current.constraints.values().cloned().collect())
661 }
662
663 pub fn constraint_records_by_id(
665 &self,
666 table_id: TableId,
667 constraint_ids: &[ConstraintId],
668 ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
669 self.ensure_table_state(table_id)?;
670 let tables = self.tables.read().unwrap();
671 let state = tables.get(&table_id).unwrap();
672 Ok(constraint_ids
673 .iter()
674 .map(|constraint_id| state.current.constraints.get(constraint_id).cloned())
675 .collect())
676 }
677
678 pub fn put_constraint_records(
680 &self,
681 table_id: TableId,
682 records: &[ConstraintRecord],
683 ) -> LlkvResult<()> {
684 self.ensure_table_state(table_id)?;
685 let mut tables = self.tables.write().unwrap();
686 let state = tables.get_mut(&table_id).unwrap();
687 for record in records {
688 state
689 .current
690 .constraints
691 .insert(record.constraint_id, record.clone());
692 }
693 drop(tables);
694 self.refresh_referencing_index_for_table(table_id);
695 Ok(())
696 }
697
698 pub fn put_constraint_names(
700 &self,
701 table_id: TableId,
702 names: &[(ConstraintId, Option<String>)],
703 ) -> LlkvResult<()> {
704 if names.is_empty() {
705 return Ok(());
706 }
707 self.ensure_table_state(table_id)?;
708 let mut tables = self.tables.write().unwrap();
709 if let Some(state) = tables.get_mut(&table_id) {
710 for (constraint_id, name) in names {
711 if let Some(name) = name {
712 state
713 .current
714 .constraint_names
715 .insert(*constraint_id, name.clone());
716 } else {
717 state.current.constraint_names.remove(constraint_id);
718 }
719 }
720 }
721 Ok(())
722 }
723
724 pub fn constraint_record_map(
726 &self,
727 table_id: TableId,
728 ) -> LlkvResult<FxHashMap<ConstraintId, ConstraintRecord>> {
729 self.ensure_table_state(table_id)?;
730 let tables = self.tables.read().unwrap();
731 let state = tables.get(&table_id).unwrap();
732 Ok(state.current.constraints.clone())
733 }
734
735 pub fn flush_table(&self, table_id: TableId) -> LlkvResult<()> {
737 self.ensure_table_state(table_id)?;
738 let mut tables = self.tables.write().unwrap();
739 let state = tables.get_mut(&table_id).unwrap();
740
741 let catalog = SysCatalog::new(&self.store);
742
743 match (
744 state.current.table_meta.as_ref(),
745 state.persisted.table_meta.as_ref(),
746 ) {
747 (Some(meta), Some(existing)) if meta != existing => {
748 catalog.put_table_meta(meta);
749 state.persisted.table_meta = Some(meta.clone());
750 }
751 (Some(meta), None) => {
752 catalog.put_table_meta(meta);
753 state.persisted.table_meta = Some(meta.clone());
754 }
755 (None, Some(_)) => {
756 catalog.delete_table_meta(table_id)?;
757 state.persisted.table_meta = None;
758 }
759 _ => {}
760 }
761
762 let mut dirty_columns: Vec<(FieldId, ColMeta)> = Vec::new();
763 for (field_id, meta) in &state.current.column_metas {
764 match state.persisted.column_metas.get(field_id) {
765 Some(existing) if existing == meta => {}
766 _ => dirty_columns.push((*field_id, meta.clone())),
767 }
768 }
769 for (field_id, meta) in dirty_columns.iter() {
770 catalog.put_col_meta(table_id, meta);
771 state.persisted.column_metas.insert(*field_id, meta.clone());
772 }
773
774 let removed_columns: Vec<FieldId> = state
775 .persisted
776 .column_metas
777 .keys()
778 .copied()
779 .filter(|field_id| !state.current.column_metas.contains_key(field_id))
780 .collect();
781 if !removed_columns.is_empty() {
782 catalog.delete_col_meta(table_id, &removed_columns)?;
783 for field_id in removed_columns {
784 state.persisted.column_metas.remove(&field_id);
785 }
786 }
787
788 let mut dirty_constraints: Vec<ConstraintRecord> = Vec::new();
789 for (constraint_id, record) in &state.current.constraints {
790 match state.persisted.constraints.get(constraint_id) {
791 Some(existing) if existing == record => {}
792 _ => dirty_constraints.push(record.clone()),
793 }
794 }
795 if !dirty_constraints.is_empty() {
796 catalog.put_constraint_records(table_id, &dirty_constraints)?;
797 for record in dirty_constraints {
798 state
799 .persisted
800 .constraints
801 .insert(record.constraint_id, record);
802 }
803 }
804
805 let removed_constraints: Vec<ConstraintId> = state
806 .persisted
807 .constraints
808 .keys()
809 .copied()
810 .filter(|constraint_id| !state.current.constraints.contains_key(constraint_id))
811 .collect();
812 if !removed_constraints.is_empty() {
813 catalog.delete_constraint_records(table_id, &removed_constraints)?;
814 for constraint_id in removed_constraints {
815 state.persisted.constraints.remove(&constraint_id);
816 }
817 }
818
819 let mut dirty_constraint_names: Vec<(ConstraintId, String)> = Vec::new();
820 for (constraint_id, name) in &state.current.constraint_names {
821 match state.persisted.constraint_names.get(constraint_id) {
822 Some(existing) if existing == name => {}
823 _ => dirty_constraint_names.push((*constraint_id, name.clone())),
824 }
825 }
826 if !dirty_constraint_names.is_empty() {
827 let records: Vec<ConstraintNameRecord> = dirty_constraint_names
828 .iter()
829 .map(|(constraint_id, name)| ConstraintNameRecord {
830 constraint_id: *constraint_id,
831 name: Some(name.clone()),
832 })
833 .collect();
834 catalog.put_constraint_names(table_id, &records)?;
835 for (constraint_id, name) in dirty_constraint_names {
836 state.persisted.constraint_names.insert(constraint_id, name);
837 }
838 }
839
840 let removed_constraint_names: Vec<ConstraintId> = state
841 .persisted
842 .constraint_names
843 .keys()
844 .copied()
845 .filter(|constraint_id| !state.current.constraint_names.contains_key(constraint_id))
846 .collect();
847 if !removed_constraint_names.is_empty() {
848 catalog.delete_constraint_names(table_id, &removed_constraint_names)?;
849 for constraint_id in removed_constraint_names {
850 state.persisted.constraint_names.remove(&constraint_id);
851 }
852 }
853
854 if state.current.multi_column_indexes != state.persisted.multi_column_indexes {
856 if state.current.multi_column_indexes.is_empty() {
857 catalog.delete_multi_column_indexes(table_id)?;
858 } else {
859 let mut entries: Vec<MultiColumnIndexEntryMeta> = state
860 .current
861 .multi_column_indexes
862 .values()
863 .cloned()
864 .collect();
865 entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
866 catalog.put_multi_column_indexes(table_id, &entries)?;
867 }
868 state.persisted.multi_column_indexes = state.current.multi_column_indexes.clone();
869 }
870
871 if state.current.single_indexes != state.persisted.single_indexes {
872 if state.current.single_indexes.is_empty() {
873 catalog.delete_single_column_indexes(table_id)?;
874 state.persisted.single_indexes.clear();
875 } else {
876 let mut entries: Vec<SingleColumnIndexEntryMeta> = state
877 .current
878 .single_indexes
879 .values()
880 .cloned()
881 .map(|entry| SingleColumnIndexEntryMeta {
882 index_name: entry.index_name,
883 canonical_name: entry.canonical_name,
884 column_id: entry.column_id,
885 column_name: entry.column_name,
886 unique: entry.unique,
887 ascending: entry.ascending,
888 nulls_first: entry.nulls_first,
889 })
890 .collect();
891 entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
892 catalog.put_single_column_indexes(table_id, &entries)?;
893 state.persisted.single_indexes = state.current.single_indexes.clone();
894 }
895 }
896
897 let sort_adds: Vec<FieldId> = state
898 .current
899 .sort_indexes
900 .iter()
901 .copied()
902 .filter(|field_id| !state.persisted.sort_indexes.contains(field_id))
903 .collect();
904 let sort_removes: Vec<FieldId> = state
905 .persisted
906 .sort_indexes
907 .iter()
908 .copied()
909 .filter(|field_id| !state.current.sort_indexes.contains(field_id))
910 .collect();
911 if !sort_adds.is_empty() || !sort_removes.is_empty() {
912 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
913 for field_id in &sort_adds {
914 table.register_sort_index(*field_id)?;
915 state.persisted.sort_indexes.insert(*field_id);
916 }
917 for field_id in &sort_removes {
918 table.unregister_sort_index(*field_id)?;
919 state.persisted.sort_indexes.remove(field_id);
920 }
921 }
922
923 Ok(())
924 }
925
926 pub fn flush_all(&self) -> LlkvResult<()> {
928 let table_ids: Vec<TableId> = {
929 let tables = self.tables.read().unwrap();
930 tables.keys().copied().collect()
931 };
932 for table_id in table_ids {
933 self.flush_table(table_id)?;
934 }
935 Ok(())
936 }
937
938 pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
940 let catalog = SysCatalog::new(&self.store);
941 catalog.all_table_metas()
942 }
943
944 pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnIndexMeta>> {
946 let catalog = SysCatalog::new(&self.store);
947 let all = catalog.all_multi_column_index_metas()?;
948 Ok(all
950 .into_iter()
951 .map(|mut meta| {
952 meta.indexes.retain(|idx| idx.unique);
953 meta
954 })
955 .filter(|meta| !meta.indexes.is_empty())
956 .collect())
957 }
958
959 pub fn foreign_key_descriptors(
961 &self,
962 table_id: TableId,
963 ) -> LlkvResult<Vec<ForeignKeyDescriptor>> {
964 let records = self.constraint_records(table_id)?;
965 let mut descriptors = Vec::new();
966
967 for record in records {
968 if !record.is_active() {
969 continue;
970 }
971
972 let ConstraintKind::ForeignKey(fk) = record.kind else {
973 continue;
974 };
975
976 descriptors.push(ForeignKeyDescriptor {
977 constraint_id: record.constraint_id,
978 referencing_table_id: table_id,
979 referencing_field_ids: fk.referencing_field_ids.clone(),
980 referenced_table_id: fk.referenced_table,
981 referenced_field_ids: fk.referenced_field_ids.clone(),
982 on_delete: fk.on_delete,
983 on_update: fk.on_update,
984 });
985 }
986
987 Ok(descriptors)
988 }
989
990 pub fn foreign_key_views(
992 &self,
993 catalog: &TableCatalog,
994 table_id: TableId,
995 ) -> LlkvResult<Vec<ForeignKeyView>> {
996 let descriptors = self.foreign_key_descriptors(table_id)?;
997
998 if descriptors.is_empty() {
999 return Ok(Vec::new());
1000 }
1001
1002 let (referencing_display, referencing_canonical) =
1003 resolve_table_name(catalog, self, table_id)?;
1004
1005 let mut details = Vec::with_capacity(descriptors.len());
1006 for descriptor in descriptors {
1007 let referenced_table_id = descriptor.referenced_table_id;
1008 let (referenced_display, referenced_canonical) =
1009 resolve_table_name(catalog, self, referenced_table_id)?;
1010
1011 let referencing_column_names =
1012 self.column_names(table_id, &descriptor.referencing_field_ids)?;
1013 let referenced_column_names =
1014 self.column_names(referenced_table_id, &descriptor.referenced_field_ids)?;
1015 let constraint_name = self.constraint_name_for(table_id, descriptor.constraint_id)?;
1016
1017 details.push(ForeignKeyView {
1018 constraint_id: descriptor.constraint_id,
1019 constraint_name,
1020 referencing_table_id: descriptor.referencing_table_id,
1021 referencing_table_display: referencing_display.clone(),
1022 referencing_table_canonical: referencing_canonical.clone(),
1023 referencing_field_ids: descriptor.referencing_field_ids.clone(),
1024 referencing_column_names,
1025 referenced_table_id,
1026 referenced_table_display: referenced_display.clone(),
1027 referenced_table_canonical: referenced_canonical.clone(),
1028 referenced_field_ids: descriptor.referenced_field_ids.clone(),
1029 referenced_column_names,
1030 on_delete: descriptor.on_delete,
1031 on_update: descriptor.on_update,
1032 });
1033 }
1034
1035 Ok(details)
1036 }
1037
1038 pub fn table_view(
1040 &self,
1041 catalog: &TableCatalog,
1042 table_id: TableId,
1043 field_ids: &[FieldId],
1044 ) -> LlkvResult<TableView> {
1045 let table_meta = self.table_meta(table_id)?;
1046 let column_metas = self.column_metas(table_id, field_ids)?;
1047 let constraint_records = self.constraint_records(table_id)?;
1048 let multi_column_uniques = self.multi_column_uniques(table_id)?;
1049 let foreign_keys = self.foreign_key_views(catalog, table_id)?;
1050
1051 Ok(TableView {
1052 table_meta,
1053 column_metas,
1054 constraint_records,
1055 multi_column_uniques,
1056 foreign_keys,
1057 })
1058 }
1059
1060 pub fn validate_and_register_foreign_keys<F>(
1062 &self,
1063 referencing_table: &ForeignKeyTableInfo,
1064 specs: &[ForeignKeySpec],
1065 lookup_table: F,
1066 timestamp_micros: u64,
1067 ) -> LlkvResult<Vec<ValidatedForeignKey>>
1068 where
1069 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
1070 {
1071 let validated = validate_foreign_keys(referencing_table, specs, lookup_table)?;
1072 self.register_foreign_keys(referencing_table.table_id, &validated, timestamp_micros)?;
1073 Ok(validated)
1074 }
1075
1076 pub fn register_foreign_keys(
1078 &self,
1079 table_id: TableId,
1080 foreign_keys: &[ValidatedForeignKey],
1081 timestamp_micros: u64,
1082 ) -> LlkvResult<()> {
1083 if foreign_keys.is_empty() {
1084 return Ok(());
1085 }
1086
1087 let existing_constraints = self.constraint_record_map(table_id)?;
1088 let mut next_constraint_id = existing_constraints
1089 .keys()
1090 .copied()
1091 .max()
1092 .unwrap_or(0)
1093 .saturating_add(1);
1094
1095 let mut constraint_records = Vec::with_capacity(foreign_keys.len());
1096 let mut constraint_names: Vec<(ConstraintId, Option<String>)> =
1097 Vec::with_capacity(foreign_keys.len());
1098
1099 for fk in foreign_keys {
1100 let constraint_id = next_constraint_id;
1101 constraint_records.push(ConstraintRecord {
1102 constraint_id,
1103 kind: ConstraintKind::ForeignKey(ForeignKeyConstraint {
1104 referencing_field_ids: fk.referencing_field_ids.clone(),
1105 referenced_table: fk.referenced_table_id,
1106 referenced_field_ids: fk.referenced_field_ids.clone(),
1107 on_delete: fk.on_delete,
1108 on_update: fk.on_update,
1109 }),
1110 state: ConstraintState::Active,
1111 revision: 1,
1112 last_modified_micros: timestamp_micros,
1113 });
1114 constraint_names.push((constraint_id, fk.name.clone()));
1115 next_constraint_id = next_constraint_id.saturating_add(1);
1116 }
1117
1118 self.put_constraint_records(table_id, &constraint_records)?;
1119 self.put_constraint_names(table_id, &constraint_names)?;
1120 self.flush_table(table_id)?;
1121
1122 Ok(())
1123 }
1124
1125 pub fn apply_column_definitions(
1127 &self,
1128 table_id: TableId,
1129 columns: &[TableColumn],
1130 timestamp_micros: u64,
1131 ) -> LlkvResult<()> {
1132 if columns.is_empty() {
1133 return Ok(());
1134 }
1135
1136 self.ensure_table_state(table_id)?;
1137
1138 for column in columns {
1139 let column_meta = ColMeta {
1140 col_id: column.field_id,
1141 name: Some(column.name.clone()),
1142 flags: 0,
1143 default: None,
1144 };
1145 self.set_column_meta(table_id, column_meta)?;
1146 }
1147
1148 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1149 let store = table.store();
1150
1151 for column in columns {
1152 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1153 store.ensure_column_registered(logical_field_id, &column.data_type)?;
1154 store.data_type(logical_field_id)?;
1155 }
1156
1157 let created_by_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
1158 store.ensure_column_registered(created_by_lfid, &DataType::UInt64)?;
1159
1160 let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
1161 store.ensure_column_registered(deleted_by_lfid, &DataType::UInt64)?;
1162
1163 let existing = self.constraint_record_map(table_id)?;
1164 let mut next_constraint_id = existing
1165 .keys()
1166 .copied()
1167 .max()
1168 .unwrap_or(0)
1169 .saturating_add(1);
1170
1171 let mut constraints = Vec::new();
1172
1173 let primary_key_fields: Vec<FieldId> = columns
1174 .iter()
1175 .filter(|col| col.primary_key)
1176 .map(|col| col.field_id)
1177 .collect();
1178 if !primary_key_fields.is_empty() {
1179 constraints.push(ConstraintRecord {
1180 constraint_id: next_constraint_id,
1181 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1182 field_ids: primary_key_fields,
1183 }),
1184 state: ConstraintState::Active,
1185 revision: 1,
1186 last_modified_micros: timestamp_micros,
1187 });
1188 next_constraint_id = next_constraint_id.saturating_add(1);
1189 }
1190
1191 for column in columns.iter().filter(|col| col.unique && !col.primary_key) {
1192 constraints.push(ConstraintRecord {
1193 constraint_id: next_constraint_id,
1194 kind: ConstraintKind::Unique(UniqueConstraint {
1195 field_ids: vec![column.field_id],
1196 }),
1197 state: ConstraintState::Active,
1198 revision: 1,
1199 last_modified_micros: timestamp_micros,
1200 });
1201 next_constraint_id = next_constraint_id.saturating_add(1);
1202 }
1203
1204 if !constraints.is_empty() {
1205 self.put_constraint_records(table_id, &constraints)?;
1206 }
1207
1208 Ok(())
1209 }
1210
1211 pub fn column_data_type(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<DataType> {
1212 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1213 let store = table.store();
1214 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
1215 store.data_type(logical_field_id)
1216 }
1217
1218 pub fn register_multi_column_unique(
1220 &self,
1221 table_id: TableId,
1222 column_ids: &[FieldId],
1223 index_name: Option<String>,
1224 ) -> LlkvResult<MultiColumnUniqueRegistration> {
1225 let mut created = false;
1226 let mut existing_name: Option<Option<String>> = None;
1227 let column_vec: Vec<FieldId> = column_ids.to_vec();
1228
1229 let canonical_name = format!(
1231 "__unique_{}_{}",
1232 table_id,
1233 column_vec
1234 .iter()
1235 .map(|id| id.to_string())
1236 .collect::<Vec<_>>()
1237 .join("_")
1238 );
1239
1240 self.update_multi_column_uniques(table_id, |entries| {
1241 if let Some(existing) = entries.iter().find(|entry| entry.column_ids == column_vec) {
1242 existing_name = Some(existing.index_name.clone());
1243 } else {
1244 entries.push(MultiColumnIndexEntryMeta {
1245 index_name: index_name.clone(),
1246 canonical_name: canonical_name.clone(),
1247 column_ids: column_vec.clone(),
1248 unique: true,
1249 });
1250 created = true;
1251 }
1252 })?;
1253
1254 if created {
1255 Ok(MultiColumnUniqueRegistration::Created)
1256 } else {
1257 Ok(MultiColumnUniqueRegistration::AlreadyExists {
1258 index_name: existing_name.unwrap_or(None),
1259 })
1260 }
1261 }
1262
1263 fn column_names(&self, table_id: TableId, field_ids: &[FieldId]) -> LlkvResult<Vec<String>> {
1264 if field_ids.is_empty() {
1265 return Ok(Vec::new());
1266 }
1267
1268 let metas = self.column_metas(table_id, field_ids)?;
1269 let mut names = Vec::with_capacity(field_ids.len());
1270 for (idx, field_id) in field_ids.iter().enumerate() {
1271 let name = metas
1272 .get(idx)
1273 .and_then(|meta| meta.as_ref())
1274 .and_then(|meta| meta.name.clone())
1275 .unwrap_or_else(|| format!("col_{}", field_id));
1276 names.push(name);
1277 }
1278 Ok(names)
1279 }
1280
1281 pub fn reserve_table_id(&self) -> LlkvResult<TableId> {
1283 let catalog = SysCatalog::new(&self.store);
1284
1285 let mut next = match catalog.get_next_table_id()? {
1286 Some(value) => value,
1287 None => {
1288 let seed = catalog
1289 .max_table_id()?
1290 .unwrap_or(reserved::CATALOG_TABLE_ID);
1291 let initial = seed.checked_add(1).ok_or_else(|| {
1292 Error::InvalidArgumentError("exhausted available table ids".into())
1293 })?;
1294 catalog.put_next_table_id(initial)?;
1295 initial
1296 }
1297 };
1298
1299 while reserved::is_reserved_table_id(next) {
1300 next = next.checked_add(1).ok_or_else(|| {
1301 Error::InvalidArgumentError("exhausted available table ids".into())
1302 })?;
1303 }
1304
1305 let mut following = next
1306 .checked_add(1)
1307 .ok_or_else(|| Error::InvalidArgumentError("exhausted available table ids".into()))?;
1308
1309 while reserved::is_reserved_table_id(following) {
1310 following = following.checked_add(1).ok_or_else(|| {
1311 Error::InvalidArgumentError("exhausted available table ids".into())
1312 })?;
1313 }
1314
1315 catalog.put_next_table_id(following)?;
1316 Ok(next)
1317 }
1318
1319 fn field_has_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<bool> {
1326 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1327 let indexes = table.list_registered_indexes(field_id)?;
1328 Ok(indexes.contains(&IndexKind::Sort))
1329 }
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334 use super::*;
1335 use crate::constraints::{ConstraintKind, ConstraintState, PrimaryKeyConstraint};
1336 use crate::{MultiColumnIndexEntryMeta, Table};
1337 use llkv_column_map::ColumnStore;
1338 use llkv_column_map::store::IndexKind;
1339 use llkv_storage::pager::MemPager;
1340 use std::sync::Arc;
1341
1342 #[test]
1343 fn metadata_manager_persists_and_loads() {
1344 let pager = Arc::new(MemPager::default());
1345 let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1346 let manager = MetadataManager::new(Arc::clone(&store));
1347
1348 let table_id: TableId = 42;
1349 let table_meta = TableMeta {
1350 table_id,
1351 name: Some("users".into()),
1352 created_at_micros: 123,
1353 flags: 0,
1354 epoch: 1,
1355 view_definition: None,
1356 };
1357 manager
1358 .set_table_meta(table_id, table_meta.clone())
1359 .unwrap();
1360
1361 {
1362 let tables = manager.tables.read().unwrap();
1363 let state = tables.get(&table_id).unwrap();
1364 assert!(state.current.table_meta.is_some());
1365 }
1366
1367 let column_meta = ColMeta {
1368 col_id: 1,
1369 name: Some("id".into()),
1370 flags: 0,
1371 default: None,
1372 };
1373 manager
1374 .set_column_meta(table_id, column_meta.clone())
1375 .unwrap();
1376
1377 let logical_field_id =
1378 llkv_column_map::types::LogicalFieldId::for_user(table_id, column_meta.col_id);
1379 store
1380 .ensure_column_registered(logical_field_id, &arrow::datatypes::DataType::Utf8)
1381 .unwrap();
1382
1383 manager
1384 .register_sort_index(table_id, column_meta.col_id)
1385 .unwrap();
1386
1387 let constraint = ConstraintRecord {
1388 constraint_id: 7,
1389 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1390 field_ids: vec![column_meta.col_id],
1391 }),
1392 state: ConstraintState::Active,
1393 revision: 1,
1394 last_modified_micros: 456,
1395 };
1396 manager
1397 .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1398 .unwrap();
1399
1400 let multi_unique = MultiColumnIndexEntryMeta {
1401 index_name: Some("uniq_users_name".into()),
1402 canonical_name: "uniq_users_name".to_lowercase(),
1403 column_ids: vec![column_meta.col_id],
1404 unique: true,
1405 };
1406 manager
1407 .set_multi_column_uniques(table_id, vec![multi_unique.clone()])
1408 .unwrap();
1409
1410 assert_eq!(
1411 manager.table_meta(table_id).unwrap(),
1412 Some(table_meta.clone())
1413 );
1414
1415 manager.flush_table(table_id).unwrap();
1416
1417 let table = Table::from_id_and_store(table_id, Arc::clone(&store)).unwrap();
1418 let indexes = table.list_registered_indexes(column_meta.col_id).unwrap();
1419 assert!(indexes.contains(&IndexKind::Sort));
1420
1421 let verify_catalog = SysCatalog::new(&store);
1422 let column_roundtrip = verify_catalog.get_cols_meta(table_id, &[column_meta.col_id]);
1423 assert_eq!(column_roundtrip[0].as_ref(), Some(&column_meta));
1424 let constraints = verify_catalog
1425 .constraint_records_for_table(table_id)
1426 .unwrap();
1427 assert_eq!(constraints, vec![constraint.clone()]);
1428 let unique_roundtrip = verify_catalog.get_multi_column_indexes(table_id).unwrap();
1429 assert_eq!(unique_roundtrip, vec![multi_unique.clone()]);
1430
1431 let meta_from_cache = manager.table_meta(table_id).unwrap();
1432 assert_eq!(meta_from_cache, Some(table_meta.clone()));
1433
1434 let columns_from_cache = manager
1435 .column_metas(table_id, &[column_meta.col_id])
1436 .unwrap();
1437 assert_eq!(columns_from_cache[0].as_ref(), Some(&column_meta));
1438
1439 let constraints_from_cache = manager.constraint_records(table_id).unwrap();
1440 assert_eq!(constraints_from_cache, vec![constraint.clone()]);
1441
1442 let uniques_from_cache = manager.multi_column_uniques(table_id).unwrap();
1443 assert_eq!(uniques_from_cache, vec![multi_unique]);
1444
1445 manager.flush_table(table_id).unwrap();
1447 }
1448
1449 #[test]
1450 fn metadata_manager_lazy_loads_columns_and_constraints() {
1451 let pager = Arc::new(MemPager::default());
1452 let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1453 let manager = MetadataManager::new(Arc::clone(&store));
1454
1455 let table_id: TableId = 99;
1456 let column_meta = ColMeta {
1457 col_id: 3,
1458 name: Some("value".into()),
1459 flags: 0,
1460 default: None,
1461 };
1462 let initial_catalog = SysCatalog::new(&store);
1463 initial_catalog.put_col_meta(table_id, &column_meta);
1464
1465 let constraint = ConstraintRecord {
1466 constraint_id: 15,
1467 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1468 field_ids: vec![column_meta.col_id],
1469 }),
1470 state: ConstraintState::Active,
1471 revision: 1,
1472 last_modified_micros: 0,
1473 };
1474 initial_catalog
1475 .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1476 .unwrap();
1477 let multi_unique = MultiColumnIndexEntryMeta {
1478 index_name: Some("uniq_value".into()),
1479 canonical_name: "uniq_value".to_lowercase(),
1480 column_ids: vec![column_meta.col_id],
1481 unique: true,
1482 };
1483 initial_catalog
1484 .put_multi_column_indexes(table_id, std::slice::from_ref(&multi_unique))
1485 .unwrap();
1486
1487 let columns = manager
1488 .column_metas(table_id, &[column_meta.col_id])
1489 .unwrap();
1490 assert_eq!(columns[0].as_ref(), Some(&column_meta));
1491
1492 let constraints = manager.constraint_records(table_id).unwrap();
1493 assert_eq!(constraints, vec![constraint]);
1494
1495 let uniques = manager.multi_column_uniques(table_id).unwrap();
1496 assert_eq!(uniques, vec![multi_unique]);
1497 }
1498}
1499
1500#[derive(Clone, Debug)]
1502pub struct ForeignKeyDescriptor {
1503 pub constraint_id: ConstraintId,
1504 pub referencing_table_id: TableId,
1505 pub referencing_field_ids: Vec<FieldId>,
1506 pub referenced_table_id: TableId,
1507 pub referenced_field_ids: Vec<FieldId>,
1508 pub on_delete: ForeignKeyAction,
1509 pub on_update: ForeignKeyAction,
1510}
1511
1512#[derive(Debug, Clone, PartialEq, Eq)]
1514pub enum MultiColumnUniqueRegistration {
1515 Created,
1516 AlreadyExists { index_name: Option<String> },
1517}