llkv_table/
metadata.rs

1//! Shared metadata manager that consolidates catalog I/O for tables, columns, and constraints.
2//!
3//! This module offers a single entry point for querying and mutating persisted metadata. It keeps
4//! an in-memory snapshot per table, performs diff-aware persistence, and always uses batch catalog
5//! operations to minimise I/O.
6
7#![forbid(unsafe_code)]
8
9use crate::catalog::TableCatalog;
10use crate::constraints::{
11    ConstraintId, ConstraintKind, ConstraintRecord, ConstraintState, ForeignKeyAction,
12    ForeignKeyConstraint, PrimaryKeyConstraint, UniqueConstraint,
13};
14use crate::constraints::{ForeignKeyTableInfo, ValidatedForeignKey, validate_foreign_keys};
15use crate::reserved;
16use crate::resolvers::resolve_table_name;
17use crate::sys_catalog::{ConstraintNameRecord, SysCatalog};
18use crate::sys_catalog::{MultiColumnIndexEntryMeta, SingleColumnIndexEntryMeta, TriggerEntryMeta};
19use crate::table::Table;
20use crate::types::{FieldId, TableColumn, TableId};
21use crate::view::{ForeignKeyView, TableView};
22use crate::{ColMeta, TableMeta, TableMultiColumnIndexMeta};
23use arrow::datatypes::DataType;
24use llkv_column_map::ColumnStore;
25use llkv_column_map::store::IndexKind;
26use llkv_plan::ForeignKeySpec;
27use llkv_result::{Error, Result as LlkvResult};
28use llkv_storage::pager::Pager;
29use llkv_types::LogicalFieldId;
30use rustc_hash::{FxHashMap, FxHashSet};
31use simd_r_drive_entry_handle::EntryHandle;
32use std::sync::{Arc, RwLock};
33
34#[derive(Clone, Debug, Default, PartialEq, Eq)]
35pub struct SingleColumnIndexEntry {
36    pub index_name: String,
37    pub canonical_name: String,
38    pub column_id: FieldId,
39    pub column_name: String,
40    pub unique: bool,
41    pub ascending: bool,
42    pub nulls_first: bool,
43}
44
45#[derive(Clone, Debug, Default)]
46struct TableSnapshot {
47    table_meta: Option<TableMeta>,
48    column_metas: FxHashMap<FieldId, ColMeta>,
49    constraints: FxHashMap<ConstraintId, ConstraintRecord>,
50    constraint_names: FxHashMap<ConstraintId, String>,
51    single_indexes: FxHashMap<String, SingleColumnIndexEntry>,
52    multi_column_indexes: FxHashMap<String, MultiColumnIndexEntryMeta>,
53    sort_indexes: FxHashSet<FieldId>,
54    triggers: FxHashMap<String, TriggerEntryMeta>,
55}
56
57#[derive(Clone, Debug)]
58struct TableState {
59    current: TableSnapshot,
60    persisted: TableSnapshot,
61}
62
63impl TableState {
64    fn from_snapshot(snapshot: TableSnapshot) -> Self {
65        Self {
66            current: snapshot.clone(),
67            persisted: snapshot,
68        }
69    }
70}
71
72#[derive(Default)]
73struct ReferencingIndex {
74    parent_to_children: FxHashMap<TableId, FxHashSet<(TableId, ConstraintId)>>,
75    child_to_parents: FxHashMap<TableId, FxHashSet<TableId>>,
76    initialized: bool,
77}
78
79impl ReferencingIndex {
80    fn remove_child(&mut self, child_id: TableId) {
81        if let Some(parents) = self.child_to_parents.remove(&child_id) {
82            for parent_id in parents {
83                if let Some(children) = self.parent_to_children.get_mut(&parent_id) {
84                    children.retain(|(entry_child, _)| *entry_child != child_id);
85                    if children.is_empty() {
86                        self.parent_to_children.remove(&parent_id);
87                    }
88                }
89            }
90        }
91    }
92
93    fn insert(&mut self, parent_id: TableId, child_id: TableId, constraint_id: ConstraintId) {
94        self.parent_to_children
95            .entry(parent_id)
96            .or_default()
97            .insert((child_id, constraint_id));
98        self.child_to_parents
99            .entry(child_id)
100            .or_default()
101            .insert(parent_id);
102        self.initialized = true;
103    }
104
105    fn children(&self, parent_id: TableId) -> Vec<(TableId, ConstraintId)> {
106        self.parent_to_children
107            .get(&parent_id)
108            .map(|set| set.iter().cloned().collect())
109            .unwrap_or_default()
110    }
111
112    fn mark_initialized(&mut self) {
113        self.initialized = true;
114    }
115
116    fn is_initialized(&self) -> bool {
117        self.initialized
118    }
119}
120
121/// Central metadata facade that hides the raw catalog implementation details.
122pub struct MetadataManager<P>
123where
124    P: Pager<Blob = EntryHandle> + Send + Sync,
125{
126    store: Arc<ColumnStore<P>>,
127    tables: RwLock<FxHashMap<TableId, TableState>>,
128    referencing_index: RwLock<ReferencingIndex>,
129}
130
131impl<P> MetadataManager<P>
132where
133    P: Pager<Blob = EntryHandle> + Send + Sync,
134{
135    /// Create a new metadata manager backed by the provided column store.
136    pub fn new(store: Arc<ColumnStore<P>>) -> Self {
137        Self {
138            store,
139            tables: RwLock::new(FxHashMap::default()),
140            referencing_index: RwLock::new(ReferencingIndex::default()),
141        }
142    }
143
144    /// Load metadata for a table from the catalog if not already cached.
145    fn ensure_table_state(&self, table_id: TableId) -> LlkvResult<()> {
146        if self.tables.read().unwrap().contains_key(&table_id) {
147            return Ok(());
148        }
149        let state = self.load_table_state(table_id)?;
150        {
151            let mut tables = self.tables.write().unwrap();
152            tables.entry(table_id).or_insert(state);
153        }
154        self.refresh_referencing_index_for_table(table_id);
155        Ok(())
156    }
157
158    fn load_table_state(&self, table_id: TableId) -> LlkvResult<TableState> {
159        let catalog = SysCatalog::new(&self.store);
160        let table_meta = catalog.get_table_meta(table_id);
161        let constraint_records = catalog.constraint_records_for_table(table_id)?;
162        let constraint_ids: Vec<ConstraintId> = constraint_records
163            .iter()
164            .map(|record| record.constraint_id)
165            .collect();
166        let constraint_name_entries = if constraint_ids.is_empty() {
167            Vec::new()
168        } else {
169            catalog.get_constraint_names(table_id, &constraint_ids)?
170        };
171        let multi_uniques = catalog.get_multi_column_indexes(table_id)?;
172        let single_index_metas = catalog.get_single_column_indexes(table_id)?;
173        let multi_column_index_metas = catalog.get_multi_column_indexes(table_id)?;
174        let trigger_metas = catalog.get_triggers(table_id)?;
175        let mut constraints = FxHashMap::default();
176        let mut constraint_names = FxHashMap::default();
177        let mut single_indexes = FxHashMap::default();
178        let mut multi_column_indexes = FxHashMap::default();
179        let mut sort_indexes = FxHashSet::default();
180        for meta in single_index_metas {
181            sort_indexes.insert(meta.column_id);
182            single_indexes.insert(
183                meta.canonical_name.clone(),
184                SingleColumnIndexEntry {
185                    index_name: meta.index_name,
186                    canonical_name: meta.canonical_name,
187                    column_id: meta.column_id,
188                    column_name: meta.column_name,
189                    unique: meta.unique,
190                    ascending: meta.ascending,
191                    nulls_first: meta.nulls_first,
192                },
193            );
194        }
195        for meta in multi_uniques {
196            multi_column_indexes.insert(meta.canonical_name.clone(), meta);
197        }
198        for meta in multi_column_index_metas {
199            multi_column_indexes.insert(meta.canonical_name.clone(), meta);
200        }
201        let mut triggers = FxHashMap::default();
202        for meta in trigger_metas {
203            triggers.insert(meta.canonical_name.clone(), meta);
204        }
205        for (record, name) in constraint_records
206            .into_iter()
207            .zip(constraint_name_entries.into_iter())
208        {
209            if let Some(name) = name {
210                constraint_names.insert(record.constraint_id, name);
211            }
212            constraints.insert(record.constraint_id, record);
213        }
214        let snapshot = TableSnapshot {
215            table_meta,
216            column_metas: FxHashMap::default(),
217            constraints,
218            constraint_names,
219            single_indexes,
220            multi_column_indexes,
221            sort_indexes,
222            triggers,
223        };
224        Ok(TableState::from_snapshot(snapshot))
225    }
226
227    fn refresh_referencing_index_for_table(&self, table_id: TableId) {
228        let foreign_keys: Vec<(TableId, ConstraintId)> = {
229            let tables = self.tables.read().unwrap();
230            match tables.get(&table_id) {
231                Some(state) => state
232                    .current
233                    .constraints
234                    .iter()
235                    .filter(|(_, record)| record.is_active())
236                    .filter_map(|(constraint_id, record)| {
237                        if let ConstraintKind::ForeignKey(fk) = &record.kind {
238                            Some((fk.referenced_table, *constraint_id))
239                        } else {
240                            None
241                        }
242                    })
243                    .collect(),
244                None => Vec::new(),
245            }
246        };
247
248        let mut index = self.referencing_index.write().unwrap();
249        index.remove_child(table_id);
250        for (parent_table, constraint_id) in foreign_keys {
251            index.insert(parent_table, table_id, constraint_id);
252        }
253    }
254
255    fn constraint_name_for(
256        &self,
257        table_id: TableId,
258        constraint_id: ConstraintId,
259    ) -> LlkvResult<Option<String>> {
260        self.ensure_table_state(table_id)?;
261        let tables = self.tables.read().unwrap();
262        let state = tables.get(&table_id).unwrap();
263        Ok(state.current.constraint_names.get(&constraint_id).cloned())
264    }
265
266    fn ensure_referencing_index_initialized(&self) -> LlkvResult<()> {
267        let needs_init = {
268            let index = self.referencing_index.read().unwrap();
269            !index.is_initialized()
270        };
271
272        if !needs_init {
273            return Ok(());
274        }
275
276        let metas = self.all_table_metas()?;
277        for (table_id, _) in metas {
278            self.ensure_table_state(table_id)?;
279            self.refresh_referencing_index_for_table(table_id);
280        }
281
282        let mut index = self.referencing_index.write().unwrap();
283        index.mark_initialized();
284        Ok(())
285    }
286
287    /// Retrieve the current table metadata snapshot (loaded lazily if required).
288    pub fn table_meta(&self, table_id: TableId) -> LlkvResult<Option<TableMeta>> {
289        self.ensure_table_state(table_id)?;
290        let tables = self.tables.read().unwrap();
291        Ok(tables
292            .get(&table_id)
293            .and_then(|state| state.current.table_meta.clone()))
294    }
295
296    /// Return the list of child table + constraint identifiers that reference the provided table.
297    pub fn foreign_keys_referencing(
298        &self,
299        referenced_table: TableId,
300    ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
301        self.ensure_referencing_index_initialized()?;
302        let index = self.referencing_index.read().unwrap();
303        Ok(index.children(referenced_table))
304    }
305
306    /// Update the in-memory table metadata. Changes are flushed on demand.
307    pub fn set_table_meta(&self, table_id: TableId, meta: TableMeta) -> LlkvResult<()> {
308        self.ensure_table_state(table_id)?;
309        let mut tables = self.tables.write().unwrap();
310        let state = tables.get_mut(&table_id).unwrap();
311        state.current.table_meta = Some(meta);
312        Ok(())
313    }
314
315    /// Fetch column metadata for the requested field identifiers, loading missing entries lazily.
316    pub fn column_metas(
317        &self,
318        table_id: TableId,
319        field_ids: &[FieldId],
320    ) -> LlkvResult<Vec<Option<ColMeta>>> {
321        self.ensure_table_state(table_id)?;
322
323        // Determine which columns still need to be loaded from the catalog.
324        let missing_ids = {
325            let tables = self.tables.read().unwrap();
326            let state = tables.get(&table_id).unwrap();
327            field_ids
328                .iter()
329                .copied()
330                .filter(|field_id| !state.current.column_metas.contains_key(field_id))
331                .collect::<Vec<_>>()
332        };
333
334        if !missing_ids.is_empty() {
335            let catalog = SysCatalog::new(&self.store);
336            let fetched = catalog.get_cols_meta(table_id, &missing_ids);
337            let mut tables = self.tables.write().unwrap();
338            let state = tables.get_mut(&table_id).unwrap();
339            for (idx, field_id) in missing_ids.iter().enumerate() {
340                if let Some(meta) = fetched[idx].clone() {
341                    state.current.column_metas.insert(*field_id, meta.clone());
342                    state
343                        .persisted
344                        .column_metas
345                        .entry(*field_id)
346                        .or_insert(meta);
347                }
348            }
349        }
350
351        let tables = self.tables.read().unwrap();
352        let state = tables.get(&table_id).unwrap();
353        Ok(field_ids
354            .iter()
355            .map(|field_id| state.current.column_metas.get(field_id).cloned())
356            .collect())
357    }
358
359    /// Upsert a single column metadata record in the in-memory snapshot.
360    pub fn set_column_meta(&self, table_id: TableId, meta: ColMeta) -> LlkvResult<()> {
361        self.ensure_table_state(table_id)?;
362        let mut tables = self.tables.write().unwrap();
363        let state = tables.get_mut(&table_id).unwrap();
364        state.current.column_metas.insert(meta.col_id, meta);
365        Ok(())
366    }
367
368    /// Return the multi-column UNIQUE definitions cached for the table.
369    pub fn multi_column_uniques(
370        &self,
371        table_id: TableId,
372    ) -> LlkvResult<Vec<MultiColumnIndexEntryMeta>> {
373        self.ensure_table_state(table_id)?;
374        let tables = self.tables.read().unwrap();
375        let state = tables.get(&table_id).unwrap();
376        Ok(state
377            .current
378            .multi_column_indexes
379            .values()
380            .filter(|entry| entry.unique)
381            .cloned()
382            .collect())
383    }
384
385    /// Replace the cached multi-column UNIQUE definitions for the table.
386    pub fn set_multi_column_uniques(
387        &self,
388        table_id: TableId,
389        uniques: Vec<MultiColumnIndexEntryMeta>,
390    ) -> LlkvResult<()> {
391        self.ensure_table_state(table_id)?;
392        let mut tables = self.tables.write().unwrap();
393        let state = tables.get_mut(&table_id).unwrap();
394        // Remove all existing unique multi-column indexes
395        state
396            .current
397            .multi_column_indexes
398            .retain(|_, entry| !entry.unique);
399        // Add the new unique indexes
400        for unique in uniques {
401            state
402                .current
403                .multi_column_indexes
404                .insert(unique.canonical_name.clone(), unique);
405        }
406        Ok(())
407    }
408
409    /// Return the named single-column indexes registered for a table.
410    pub fn single_column_indexes(
411        &self,
412        table_id: TableId,
413    ) -> LlkvResult<Vec<SingleColumnIndexEntry>> {
414        self.ensure_table_state(table_id)?;
415        let tables = self.tables.read().unwrap();
416        let state = tables.get(&table_id).unwrap();
417        Ok(state.current.single_indexes.values().cloned().collect())
418    }
419
420    /// Lookup a single-column index by canonical name.
421    pub fn single_column_index(
422        &self,
423        table_id: TableId,
424        canonical_index_name: &str,
425    ) -> LlkvResult<Option<SingleColumnIndexEntry>> {
426        self.ensure_table_state(table_id)?;
427        let tables = self.tables.read().unwrap();
428        let state = tables.get(&table_id).unwrap();
429        Ok(state
430            .current
431            .single_indexes
432            .get(canonical_index_name)
433            .cloned())
434    }
435
436    /// Register or replace a single-column index metadata entry in the cached snapshot.
437    pub fn put_single_column_index(
438        &self,
439        table_id: TableId,
440        entry: SingleColumnIndexEntry,
441    ) -> LlkvResult<()> {
442        self.ensure_table_state(table_id)?;
443        let mut tables = self.tables.write().unwrap();
444        let state = tables.get_mut(&table_id).unwrap();
445        let column_id = entry.column_id;
446        state
447            .current
448            .single_indexes
449            .insert(entry.canonical_name.clone(), entry);
450        state.current.sort_indexes.insert(column_id);
451        Ok(())
452    }
453
454    /// Remove a single-column index metadata entry from the cached snapshot.
455    pub fn remove_single_column_index(
456        &self,
457        table_id: TableId,
458        canonical_index_name: &str,
459    ) -> LlkvResult<Option<SingleColumnIndexEntry>> {
460        self.ensure_table_state(table_id)?;
461        let mut tables = self.tables.write().unwrap();
462        let state = tables.get_mut(&table_id).unwrap();
463        let removed = state.current.single_indexes.remove(canonical_index_name);
464        if let Some(ref entry) = removed {
465            let still_indexed = state
466                .current
467                .single_indexes
468                .values()
469                .any(|existing| existing.column_id == entry.column_id);
470            if !still_indexed {
471                state.current.sort_indexes.remove(&entry.column_id);
472            }
473        }
474        Ok(removed)
475    }
476
477    /// Register or replace a multi-column index metadata entry in the cached snapshot.
478    pub fn put_multi_column_index(
479        &self,
480        table_id: TableId,
481        entry: MultiColumnIndexEntryMeta,
482    ) -> LlkvResult<()> {
483        self.ensure_table_state(table_id)?;
484        let mut tables = self.tables.write().unwrap();
485        let state = tables.get_mut(&table_id).unwrap();
486        state
487            .current
488            .multi_column_indexes
489            .insert(entry.canonical_name.clone(), entry);
490        Ok(())
491    }
492
493    /// Remove a multi-column index metadata entry from the cached snapshot.
494    pub fn remove_multi_column_index(
495        &self,
496        table_id: TableId,
497        canonical_index_name: &str,
498    ) -> LlkvResult<Option<MultiColumnIndexEntryMeta>> {
499        self.ensure_table_state(table_id)?;
500        let mut tables = self.tables.write().unwrap();
501        let state = tables.get_mut(&table_id).unwrap();
502        Ok(state
503            .current
504            .multi_column_indexes
505            .remove(canonical_index_name))
506    }
507
508    /// Retrieve a multi-column index metadata entry by canonical name.
509    pub fn get_multi_column_index(
510        &self,
511        table_id: TableId,
512        canonical_index_name: &str,
513    ) -> LlkvResult<Option<MultiColumnIndexEntryMeta>> {
514        self.ensure_table_state(table_id)?;
515        let tables = self.tables.read().unwrap();
516        Ok(tables.get(&table_id).and_then(|state| {
517            state
518                .current
519                .multi_column_indexes
520                .get(canonical_index_name)
521                .cloned()
522        }))
523    }
524
525    /// Register a sort index for a column at the metadata level, staging the change for the next flush.
526    pub fn register_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
527        self.ensure_table_state(table_id)?;
528
529        {
530            let mut tables = self.tables.write().unwrap();
531            let state = tables.get_mut(&table_id).unwrap();
532            if state.persisted.sort_indexes.contains(&field_id)
533                || state.current.sort_indexes.contains(&field_id)
534            {
535                state.current.sort_indexes.insert(field_id);
536                return Ok(());
537            }
538        }
539
540        if self.field_has_sort_index(table_id, field_id)? {
541            let mut tables = self.tables.write().unwrap();
542            let state = tables.get_mut(&table_id).unwrap();
543            state.persisted.sort_indexes.insert(field_id);
544            state.current.sort_indexes.insert(field_id);
545            return Ok(());
546        }
547
548        let mut tables = self.tables.write().unwrap();
549        let state = tables.get_mut(&table_id).unwrap();
550        state.current.sort_indexes.insert(field_id);
551        Ok(())
552    }
553
554    /// Unregister a sort index for a column, staging removal for the next flush.
555    pub fn unregister_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
556        self.ensure_table_state(table_id)?;
557
558        let mut tables = self.tables.write().unwrap();
559        let state = tables.get_mut(&table_id).unwrap();
560        state.current.sort_indexes.remove(&field_id);
561
562        if !state.persisted.sort_indexes.contains(&field_id) {
563            drop(tables);
564            if self.field_has_sort_index(table_id, field_id)? {
565                let mut tables = self.tables.write().unwrap();
566                let state = tables.get_mut(&table_id).unwrap();
567                state.persisted.sort_indexes.insert(field_id);
568            }
569        }
570
571        Ok(())
572    }
573
574    /// Mutate the cached multi-column UNIQUE definitions for a table in-place.
575    pub fn update_multi_column_uniques<F, T>(&self, table_id: TableId, f: F) -> LlkvResult<T>
576    where
577        F: FnOnce(&mut Vec<MultiColumnIndexEntryMeta>) -> T,
578    {
579        self.ensure_table_state(table_id)?;
580        let mut tables = self.tables.write().unwrap();
581        let state = tables.get_mut(&table_id).unwrap();
582        let mut uniques: Vec<MultiColumnIndexEntryMeta> = state
583            .current
584            .multi_column_indexes
585            .values()
586            .filter(|entry| entry.unique)
587            .cloned()
588            .collect();
589        let result = f(&mut uniques);
590        // Remove all existing unique multi-column indexes
591        state
592            .current
593            .multi_column_indexes
594            .retain(|_, entry| !entry.unique);
595        // Add back the modified unique indexes
596        for unique in uniques {
597            state
598                .current
599                .multi_column_indexes
600                .insert(unique.canonical_name.clone(), unique);
601        }
602        Ok(result)
603    }
604
605    /// Return the trigger definitions cached for the table.
606    pub fn triggers(&self, table_id: TableId) -> LlkvResult<Vec<TriggerEntryMeta>> {
607        self.ensure_table_state(table_id)?;
608        let tables = self.tables.read().unwrap();
609        let state = tables.get(&table_id).unwrap();
610        Ok(state.current.triggers.values().cloned().collect())
611    }
612
613    /// Fetch a trigger definition by its canonical name.
614    pub fn trigger(
615        &self,
616        table_id: TableId,
617        canonical_name: &str,
618    ) -> LlkvResult<Option<TriggerEntryMeta>> {
619        self.ensure_table_state(table_id)?;
620        let tables = self.tables.read().unwrap();
621        let state = tables.get(&table_id).unwrap();
622        Ok(state
623            .current
624            .triggers
625            .get(&canonical_name.to_ascii_lowercase())
626            .cloned())
627    }
628
629    /// Insert or replace a trigger definition in the cached snapshot.
630    pub fn insert_trigger(&self, table_id: TableId, trigger: TriggerEntryMeta) -> LlkvResult<()> {
631        self.ensure_table_state(table_id)?;
632        let mut tables = self.tables.write().unwrap();
633        let state = tables.get_mut(&table_id).unwrap();
634        state
635            .current
636            .triggers
637            .insert(trigger.canonical_name.clone(), trigger);
638        Ok(())
639    }
640
641    /// Remove a trigger definition by canonical name. Returns true when the trigger existed.
642    pub fn remove_trigger(&self, table_id: TableId, canonical_name: &str) -> LlkvResult<bool> {
643        self.ensure_table_state(table_id)?;
644        let mut tables = self.tables.write().unwrap();
645        let state = tables.get_mut(&table_id).unwrap();
646        Ok(state
647            .current
648            .triggers
649            .remove(&canonical_name.to_ascii_lowercase())
650            .is_some())
651    }
652
653    /// Prepare the metadata state for dropping a table by clearing cached entries.
654    ///
655    /// Column metadata is loaded eagerly for the provided field identifiers so deletions
656    /// are persisted on the next flush.
657    pub fn prepare_table_drop(&self, table_id: TableId, column_ids: &[FieldId]) -> LlkvResult<()> {
658        if !column_ids.is_empty() {
659            let _ = self.column_metas(table_id, column_ids)?;
660        } else {
661            self.ensure_table_state(table_id)?;
662        }
663
664        let mut tables = self.tables.write().unwrap();
665        if let Some(state) = tables.get_mut(&table_id) {
666            state.current.table_meta = None;
667            state.current.column_metas.clear();
668            state.current.constraints.clear();
669            state.current.constraint_names.clear();
670            state.current.single_indexes.clear();
671            state.current.multi_column_indexes.clear();
672            state.current.sort_indexes.clear();
673            state.current.triggers.clear();
674        }
675        drop(tables);
676        self.refresh_referencing_index_for_table(table_id);
677        Ok(())
678    }
679
680    /// Remove any cached snapshots for the specified table.
681    pub fn remove_table_state(&self, table_id: TableId) {
682        self.tables.write().unwrap().remove(&table_id);
683        self.referencing_index
684            .write()
685            .unwrap()
686            .remove_child(table_id);
687    }
688
689    /// Return all constraint records currently cached for the table.
690    pub fn constraint_records(&self, table_id: TableId) -> LlkvResult<Vec<ConstraintRecord>> {
691        self.ensure_table_state(table_id)?;
692        let tables = self.tables.read().unwrap();
693        let state = tables.get(&table_id).unwrap();
694        Ok(state.current.constraints.values().cloned().collect())
695    }
696
697    /// Return the map of constraint names keyed by identifier for the table.
698    pub fn constraint_names(
699        &self,
700        table_id: TableId,
701    ) -> LlkvResult<FxHashMap<ConstraintId, String>> {
702        self.ensure_table_state(table_id)?;
703        let tables = self.tables.read().unwrap();
704        let state = tables.get(&table_id).unwrap();
705        Ok(state.current.constraint_names.clone())
706    }
707
708    /// Fetch a subset of constraint records by their identifiers.
709    pub fn constraint_records_by_id(
710        &self,
711        table_id: TableId,
712        constraint_ids: &[ConstraintId],
713    ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
714        self.ensure_table_state(table_id)?;
715        let tables = self.tables.read().unwrap();
716        let state = tables.get(&table_id).unwrap();
717        Ok(constraint_ids
718            .iter()
719            .map(|constraint_id| state.current.constraints.get(constraint_id).cloned())
720            .collect())
721    }
722
723    /// Upsert constraint records in the in-memory snapshot.
724    pub fn put_constraint_records(
725        &self,
726        table_id: TableId,
727        records: &[ConstraintRecord],
728    ) -> LlkvResult<()> {
729        self.ensure_table_state(table_id)?;
730        let mut tables = self.tables.write().unwrap();
731        let state = tables.get_mut(&table_id).unwrap();
732        for record in records {
733            state
734                .current
735                .constraints
736                .insert(record.constraint_id, record.clone());
737        }
738        drop(tables);
739        self.refresh_referencing_index_for_table(table_id);
740        Ok(())
741    }
742
743    /// Upsert constraint names in the in-memory snapshot.
744    pub fn put_constraint_names(
745        &self,
746        table_id: TableId,
747        names: &[(ConstraintId, Option<String>)],
748    ) -> LlkvResult<()> {
749        if names.is_empty() {
750            return Ok(());
751        }
752        self.ensure_table_state(table_id)?;
753        let mut tables = self.tables.write().unwrap();
754        if let Some(state) = tables.get_mut(&table_id) {
755            for (constraint_id, name) in names {
756                if let Some(name) = name {
757                    state
758                        .current
759                        .constraint_names
760                        .insert(*constraint_id, name.clone());
761                } else {
762                    state.current.constraint_names.remove(constraint_id);
763                }
764            }
765        }
766        Ok(())
767    }
768
769    /// Produce a map of constraint records keyed by identifier.
770    pub fn constraint_record_map(
771        &self,
772        table_id: TableId,
773    ) -> LlkvResult<FxHashMap<ConstraintId, ConstraintRecord>> {
774        self.ensure_table_state(table_id)?;
775        let tables = self.tables.read().unwrap();
776        let state = tables.get(&table_id).unwrap();
777        Ok(state.current.constraints.clone())
778    }
779
780    /// Persist changes for a single table to the underlying catalog, writing only the diffs.
781    pub fn flush_table(&self, table_id: TableId) -> LlkvResult<()> {
782        self.ensure_table_state(table_id)?;
783        let mut tables = self.tables.write().unwrap();
784        let state = tables.get_mut(&table_id).unwrap();
785
786        let catalog = SysCatalog::new(&self.store);
787
788        match (
789            state.current.table_meta.as_ref(),
790            state.persisted.table_meta.as_ref(),
791        ) {
792            (Some(meta), Some(existing)) if meta != existing => {
793                catalog.put_table_meta(meta);
794                state.persisted.table_meta = Some(meta.clone());
795            }
796            (Some(meta), None) => {
797                catalog.put_table_meta(meta);
798                state.persisted.table_meta = Some(meta.clone());
799            }
800            (None, Some(_)) => {
801                catalog.delete_table_meta(table_id)?;
802                state.persisted.table_meta = None;
803            }
804            _ => {}
805        }
806
807        let mut dirty_columns: Vec<(FieldId, ColMeta)> = Vec::new();
808        for (field_id, meta) in &state.current.column_metas {
809            match state.persisted.column_metas.get(field_id) {
810                Some(existing) if existing == meta => {}
811                _ => dirty_columns.push((*field_id, meta.clone())),
812            }
813        }
814        for (field_id, meta) in dirty_columns.iter() {
815            catalog.put_col_meta(table_id, meta);
816            state.persisted.column_metas.insert(*field_id, meta.clone());
817        }
818
819        let removed_columns: Vec<FieldId> = state
820            .persisted
821            .column_metas
822            .keys()
823            .copied()
824            .filter(|field_id| !state.current.column_metas.contains_key(field_id))
825            .collect();
826        if !removed_columns.is_empty() {
827            catalog.delete_col_meta(table_id, &removed_columns)?;
828            for field_id in removed_columns {
829                state.persisted.column_metas.remove(&field_id);
830            }
831        }
832
833        let mut dirty_constraints: Vec<ConstraintRecord> = Vec::new();
834        for (constraint_id, record) in &state.current.constraints {
835            match state.persisted.constraints.get(constraint_id) {
836                Some(existing) if existing == record => {}
837                _ => dirty_constraints.push(record.clone()),
838            }
839        }
840        if !dirty_constraints.is_empty() {
841            catalog.put_constraint_records(table_id, &dirty_constraints)?;
842            for record in dirty_constraints {
843                state
844                    .persisted
845                    .constraints
846                    .insert(record.constraint_id, record);
847            }
848        }
849
850        let removed_constraints: Vec<ConstraintId> = state
851            .persisted
852            .constraints
853            .keys()
854            .copied()
855            .filter(|constraint_id| !state.current.constraints.contains_key(constraint_id))
856            .collect();
857        if !removed_constraints.is_empty() {
858            catalog.delete_constraint_records(table_id, &removed_constraints)?;
859            for constraint_id in removed_constraints {
860                state.persisted.constraints.remove(&constraint_id);
861            }
862        }
863
864        let mut dirty_constraint_names: Vec<(ConstraintId, String)> = Vec::new();
865        for (constraint_id, name) in &state.current.constraint_names {
866            match state.persisted.constraint_names.get(constraint_id) {
867                Some(existing) if existing == name => {}
868                _ => dirty_constraint_names.push((*constraint_id, name.clone())),
869            }
870        }
871        if !dirty_constraint_names.is_empty() {
872            let records: Vec<ConstraintNameRecord> = dirty_constraint_names
873                .iter()
874                .map(|(constraint_id, name)| ConstraintNameRecord {
875                    constraint_id: *constraint_id,
876                    name: Some(name.clone()),
877                })
878                .collect();
879            catalog.put_constraint_names(table_id, &records)?;
880            for (constraint_id, name) in dirty_constraint_names {
881                state.persisted.constraint_names.insert(constraint_id, name);
882            }
883        }
884
885        let removed_constraint_names: Vec<ConstraintId> = state
886            .persisted
887            .constraint_names
888            .keys()
889            .copied()
890            .filter(|constraint_id| !state.current.constraint_names.contains_key(constraint_id))
891            .collect();
892        if !removed_constraint_names.is_empty() {
893            catalog.delete_constraint_names(table_id, &removed_constraint_names)?;
894            for constraint_id in removed_constraint_names {
895                state.persisted.constraint_names.remove(&constraint_id);
896            }
897        }
898
899        // Flush all multi-column indexes (both unique and non-unique) to the catalog
900        if state.current.multi_column_indexes != state.persisted.multi_column_indexes {
901            if state.current.multi_column_indexes.is_empty() {
902                catalog.delete_multi_column_indexes(table_id)?;
903            } else {
904                let mut entries: Vec<MultiColumnIndexEntryMeta> = state
905                    .current
906                    .multi_column_indexes
907                    .values()
908                    .cloned()
909                    .collect();
910                entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
911                catalog.put_multi_column_indexes(table_id, &entries)?;
912            }
913            state.persisted.multi_column_indexes = state.current.multi_column_indexes.clone();
914        }
915
916        if state.current.single_indexes != state.persisted.single_indexes {
917            if state.current.single_indexes.is_empty() {
918                catalog.delete_single_column_indexes(table_id)?;
919                state.persisted.single_indexes.clear();
920            } else {
921                let mut entries: Vec<SingleColumnIndexEntryMeta> = state
922                    .current
923                    .single_indexes
924                    .values()
925                    .cloned()
926                    .map(|entry| SingleColumnIndexEntryMeta {
927                        index_name: entry.index_name,
928                        canonical_name: entry.canonical_name,
929                        column_id: entry.column_id,
930                        column_name: entry.column_name,
931                        unique: entry.unique,
932                        ascending: entry.ascending,
933                        nulls_first: entry.nulls_first,
934                    })
935                    .collect();
936                entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
937                catalog.put_single_column_indexes(table_id, &entries)?;
938                state.persisted.single_indexes = state.current.single_indexes.clone();
939            }
940        }
941
942        if state.current.triggers != state.persisted.triggers {
943            if state.current.triggers.is_empty() {
944                if !state.persisted.triggers.is_empty() {
945                    catalog.delete_triggers(table_id)?;
946                }
947                state.persisted.triggers.clear();
948            } else {
949                let mut entries: Vec<TriggerEntryMeta> =
950                    state.current.triggers.values().cloned().collect();
951                entries.sort_by(|a, b| a.canonical_name.cmp(&b.canonical_name));
952                catalog.put_triggers(table_id, &entries)?;
953                state.persisted.triggers = state.current.triggers.clone();
954            }
955        }
956
957        let sort_adds: Vec<FieldId> = state
958            .current
959            .sort_indexes
960            .iter()
961            .copied()
962            .filter(|field_id| !state.persisted.sort_indexes.contains(field_id))
963            .collect();
964        let sort_removes: Vec<FieldId> = state
965            .persisted
966            .sort_indexes
967            .iter()
968            .copied()
969            .filter(|field_id| !state.current.sort_indexes.contains(field_id))
970            .collect();
971        if !sort_adds.is_empty() || !sort_removes.is_empty() {
972            let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
973            for field_id in &sort_adds {
974                table.register_sort_index(*field_id)?;
975                state.persisted.sort_indexes.insert(*field_id);
976            }
977            for field_id in &sort_removes {
978                table.unregister_sort_index(*field_id)?;
979                state.persisted.sort_indexes.remove(field_id);
980            }
981        }
982
983        Ok(())
984    }
985
986    /// Persist changes for all tracked tables.
987    pub fn flush_all(&self) -> LlkvResult<()> {
988        let table_ids: Vec<TableId> = {
989            let tables = self.tables.read().unwrap();
990            tables.keys().copied().collect()
991        };
992        for table_id in table_ids {
993            self.flush_table(table_id)?;
994        }
995        Ok(())
996    }
997
998    /// Return all persisted table metadata.
999    pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
1000        let catalog = SysCatalog::new(&self.store);
1001        catalog.all_table_metas()
1002    }
1003
1004    /// Return all persisted multi-column unique metadata.
1005    pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnIndexMeta>> {
1006        let catalog = SysCatalog::new(&self.store);
1007        let all = catalog.all_multi_column_index_metas()?;
1008        // Filter to unique indexes only
1009        Ok(all
1010            .into_iter()
1011            .map(|mut meta| {
1012                meta.indexes.retain(|idx| idx.unique);
1013                meta
1014            })
1015            .filter(|meta| !meta.indexes.is_empty())
1016            .collect())
1017    }
1018
1019    /// Assemble foreign key descriptors for the table using cached metadata.
1020    pub fn foreign_key_descriptors(
1021        &self,
1022        table_id: TableId,
1023    ) -> LlkvResult<Vec<ForeignKeyDescriptor>> {
1024        let records = self.constraint_records(table_id)?;
1025        let mut descriptors = Vec::new();
1026
1027        for record in records {
1028            if !record.is_active() {
1029                continue;
1030            }
1031
1032            let ConstraintKind::ForeignKey(fk) = record.kind else {
1033                continue;
1034            };
1035
1036            descriptors.push(ForeignKeyDescriptor {
1037                constraint_id: record.constraint_id,
1038                referencing_table_id: table_id,
1039                referencing_field_ids: fk.referencing_field_ids.clone(),
1040                referenced_table_id: fk.referenced_table,
1041                referenced_field_ids: fk.referenced_field_ids.clone(),
1042                on_delete: fk.on_delete,
1043                on_update: fk.on_update,
1044            });
1045        }
1046
1047        Ok(descriptors)
1048    }
1049
1050    /// Resolve foreign key descriptors into names suitable for runtime consumers.
1051    pub fn foreign_key_views(
1052        &self,
1053        catalog: &TableCatalog,
1054        table_id: TableId,
1055    ) -> LlkvResult<Vec<ForeignKeyView>> {
1056        let descriptors = self.foreign_key_descriptors(table_id)?;
1057
1058        if descriptors.is_empty() {
1059            return Ok(Vec::new());
1060        }
1061
1062        let (referencing_display, referencing_canonical) =
1063            resolve_table_name(catalog, self, table_id)?;
1064
1065        let mut details = Vec::with_capacity(descriptors.len());
1066        for descriptor in descriptors {
1067            let referenced_table_id = descriptor.referenced_table_id;
1068            let (referenced_display, referenced_canonical) =
1069                resolve_table_name(catalog, self, referenced_table_id)?;
1070
1071            let referencing_column_names =
1072                self.column_names(table_id, &descriptor.referencing_field_ids)?;
1073            let referenced_column_names =
1074                self.column_names(referenced_table_id, &descriptor.referenced_field_ids)?;
1075            let constraint_name = self.constraint_name_for(table_id, descriptor.constraint_id)?;
1076
1077            details.push(ForeignKeyView {
1078                constraint_id: descriptor.constraint_id,
1079                constraint_name,
1080                referencing_table_id: descriptor.referencing_table_id,
1081                referencing_table_display: referencing_display.clone(),
1082                referencing_table_canonical: referencing_canonical.clone(),
1083                referencing_field_ids: descriptor.referencing_field_ids.clone(),
1084                referencing_column_names,
1085                referenced_table_id,
1086                referenced_table_display: referenced_display.clone(),
1087                referenced_table_canonical: referenced_canonical.clone(),
1088                referenced_field_ids: descriptor.referenced_field_ids.clone(),
1089                referenced_column_names,
1090                on_delete: descriptor.on_delete,
1091                on_update: descriptor.on_update,
1092            });
1093        }
1094
1095        Ok(details)
1096    }
1097
1098    /// Assemble a consolidated read-only view of table metadata.
1099    pub fn table_view(
1100        &self,
1101        catalog: &TableCatalog,
1102        table_id: TableId,
1103        field_ids: &[FieldId],
1104    ) -> LlkvResult<TableView> {
1105        let table_meta = self.table_meta(table_id)?;
1106        let column_metas = self.column_metas(table_id, field_ids)?;
1107        let constraint_records = self.constraint_records(table_id)?;
1108        let multi_column_uniques = self.multi_column_uniques(table_id)?;
1109        let foreign_keys = self.foreign_key_views(catalog, table_id)?;
1110
1111        Ok(TableView {
1112            table_meta,
1113            column_metas,
1114            constraint_records,
1115            multi_column_uniques,
1116            foreign_keys,
1117        })
1118    }
1119
1120    /// Validate foreign key specifications and persist them for the referencing table.
1121    pub fn validate_and_register_foreign_keys<F>(
1122        &self,
1123        referencing_table: &ForeignKeyTableInfo,
1124        specs: &[ForeignKeySpec],
1125        lookup_table: F,
1126        timestamp_micros: u64,
1127    ) -> LlkvResult<Vec<ValidatedForeignKey>>
1128    where
1129        F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
1130    {
1131        let validated = validate_foreign_keys(referencing_table, specs, lookup_table)?;
1132        self.register_foreign_keys(referencing_table.table_id, &validated, timestamp_micros)?;
1133        Ok(validated)
1134    }
1135
1136    /// Register validated foreign key definitions for a table.
1137    pub fn register_foreign_keys(
1138        &self,
1139        table_id: TableId,
1140        foreign_keys: &[ValidatedForeignKey],
1141        timestamp_micros: u64,
1142    ) -> LlkvResult<()> {
1143        if foreign_keys.is_empty() {
1144            return Ok(());
1145        }
1146
1147        let existing_constraints = self.constraint_record_map(table_id)?;
1148        let mut next_constraint_id = existing_constraints
1149            .keys()
1150            .copied()
1151            .max()
1152            .unwrap_or(0)
1153            .saturating_add(1);
1154
1155        let mut constraint_records = Vec::with_capacity(foreign_keys.len());
1156        let mut constraint_names: Vec<(ConstraintId, Option<String>)> =
1157            Vec::with_capacity(foreign_keys.len());
1158
1159        for fk in foreign_keys {
1160            let constraint_id = next_constraint_id;
1161            constraint_records.push(ConstraintRecord {
1162                constraint_id,
1163                kind: ConstraintKind::ForeignKey(ForeignKeyConstraint {
1164                    referencing_field_ids: fk.referencing_field_ids.clone(),
1165                    referenced_table: fk.referenced_table_id,
1166                    referenced_field_ids: fk.referenced_field_ids.clone(),
1167                    on_delete: fk.on_delete,
1168                    on_update: fk.on_update,
1169                }),
1170                state: ConstraintState::Active,
1171                revision: 1,
1172                last_modified_micros: timestamp_micros,
1173            });
1174            constraint_names.push((constraint_id, fk.name.clone()));
1175            next_constraint_id = next_constraint_id.saturating_add(1);
1176        }
1177
1178        self.put_constraint_records(table_id, &constraint_records)?;
1179        self.put_constraint_names(table_id, &constraint_names)?;
1180        self.flush_table(table_id)?;
1181
1182        Ok(())
1183    }
1184
1185    /// Register column metadata, physical storage columns, and primary/unique constraints.
1186    pub fn apply_column_definitions(
1187        &self,
1188        table_id: TableId,
1189        columns: &[TableColumn],
1190        timestamp_micros: u64,
1191    ) -> LlkvResult<()> {
1192        if columns.is_empty() {
1193            return Ok(());
1194        }
1195
1196        self.ensure_table_state(table_id)?;
1197
1198        for column in columns {
1199            let column_meta = ColMeta {
1200                col_id: column.field_id,
1201                name: Some(column.name.clone()),
1202                flags: 0,
1203                default: None,
1204            };
1205            self.set_column_meta(table_id, column_meta)?;
1206        }
1207
1208        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1209        let store = table.store();
1210
1211        for column in columns {
1212            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1213            store.ensure_column_registered(logical_field_id, &column.data_type)?;
1214            store.data_type(logical_field_id)?;
1215        }
1216
1217        let created_by_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
1218        store.ensure_column_registered(created_by_lfid, &DataType::UInt64)?;
1219
1220        let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
1221        store.ensure_column_registered(deleted_by_lfid, &DataType::UInt64)?;
1222
1223        let existing = self.constraint_record_map(table_id)?;
1224        let mut next_constraint_id = existing
1225            .keys()
1226            .copied()
1227            .max()
1228            .unwrap_or(0)
1229            .saturating_add(1);
1230
1231        let mut constraints = Vec::new();
1232
1233        let primary_key_fields: Vec<FieldId> = columns
1234            .iter()
1235            .filter(|col| col.primary_key)
1236            .map(|col| col.field_id)
1237            .collect();
1238        if !primary_key_fields.is_empty() {
1239            constraints.push(ConstraintRecord {
1240                constraint_id: next_constraint_id,
1241                kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1242                    field_ids: primary_key_fields,
1243                }),
1244                state: ConstraintState::Active,
1245                revision: 1,
1246                last_modified_micros: timestamp_micros,
1247            });
1248            next_constraint_id = next_constraint_id.saturating_add(1);
1249        }
1250
1251        for column in columns.iter().filter(|col| col.unique && !col.primary_key) {
1252            constraints.push(ConstraintRecord {
1253                constraint_id: next_constraint_id,
1254                kind: ConstraintKind::Unique(UniqueConstraint {
1255                    field_ids: vec![column.field_id],
1256                }),
1257                state: ConstraintState::Active,
1258                revision: 1,
1259                last_modified_micros: timestamp_micros,
1260            });
1261            next_constraint_id = next_constraint_id.saturating_add(1);
1262        }
1263
1264        if !constraints.is_empty() {
1265            self.put_constraint_records(table_id, &constraints)?;
1266        }
1267
1268        Ok(())
1269    }
1270
1271    pub fn column_data_type(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<DataType> {
1272        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1273        let store = table.store();
1274        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
1275        store.data_type(logical_field_id)
1276    }
1277
1278    /// Register a multi-column UNIQUE definition for a table.
1279    pub fn register_multi_column_unique(
1280        &self,
1281        table_id: TableId,
1282        column_ids: &[FieldId],
1283        index_name: Option<String>,
1284    ) -> LlkvResult<MultiColumnUniqueRegistration> {
1285        let mut created = false;
1286        let mut existing_name: Option<Option<String>> = None;
1287        let column_vec: Vec<FieldId> = column_ids.to_vec();
1288
1289        // Generate canonical name from column IDs
1290        let canonical_name = format!(
1291            "__unique_{}_{}",
1292            table_id,
1293            column_vec
1294                .iter()
1295                .map(|id| id.to_string())
1296                .collect::<Vec<_>>()
1297                .join("_")
1298        );
1299
1300        self.update_multi_column_uniques(table_id, |entries| {
1301            if let Some(existing) = entries.iter().find(|entry| entry.column_ids == column_vec) {
1302                existing_name = Some(existing.index_name.clone());
1303            } else {
1304                entries.push(MultiColumnIndexEntryMeta {
1305                    index_name: index_name.clone(),
1306                    canonical_name: canonical_name.clone(),
1307                    column_ids: column_vec.clone(),
1308                    unique: true,
1309                });
1310                created = true;
1311            }
1312        })?;
1313
1314        if created {
1315            Ok(MultiColumnUniqueRegistration::Created)
1316        } else {
1317            Ok(MultiColumnUniqueRegistration::AlreadyExists {
1318                index_name: existing_name.unwrap_or(None),
1319            })
1320        }
1321    }
1322
1323    fn column_names(&self, table_id: TableId, field_ids: &[FieldId]) -> LlkvResult<Vec<String>> {
1324        if field_ids.is_empty() {
1325            return Ok(Vec::new());
1326        }
1327
1328        let metas = self.column_metas(table_id, field_ids)?;
1329        let mut names = Vec::with_capacity(field_ids.len());
1330        for (idx, field_id) in field_ids.iter().enumerate() {
1331            let name = metas
1332                .get(idx)
1333                .and_then(|meta| meta.as_ref())
1334                .and_then(|meta| meta.name.clone())
1335                .unwrap_or_else(|| format!("col_{}", field_id));
1336            names.push(name);
1337        }
1338        Ok(names)
1339    }
1340
1341    /// Reserve and return the next available table id.
1342    pub fn reserve_table_id(&self) -> LlkvResult<TableId> {
1343        let catalog = SysCatalog::new(&self.store);
1344
1345        let mut next = match catalog.get_next_table_id()? {
1346            Some(value) => value,
1347            None => {
1348                let seed = catalog
1349                    .max_table_id()?
1350                    .unwrap_or(reserved::CATALOG_TABLE_ID);
1351                let initial = seed.checked_add(1).ok_or_else(|| {
1352                    Error::InvalidArgumentError("exhausted available table ids".into())
1353                })?;
1354                catalog.put_next_table_id(initial)?;
1355                initial
1356            }
1357        };
1358
1359        while reserved::is_reserved_table_id(next) {
1360            next = next.checked_add(1).ok_or_else(|| {
1361                Error::InvalidArgumentError("exhausted available table ids".into())
1362            })?;
1363        }
1364
1365        let mut following = next
1366            .checked_add(1)
1367            .ok_or_else(|| Error::InvalidArgumentError("exhausted available table ids".into()))?;
1368
1369        while reserved::is_reserved_table_id(following) {
1370            following = following.checked_add(1).ok_or_else(|| {
1371                Error::InvalidArgumentError("exhausted available table ids".into())
1372            })?;
1373        }
1374
1375        catalog.put_next_table_id(following)?;
1376        Ok(next)
1377    }
1378
1379    /// Ensure the catalog's next_table_id counter is at least `minimum`.
1380    ///
1381    /// This is primarily used by in-memory namespaces (e.g., temporary tables)
1382    /// that share a catalog handle with persistent storage. By seeding their
1383    /// own metadata store with a higher starting point we avoid collisions on
1384    /// table IDs already allocated by the persistent catalog.
1385    pub fn ensure_next_table_id_at_least(&self, minimum: TableId) -> LlkvResult<()> {
1386        if reserved::is_reserved_table_id(minimum) {
1387            return Err(Error::InvalidArgumentError(
1388                reserved::reserved_table_id_message(minimum),
1389            ));
1390        }
1391
1392        let catalog = SysCatalog::new(&self.store);
1393        match catalog.get_next_table_id()? {
1394            Some(current) if current >= minimum => Ok(()),
1395            _ => catalog.put_next_table_id(minimum),
1396        }
1397    }
1398
1399    /// Check if a field has a sort index in the underlying store.
1400    ///
1401    /// Note: Creates a temporary Table instance to access index metadata.
1402    /// This is acceptable since Table::from_id_and_store is lightweight (just wraps
1403    /// table_id + `Arc<ColumnStore>`) and this method is only called during index
1404    /// registration/unregistration, not in query hot paths.
1405    fn field_has_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<bool> {
1406        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1407        let indexes = table.list_registered_indexes(field_id)?;
1408        Ok(indexes.contains(&IndexKind::Sort))
1409    }
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414    use super::*;
1415    use crate::constraints::{ConstraintKind, ConstraintState, PrimaryKeyConstraint};
1416    use crate::{MultiColumnIndexEntryMeta, Table};
1417    use llkv_column_map::ColumnStore;
1418    use llkv_column_map::store::IndexKind;
1419    use llkv_storage::pager::MemPager;
1420    use std::sync::Arc;
1421
1422    #[test]
1423    fn metadata_manager_persists_and_loads() {
1424        let pager = Arc::new(MemPager::default());
1425        let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1426        let manager = MetadataManager::new(Arc::clone(&store));
1427
1428        let table_id: TableId = 42;
1429        let table_meta = TableMeta {
1430            table_id,
1431            name: Some("users".into()),
1432            created_at_micros: 123,
1433            flags: 0,
1434            epoch: 1,
1435            view_definition: None,
1436        };
1437        manager
1438            .set_table_meta(table_id, table_meta.clone())
1439            .unwrap();
1440
1441        {
1442            let tables = manager.tables.read().unwrap();
1443            let state = tables.get(&table_id).unwrap();
1444            assert!(state.current.table_meta.is_some());
1445        }
1446
1447        let column_meta = ColMeta {
1448            col_id: 1,
1449            name: Some("id".into()),
1450            flags: 0,
1451            default: None,
1452        };
1453        manager
1454            .set_column_meta(table_id, column_meta.clone())
1455            .unwrap();
1456
1457        let logical_field_id = llkv_types::LogicalFieldId::for_user(table_id, column_meta.col_id);
1458        store
1459            .ensure_column_registered(logical_field_id, &arrow::datatypes::DataType::Utf8)
1460            .unwrap();
1461
1462        manager
1463            .register_sort_index(table_id, column_meta.col_id)
1464            .unwrap();
1465
1466        let constraint = ConstraintRecord {
1467            constraint_id: 7,
1468            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1469                field_ids: vec![column_meta.col_id],
1470            }),
1471            state: ConstraintState::Active,
1472            revision: 1,
1473            last_modified_micros: 456,
1474        };
1475        manager
1476            .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1477            .unwrap();
1478
1479        let multi_unique = MultiColumnIndexEntryMeta {
1480            index_name: Some("uniq_users_name".into()),
1481            canonical_name: "uniq_users_name".to_lowercase(),
1482            column_ids: vec![column_meta.col_id],
1483            unique: true,
1484        };
1485        manager
1486            .set_multi_column_uniques(table_id, vec![multi_unique.clone()])
1487            .unwrap();
1488
1489        assert_eq!(
1490            manager.table_meta(table_id).unwrap(),
1491            Some(table_meta.clone())
1492        );
1493
1494        manager.flush_table(table_id).unwrap();
1495
1496        let table = Table::from_id_and_store(table_id, Arc::clone(&store)).unwrap();
1497        let indexes = table.list_registered_indexes(column_meta.col_id).unwrap();
1498        assert!(indexes.contains(&IndexKind::Sort));
1499
1500        let verify_catalog = SysCatalog::new(&store);
1501        let column_roundtrip = verify_catalog.get_cols_meta(table_id, &[column_meta.col_id]);
1502        assert_eq!(column_roundtrip[0].as_ref(), Some(&column_meta));
1503        let constraints = verify_catalog
1504            .constraint_records_for_table(table_id)
1505            .unwrap();
1506        assert_eq!(constraints, vec![constraint.clone()]);
1507        let unique_roundtrip = verify_catalog.get_multi_column_indexes(table_id).unwrap();
1508        assert_eq!(unique_roundtrip, vec![multi_unique.clone()]);
1509
1510        let meta_from_cache = manager.table_meta(table_id).unwrap();
1511        assert_eq!(meta_from_cache, Some(table_meta.clone()));
1512
1513        let columns_from_cache = manager
1514            .column_metas(table_id, &[column_meta.col_id])
1515            .unwrap();
1516        assert_eq!(columns_from_cache[0].as_ref(), Some(&column_meta));
1517
1518        let constraints_from_cache = manager.constraint_records(table_id).unwrap();
1519        assert_eq!(constraints_from_cache, vec![constraint.clone()]);
1520
1521        let uniques_from_cache = manager.multi_column_uniques(table_id).unwrap();
1522        assert_eq!(uniques_from_cache, vec![multi_unique]);
1523
1524        // No additional writes should occur on subsequent flushes without modifications.
1525        manager.flush_table(table_id).unwrap();
1526    }
1527
1528    #[test]
1529    fn metadata_manager_lazy_loads_columns_and_constraints() {
1530        let pager = Arc::new(MemPager::default());
1531        let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1532        let manager = MetadataManager::new(Arc::clone(&store));
1533
1534        let table_id: TableId = 99;
1535        let column_meta = ColMeta {
1536            col_id: 3,
1537            name: Some("value".into()),
1538            flags: 0,
1539            default: None,
1540        };
1541        let initial_catalog = SysCatalog::new(&store);
1542        initial_catalog.put_col_meta(table_id, &column_meta);
1543
1544        let constraint = ConstraintRecord {
1545            constraint_id: 15,
1546            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1547                field_ids: vec![column_meta.col_id],
1548            }),
1549            state: ConstraintState::Active,
1550            revision: 1,
1551            last_modified_micros: 0,
1552        };
1553        initial_catalog
1554            .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1555            .unwrap();
1556        let multi_unique = MultiColumnIndexEntryMeta {
1557            index_name: Some("uniq_value".into()),
1558            canonical_name: "uniq_value".to_lowercase(),
1559            column_ids: vec![column_meta.col_id],
1560            unique: true,
1561        };
1562        initial_catalog
1563            .put_multi_column_indexes(table_id, std::slice::from_ref(&multi_unique))
1564            .unwrap();
1565
1566        let columns = manager
1567            .column_metas(table_id, &[column_meta.col_id])
1568            .unwrap();
1569        assert_eq!(columns[0].as_ref(), Some(&column_meta));
1570
1571        let constraints = manager.constraint_records(table_id).unwrap();
1572        assert_eq!(constraints, vec![constraint]);
1573
1574        let uniques = manager.multi_column_uniques(table_id).unwrap();
1575        assert_eq!(uniques, vec![multi_unique]);
1576    }
1577}
1578
1579/// Descriptor describing a foreign key constraint scoped to field identifiers.
1580#[derive(Clone, Debug)]
1581pub struct ForeignKeyDescriptor {
1582    pub constraint_id: ConstraintId,
1583    pub referencing_table_id: TableId,
1584    pub referencing_field_ids: Vec<FieldId>,
1585    pub referenced_table_id: TableId,
1586    pub referenced_field_ids: Vec<FieldId>,
1587    pub on_delete: ForeignKeyAction,
1588    pub on_update: ForeignKeyAction,
1589}
1590
1591/// Result of attempting to register a multi-column unique definition.
1592#[derive(Debug, Clone, PartialEq, Eq)]
1593pub enum MultiColumnUniqueRegistration {
1594    Created,
1595    AlreadyExists { index_name: Option<String> },
1596}