llkv_table/
metadata.rs

1//! Shared metadata manager that consolidates catalog I/O for tables, columns, and constraints.
2//!
3//! This module offers a single entry point for querying and mutating persisted metadata. It keeps
4//! an in-memory snapshot per table, performs diff-aware persistence, and always uses batch catalog
5//! operations to minimise I/O.
6
7#![forbid(unsafe_code)]
8
9use crate::catalog::TableCatalog;
10use crate::constraints::{
11    ConstraintId, ConstraintKind, ConstraintRecord, ConstraintState, ForeignKeyAction,
12    ForeignKeyConstraint, PrimaryKeyConstraint, UniqueConstraint,
13};
14use crate::constraints::{ForeignKeyTableInfo, ValidatedForeignKey, validate_foreign_keys};
15use crate::reserved;
16use crate::resolvers::resolve_table_name;
17use crate::sys_catalog::{ConstraintNameRecord, SysCatalog};
18use crate::sys_catalog::{MultiColumnIndexEntryMeta, SingleColumnIndexEntryMeta};
19use crate::table::Table;
20use crate::types::{FieldId, TableColumn, TableId};
21use crate::view::{ForeignKeyView, TableView};
22use crate::{ColMeta, TableMeta, TableMultiColumnIndexMeta};
23use arrow::datatypes::DataType;
24use llkv_column_map::ColumnStore;
25use llkv_column_map::store::IndexKind;
26use llkv_column_map::types::LogicalFieldId;
27use llkv_plan::ForeignKeySpec;
28use llkv_result::{Error, Result as LlkvResult};
29use llkv_storage::pager::Pager;
30use rustc_hash::{FxHashMap, FxHashSet};
31use simd_r_drive_entry_handle::EntryHandle;
32use std::sync::{Arc, RwLock};
33
34#[derive(Clone, Debug, Default, PartialEq, Eq)]
35pub struct SingleColumnIndexEntry {
36    pub index_name: String,
37    pub canonical_name: String,
38    pub column_id: FieldId,
39    pub column_name: String,
40    pub unique: bool,
41    pub ascending: bool,
42    pub nulls_first: bool,
43}
44
45#[derive(Clone, Debug, Default)]
46struct TableSnapshot {
47    table_meta: Option<TableMeta>,
48    column_metas: FxHashMap<FieldId, ColMeta>,
49    constraints: FxHashMap<ConstraintId, ConstraintRecord>,
50    constraint_names: FxHashMap<ConstraintId, String>,
51    single_indexes: FxHashMap<String, SingleColumnIndexEntry>,
52    multi_column_indexes: FxHashMap<String, MultiColumnIndexEntryMeta>,
53    sort_indexes: FxHashSet<FieldId>,
54}
55
56impl TableSnapshot {
57    fn new(
58        table_meta: Option<TableMeta>,
59        column_metas: FxHashMap<FieldId, ColMeta>,
60        constraints: FxHashMap<ConstraintId, ConstraintRecord>,
61        constraint_names: FxHashMap<ConstraintId, String>,
62        single_indexes: FxHashMap<String, SingleColumnIndexEntry>,
63        multi_column_indexes: FxHashMap<String, MultiColumnIndexEntryMeta>,
64        sort_indexes: FxHashSet<FieldId>,
65    ) -> Self {
66        Self {
67            table_meta,
68            column_metas,
69            constraints,
70            constraint_names,
71            single_indexes,
72            multi_column_indexes,
73            sort_indexes,
74        }
75    }
76}
77
78#[derive(Clone, Debug)]
79struct TableState {
80    current: TableSnapshot,
81    persisted: TableSnapshot,
82}
83
84impl TableState {
85    fn from_snapshot(snapshot: TableSnapshot) -> Self {
86        Self {
87            current: snapshot.clone(),
88            persisted: snapshot,
89        }
90    }
91}
92
93#[derive(Default)]
94struct ReferencingIndex {
95    parent_to_children: FxHashMap<TableId, FxHashSet<(TableId, ConstraintId)>>,
96    child_to_parents: FxHashMap<TableId, FxHashSet<TableId>>,
97    initialized: bool,
98}
99
100impl ReferencingIndex {
101    fn remove_child(&mut self, child_id: TableId) {
102        if let Some(parents) = self.child_to_parents.remove(&child_id) {
103            for parent_id in parents {
104                if let Some(children) = self.parent_to_children.get_mut(&parent_id) {
105                    children.retain(|(entry_child, _)| *entry_child != child_id);
106                    if children.is_empty() {
107                        self.parent_to_children.remove(&parent_id);
108                    }
109                }
110            }
111        }
112    }
113
114    fn insert(&mut self, parent_id: TableId, child_id: TableId, constraint_id: ConstraintId) {
115        self.parent_to_children
116            .entry(parent_id)
117            .or_default()
118            .insert((child_id, constraint_id));
119        self.child_to_parents
120            .entry(child_id)
121            .or_default()
122            .insert(parent_id);
123        self.initialized = true;
124    }
125
126    fn children(&self, parent_id: TableId) -> Vec<(TableId, ConstraintId)> {
127        self.parent_to_children
128            .get(&parent_id)
129            .map(|set| set.iter().cloned().collect())
130            .unwrap_or_default()
131    }
132
133    fn mark_initialized(&mut self) {
134        self.initialized = true;
135    }
136
137    fn is_initialized(&self) -> bool {
138        self.initialized
139    }
140}
141
142/// Central metadata facade that hides the raw catalog implementation details.
143pub struct MetadataManager<P>
144where
145    P: Pager<Blob = EntryHandle> + Send + Sync,
146{
147    store: Arc<ColumnStore<P>>,
148    tables: RwLock<FxHashMap<TableId, TableState>>,
149    referencing_index: RwLock<ReferencingIndex>,
150}
151
152impl<P> MetadataManager<P>
153where
154    P: Pager<Blob = EntryHandle> + Send + Sync,
155{
156    /// Create a new metadata manager backed by the provided column store.
157    pub fn new(store: Arc<ColumnStore<P>>) -> Self {
158        Self {
159            store,
160            tables: RwLock::new(FxHashMap::default()),
161            referencing_index: RwLock::new(ReferencingIndex::default()),
162        }
163    }
164
165    /// Load metadata for a table from the catalog if not already cached.
166    fn ensure_table_state(&self, table_id: TableId) -> LlkvResult<()> {
167        if self.tables.read().unwrap().contains_key(&table_id) {
168            return Ok(());
169        }
170        let state = self.load_table_state(table_id)?;
171        {
172            let mut tables = self.tables.write().unwrap();
173            tables.entry(table_id).or_insert(state);
174        }
175        self.refresh_referencing_index_for_table(table_id);
176        Ok(())
177    }
178
179    fn load_table_state(&self, table_id: TableId) -> LlkvResult<TableState> {
180        let catalog = SysCatalog::new(&self.store);
181        let table_meta = catalog.get_table_meta(table_id);
182        let constraint_records = catalog.constraint_records_for_table(table_id)?;
183        let constraint_ids: Vec<ConstraintId> = constraint_records
184            .iter()
185            .map(|record| record.constraint_id)
186            .collect();
187        let constraint_name_entries = if constraint_ids.is_empty() {
188            Vec::new()
189        } else {
190            catalog.get_constraint_names(table_id, &constraint_ids)?
191        };
192        let multi_uniques = catalog.get_multi_column_indexes(table_id)?;
193        let single_index_metas = catalog.get_single_column_indexes(table_id)?;
194        let multi_column_index_metas = catalog.get_multi_column_indexes(table_id)?;
195        let mut constraints = FxHashMap::default();
196        let mut constraint_names = FxHashMap::default();
197        let mut single_indexes = FxHashMap::default();
198        let mut multi_column_indexes = FxHashMap::default();
199        let mut sort_indexes = FxHashSet::default();
200        for meta in single_index_metas {
201            sort_indexes.insert(meta.column_id);
202            single_indexes.insert(
203                meta.canonical_name.clone(),
204                SingleColumnIndexEntry {
205                    index_name: meta.index_name,
206                    canonical_name: meta.canonical_name,
207                    column_id: meta.column_id,
208                    column_name: meta.column_name,
209                    unique: meta.unique,
210                    ascending: meta.ascending,
211                    nulls_first: meta.nulls_first,
212                },
213            );
214        }
215        for meta in multi_uniques {
216            multi_column_indexes.insert(meta.canonical_name.clone(), meta);
217        }
218        for meta in multi_column_index_metas {
219            multi_column_indexes.insert(meta.canonical_name.clone(), meta);
220        }
221        for (record, name) in constraint_records
222            .into_iter()
223            .zip(constraint_name_entries.into_iter())
224        {
225            if let Some(name) = name {
226                constraint_names.insert(record.constraint_id, name);
227            }
228            constraints.insert(record.constraint_id, record);
229        }
230        let snapshot = TableSnapshot::new(
231            table_meta,
232            FxHashMap::default(),
233            constraints,
234            constraint_names,
235            single_indexes,
236            multi_column_indexes,
237            sort_indexes,
238        );
239        Ok(TableState::from_snapshot(snapshot))
240    }
241
242    fn refresh_referencing_index_for_table(&self, table_id: TableId) {
243        let foreign_keys: Vec<(TableId, ConstraintId)> = {
244            let tables = self.tables.read().unwrap();
245            match tables.get(&table_id) {
246                Some(state) => state
247                    .current
248                    .constraints
249                    .iter()
250                    .filter(|(_, record)| record.is_active())
251                    .filter_map(|(constraint_id, record)| {
252                        if let ConstraintKind::ForeignKey(fk) = &record.kind {
253                            Some((fk.referenced_table, *constraint_id))
254                        } else {
255                            None
256                        }
257                    })
258                    .collect(),
259                None => Vec::new(),
260            }
261        };
262
263        let mut index = self.referencing_index.write().unwrap();
264        index.remove_child(table_id);
265        for (parent_table, constraint_id) in foreign_keys {
266            index.insert(parent_table, table_id, constraint_id);
267        }
268    }
269
270    fn constraint_name_for(
271        &self,
272        table_id: TableId,
273        constraint_id: ConstraintId,
274    ) -> LlkvResult<Option<String>> {
275        self.ensure_table_state(table_id)?;
276        let tables = self.tables.read().unwrap();
277        let state = tables.get(&table_id).unwrap();
278        Ok(state.current.constraint_names.get(&constraint_id).cloned())
279    }
280
281    fn ensure_referencing_index_initialized(&self) -> LlkvResult<()> {
282        let needs_init = {
283            let index = self.referencing_index.read().unwrap();
284            !index.is_initialized()
285        };
286
287        if !needs_init {
288            return Ok(());
289        }
290
291        let metas = self.all_table_metas()?;
292        for (table_id, _) in metas {
293            self.ensure_table_state(table_id)?;
294            self.refresh_referencing_index_for_table(table_id);
295        }
296
297        let mut index = self.referencing_index.write().unwrap();
298        index.mark_initialized();
299        Ok(())
300    }
301
302    /// Retrieve the current table metadata snapshot (loaded lazily if required).
303    pub fn table_meta(&self, table_id: TableId) -> LlkvResult<Option<TableMeta>> {
304        self.ensure_table_state(table_id)?;
305        let tables = self.tables.read().unwrap();
306        Ok(tables
307            .get(&table_id)
308            .and_then(|state| state.current.table_meta.clone()))
309    }
310
311    /// Return the list of child table + constraint identifiers that reference the provided table.
312    pub fn foreign_keys_referencing(
313        &self,
314        referenced_table: TableId,
315    ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
316        self.ensure_referencing_index_initialized()?;
317        let index = self.referencing_index.read().unwrap();
318        Ok(index.children(referenced_table))
319    }
320
321    /// Update the in-memory table metadata. Changes are flushed on demand.
322    pub fn set_table_meta(&self, table_id: TableId, meta: TableMeta) -> LlkvResult<()> {
323        self.ensure_table_state(table_id)?;
324        let mut tables = self.tables.write().unwrap();
325        let state = tables.get_mut(&table_id).unwrap();
326        state.current.table_meta = Some(meta);
327        Ok(())
328    }
329
330    /// Fetch column metadata for the requested field identifiers, loading missing entries lazily.
331    pub fn column_metas(
332        &self,
333        table_id: TableId,
334        field_ids: &[FieldId],
335    ) -> LlkvResult<Vec<Option<ColMeta>>> {
336        self.ensure_table_state(table_id)?;
337
338        // Determine which columns still need to be loaded from the catalog.
339        let missing_ids = {
340            let tables = self.tables.read().unwrap();
341            let state = tables.get(&table_id).unwrap();
342            field_ids
343                .iter()
344                .copied()
345                .filter(|field_id| !state.current.column_metas.contains_key(field_id))
346                .collect::<Vec<_>>()
347        };
348
349        if !missing_ids.is_empty() {
350            let catalog = SysCatalog::new(&self.store);
351            let fetched = catalog.get_cols_meta(table_id, &missing_ids);
352            let mut tables = self.tables.write().unwrap();
353            let state = tables.get_mut(&table_id).unwrap();
354            for (idx, field_id) in missing_ids.iter().enumerate() {
355                if let Some(meta) = fetched[idx].clone() {
356                    state.current.column_metas.insert(*field_id, meta.clone());
357                    state
358                        .persisted
359                        .column_metas
360                        .entry(*field_id)
361                        .or_insert(meta);
362                }
363            }
364        }
365
366        let tables = self.tables.read().unwrap();
367        let state = tables.get(&table_id).unwrap();
368        Ok(field_ids
369            .iter()
370            .map(|field_id| state.current.column_metas.get(field_id).cloned())
371            .collect())
372    }
373
374    /// Upsert a single column metadata record in the in-memory snapshot.
375    pub fn set_column_meta(&self, table_id: TableId, meta: ColMeta) -> LlkvResult<()> {
376        self.ensure_table_state(table_id)?;
377        let mut tables = self.tables.write().unwrap();
378        let state = tables.get_mut(&table_id).unwrap();
379        state.current.column_metas.insert(meta.col_id, meta);
380        Ok(())
381    }
382
383    /// Return the multi-column UNIQUE definitions cached for the table.
384    pub fn multi_column_uniques(
385        &self,
386        table_id: TableId,
387    ) -> LlkvResult<Vec<MultiColumnIndexEntryMeta>> {
388        self.ensure_table_state(table_id)?;
389        let tables = self.tables.read().unwrap();
390        let state = tables.get(&table_id).unwrap();
391        Ok(state
392            .current
393            .multi_column_indexes
394            .values()
395            .filter(|entry| entry.unique)
396            .cloned()
397            .collect())
398    }
399
400    /// Replace the cached multi-column UNIQUE definitions for the table.
401    pub fn set_multi_column_uniques(
402        &self,
403        table_id: TableId,
404        uniques: Vec<MultiColumnIndexEntryMeta>,
405    ) -> LlkvResult<()> {
406        self.ensure_table_state(table_id)?;
407        let mut tables = self.tables.write().unwrap();
408        let state = tables.get_mut(&table_id).unwrap();
409        // Remove all existing unique multi-column indexes
410        state
411            .current
412            .multi_column_indexes
413            .retain(|_, entry| !entry.unique);
414        // Add the new unique indexes
415        for unique in uniques {
416            state
417                .current
418                .multi_column_indexes
419                .insert(unique.canonical_name.clone(), unique);
420        }
421        Ok(())
422    }
423
424    /// Return the named single-column indexes registered for a table.
425    pub fn single_column_indexes(
426        &self,
427        table_id: TableId,
428    ) -> LlkvResult<Vec<SingleColumnIndexEntry>> {
429        self.ensure_table_state(table_id)?;
430        let tables = self.tables.read().unwrap();
431        let state = tables.get(&table_id).unwrap();
432        Ok(state.current.single_indexes.values().cloned().collect())
433    }
434
435    /// Lookup a single-column index by canonical name.
436    pub fn single_column_index(
437        &self,
438        table_id: TableId,
439        canonical_index_name: &str,
440    ) -> LlkvResult<Option<SingleColumnIndexEntry>> {
441        self.ensure_table_state(table_id)?;
442        let tables = self.tables.read().unwrap();
443        let state = tables.get(&table_id).unwrap();
444        Ok(state
445            .current
446            .single_indexes
447            .get(canonical_index_name)
448            .cloned())
449    }
450
451    /// Register or replace a single-column index metadata entry in the cached snapshot.
452    pub fn put_single_column_index(
453        &self,
454        table_id: TableId,
455        entry: SingleColumnIndexEntry,
456    ) -> LlkvResult<()> {
457        self.ensure_table_state(table_id)?;
458        let mut tables = self.tables.write().unwrap();
459        let state = tables.get_mut(&table_id).unwrap();
460        let column_id = entry.column_id;
461        state
462            .current
463            .single_indexes
464            .insert(entry.canonical_name.clone(), entry);
465        state.current.sort_indexes.insert(column_id);
466        Ok(())
467    }
468
469    /// Remove a single-column index metadata entry from the cached snapshot.
470    pub fn remove_single_column_index(
471        &self,
472        table_id: TableId,
473        canonical_index_name: &str,
474    ) -> LlkvResult<Option<SingleColumnIndexEntry>> {
475        self.ensure_table_state(table_id)?;
476        let mut tables = self.tables.write().unwrap();
477        let state = tables.get_mut(&table_id).unwrap();
478        let removed = state.current.single_indexes.remove(canonical_index_name);
479        if let Some(ref entry) = removed {
480            let still_indexed = state
481                .current
482                .single_indexes
483                .values()
484                .any(|existing| existing.column_id == entry.column_id);
485            if !still_indexed {
486                state.current.sort_indexes.remove(&entry.column_id);
487            }
488        }
489        Ok(removed)
490    }
491
492    /// Register or replace a multi-column index metadata entry in the cached snapshot.
493    pub fn put_multi_column_index(
494        &self,
495        table_id: TableId,
496        entry: MultiColumnIndexEntryMeta,
497    ) -> LlkvResult<()> {
498        self.ensure_table_state(table_id)?;
499        let mut tables = self.tables.write().unwrap();
500        let state = tables.get_mut(&table_id).unwrap();
501        state
502            .current
503            .multi_column_indexes
504            .insert(entry.canonical_name.clone(), entry);
505        Ok(())
506    }
507
508    /// Remove a multi-column index metadata entry from the cached snapshot.
509    pub fn remove_multi_column_index(
510        &self,
511        table_id: TableId,
512        canonical_index_name: &str,
513    ) -> LlkvResult<Option<MultiColumnIndexEntryMeta>> {
514        self.ensure_table_state(table_id)?;
515        let mut tables = self.tables.write().unwrap();
516        let state = tables.get_mut(&table_id).unwrap();
517        Ok(state
518            .current
519            .multi_column_indexes
520            .remove(canonical_index_name))
521    }
522
523    /// Retrieve a multi-column index metadata entry by canonical name.
524    pub fn get_multi_column_index(
525        &self,
526        table_id: TableId,
527        canonical_index_name: &str,
528    ) -> LlkvResult<Option<MultiColumnIndexEntryMeta>> {
529        self.ensure_table_state(table_id)?;
530        let tables = self.tables.read().unwrap();
531        Ok(tables.get(&table_id).and_then(|state| {
532            state
533                .current
534                .multi_column_indexes
535                .get(canonical_index_name)
536                .cloned()
537        }))
538    }
539
540    /// Register a sort index for a column at the metadata level, staging the change for the next flush.
541    pub fn register_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
542        self.ensure_table_state(table_id)?;
543
544        {
545            let mut tables = self.tables.write().unwrap();
546            let state = tables.get_mut(&table_id).unwrap();
547            if state.persisted.sort_indexes.contains(&field_id)
548                || state.current.sort_indexes.contains(&field_id)
549            {
550                state.current.sort_indexes.insert(field_id);
551                return Ok(());
552            }
553        }
554
555        if self.field_has_sort_index(table_id, field_id)? {
556            let mut tables = self.tables.write().unwrap();
557            let state = tables.get_mut(&table_id).unwrap();
558            state.persisted.sort_indexes.insert(field_id);
559            state.current.sort_indexes.insert(field_id);
560            return Ok(());
561        }
562
563        let mut tables = self.tables.write().unwrap();
564        let state = tables.get_mut(&table_id).unwrap();
565        state.current.sort_indexes.insert(field_id);
566        Ok(())
567    }
568
569    /// Unregister a sort index for a column, staging removal for the next flush.
570    pub fn unregister_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
571        self.ensure_table_state(table_id)?;
572
573        let mut tables = self.tables.write().unwrap();
574        let state = tables.get_mut(&table_id).unwrap();
575        state.current.sort_indexes.remove(&field_id);
576
577        if !state.persisted.sort_indexes.contains(&field_id) {
578            drop(tables);
579            if self.field_has_sort_index(table_id, field_id)? {
580                let mut tables = self.tables.write().unwrap();
581                let state = tables.get_mut(&table_id).unwrap();
582                state.persisted.sort_indexes.insert(field_id);
583            }
584        }
585
586        Ok(())
587    }
588
589    /// Mutate the cached multi-column UNIQUE definitions for a table in-place.
590    pub fn update_multi_column_uniques<F, T>(&self, table_id: TableId, f: F) -> LlkvResult<T>
591    where
592        F: FnOnce(&mut Vec<MultiColumnIndexEntryMeta>) -> T,
593    {
594        self.ensure_table_state(table_id)?;
595        let mut tables = self.tables.write().unwrap();
596        let state = tables.get_mut(&table_id).unwrap();
597        let mut uniques: Vec<MultiColumnIndexEntryMeta> = state
598            .current
599            .multi_column_indexes
600            .values()
601            .filter(|entry| entry.unique)
602            .cloned()
603            .collect();
604        let result = f(&mut uniques);
605        // Remove all existing unique multi-column indexes
606        state
607            .current
608            .multi_column_indexes
609            .retain(|_, entry| !entry.unique);
610        // Add back the modified unique indexes
611        for unique in uniques {
612            state
613                .current
614                .multi_column_indexes
615                .insert(unique.canonical_name.clone(), unique);
616        }
617        Ok(result)
618    }
619
620    /// Prepare the metadata state for dropping a table by clearing cached entries.
621    ///
622    /// Column metadata is loaded eagerly for the provided field identifiers so deletions
623    /// are persisted on the next flush.
624    pub fn prepare_table_drop(&self, table_id: TableId, column_ids: &[FieldId]) -> LlkvResult<()> {
625        if !column_ids.is_empty() {
626            let _ = self.column_metas(table_id, column_ids)?;
627        } else {
628            self.ensure_table_state(table_id)?;
629        }
630
631        let mut tables = self.tables.write().unwrap();
632        if let Some(state) = tables.get_mut(&table_id) {
633            state.current.table_meta = None;
634            state.current.column_metas.clear();
635            state.current.constraints.clear();
636            state.current.constraint_names.clear();
637            state.current.single_indexes.clear();
638            state.current.multi_column_indexes.clear();
639            state.current.sort_indexes.clear();
640        }
641        drop(tables);
642        self.refresh_referencing_index_for_table(table_id);
643        Ok(())
644    }
645
646    /// Remove any cached snapshots for the specified table.
647    pub fn remove_table_state(&self, table_id: TableId) {
648        self.tables.write().unwrap().remove(&table_id);
649        self.referencing_index
650            .write()
651            .unwrap()
652            .remove_child(table_id);
653    }
654
655    /// Return all constraint records currently cached for the table.
656    pub fn constraint_records(&self, table_id: TableId) -> LlkvResult<Vec<ConstraintRecord>> {
657        self.ensure_table_state(table_id)?;
658        let tables = self.tables.read().unwrap();
659        let state = tables.get(&table_id).unwrap();
660        Ok(state.current.constraints.values().cloned().collect())
661    }
662
663    /// Fetch a subset of constraint records by their identifiers.
664    pub fn constraint_records_by_id(
665        &self,
666        table_id: TableId,
667        constraint_ids: &[ConstraintId],
668    ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
669        self.ensure_table_state(table_id)?;
670        let tables = self.tables.read().unwrap();
671        let state = tables.get(&table_id).unwrap();
672        Ok(constraint_ids
673            .iter()
674            .map(|constraint_id| state.current.constraints.get(constraint_id).cloned())
675            .collect())
676    }
677
678    /// Upsert constraint records in the in-memory snapshot.
679    pub fn put_constraint_records(
680        &self,
681        table_id: TableId,
682        records: &[ConstraintRecord],
683    ) -> LlkvResult<()> {
684        self.ensure_table_state(table_id)?;
685        let mut tables = self.tables.write().unwrap();
686        let state = tables.get_mut(&table_id).unwrap();
687        for record in records {
688            state
689                .current
690                .constraints
691                .insert(record.constraint_id, record.clone());
692        }
693        drop(tables);
694        self.refresh_referencing_index_for_table(table_id);
695        Ok(())
696    }
697
698    /// Upsert constraint names in the in-memory snapshot.
699    pub fn put_constraint_names(
700        &self,
701        table_id: TableId,
702        names: &[(ConstraintId, Option<String>)],
703    ) -> LlkvResult<()> {
704        if names.is_empty() {
705            return Ok(());
706        }
707        self.ensure_table_state(table_id)?;
708        let mut tables = self.tables.write().unwrap();
709        if let Some(state) = tables.get_mut(&table_id) {
710            for (constraint_id, name) in names {
711                if let Some(name) = name {
712                    state
713                        .current
714                        .constraint_names
715                        .insert(*constraint_id, name.clone());
716                } else {
717                    state.current.constraint_names.remove(constraint_id);
718                }
719            }
720        }
721        Ok(())
722    }
723
724    /// Produce a map of constraint records keyed by identifier.
725    pub fn constraint_record_map(
726        &self,
727        table_id: TableId,
728    ) -> LlkvResult<FxHashMap<ConstraintId, ConstraintRecord>> {
729        self.ensure_table_state(table_id)?;
730        let tables = self.tables.read().unwrap();
731        let state = tables.get(&table_id).unwrap();
732        Ok(state.current.constraints.clone())
733    }
734
735    /// Persist changes for a single table to the underlying catalog, writing only the diffs.
736    pub fn flush_table(&self, table_id: TableId) -> LlkvResult<()> {
737        self.ensure_table_state(table_id)?;
738        let mut tables = self.tables.write().unwrap();
739        let state = tables.get_mut(&table_id).unwrap();
740
741        let catalog = SysCatalog::new(&self.store);
742
743        match (
744            state.current.table_meta.as_ref(),
745            state.persisted.table_meta.as_ref(),
746        ) {
747            (Some(meta), Some(existing)) if meta != existing => {
748                catalog.put_table_meta(meta);
749                state.persisted.table_meta = Some(meta.clone());
750            }
751            (Some(meta), None) => {
752                catalog.put_table_meta(meta);
753                state.persisted.table_meta = Some(meta.clone());
754            }
755            (None, Some(_)) => {
756                catalog.delete_table_meta(table_id)?;
757                state.persisted.table_meta = None;
758            }
759            _ => {}
760        }
761
762        let mut dirty_columns: Vec<(FieldId, ColMeta)> = Vec::new();
763        for (field_id, meta) in &state.current.column_metas {
764            match state.persisted.column_metas.get(field_id) {
765                Some(existing) if existing == meta => {}
766                _ => dirty_columns.push((*field_id, meta.clone())),
767            }
768        }
769        for (field_id, meta) in dirty_columns.iter() {
770            catalog.put_col_meta(table_id, meta);
771            state.persisted.column_metas.insert(*field_id, meta.clone());
772        }
773
774        let removed_columns: Vec<FieldId> = state
775            .persisted
776            .column_metas
777            .keys()
778            .copied()
779            .filter(|field_id| !state.current.column_metas.contains_key(field_id))
780            .collect();
781        if !removed_columns.is_empty() {
782            catalog.delete_col_meta(table_id, &removed_columns)?;
783            for field_id in removed_columns {
784                state.persisted.column_metas.remove(&field_id);
785            }
786        }
787
788        let mut dirty_constraints: Vec<ConstraintRecord> = Vec::new();
789        for (constraint_id, record) in &state.current.constraints {
790            match state.persisted.constraints.get(constraint_id) {
791                Some(existing) if existing == record => {}
792                _ => dirty_constraints.push(record.clone()),
793            }
794        }
795        if !dirty_constraints.is_empty() {
796            catalog.put_constraint_records(table_id, &dirty_constraints)?;
797            for record in dirty_constraints {
798                state
799                    .persisted
800                    .constraints
801                    .insert(record.constraint_id, record);
802            }
803        }
804
805        let removed_constraints: Vec<ConstraintId> = state
806            .persisted
807            .constraints
808            .keys()
809            .copied()
810            .filter(|constraint_id| !state.current.constraints.contains_key(constraint_id))
811            .collect();
812        if !removed_constraints.is_empty() {
813            catalog.delete_constraint_records(table_id, &removed_constraints)?;
814            for constraint_id in removed_constraints {
815                state.persisted.constraints.remove(&constraint_id);
816            }
817        }
818
819        let mut dirty_constraint_names: Vec<(ConstraintId, String)> = Vec::new();
820        for (constraint_id, name) in &state.current.constraint_names {
821            match state.persisted.constraint_names.get(constraint_id) {
822                Some(existing) if existing == name => {}
823                _ => dirty_constraint_names.push((*constraint_id, name.clone())),
824            }
825        }
826        if !dirty_constraint_names.is_empty() {
827            let records: Vec<ConstraintNameRecord> = dirty_constraint_names
828                .iter()
829                .map(|(constraint_id, name)| ConstraintNameRecord {
830                    constraint_id: *constraint_id,
831                    name: Some(name.clone()),
832                })
833                .collect();
834            catalog.put_constraint_names(table_id, &records)?;
835            for (constraint_id, name) in dirty_constraint_names {
836                state.persisted.constraint_names.insert(constraint_id, name);
837            }
838        }
839
840        let removed_constraint_names: Vec<ConstraintId> = state
841            .persisted
842            .constraint_names
843            .keys()
844            .copied()
845            .filter(|constraint_id| !state.current.constraint_names.contains_key(constraint_id))
846            .collect();
847        if !removed_constraint_names.is_empty() {
848            catalog.delete_constraint_names(table_id, &removed_constraint_names)?;
849            for constraint_id in removed_constraint_names {
850                state.persisted.constraint_names.remove(&constraint_id);
851            }
852        }
853
854        // Flush all multi-column indexes (both unique and non-unique) to the catalog
855        if state.current.multi_column_indexes != state.persisted.multi_column_indexes {
856            if state.current.multi_column_indexes.is_empty() {
857                catalog.delete_multi_column_indexes(table_id)?;
858            } else {
859                let mut entries: Vec<MultiColumnIndexEntryMeta> = state
860                    .current
861                    .multi_column_indexes
862                    .values()
863                    .cloned()
864                    .collect();
865                entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
866                catalog.put_multi_column_indexes(table_id, &entries)?;
867            }
868            state.persisted.multi_column_indexes = state.current.multi_column_indexes.clone();
869        }
870
871        if state.current.single_indexes != state.persisted.single_indexes {
872            if state.current.single_indexes.is_empty() {
873                catalog.delete_single_column_indexes(table_id)?;
874                state.persisted.single_indexes.clear();
875            } else {
876                let mut entries: Vec<SingleColumnIndexEntryMeta> = state
877                    .current
878                    .single_indexes
879                    .values()
880                    .cloned()
881                    .map(|entry| SingleColumnIndexEntryMeta {
882                        index_name: entry.index_name,
883                        canonical_name: entry.canonical_name,
884                        column_id: entry.column_id,
885                        column_name: entry.column_name,
886                        unique: entry.unique,
887                        ascending: entry.ascending,
888                        nulls_first: entry.nulls_first,
889                    })
890                    .collect();
891                entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
892                catalog.put_single_column_indexes(table_id, &entries)?;
893                state.persisted.single_indexes = state.current.single_indexes.clone();
894            }
895        }
896
897        let sort_adds: Vec<FieldId> = state
898            .current
899            .sort_indexes
900            .iter()
901            .copied()
902            .filter(|field_id| !state.persisted.sort_indexes.contains(field_id))
903            .collect();
904        let sort_removes: Vec<FieldId> = state
905            .persisted
906            .sort_indexes
907            .iter()
908            .copied()
909            .filter(|field_id| !state.current.sort_indexes.contains(field_id))
910            .collect();
911        if !sort_adds.is_empty() || !sort_removes.is_empty() {
912            let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
913            for field_id in &sort_adds {
914                table.register_sort_index(*field_id)?;
915                state.persisted.sort_indexes.insert(*field_id);
916            }
917            for field_id in &sort_removes {
918                table.unregister_sort_index(*field_id)?;
919                state.persisted.sort_indexes.remove(field_id);
920            }
921        }
922
923        Ok(())
924    }
925
926    /// Persist changes for all tracked tables.
927    pub fn flush_all(&self) -> LlkvResult<()> {
928        let table_ids: Vec<TableId> = {
929            let tables = self.tables.read().unwrap();
930            tables.keys().copied().collect()
931        };
932        for table_id in table_ids {
933            self.flush_table(table_id)?;
934        }
935        Ok(())
936    }
937
938    /// Return all persisted table metadata.
939    pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
940        let catalog = SysCatalog::new(&self.store);
941        catalog.all_table_metas()
942    }
943
944    /// Return all persisted multi-column unique metadata.
945    pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnIndexMeta>> {
946        let catalog = SysCatalog::new(&self.store);
947        let all = catalog.all_multi_column_index_metas()?;
948        // Filter to unique indexes only
949        Ok(all
950            .into_iter()
951            .map(|mut meta| {
952                meta.indexes.retain(|idx| idx.unique);
953                meta
954            })
955            .filter(|meta| !meta.indexes.is_empty())
956            .collect())
957    }
958
959    /// Assemble foreign key descriptors for the table using cached metadata.
960    pub fn foreign_key_descriptors(
961        &self,
962        table_id: TableId,
963    ) -> LlkvResult<Vec<ForeignKeyDescriptor>> {
964        let records = self.constraint_records(table_id)?;
965        let mut descriptors = Vec::new();
966
967        for record in records {
968            if !record.is_active() {
969                continue;
970            }
971
972            let ConstraintKind::ForeignKey(fk) = record.kind else {
973                continue;
974            };
975
976            descriptors.push(ForeignKeyDescriptor {
977                constraint_id: record.constraint_id,
978                referencing_table_id: table_id,
979                referencing_field_ids: fk.referencing_field_ids.clone(),
980                referenced_table_id: fk.referenced_table,
981                referenced_field_ids: fk.referenced_field_ids.clone(),
982                on_delete: fk.on_delete,
983                on_update: fk.on_update,
984            });
985        }
986
987        Ok(descriptors)
988    }
989
990    /// Resolve foreign key descriptors into names suitable for runtime consumers.
991    pub fn foreign_key_views(
992        &self,
993        catalog: &TableCatalog,
994        table_id: TableId,
995    ) -> LlkvResult<Vec<ForeignKeyView>> {
996        let descriptors = self.foreign_key_descriptors(table_id)?;
997
998        if descriptors.is_empty() {
999            return Ok(Vec::new());
1000        }
1001
1002        let (referencing_display, referencing_canonical) =
1003            resolve_table_name(catalog, self, table_id)?;
1004
1005        let mut details = Vec::with_capacity(descriptors.len());
1006        for descriptor in descriptors {
1007            let referenced_table_id = descriptor.referenced_table_id;
1008            let (referenced_display, referenced_canonical) =
1009                resolve_table_name(catalog, self, referenced_table_id)?;
1010
1011            let referencing_column_names =
1012                self.column_names(table_id, &descriptor.referencing_field_ids)?;
1013            let referenced_column_names =
1014                self.column_names(referenced_table_id, &descriptor.referenced_field_ids)?;
1015            let constraint_name = self.constraint_name_for(table_id, descriptor.constraint_id)?;
1016
1017            details.push(ForeignKeyView {
1018                constraint_id: descriptor.constraint_id,
1019                constraint_name,
1020                referencing_table_id: descriptor.referencing_table_id,
1021                referencing_table_display: referencing_display.clone(),
1022                referencing_table_canonical: referencing_canonical.clone(),
1023                referencing_field_ids: descriptor.referencing_field_ids.clone(),
1024                referencing_column_names,
1025                referenced_table_id,
1026                referenced_table_display: referenced_display.clone(),
1027                referenced_table_canonical: referenced_canonical.clone(),
1028                referenced_field_ids: descriptor.referenced_field_ids.clone(),
1029                referenced_column_names,
1030                on_delete: descriptor.on_delete,
1031                on_update: descriptor.on_update,
1032            });
1033        }
1034
1035        Ok(details)
1036    }
1037
1038    /// Assemble a consolidated read-only view of table metadata.
1039    pub fn table_view(
1040        &self,
1041        catalog: &TableCatalog,
1042        table_id: TableId,
1043        field_ids: &[FieldId],
1044    ) -> LlkvResult<TableView> {
1045        let table_meta = self.table_meta(table_id)?;
1046        let column_metas = self.column_metas(table_id, field_ids)?;
1047        let constraint_records = self.constraint_records(table_id)?;
1048        let multi_column_uniques = self.multi_column_uniques(table_id)?;
1049        let foreign_keys = self.foreign_key_views(catalog, table_id)?;
1050
1051        Ok(TableView {
1052            table_meta,
1053            column_metas,
1054            constraint_records,
1055            multi_column_uniques,
1056            foreign_keys,
1057        })
1058    }
1059
1060    /// Validate foreign key specifications and persist them for the referencing table.
1061    pub fn validate_and_register_foreign_keys<F>(
1062        &self,
1063        referencing_table: &ForeignKeyTableInfo,
1064        specs: &[ForeignKeySpec],
1065        lookup_table: F,
1066        timestamp_micros: u64,
1067    ) -> LlkvResult<Vec<ValidatedForeignKey>>
1068    where
1069        F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
1070    {
1071        let validated = validate_foreign_keys(referencing_table, specs, lookup_table)?;
1072        self.register_foreign_keys(referencing_table.table_id, &validated, timestamp_micros)?;
1073        Ok(validated)
1074    }
1075
1076    /// Register validated foreign key definitions for a table.
1077    pub fn register_foreign_keys(
1078        &self,
1079        table_id: TableId,
1080        foreign_keys: &[ValidatedForeignKey],
1081        timestamp_micros: u64,
1082    ) -> LlkvResult<()> {
1083        if foreign_keys.is_empty() {
1084            return Ok(());
1085        }
1086
1087        let existing_constraints = self.constraint_record_map(table_id)?;
1088        let mut next_constraint_id = existing_constraints
1089            .keys()
1090            .copied()
1091            .max()
1092            .unwrap_or(0)
1093            .saturating_add(1);
1094
1095        let mut constraint_records = Vec::with_capacity(foreign_keys.len());
1096        let mut constraint_names: Vec<(ConstraintId, Option<String>)> =
1097            Vec::with_capacity(foreign_keys.len());
1098
1099        for fk in foreign_keys {
1100            let constraint_id = next_constraint_id;
1101            constraint_records.push(ConstraintRecord {
1102                constraint_id,
1103                kind: ConstraintKind::ForeignKey(ForeignKeyConstraint {
1104                    referencing_field_ids: fk.referencing_field_ids.clone(),
1105                    referenced_table: fk.referenced_table_id,
1106                    referenced_field_ids: fk.referenced_field_ids.clone(),
1107                    on_delete: fk.on_delete,
1108                    on_update: fk.on_update,
1109                }),
1110                state: ConstraintState::Active,
1111                revision: 1,
1112                last_modified_micros: timestamp_micros,
1113            });
1114            constraint_names.push((constraint_id, fk.name.clone()));
1115            next_constraint_id = next_constraint_id.saturating_add(1);
1116        }
1117
1118        self.put_constraint_records(table_id, &constraint_records)?;
1119        self.put_constraint_names(table_id, &constraint_names)?;
1120        self.flush_table(table_id)?;
1121
1122        Ok(())
1123    }
1124
1125    /// Register column metadata, physical storage columns, and primary/unique constraints.
1126    pub fn apply_column_definitions(
1127        &self,
1128        table_id: TableId,
1129        columns: &[TableColumn],
1130        timestamp_micros: u64,
1131    ) -> LlkvResult<()> {
1132        if columns.is_empty() {
1133            return Ok(());
1134        }
1135
1136        self.ensure_table_state(table_id)?;
1137
1138        for column in columns {
1139            let column_meta = ColMeta {
1140                col_id: column.field_id,
1141                name: Some(column.name.clone()),
1142                flags: 0,
1143                default: None,
1144            };
1145            self.set_column_meta(table_id, column_meta)?;
1146        }
1147
1148        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1149        let store = table.store();
1150
1151        for column in columns {
1152            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1153            store.ensure_column_registered(logical_field_id, &column.data_type)?;
1154            store.data_type(logical_field_id)?;
1155        }
1156
1157        let created_by_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
1158        store.ensure_column_registered(created_by_lfid, &DataType::UInt64)?;
1159
1160        let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
1161        store.ensure_column_registered(deleted_by_lfid, &DataType::UInt64)?;
1162
1163        let existing = self.constraint_record_map(table_id)?;
1164        let mut next_constraint_id = existing
1165            .keys()
1166            .copied()
1167            .max()
1168            .unwrap_or(0)
1169            .saturating_add(1);
1170
1171        let mut constraints = Vec::new();
1172
1173        let primary_key_fields: Vec<FieldId> = columns
1174            .iter()
1175            .filter(|col| col.primary_key)
1176            .map(|col| col.field_id)
1177            .collect();
1178        if !primary_key_fields.is_empty() {
1179            constraints.push(ConstraintRecord {
1180                constraint_id: next_constraint_id,
1181                kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1182                    field_ids: primary_key_fields,
1183                }),
1184                state: ConstraintState::Active,
1185                revision: 1,
1186                last_modified_micros: timestamp_micros,
1187            });
1188            next_constraint_id = next_constraint_id.saturating_add(1);
1189        }
1190
1191        for column in columns.iter().filter(|col| col.unique && !col.primary_key) {
1192            constraints.push(ConstraintRecord {
1193                constraint_id: next_constraint_id,
1194                kind: ConstraintKind::Unique(UniqueConstraint {
1195                    field_ids: vec![column.field_id],
1196                }),
1197                state: ConstraintState::Active,
1198                revision: 1,
1199                last_modified_micros: timestamp_micros,
1200            });
1201            next_constraint_id = next_constraint_id.saturating_add(1);
1202        }
1203
1204        if !constraints.is_empty() {
1205            self.put_constraint_records(table_id, &constraints)?;
1206        }
1207
1208        Ok(())
1209    }
1210
1211    pub fn column_data_type(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<DataType> {
1212        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1213        let store = table.store();
1214        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
1215        store.data_type(logical_field_id)
1216    }
1217
1218    /// Register a multi-column UNIQUE definition for a table.
1219    pub fn register_multi_column_unique(
1220        &self,
1221        table_id: TableId,
1222        column_ids: &[FieldId],
1223        index_name: Option<String>,
1224    ) -> LlkvResult<MultiColumnUniqueRegistration> {
1225        let mut created = false;
1226        let mut existing_name: Option<Option<String>> = None;
1227        let column_vec: Vec<FieldId> = column_ids.to_vec();
1228
1229        // Generate canonical name from column IDs
1230        let canonical_name = format!(
1231            "__unique_{}_{}",
1232            table_id,
1233            column_vec
1234                .iter()
1235                .map(|id| id.to_string())
1236                .collect::<Vec<_>>()
1237                .join("_")
1238        );
1239
1240        self.update_multi_column_uniques(table_id, |entries| {
1241            if let Some(existing) = entries.iter().find(|entry| entry.column_ids == column_vec) {
1242                existing_name = Some(existing.index_name.clone());
1243            } else {
1244                entries.push(MultiColumnIndexEntryMeta {
1245                    index_name: index_name.clone(),
1246                    canonical_name: canonical_name.clone(),
1247                    column_ids: column_vec.clone(),
1248                    unique: true,
1249                });
1250                created = true;
1251            }
1252        })?;
1253
1254        if created {
1255            Ok(MultiColumnUniqueRegistration::Created)
1256        } else {
1257            Ok(MultiColumnUniqueRegistration::AlreadyExists {
1258                index_name: existing_name.unwrap_or(None),
1259            })
1260        }
1261    }
1262
1263    fn column_names(&self, table_id: TableId, field_ids: &[FieldId]) -> LlkvResult<Vec<String>> {
1264        if field_ids.is_empty() {
1265            return Ok(Vec::new());
1266        }
1267
1268        let metas = self.column_metas(table_id, field_ids)?;
1269        let mut names = Vec::with_capacity(field_ids.len());
1270        for (idx, field_id) in field_ids.iter().enumerate() {
1271            let name = metas
1272                .get(idx)
1273                .and_then(|meta| meta.as_ref())
1274                .and_then(|meta| meta.name.clone())
1275                .unwrap_or_else(|| format!("col_{}", field_id));
1276            names.push(name);
1277        }
1278        Ok(names)
1279    }
1280
1281    /// Reserve and return the next available table id.
1282    pub fn reserve_table_id(&self) -> LlkvResult<TableId> {
1283        let catalog = SysCatalog::new(&self.store);
1284
1285        let mut next = match catalog.get_next_table_id()? {
1286            Some(value) => value,
1287            None => {
1288                let seed = catalog
1289                    .max_table_id()?
1290                    .unwrap_or(reserved::CATALOG_TABLE_ID);
1291                let initial = seed.checked_add(1).ok_or_else(|| {
1292                    Error::InvalidArgumentError("exhausted available table ids".into())
1293                })?;
1294                catalog.put_next_table_id(initial)?;
1295                initial
1296            }
1297        };
1298
1299        while reserved::is_reserved_table_id(next) {
1300            next = next.checked_add(1).ok_or_else(|| {
1301                Error::InvalidArgumentError("exhausted available table ids".into())
1302            })?;
1303        }
1304
1305        let mut following = next
1306            .checked_add(1)
1307            .ok_or_else(|| Error::InvalidArgumentError("exhausted available table ids".into()))?;
1308
1309        while reserved::is_reserved_table_id(following) {
1310            following = following.checked_add(1).ok_or_else(|| {
1311                Error::InvalidArgumentError("exhausted available table ids".into())
1312            })?;
1313        }
1314
1315        catalog.put_next_table_id(following)?;
1316        Ok(next)
1317    }
1318
1319    /// Check if a field has a sort index in the underlying store.
1320    ///
1321    /// Note: Creates a temporary Table instance to access index metadata.
1322    /// This is acceptable since Table::from_id_and_store is lightweight (just wraps
1323    /// table_id + `Arc<ColumnStore>`) and this method is only called during index
1324    /// registration/unregistration, not in query hot paths.
1325    fn field_has_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<bool> {
1326        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1327        let indexes = table.list_registered_indexes(field_id)?;
1328        Ok(indexes.contains(&IndexKind::Sort))
1329    }
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334    use super::*;
1335    use crate::constraints::{ConstraintKind, ConstraintState, PrimaryKeyConstraint};
1336    use crate::{MultiColumnIndexEntryMeta, Table};
1337    use llkv_column_map::ColumnStore;
1338    use llkv_column_map::store::IndexKind;
1339    use llkv_storage::pager::MemPager;
1340    use std::sync::Arc;
1341
1342    #[test]
1343    fn metadata_manager_persists_and_loads() {
1344        let pager = Arc::new(MemPager::default());
1345        let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1346        let manager = MetadataManager::new(Arc::clone(&store));
1347
1348        let table_id: TableId = 42;
1349        let table_meta = TableMeta {
1350            table_id,
1351            name: Some("users".into()),
1352            created_at_micros: 123,
1353            flags: 0,
1354            epoch: 1,
1355            view_definition: None,
1356        };
1357        manager
1358            .set_table_meta(table_id, table_meta.clone())
1359            .unwrap();
1360
1361        {
1362            let tables = manager.tables.read().unwrap();
1363            let state = tables.get(&table_id).unwrap();
1364            assert!(state.current.table_meta.is_some());
1365        }
1366
1367        let column_meta = ColMeta {
1368            col_id: 1,
1369            name: Some("id".into()),
1370            flags: 0,
1371            default: None,
1372        };
1373        manager
1374            .set_column_meta(table_id, column_meta.clone())
1375            .unwrap();
1376
1377        let logical_field_id =
1378            llkv_column_map::types::LogicalFieldId::for_user(table_id, column_meta.col_id);
1379        store
1380            .ensure_column_registered(logical_field_id, &arrow::datatypes::DataType::Utf8)
1381            .unwrap();
1382
1383        manager
1384            .register_sort_index(table_id, column_meta.col_id)
1385            .unwrap();
1386
1387        let constraint = ConstraintRecord {
1388            constraint_id: 7,
1389            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1390                field_ids: vec![column_meta.col_id],
1391            }),
1392            state: ConstraintState::Active,
1393            revision: 1,
1394            last_modified_micros: 456,
1395        };
1396        manager
1397            .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1398            .unwrap();
1399
1400        let multi_unique = MultiColumnIndexEntryMeta {
1401            index_name: Some("uniq_users_name".into()),
1402            canonical_name: "uniq_users_name".to_lowercase(),
1403            column_ids: vec![column_meta.col_id],
1404            unique: true,
1405        };
1406        manager
1407            .set_multi_column_uniques(table_id, vec![multi_unique.clone()])
1408            .unwrap();
1409
1410        assert_eq!(
1411            manager.table_meta(table_id).unwrap(),
1412            Some(table_meta.clone())
1413        );
1414
1415        manager.flush_table(table_id).unwrap();
1416
1417        let table = Table::from_id_and_store(table_id, Arc::clone(&store)).unwrap();
1418        let indexes = table.list_registered_indexes(column_meta.col_id).unwrap();
1419        assert!(indexes.contains(&IndexKind::Sort));
1420
1421        let verify_catalog = SysCatalog::new(&store);
1422        let column_roundtrip = verify_catalog.get_cols_meta(table_id, &[column_meta.col_id]);
1423        assert_eq!(column_roundtrip[0].as_ref(), Some(&column_meta));
1424        let constraints = verify_catalog
1425            .constraint_records_for_table(table_id)
1426            .unwrap();
1427        assert_eq!(constraints, vec![constraint.clone()]);
1428        let unique_roundtrip = verify_catalog.get_multi_column_indexes(table_id).unwrap();
1429        assert_eq!(unique_roundtrip, vec![multi_unique.clone()]);
1430
1431        let meta_from_cache = manager.table_meta(table_id).unwrap();
1432        assert_eq!(meta_from_cache, Some(table_meta.clone()));
1433
1434        let columns_from_cache = manager
1435            .column_metas(table_id, &[column_meta.col_id])
1436            .unwrap();
1437        assert_eq!(columns_from_cache[0].as_ref(), Some(&column_meta));
1438
1439        let constraints_from_cache = manager.constraint_records(table_id).unwrap();
1440        assert_eq!(constraints_from_cache, vec![constraint.clone()]);
1441
1442        let uniques_from_cache = manager.multi_column_uniques(table_id).unwrap();
1443        assert_eq!(uniques_from_cache, vec![multi_unique]);
1444
1445        // No additional writes should occur on subsequent flushes without modifications.
1446        manager.flush_table(table_id).unwrap();
1447    }
1448
1449    #[test]
1450    fn metadata_manager_lazy_loads_columns_and_constraints() {
1451        let pager = Arc::new(MemPager::default());
1452        let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1453        let manager = MetadataManager::new(Arc::clone(&store));
1454
1455        let table_id: TableId = 99;
1456        let column_meta = ColMeta {
1457            col_id: 3,
1458            name: Some("value".into()),
1459            flags: 0,
1460            default: None,
1461        };
1462        let initial_catalog = SysCatalog::new(&store);
1463        initial_catalog.put_col_meta(table_id, &column_meta);
1464
1465        let constraint = ConstraintRecord {
1466            constraint_id: 15,
1467            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1468                field_ids: vec![column_meta.col_id],
1469            }),
1470            state: ConstraintState::Active,
1471            revision: 1,
1472            last_modified_micros: 0,
1473        };
1474        initial_catalog
1475            .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1476            .unwrap();
1477        let multi_unique = MultiColumnIndexEntryMeta {
1478            index_name: Some("uniq_value".into()),
1479            canonical_name: "uniq_value".to_lowercase(),
1480            column_ids: vec![column_meta.col_id],
1481            unique: true,
1482        };
1483        initial_catalog
1484            .put_multi_column_indexes(table_id, std::slice::from_ref(&multi_unique))
1485            .unwrap();
1486
1487        let columns = manager
1488            .column_metas(table_id, &[column_meta.col_id])
1489            .unwrap();
1490        assert_eq!(columns[0].as_ref(), Some(&column_meta));
1491
1492        let constraints = manager.constraint_records(table_id).unwrap();
1493        assert_eq!(constraints, vec![constraint]);
1494
1495        let uniques = manager.multi_column_uniques(table_id).unwrap();
1496        assert_eq!(uniques, vec![multi_unique]);
1497    }
1498}
1499
1500/// Descriptor describing a foreign key constraint scoped to field identifiers.
1501#[derive(Clone, Debug)]
1502pub struct ForeignKeyDescriptor {
1503    pub constraint_id: ConstraintId,
1504    pub referencing_table_id: TableId,
1505    pub referencing_field_ids: Vec<FieldId>,
1506    pub referenced_table_id: TableId,
1507    pub referenced_field_ids: Vec<FieldId>,
1508    pub on_delete: ForeignKeyAction,
1509    pub on_update: ForeignKeyAction,
1510}
1511
1512/// Result of attempting to register a multi-column unique definition.
1513#[derive(Debug, Clone, PartialEq, Eq)]
1514pub enum MultiColumnUniqueRegistration {
1515    Created,
1516    AlreadyExists { index_name: Option<String> },
1517}