1#![forbid(unsafe_code)]
8
9use crate::catalog::TableCatalog;
10use crate::constraints::{
11 ConstraintId, ConstraintKind, ConstraintRecord, ConstraintState, ForeignKeyAction,
12 ForeignKeyConstraint, PrimaryKeyConstraint, UniqueConstraint,
13};
14use crate::constraints::{ForeignKeyTableInfo, ValidatedForeignKey, validate_foreign_keys};
15use crate::reserved;
16use crate::resolvers::resolve_table_name;
17use crate::sys_catalog::{ConstraintNameRecord, SysCatalog};
18use crate::sys_catalog::{MultiColumnIndexEntryMeta, SingleColumnIndexEntryMeta, TriggerEntryMeta};
19use crate::table::Table;
20use crate::types::{FieldId, TableColumn, TableId};
21use crate::view::{ForeignKeyView, TableView};
22use crate::{ColMeta, TableMeta, TableMultiColumnIndexMeta};
23use arrow::datatypes::DataType;
24use llkv_column_map::ColumnStore;
25use llkv_column_map::store::IndexKind;
26use llkv_plan::ForeignKeySpec;
27use llkv_result::{Error, Result as LlkvResult};
28use llkv_storage::pager::Pager;
29use llkv_types::LogicalFieldId;
30use rustc_hash::{FxHashMap, FxHashSet};
31use simd_r_drive_entry_handle::EntryHandle;
32use std::sync::{Arc, RwLock};
33
34#[derive(Clone, Debug, Default, PartialEq, Eq)]
35pub struct SingleColumnIndexEntry {
36 pub index_name: String,
37 pub canonical_name: String,
38 pub column_id: FieldId,
39 pub column_name: String,
40 pub unique: bool,
41 pub ascending: bool,
42 pub nulls_first: bool,
43}
44
45#[derive(Clone, Debug, Default)]
46struct TableSnapshot {
47 table_meta: Option<TableMeta>,
48 column_metas: FxHashMap<FieldId, ColMeta>,
49 constraints: FxHashMap<ConstraintId, ConstraintRecord>,
50 constraint_names: FxHashMap<ConstraintId, String>,
51 single_indexes: FxHashMap<String, SingleColumnIndexEntry>,
52 multi_column_indexes: FxHashMap<String, MultiColumnIndexEntryMeta>,
53 sort_indexes: FxHashSet<FieldId>,
54 triggers: FxHashMap<String, TriggerEntryMeta>,
55}
56
57#[derive(Clone, Debug)]
58struct TableState {
59 current: TableSnapshot,
60 persisted: TableSnapshot,
61}
62
63impl TableState {
64 fn from_snapshot(snapshot: TableSnapshot) -> Self {
65 Self {
66 current: snapshot.clone(),
67 persisted: snapshot,
68 }
69 }
70}
71
72#[derive(Default)]
73struct ReferencingIndex {
74 parent_to_children: FxHashMap<TableId, FxHashSet<(TableId, ConstraintId)>>,
75 child_to_parents: FxHashMap<TableId, FxHashSet<TableId>>,
76 initialized: bool,
77}
78
79impl ReferencingIndex {
80 fn remove_child(&mut self, child_id: TableId) {
81 if let Some(parents) = self.child_to_parents.remove(&child_id) {
82 for parent_id in parents {
83 if let Some(children) = self.parent_to_children.get_mut(&parent_id) {
84 children.retain(|(entry_child, _)| *entry_child != child_id);
85 if children.is_empty() {
86 self.parent_to_children.remove(&parent_id);
87 }
88 }
89 }
90 }
91 }
92
93 fn insert(&mut self, parent_id: TableId, child_id: TableId, constraint_id: ConstraintId) {
94 self.parent_to_children
95 .entry(parent_id)
96 .or_default()
97 .insert((child_id, constraint_id));
98 self.child_to_parents
99 .entry(child_id)
100 .or_default()
101 .insert(parent_id);
102 self.initialized = true;
103 }
104
105 fn children(&self, parent_id: TableId) -> Vec<(TableId, ConstraintId)> {
106 self.parent_to_children
107 .get(&parent_id)
108 .map(|set| set.iter().cloned().collect())
109 .unwrap_or_default()
110 }
111
112 fn mark_initialized(&mut self) {
113 self.initialized = true;
114 }
115
116 fn is_initialized(&self) -> bool {
117 self.initialized
118 }
119}
120
121pub struct MetadataManager<P>
123where
124 P: Pager<Blob = EntryHandle> + Send + Sync,
125{
126 store: Arc<ColumnStore<P>>,
127 tables: RwLock<FxHashMap<TableId, TableState>>,
128 referencing_index: RwLock<ReferencingIndex>,
129}
130
131impl<P> MetadataManager<P>
132where
133 P: Pager<Blob = EntryHandle> + Send + Sync,
134{
135 pub fn new(store: Arc<ColumnStore<P>>) -> Self {
137 Self {
138 store,
139 tables: RwLock::new(FxHashMap::default()),
140 referencing_index: RwLock::new(ReferencingIndex::default()),
141 }
142 }
143
144 fn ensure_table_state(&self, table_id: TableId) -> LlkvResult<()> {
146 if self.tables.read().unwrap().contains_key(&table_id) {
147 return Ok(());
148 }
149 let state = self.load_table_state(table_id)?;
150 {
151 let mut tables = self.tables.write().unwrap();
152 tables.entry(table_id).or_insert(state);
153 }
154 self.refresh_referencing_index_for_table(table_id);
155 Ok(())
156 }
157
158 fn load_table_state(&self, table_id: TableId) -> LlkvResult<TableState> {
159 let catalog = SysCatalog::new(&self.store);
160 let table_meta = catalog.get_table_meta(table_id);
161 let constraint_records = catalog.constraint_records_for_table(table_id)?;
162 let constraint_ids: Vec<ConstraintId> = constraint_records
163 .iter()
164 .map(|record| record.constraint_id)
165 .collect();
166 let constraint_name_entries = if constraint_ids.is_empty() {
167 Vec::new()
168 } else {
169 catalog.get_constraint_names(table_id, &constraint_ids)?
170 };
171 let multi_uniques = catalog.get_multi_column_indexes(table_id)?;
172 let single_index_metas = catalog.get_single_column_indexes(table_id)?;
173 let multi_column_index_metas = catalog.get_multi_column_indexes(table_id)?;
174 let trigger_metas = catalog.get_triggers(table_id)?;
175 let mut constraints = FxHashMap::default();
176 let mut constraint_names = FxHashMap::default();
177 let mut single_indexes = FxHashMap::default();
178 let mut multi_column_indexes = FxHashMap::default();
179 let mut sort_indexes = FxHashSet::default();
180 for meta in single_index_metas {
181 sort_indexes.insert(meta.column_id);
182 single_indexes.insert(
183 meta.canonical_name.clone(),
184 SingleColumnIndexEntry {
185 index_name: meta.index_name,
186 canonical_name: meta.canonical_name,
187 column_id: meta.column_id,
188 column_name: meta.column_name,
189 unique: meta.unique,
190 ascending: meta.ascending,
191 nulls_first: meta.nulls_first,
192 },
193 );
194 }
195 for meta in multi_uniques {
196 multi_column_indexes.insert(meta.canonical_name.clone(), meta);
197 }
198 for meta in multi_column_index_metas {
199 multi_column_indexes.insert(meta.canonical_name.clone(), meta);
200 }
201 let mut triggers = FxHashMap::default();
202 for meta in trigger_metas {
203 triggers.insert(meta.canonical_name.clone(), meta);
204 }
205 for (record, name) in constraint_records
206 .into_iter()
207 .zip(constraint_name_entries.into_iter())
208 {
209 if let Some(name) = name {
210 constraint_names.insert(record.constraint_id, name);
211 }
212 constraints.insert(record.constraint_id, record);
213 }
214 let snapshot = TableSnapshot {
215 table_meta,
216 column_metas: FxHashMap::default(),
217 constraints,
218 constraint_names,
219 single_indexes,
220 multi_column_indexes,
221 sort_indexes,
222 triggers,
223 };
224 Ok(TableState::from_snapshot(snapshot))
225 }
226
227 fn refresh_referencing_index_for_table(&self, table_id: TableId) {
228 let foreign_keys: Vec<(TableId, ConstraintId)> = {
229 let tables = self.tables.read().unwrap();
230 match tables.get(&table_id) {
231 Some(state) => state
232 .current
233 .constraints
234 .iter()
235 .filter(|(_, record)| record.is_active())
236 .filter_map(|(constraint_id, record)| {
237 if let ConstraintKind::ForeignKey(fk) = &record.kind {
238 Some((fk.referenced_table, *constraint_id))
239 } else {
240 None
241 }
242 })
243 .collect(),
244 None => Vec::new(),
245 }
246 };
247
248 let mut index = self.referencing_index.write().unwrap();
249 index.remove_child(table_id);
250 for (parent_table, constraint_id) in foreign_keys {
251 index.insert(parent_table, table_id, constraint_id);
252 }
253 }
254
255 fn constraint_name_for(
256 &self,
257 table_id: TableId,
258 constraint_id: ConstraintId,
259 ) -> LlkvResult<Option<String>> {
260 self.ensure_table_state(table_id)?;
261 let tables = self.tables.read().unwrap();
262 let state = tables.get(&table_id).unwrap();
263 Ok(state.current.constraint_names.get(&constraint_id).cloned())
264 }
265
266 fn ensure_referencing_index_initialized(&self) -> LlkvResult<()> {
267 let needs_init = {
268 let index = self.referencing_index.read().unwrap();
269 !index.is_initialized()
270 };
271
272 if !needs_init {
273 return Ok(());
274 }
275
276 let metas = self.all_table_metas()?;
277 for (table_id, _) in metas {
278 self.ensure_table_state(table_id)?;
279 self.refresh_referencing_index_for_table(table_id);
280 }
281
282 let mut index = self.referencing_index.write().unwrap();
283 index.mark_initialized();
284 Ok(())
285 }
286
287 pub fn table_meta(&self, table_id: TableId) -> LlkvResult<Option<TableMeta>> {
289 self.ensure_table_state(table_id)?;
290 let tables = self.tables.read().unwrap();
291 Ok(tables
292 .get(&table_id)
293 .and_then(|state| state.current.table_meta.clone()))
294 }
295
296 pub fn foreign_keys_referencing(
298 &self,
299 referenced_table: TableId,
300 ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
301 self.ensure_referencing_index_initialized()?;
302 let index = self.referencing_index.read().unwrap();
303 Ok(index.children(referenced_table))
304 }
305
306 pub fn set_table_meta(&self, table_id: TableId, meta: TableMeta) -> LlkvResult<()> {
308 self.ensure_table_state(table_id)?;
309 let mut tables = self.tables.write().unwrap();
310 let state = tables.get_mut(&table_id).unwrap();
311 state.current.table_meta = Some(meta);
312 Ok(())
313 }
314
315 pub fn column_metas(
317 &self,
318 table_id: TableId,
319 field_ids: &[FieldId],
320 ) -> LlkvResult<Vec<Option<ColMeta>>> {
321 self.ensure_table_state(table_id)?;
322
323 let missing_ids = {
325 let tables = self.tables.read().unwrap();
326 let state = tables.get(&table_id).unwrap();
327 field_ids
328 .iter()
329 .copied()
330 .filter(|field_id| !state.current.column_metas.contains_key(field_id))
331 .collect::<Vec<_>>()
332 };
333
334 if !missing_ids.is_empty() {
335 let catalog = SysCatalog::new(&self.store);
336 let fetched = catalog.get_cols_meta(table_id, &missing_ids);
337 let mut tables = self.tables.write().unwrap();
338 let state = tables.get_mut(&table_id).unwrap();
339 for (idx, field_id) in missing_ids.iter().enumerate() {
340 if let Some(meta) = fetched[idx].clone() {
341 state.current.column_metas.insert(*field_id, meta.clone());
342 state
343 .persisted
344 .column_metas
345 .entry(*field_id)
346 .or_insert(meta);
347 }
348 }
349 }
350
351 let tables = self.tables.read().unwrap();
352 let state = tables.get(&table_id).unwrap();
353 Ok(field_ids
354 .iter()
355 .map(|field_id| state.current.column_metas.get(field_id).cloned())
356 .collect())
357 }
358
359 pub fn set_column_meta(&self, table_id: TableId, meta: ColMeta) -> LlkvResult<()> {
361 self.ensure_table_state(table_id)?;
362 let mut tables = self.tables.write().unwrap();
363 let state = tables.get_mut(&table_id).unwrap();
364 state.current.column_metas.insert(meta.col_id, meta);
365 Ok(())
366 }
367
368 pub fn multi_column_uniques(
370 &self,
371 table_id: TableId,
372 ) -> LlkvResult<Vec<MultiColumnIndexEntryMeta>> {
373 self.ensure_table_state(table_id)?;
374 let tables = self.tables.read().unwrap();
375 let state = tables.get(&table_id).unwrap();
376 Ok(state
377 .current
378 .multi_column_indexes
379 .values()
380 .filter(|entry| entry.unique)
381 .cloned()
382 .collect())
383 }
384
385 pub fn set_multi_column_uniques(
387 &self,
388 table_id: TableId,
389 uniques: Vec<MultiColumnIndexEntryMeta>,
390 ) -> LlkvResult<()> {
391 self.ensure_table_state(table_id)?;
392 let mut tables = self.tables.write().unwrap();
393 let state = tables.get_mut(&table_id).unwrap();
394 state
396 .current
397 .multi_column_indexes
398 .retain(|_, entry| !entry.unique);
399 for unique in uniques {
401 state
402 .current
403 .multi_column_indexes
404 .insert(unique.canonical_name.clone(), unique);
405 }
406 Ok(())
407 }
408
409 pub fn single_column_indexes(
411 &self,
412 table_id: TableId,
413 ) -> LlkvResult<Vec<SingleColumnIndexEntry>> {
414 self.ensure_table_state(table_id)?;
415 let tables = self.tables.read().unwrap();
416 let state = tables.get(&table_id).unwrap();
417 Ok(state.current.single_indexes.values().cloned().collect())
418 }
419
420 pub fn single_column_index(
422 &self,
423 table_id: TableId,
424 canonical_index_name: &str,
425 ) -> LlkvResult<Option<SingleColumnIndexEntry>> {
426 self.ensure_table_state(table_id)?;
427 let tables = self.tables.read().unwrap();
428 let state = tables.get(&table_id).unwrap();
429 Ok(state
430 .current
431 .single_indexes
432 .get(canonical_index_name)
433 .cloned())
434 }
435
436 pub fn put_single_column_index(
438 &self,
439 table_id: TableId,
440 entry: SingleColumnIndexEntry,
441 ) -> LlkvResult<()> {
442 self.ensure_table_state(table_id)?;
443 let mut tables = self.tables.write().unwrap();
444 let state = tables.get_mut(&table_id).unwrap();
445 let column_id = entry.column_id;
446 state
447 .current
448 .single_indexes
449 .insert(entry.canonical_name.clone(), entry);
450 state.current.sort_indexes.insert(column_id);
451 Ok(())
452 }
453
454 pub fn remove_single_column_index(
456 &self,
457 table_id: TableId,
458 canonical_index_name: &str,
459 ) -> LlkvResult<Option<SingleColumnIndexEntry>> {
460 self.ensure_table_state(table_id)?;
461 let mut tables = self.tables.write().unwrap();
462 let state = tables.get_mut(&table_id).unwrap();
463 let removed = state.current.single_indexes.remove(canonical_index_name);
464 if let Some(ref entry) = removed {
465 let still_indexed = state
466 .current
467 .single_indexes
468 .values()
469 .any(|existing| existing.column_id == entry.column_id);
470 if !still_indexed {
471 state.current.sort_indexes.remove(&entry.column_id);
472 }
473 }
474 Ok(removed)
475 }
476
477 pub fn put_multi_column_index(
479 &self,
480 table_id: TableId,
481 entry: MultiColumnIndexEntryMeta,
482 ) -> LlkvResult<()> {
483 self.ensure_table_state(table_id)?;
484 let mut tables = self.tables.write().unwrap();
485 let state = tables.get_mut(&table_id).unwrap();
486 state
487 .current
488 .multi_column_indexes
489 .insert(entry.canonical_name.clone(), entry);
490 Ok(())
491 }
492
493 pub fn remove_multi_column_index(
495 &self,
496 table_id: TableId,
497 canonical_index_name: &str,
498 ) -> LlkvResult<Option<MultiColumnIndexEntryMeta>> {
499 self.ensure_table_state(table_id)?;
500 let mut tables = self.tables.write().unwrap();
501 let state = tables.get_mut(&table_id).unwrap();
502 Ok(state
503 .current
504 .multi_column_indexes
505 .remove(canonical_index_name))
506 }
507
508 pub fn get_multi_column_index(
510 &self,
511 table_id: TableId,
512 canonical_index_name: &str,
513 ) -> LlkvResult<Option<MultiColumnIndexEntryMeta>> {
514 self.ensure_table_state(table_id)?;
515 let tables = self.tables.read().unwrap();
516 Ok(tables.get(&table_id).and_then(|state| {
517 state
518 .current
519 .multi_column_indexes
520 .get(canonical_index_name)
521 .cloned()
522 }))
523 }
524
525 pub fn register_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
527 self.ensure_table_state(table_id)?;
528
529 {
530 let mut tables = self.tables.write().unwrap();
531 let state = tables.get_mut(&table_id).unwrap();
532 if state.persisted.sort_indexes.contains(&field_id)
533 || state.current.sort_indexes.contains(&field_id)
534 {
535 state.current.sort_indexes.insert(field_id);
536 return Ok(());
537 }
538 }
539
540 if self.field_has_sort_index(table_id, field_id)? {
541 let mut tables = self.tables.write().unwrap();
542 let state = tables.get_mut(&table_id).unwrap();
543 state.persisted.sort_indexes.insert(field_id);
544 state.current.sort_indexes.insert(field_id);
545 return Ok(());
546 }
547
548 let mut tables = self.tables.write().unwrap();
549 let state = tables.get_mut(&table_id).unwrap();
550 state.current.sort_indexes.insert(field_id);
551 Ok(())
552 }
553
554 pub fn unregister_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
556 self.ensure_table_state(table_id)?;
557
558 let mut tables = self.tables.write().unwrap();
559 let state = tables.get_mut(&table_id).unwrap();
560 state.current.sort_indexes.remove(&field_id);
561
562 if !state.persisted.sort_indexes.contains(&field_id) {
563 drop(tables);
564 if self.field_has_sort_index(table_id, field_id)? {
565 let mut tables = self.tables.write().unwrap();
566 let state = tables.get_mut(&table_id).unwrap();
567 state.persisted.sort_indexes.insert(field_id);
568 }
569 }
570
571 Ok(())
572 }
573
574 pub fn update_multi_column_uniques<F, T>(&self, table_id: TableId, f: F) -> LlkvResult<T>
576 where
577 F: FnOnce(&mut Vec<MultiColumnIndexEntryMeta>) -> T,
578 {
579 self.ensure_table_state(table_id)?;
580 let mut tables = self.tables.write().unwrap();
581 let state = tables.get_mut(&table_id).unwrap();
582 let mut uniques: Vec<MultiColumnIndexEntryMeta> = state
583 .current
584 .multi_column_indexes
585 .values()
586 .filter(|entry| entry.unique)
587 .cloned()
588 .collect();
589 let result = f(&mut uniques);
590 state
592 .current
593 .multi_column_indexes
594 .retain(|_, entry| !entry.unique);
595 for unique in uniques {
597 state
598 .current
599 .multi_column_indexes
600 .insert(unique.canonical_name.clone(), unique);
601 }
602 Ok(result)
603 }
604
605 pub fn triggers(&self, table_id: TableId) -> LlkvResult<Vec<TriggerEntryMeta>> {
607 self.ensure_table_state(table_id)?;
608 let tables = self.tables.read().unwrap();
609 let state = tables.get(&table_id).unwrap();
610 Ok(state.current.triggers.values().cloned().collect())
611 }
612
613 pub fn trigger(
615 &self,
616 table_id: TableId,
617 canonical_name: &str,
618 ) -> LlkvResult<Option<TriggerEntryMeta>> {
619 self.ensure_table_state(table_id)?;
620 let tables = self.tables.read().unwrap();
621 let state = tables.get(&table_id).unwrap();
622 Ok(state
623 .current
624 .triggers
625 .get(&canonical_name.to_ascii_lowercase())
626 .cloned())
627 }
628
629 pub fn insert_trigger(&self, table_id: TableId, trigger: TriggerEntryMeta) -> LlkvResult<()> {
631 self.ensure_table_state(table_id)?;
632 let mut tables = self.tables.write().unwrap();
633 let state = tables.get_mut(&table_id).unwrap();
634 state
635 .current
636 .triggers
637 .insert(trigger.canonical_name.clone(), trigger);
638 Ok(())
639 }
640
641 pub fn remove_trigger(&self, table_id: TableId, canonical_name: &str) -> LlkvResult<bool> {
643 self.ensure_table_state(table_id)?;
644 let mut tables = self.tables.write().unwrap();
645 let state = tables.get_mut(&table_id).unwrap();
646 Ok(state
647 .current
648 .triggers
649 .remove(&canonical_name.to_ascii_lowercase())
650 .is_some())
651 }
652
653 pub fn prepare_table_drop(&self, table_id: TableId, column_ids: &[FieldId]) -> LlkvResult<()> {
658 if !column_ids.is_empty() {
659 let _ = self.column_metas(table_id, column_ids)?;
660 } else {
661 self.ensure_table_state(table_id)?;
662 }
663
664 let mut tables = self.tables.write().unwrap();
665 if let Some(state) = tables.get_mut(&table_id) {
666 state.current.table_meta = None;
667 state.current.column_metas.clear();
668 state.current.constraints.clear();
669 state.current.constraint_names.clear();
670 state.current.single_indexes.clear();
671 state.current.multi_column_indexes.clear();
672 state.current.sort_indexes.clear();
673 state.current.triggers.clear();
674 }
675 drop(tables);
676 self.refresh_referencing_index_for_table(table_id);
677 Ok(())
678 }
679
680 pub fn remove_table_state(&self, table_id: TableId) {
682 self.tables.write().unwrap().remove(&table_id);
683 self.referencing_index
684 .write()
685 .unwrap()
686 .remove_child(table_id);
687 }
688
689 pub fn constraint_records(&self, table_id: TableId) -> LlkvResult<Vec<ConstraintRecord>> {
691 self.ensure_table_state(table_id)?;
692 let tables = self.tables.read().unwrap();
693 let state = tables.get(&table_id).unwrap();
694 Ok(state.current.constraints.values().cloned().collect())
695 }
696
697 pub fn constraint_names(
699 &self,
700 table_id: TableId,
701 ) -> LlkvResult<FxHashMap<ConstraintId, String>> {
702 self.ensure_table_state(table_id)?;
703 let tables = self.tables.read().unwrap();
704 let state = tables.get(&table_id).unwrap();
705 Ok(state.current.constraint_names.clone())
706 }
707
708 pub fn constraint_records_by_id(
710 &self,
711 table_id: TableId,
712 constraint_ids: &[ConstraintId],
713 ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
714 self.ensure_table_state(table_id)?;
715 let tables = self.tables.read().unwrap();
716 let state = tables.get(&table_id).unwrap();
717 Ok(constraint_ids
718 .iter()
719 .map(|constraint_id| state.current.constraints.get(constraint_id).cloned())
720 .collect())
721 }
722
723 pub fn put_constraint_records(
725 &self,
726 table_id: TableId,
727 records: &[ConstraintRecord],
728 ) -> LlkvResult<()> {
729 self.ensure_table_state(table_id)?;
730 let mut tables = self.tables.write().unwrap();
731 let state = tables.get_mut(&table_id).unwrap();
732 for record in records {
733 state
734 .current
735 .constraints
736 .insert(record.constraint_id, record.clone());
737 }
738 drop(tables);
739 self.refresh_referencing_index_for_table(table_id);
740 Ok(())
741 }
742
743 pub fn put_constraint_names(
745 &self,
746 table_id: TableId,
747 names: &[(ConstraintId, Option<String>)],
748 ) -> LlkvResult<()> {
749 if names.is_empty() {
750 return Ok(());
751 }
752 self.ensure_table_state(table_id)?;
753 let mut tables = self.tables.write().unwrap();
754 if let Some(state) = tables.get_mut(&table_id) {
755 for (constraint_id, name) in names {
756 if let Some(name) = name {
757 state
758 .current
759 .constraint_names
760 .insert(*constraint_id, name.clone());
761 } else {
762 state.current.constraint_names.remove(constraint_id);
763 }
764 }
765 }
766 Ok(())
767 }
768
769 pub fn constraint_record_map(
771 &self,
772 table_id: TableId,
773 ) -> LlkvResult<FxHashMap<ConstraintId, ConstraintRecord>> {
774 self.ensure_table_state(table_id)?;
775 let tables = self.tables.read().unwrap();
776 let state = tables.get(&table_id).unwrap();
777 Ok(state.current.constraints.clone())
778 }
779
780 pub fn flush_table(&self, table_id: TableId) -> LlkvResult<()> {
782 self.ensure_table_state(table_id)?;
783 let mut tables = self.tables.write().unwrap();
784 let state = tables.get_mut(&table_id).unwrap();
785
786 let catalog = SysCatalog::new(&self.store);
787
788 match (
789 state.current.table_meta.as_ref(),
790 state.persisted.table_meta.as_ref(),
791 ) {
792 (Some(meta), Some(existing)) if meta != existing => {
793 catalog.put_table_meta(meta);
794 state.persisted.table_meta = Some(meta.clone());
795 }
796 (Some(meta), None) => {
797 catalog.put_table_meta(meta);
798 state.persisted.table_meta = Some(meta.clone());
799 }
800 (None, Some(_)) => {
801 catalog.delete_table_meta(table_id)?;
802 state.persisted.table_meta = None;
803 }
804 _ => {}
805 }
806
807 let mut dirty_columns: Vec<(FieldId, ColMeta)> = Vec::new();
808 for (field_id, meta) in &state.current.column_metas {
809 match state.persisted.column_metas.get(field_id) {
810 Some(existing) if existing == meta => {}
811 _ => dirty_columns.push((*field_id, meta.clone())),
812 }
813 }
814 for (field_id, meta) in dirty_columns.iter() {
815 catalog.put_col_meta(table_id, meta);
816 state.persisted.column_metas.insert(*field_id, meta.clone());
817 }
818
819 let removed_columns: Vec<FieldId> = state
820 .persisted
821 .column_metas
822 .keys()
823 .copied()
824 .filter(|field_id| !state.current.column_metas.contains_key(field_id))
825 .collect();
826 if !removed_columns.is_empty() {
827 catalog.delete_col_meta(table_id, &removed_columns)?;
828 for field_id in removed_columns {
829 state.persisted.column_metas.remove(&field_id);
830 }
831 }
832
833 let mut dirty_constraints: Vec<ConstraintRecord> = Vec::new();
834 for (constraint_id, record) in &state.current.constraints {
835 match state.persisted.constraints.get(constraint_id) {
836 Some(existing) if existing == record => {}
837 _ => dirty_constraints.push(record.clone()),
838 }
839 }
840 if !dirty_constraints.is_empty() {
841 catalog.put_constraint_records(table_id, &dirty_constraints)?;
842 for record in dirty_constraints {
843 state
844 .persisted
845 .constraints
846 .insert(record.constraint_id, record);
847 }
848 }
849
850 let removed_constraints: Vec<ConstraintId> = state
851 .persisted
852 .constraints
853 .keys()
854 .copied()
855 .filter(|constraint_id| !state.current.constraints.contains_key(constraint_id))
856 .collect();
857 if !removed_constraints.is_empty() {
858 catalog.delete_constraint_records(table_id, &removed_constraints)?;
859 for constraint_id in removed_constraints {
860 state.persisted.constraints.remove(&constraint_id);
861 }
862 }
863
864 let mut dirty_constraint_names: Vec<(ConstraintId, String)> = Vec::new();
865 for (constraint_id, name) in &state.current.constraint_names {
866 match state.persisted.constraint_names.get(constraint_id) {
867 Some(existing) if existing == name => {}
868 _ => dirty_constraint_names.push((*constraint_id, name.clone())),
869 }
870 }
871 if !dirty_constraint_names.is_empty() {
872 let records: Vec<ConstraintNameRecord> = dirty_constraint_names
873 .iter()
874 .map(|(constraint_id, name)| ConstraintNameRecord {
875 constraint_id: *constraint_id,
876 name: Some(name.clone()),
877 })
878 .collect();
879 catalog.put_constraint_names(table_id, &records)?;
880 for (constraint_id, name) in dirty_constraint_names {
881 state.persisted.constraint_names.insert(constraint_id, name);
882 }
883 }
884
885 let removed_constraint_names: Vec<ConstraintId> = state
886 .persisted
887 .constraint_names
888 .keys()
889 .copied()
890 .filter(|constraint_id| !state.current.constraint_names.contains_key(constraint_id))
891 .collect();
892 if !removed_constraint_names.is_empty() {
893 catalog.delete_constraint_names(table_id, &removed_constraint_names)?;
894 for constraint_id in removed_constraint_names {
895 state.persisted.constraint_names.remove(&constraint_id);
896 }
897 }
898
899 if state.current.multi_column_indexes != state.persisted.multi_column_indexes {
901 if state.current.multi_column_indexes.is_empty() {
902 catalog.delete_multi_column_indexes(table_id)?;
903 } else {
904 let mut entries: Vec<MultiColumnIndexEntryMeta> = state
905 .current
906 .multi_column_indexes
907 .values()
908 .cloned()
909 .collect();
910 entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
911 catalog.put_multi_column_indexes(table_id, &entries)?;
912 }
913 state.persisted.multi_column_indexes = state.current.multi_column_indexes.clone();
914 }
915
916 if state.current.single_indexes != state.persisted.single_indexes {
917 if state.current.single_indexes.is_empty() {
918 catalog.delete_single_column_indexes(table_id)?;
919 state.persisted.single_indexes.clear();
920 } else {
921 let mut entries: Vec<SingleColumnIndexEntryMeta> = state
922 .current
923 .single_indexes
924 .values()
925 .cloned()
926 .map(|entry| SingleColumnIndexEntryMeta {
927 index_name: entry.index_name,
928 canonical_name: entry.canonical_name,
929 column_id: entry.column_id,
930 column_name: entry.column_name,
931 unique: entry.unique,
932 ascending: entry.ascending,
933 nulls_first: entry.nulls_first,
934 })
935 .collect();
936 entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
937 catalog.put_single_column_indexes(table_id, &entries)?;
938 state.persisted.single_indexes = state.current.single_indexes.clone();
939 }
940 }
941
942 if state.current.triggers != state.persisted.triggers {
943 if state.current.triggers.is_empty() {
944 if !state.persisted.triggers.is_empty() {
945 catalog.delete_triggers(table_id)?;
946 }
947 state.persisted.triggers.clear();
948 } else {
949 let mut entries: Vec<TriggerEntryMeta> =
950 state.current.triggers.values().cloned().collect();
951 entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
952 catalog.put_triggers(table_id, &entries)?;
953 state.persisted.triggers = state.current.triggers.clone();
954 }
955 }
956
957 let sort_adds: Vec<FieldId> = state
958 .current
959 .sort_indexes
960 .iter()
961 .copied()
962 .filter(|field_id| !state.persisted.sort_indexes.contains(field_id))
963 .collect();
964 let sort_removes: Vec<FieldId> = state
965 .persisted
966 .sort_indexes
967 .iter()
968 .copied()
969 .filter(|field_id| !state.current.sort_indexes.contains(field_id))
970 .collect();
971 if !sort_adds.is_empty() || !sort_removes.is_empty() {
972 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
973 for field_id in &sort_adds {
974 table.register_sort_index(*field_id)?;
975 state.persisted.sort_indexes.insert(*field_id);
976 }
977 for field_id in &sort_removes {
978 table.unregister_sort_index(*field_id)?;
979 state.persisted.sort_indexes.remove(field_id);
980 }
981 }
982
983 Ok(())
984 }
985
986 pub fn flush_all(&self) -> LlkvResult<()> {
988 let table_ids: Vec<TableId> = {
989 let tables = self.tables.read().unwrap();
990 tables.keys().copied().collect()
991 };
992 for table_id in table_ids {
993 self.flush_table(table_id)?;
994 }
995 Ok(())
996 }
997
998 pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
1000 let catalog = SysCatalog::new(&self.store);
1001 catalog.all_table_metas()
1002 }
1003
1004 pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnIndexMeta>> {
1006 let catalog = SysCatalog::new(&self.store);
1007 let all = catalog.all_multi_column_index_metas()?;
1008 Ok(all
1010 .into_iter()
1011 .map(|mut meta| {
1012 meta.indexes.retain(|idx| idx.unique);
1013 meta
1014 })
1015 .filter(|meta| !meta.indexes.is_empty())
1016 .collect())
1017 }
1018
1019 pub fn foreign_key_descriptors(
1021 &self,
1022 table_id: TableId,
1023 ) -> LlkvResult<Vec<ForeignKeyDescriptor>> {
1024 let records = self.constraint_records(table_id)?;
1025 let mut descriptors = Vec::new();
1026
1027 for record in records {
1028 if !record.is_active() {
1029 continue;
1030 }
1031
1032 let ConstraintKind::ForeignKey(fk) = record.kind else {
1033 continue;
1034 };
1035
1036 descriptors.push(ForeignKeyDescriptor {
1037 constraint_id: record.constraint_id,
1038 referencing_table_id: table_id,
1039 referencing_field_ids: fk.referencing_field_ids.clone(),
1040 referenced_table_id: fk.referenced_table,
1041 referenced_field_ids: fk.referenced_field_ids.clone(),
1042 on_delete: fk.on_delete,
1043 on_update: fk.on_update,
1044 });
1045 }
1046
1047 Ok(descriptors)
1048 }
1049
1050 pub fn foreign_key_views(
1052 &self,
1053 catalog: &TableCatalog,
1054 table_id: TableId,
1055 ) -> LlkvResult<Vec<ForeignKeyView>> {
1056 let descriptors = self.foreign_key_descriptors(table_id)?;
1057
1058 if descriptors.is_empty() {
1059 return Ok(Vec::new());
1060 }
1061
1062 let (referencing_display, referencing_canonical) =
1063 resolve_table_name(catalog, self, table_id)?;
1064
1065 let mut details = Vec::with_capacity(descriptors.len());
1066 for descriptor in descriptors {
1067 let referenced_table_id = descriptor.referenced_table_id;
1068 let (referenced_display, referenced_canonical) =
1069 resolve_table_name(catalog, self, referenced_table_id)?;
1070
1071 let referencing_column_names =
1072 self.column_names(table_id, &descriptor.referencing_field_ids)?;
1073 let referenced_column_names =
1074 self.column_names(referenced_table_id, &descriptor.referenced_field_ids)?;
1075 let constraint_name = self.constraint_name_for(table_id, descriptor.constraint_id)?;
1076
1077 details.push(ForeignKeyView {
1078 constraint_id: descriptor.constraint_id,
1079 constraint_name,
1080 referencing_table_id: descriptor.referencing_table_id,
1081 referencing_table_display: referencing_display.clone(),
1082 referencing_table_canonical: referencing_canonical.clone(),
1083 referencing_field_ids: descriptor.referencing_field_ids.clone(),
1084 referencing_column_names,
1085 referenced_table_id,
1086 referenced_table_display: referenced_display.clone(),
1087 referenced_table_canonical: referenced_canonical.clone(),
1088 referenced_field_ids: descriptor.referenced_field_ids.clone(),
1089 referenced_column_names,
1090 on_delete: descriptor.on_delete,
1091 on_update: descriptor.on_update,
1092 });
1093 }
1094
1095 Ok(details)
1096 }
1097
1098 pub fn table_view(
1100 &self,
1101 catalog: &TableCatalog,
1102 table_id: TableId,
1103 field_ids: &[FieldId],
1104 ) -> LlkvResult<TableView> {
1105 let table_meta = self.table_meta(table_id)?;
1106 let column_metas = self.column_metas(table_id, field_ids)?;
1107 let constraint_records = self.constraint_records(table_id)?;
1108 let multi_column_uniques = self.multi_column_uniques(table_id)?;
1109 let foreign_keys = self.foreign_key_views(catalog, table_id)?;
1110
1111 Ok(TableView {
1112 table_meta,
1113 column_metas,
1114 constraint_records,
1115 multi_column_uniques,
1116 foreign_keys,
1117 })
1118 }
1119
1120 pub fn validate_and_register_foreign_keys<F>(
1122 &self,
1123 referencing_table: &ForeignKeyTableInfo,
1124 specs: &[ForeignKeySpec],
1125 lookup_table: F,
1126 timestamp_micros: u64,
1127 ) -> LlkvResult<Vec<ValidatedForeignKey>>
1128 where
1129 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
1130 {
1131 let validated = validate_foreign_keys(referencing_table, specs, lookup_table)?;
1132 self.register_foreign_keys(referencing_table.table_id, &validated, timestamp_micros)?;
1133 Ok(validated)
1134 }
1135
1136 pub fn register_foreign_keys(
1138 &self,
1139 table_id: TableId,
1140 foreign_keys: &[ValidatedForeignKey],
1141 timestamp_micros: u64,
1142 ) -> LlkvResult<()> {
1143 if foreign_keys.is_empty() {
1144 return Ok(());
1145 }
1146
1147 let existing_constraints = self.constraint_record_map(table_id)?;
1148 let mut next_constraint_id = existing_constraints
1149 .keys()
1150 .copied()
1151 .max()
1152 .unwrap_or(0)
1153 .saturating_add(1);
1154
1155 let mut constraint_records = Vec::with_capacity(foreign_keys.len());
1156 let mut constraint_names: Vec<(ConstraintId, Option<String>)> =
1157 Vec::with_capacity(foreign_keys.len());
1158
1159 for fk in foreign_keys {
1160 let constraint_id = next_constraint_id;
1161 constraint_records.push(ConstraintRecord {
1162 constraint_id,
1163 kind: ConstraintKind::ForeignKey(ForeignKeyConstraint {
1164 referencing_field_ids: fk.referencing_field_ids.clone(),
1165 referenced_table: fk.referenced_table_id,
1166 referenced_field_ids: fk.referenced_field_ids.clone(),
1167 on_delete: fk.on_delete,
1168 on_update: fk.on_update,
1169 }),
1170 state: ConstraintState::Active,
1171 revision: 1,
1172 last_modified_micros: timestamp_micros,
1173 });
1174 constraint_names.push((constraint_id, fk.name.clone()));
1175 next_constraint_id = next_constraint_id.saturating_add(1);
1176 }
1177
1178 self.put_constraint_records(table_id, &constraint_records)?;
1179 self.put_constraint_names(table_id, &constraint_names)?;
1180 self.flush_table(table_id)?;
1181
1182 Ok(())
1183 }
1184
1185 pub fn apply_column_definitions(
1187 &self,
1188 table_id: TableId,
1189 columns: &[TableColumn],
1190 timestamp_micros: u64,
1191 ) -> LlkvResult<()> {
1192 if columns.is_empty() {
1193 return Ok(());
1194 }
1195
1196 self.ensure_table_state(table_id)?;
1197
1198 for column in columns {
1199 let column_meta = ColMeta {
1200 col_id: column.field_id,
1201 name: Some(column.name.clone()),
1202 flags: 0,
1203 default: None,
1204 };
1205 self.set_column_meta(table_id, column_meta)?;
1206 }
1207
1208 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1209 let store = table.store();
1210
1211 for column in columns {
1212 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1213 store.ensure_column_registered(logical_field_id, &column.data_type)?;
1214 store.data_type(logical_field_id)?;
1215 }
1216
1217 let created_by_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
1218 store.ensure_column_registered(created_by_lfid, &DataType::UInt64)?;
1219
1220 let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
1221 store.ensure_column_registered(deleted_by_lfid, &DataType::UInt64)?;
1222
1223 let existing = self.constraint_record_map(table_id)?;
1224 let mut next_constraint_id = existing
1225 .keys()
1226 .copied()
1227 .max()
1228 .unwrap_or(0)
1229 .saturating_add(1);
1230
1231 let mut constraints = Vec::new();
1232
1233 let primary_key_fields: Vec<FieldId> = columns
1234 .iter()
1235 .filter(|col| col.primary_key)
1236 .map(|col| col.field_id)
1237 .collect();
1238 if !primary_key_fields.is_empty() {
1239 constraints.push(ConstraintRecord {
1240 constraint_id: next_constraint_id,
1241 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1242 field_ids: primary_key_fields,
1243 }),
1244 state: ConstraintState::Active,
1245 revision: 1,
1246 last_modified_micros: timestamp_micros,
1247 });
1248 next_constraint_id = next_constraint_id.saturating_add(1);
1249 }
1250
1251 for column in columns.iter().filter(|col| col.unique && !col.primary_key) {
1252 constraints.push(ConstraintRecord {
1253 constraint_id: next_constraint_id,
1254 kind: ConstraintKind::Unique(UniqueConstraint {
1255 field_ids: vec![column.field_id],
1256 }),
1257 state: ConstraintState::Active,
1258 revision: 1,
1259 last_modified_micros: timestamp_micros,
1260 });
1261 next_constraint_id = next_constraint_id.saturating_add(1);
1262 }
1263
1264 if !constraints.is_empty() {
1265 self.put_constraint_records(table_id, &constraints)?;
1266 }
1267
1268 Ok(())
1269 }
1270
1271 pub fn column_data_type(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<DataType> {
1272 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1273 let store = table.store();
1274 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
1275 store.data_type(logical_field_id)
1276 }
1277
1278 pub fn register_multi_column_unique(
1280 &self,
1281 table_id: TableId,
1282 column_ids: &[FieldId],
1283 index_name: Option<String>,
1284 ) -> LlkvResult<MultiColumnUniqueRegistration> {
1285 let mut created = false;
1286 let mut existing_name: Option<Option<String>> = None;
1287 let column_vec: Vec<FieldId> = column_ids.to_vec();
1288
1289 let canonical_name = format!(
1291 "__unique_{}_{}",
1292 table_id,
1293 column_vec
1294 .iter()
1295 .map(|id| id.to_string())
1296 .collect::<Vec<_>>()
1297 .join("_")
1298 );
1299
1300 self.update_multi_column_uniques(table_id, |entries| {
1301 if let Some(existing) = entries.iter().find(|entry| entry.column_ids == column_vec) {
1302 existing_name = Some(existing.index_name.clone());
1303 } else {
1304 entries.push(MultiColumnIndexEntryMeta {
1305 index_name: index_name.clone(),
1306 canonical_name: canonical_name.clone(),
1307 column_ids: column_vec.clone(),
1308 unique: true,
1309 });
1310 created = true;
1311 }
1312 })?;
1313
1314 if created {
1315 Ok(MultiColumnUniqueRegistration::Created)
1316 } else {
1317 Ok(MultiColumnUniqueRegistration::AlreadyExists {
1318 index_name: existing_name.unwrap_or(None),
1319 })
1320 }
1321 }
1322
1323 fn column_names(&self, table_id: TableId, field_ids: &[FieldId]) -> LlkvResult<Vec<String>> {
1324 if field_ids.is_empty() {
1325 return Ok(Vec::new());
1326 }
1327
1328 let metas = self.column_metas(table_id, field_ids)?;
1329 let mut names = Vec::with_capacity(field_ids.len());
1330 for (idx, field_id) in field_ids.iter().enumerate() {
1331 let name = metas
1332 .get(idx)
1333 .and_then(|meta| meta.as_ref())
1334 .and_then(|meta| meta.name.clone())
1335 .unwrap_or_else(|| format!("col_{}", field_id));
1336 names.push(name);
1337 }
1338 Ok(names)
1339 }
1340
1341 pub fn reserve_table_id(&self) -> LlkvResult<TableId> {
1343 let catalog = SysCatalog::new(&self.store);
1344
1345 let mut next = match catalog.get_next_table_id()? {
1346 Some(value) => value,
1347 None => {
1348 let seed = catalog
1349 .max_table_id()?
1350 .unwrap_or(reserved::CATALOG_TABLE_ID);
1351 let initial = seed.checked_add(1).ok_or_else(|| {
1352 Error::InvalidArgumentError("exhausted available table ids".into())
1353 })?;
1354 catalog.put_next_table_id(initial)?;
1355 initial
1356 }
1357 };
1358
1359 while reserved::is_reserved_table_id(next) {
1360 next = next.checked_add(1).ok_or_else(|| {
1361 Error::InvalidArgumentError("exhausted available table ids".into())
1362 })?;
1363 }
1364
1365 let mut following = next
1366 .checked_add(1)
1367 .ok_or_else(|| Error::InvalidArgumentError("exhausted available table ids".into()))?;
1368
1369 while reserved::is_reserved_table_id(following) {
1370 following = following.checked_add(1).ok_or_else(|| {
1371 Error::InvalidArgumentError("exhausted available table ids".into())
1372 })?;
1373 }
1374
1375 catalog.put_next_table_id(following)?;
1376 Ok(next)
1377 }
1378
1379 pub fn ensure_next_table_id_at_least(&self, minimum: TableId) -> LlkvResult<()> {
1386 if reserved::is_reserved_table_id(minimum) {
1387 return Err(Error::InvalidArgumentError(
1388 reserved::reserved_table_id_message(minimum),
1389 ));
1390 }
1391
1392 let catalog = SysCatalog::new(&self.store);
1393 match catalog.get_next_table_id()? {
1394 Some(current) if current >= minimum => Ok(()),
1395 _ => catalog.put_next_table_id(minimum),
1396 }
1397 }
1398
1399 fn field_has_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<bool> {
1406 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1407 let indexes = table.list_registered_indexes(field_id)?;
1408 Ok(indexes.contains(&IndexKind::Sort))
1409 }
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414 use super::*;
1415 use crate::constraints::{ConstraintKind, ConstraintState, PrimaryKeyConstraint};
1416 use crate::{MultiColumnIndexEntryMeta, Table};
1417 use llkv_column_map::ColumnStore;
1418 use llkv_column_map::store::IndexKind;
1419 use llkv_storage::pager::MemPager;
1420 use std::sync::Arc;
1421
1422 #[test]
1423 fn metadata_manager_persists_and_loads() {
1424 let pager = Arc::new(MemPager::default());
1425 let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1426 let manager = MetadataManager::new(Arc::clone(&store));
1427
1428 let table_id: TableId = 42;
1429 let table_meta = TableMeta {
1430 table_id,
1431 name: Some("users".into()),
1432 created_at_micros: 123,
1433 flags: 0,
1434 epoch: 1,
1435 view_definition: None,
1436 };
1437 manager
1438 .set_table_meta(table_id, table_meta.clone())
1439 .unwrap();
1440
1441 {
1442 let tables = manager.tables.read().unwrap();
1443 let state = tables.get(&table_id).unwrap();
1444 assert!(state.current.table_meta.is_some());
1445 }
1446
1447 let column_meta = ColMeta {
1448 col_id: 1,
1449 name: Some("id".into()),
1450 flags: 0,
1451 default: None,
1452 };
1453 manager
1454 .set_column_meta(table_id, column_meta.clone())
1455 .unwrap();
1456
1457 let logical_field_id = llkv_types::LogicalFieldId::for_user(table_id, column_meta.col_id);
1458 store
1459 .ensure_column_registered(logical_field_id, &arrow::datatypes::DataType::Utf8)
1460 .unwrap();
1461
1462 manager
1463 .register_sort_index(table_id, column_meta.col_id)
1464 .unwrap();
1465
1466 let constraint = ConstraintRecord {
1467 constraint_id: 7,
1468 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1469 field_ids: vec![column_meta.col_id],
1470 }),
1471 state: ConstraintState::Active,
1472 revision: 1,
1473 last_modified_micros: 456,
1474 };
1475 manager
1476 .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1477 .unwrap();
1478
1479 let multi_unique = MultiColumnIndexEntryMeta {
1480 index_name: Some("uniq_users_name".into()),
1481 canonical_name: "uniq_users_name".to_lowercase(),
1482 column_ids: vec![column_meta.col_id],
1483 unique: true,
1484 };
1485 manager
1486 .set_multi_column_uniques(table_id, vec![multi_unique.clone()])
1487 .unwrap();
1488
1489 assert_eq!(
1490 manager.table_meta(table_id).unwrap(),
1491 Some(table_meta.clone())
1492 );
1493
1494 manager.flush_table(table_id).unwrap();
1495
1496 let table = Table::from_id_and_store(table_id, Arc::clone(&store)).unwrap();
1497 let indexes = table.list_registered_indexes(column_meta.col_id).unwrap();
1498 assert!(indexes.contains(&IndexKind::Sort));
1499
1500 let verify_catalog = SysCatalog::new(&store);
1501 let column_roundtrip = verify_catalog.get_cols_meta(table_id, &[column_meta.col_id]);
1502 assert_eq!(column_roundtrip[0].as_ref(), Some(&column_meta));
1503 let constraints = verify_catalog
1504 .constraint_records_for_table(table_id)
1505 .unwrap();
1506 assert_eq!(constraints, vec![constraint.clone()]);
1507 let unique_roundtrip = verify_catalog.get_multi_column_indexes(table_id).unwrap();
1508 assert_eq!(unique_roundtrip, vec![multi_unique.clone()]);
1509
1510 let meta_from_cache = manager.table_meta(table_id).unwrap();
1511 assert_eq!(meta_from_cache, Some(table_meta.clone()));
1512
1513 let columns_from_cache = manager
1514 .column_metas(table_id, &[column_meta.col_id])
1515 .unwrap();
1516 assert_eq!(columns_from_cache[0].as_ref(), Some(&column_meta));
1517
1518 let constraints_from_cache = manager.constraint_records(table_id).unwrap();
1519 assert_eq!(constraints_from_cache, vec![constraint.clone()]);
1520
1521 let uniques_from_cache = manager.multi_column_uniques(table_id).unwrap();
1522 assert_eq!(uniques_from_cache, vec![multi_unique]);
1523
1524 manager.flush_table(table_id).unwrap();
1526 }
1527
1528 #[test]
1529 fn metadata_manager_lazy_loads_columns_and_constraints() {
1530 let pager = Arc::new(MemPager::default());
1531 let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1532 let manager = MetadataManager::new(Arc::clone(&store));
1533
1534 let table_id: TableId = 99;
1535 let column_meta = ColMeta {
1536 col_id: 3,
1537 name: Some("value".into()),
1538 flags: 0,
1539 default: None,
1540 };
1541 let initial_catalog = SysCatalog::new(&store);
1542 initial_catalog.put_col_meta(table_id, &column_meta);
1543
1544 let constraint = ConstraintRecord {
1545 constraint_id: 15,
1546 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1547 field_ids: vec![column_meta.col_id],
1548 }),
1549 state: ConstraintState::Active,
1550 revision: 1,
1551 last_modified_micros: 0,
1552 };
1553 initial_catalog
1554 .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1555 .unwrap();
1556 let multi_unique = MultiColumnIndexEntryMeta {
1557 index_name: Some("uniq_value".into()),
1558 canonical_name: "uniq_value".to_lowercase(),
1559 column_ids: vec![column_meta.col_id],
1560 unique: true,
1561 };
1562 initial_catalog
1563 .put_multi_column_indexes(table_id, std::slice::from_ref(&multi_unique))
1564 .unwrap();
1565
1566 let columns = manager
1567 .column_metas(table_id, &[column_meta.col_id])
1568 .unwrap();
1569 assert_eq!(columns[0].as_ref(), Some(&column_meta));
1570
1571 let constraints = manager.constraint_records(table_id).unwrap();
1572 assert_eq!(constraints, vec![constraint]);
1573
1574 let uniques = manager.multi_column_uniques(table_id).unwrap();
1575 assert_eq!(uniques, vec![multi_unique]);
1576 }
1577}
1578
1579#[derive(Clone, Debug)]
1581pub struct ForeignKeyDescriptor {
1582 pub constraint_id: ConstraintId,
1583 pub referencing_table_id: TableId,
1584 pub referencing_field_ids: Vec<FieldId>,
1585 pub referenced_table_id: TableId,
1586 pub referenced_field_ids: Vec<FieldId>,
1587 pub on_delete: ForeignKeyAction,
1588 pub on_update: ForeignKeyAction,
1589}
1590
1591#[derive(Debug, Clone, PartialEq, Eq)]
1593pub enum MultiColumnUniqueRegistration {
1594 Created,
1595 AlreadyExists { index_name: Option<String> },
1596}