llkv_table/
metadata.rs

1//! Shared metadata manager that consolidates catalog I/O for tables, columns, and constraints.
2//!
3//! This module offers a single entry point for querying and mutating persisted metadata. It keeps
4//! an in-memory snapshot per table, performs diff-aware persistence, and always uses batch catalog
5//! operations to minimise I/O.
6
7#![forbid(unsafe_code)]
8
9use crate::catalog::TableCatalog;
10use crate::constraints::{
11    ConstraintId, ConstraintKind, ConstraintRecord, ConstraintState, ForeignKeyAction,
12    ForeignKeyConstraint, PrimaryKeyConstraint, UniqueConstraint,
13};
14use crate::constraints::{ForeignKeyTableInfo, ValidatedForeignKey, validate_foreign_keys};
15use crate::reserved;
16use crate::resolvers::resolve_table_name;
17use crate::sys_catalog::{ConstraintNameRecord, SysCatalog};
18use crate::table::Table;
19use crate::types::{FieldId, TableColumn, TableId};
20use crate::view::{ForeignKeyView, TableView};
21use crate::{ColMeta, MultiColumnUniqueEntryMeta, TableMeta, TableMultiColumnUniqueMeta};
22use arrow::datatypes::DataType;
23use llkv_column_map::ColumnStore;
24use llkv_column_map::store::IndexKind;
25use llkv_column_map::types::LogicalFieldId;
26use llkv_plan::ForeignKeySpec;
27use llkv_result::{Error, Result as LlkvResult};
28use llkv_storage::pager::Pager;
29use rustc_hash::{FxHashMap, FxHashSet};
30use simd_r_drive_entry_handle::EntryHandle;
31use std::sync::{Arc, RwLock};
32
33#[derive(Clone, Debug, Default)]
34struct TableSnapshot {
35    table_meta: Option<TableMeta>,
36    column_metas: FxHashMap<FieldId, ColMeta>,
37    constraints: FxHashMap<ConstraintId, ConstraintRecord>,
38    multi_uniques: Vec<MultiColumnUniqueEntryMeta>,
39    constraint_names: FxHashMap<ConstraintId, String>,
40    sort_indexes: FxHashSet<FieldId>,
41}
42
43impl TableSnapshot {
44    fn new(
45        table_meta: Option<TableMeta>,
46        column_metas: FxHashMap<FieldId, ColMeta>,
47        constraints: FxHashMap<ConstraintId, ConstraintRecord>,
48        multi_uniques: Vec<MultiColumnUniqueEntryMeta>,
49        constraint_names: FxHashMap<ConstraintId, String>,
50    ) -> Self {
51        Self {
52            table_meta,
53            column_metas,
54            constraints,
55            multi_uniques,
56            constraint_names,
57            sort_indexes: FxHashSet::default(),
58        }
59    }
60}
61
62#[derive(Clone, Debug)]
63struct TableState {
64    current: TableSnapshot,
65    persisted: TableSnapshot,
66}
67
68impl TableState {
69    fn from_snapshot(snapshot: TableSnapshot) -> Self {
70        Self {
71            current: snapshot.clone(),
72            persisted: snapshot,
73        }
74    }
75}
76
77#[derive(Default)]
78struct ReferencingIndex {
79    parent_to_children: FxHashMap<TableId, FxHashSet<(TableId, ConstraintId)>>,
80    child_to_parents: FxHashMap<TableId, FxHashSet<TableId>>,
81    initialized: bool,
82}
83
84impl ReferencingIndex {
85    fn remove_child(&mut self, child_id: TableId) {
86        if let Some(parents) = self.child_to_parents.remove(&child_id) {
87            for parent_id in parents {
88                if let Some(children) = self.parent_to_children.get_mut(&parent_id) {
89                    children.retain(|(entry_child, _)| *entry_child != child_id);
90                    if children.is_empty() {
91                        self.parent_to_children.remove(&parent_id);
92                    }
93                }
94            }
95        }
96    }
97
98    fn insert(&mut self, parent_id: TableId, child_id: TableId, constraint_id: ConstraintId) {
99        self.parent_to_children
100            .entry(parent_id)
101            .or_default()
102            .insert((child_id, constraint_id));
103        self.child_to_parents
104            .entry(child_id)
105            .or_default()
106            .insert(parent_id);
107        self.initialized = true;
108    }
109
110    fn children(&self, parent_id: TableId) -> Vec<(TableId, ConstraintId)> {
111        self.parent_to_children
112            .get(&parent_id)
113            .map(|set| set.iter().cloned().collect())
114            .unwrap_or_default()
115    }
116
117    fn mark_initialized(&mut self) {
118        self.initialized = true;
119    }
120
121    fn is_initialized(&self) -> bool {
122        self.initialized
123    }
124}
125
126/// Central metadata facade that hides the raw catalog implementation details.
127pub struct MetadataManager<P>
128where
129    P: Pager<Blob = EntryHandle> + Send + Sync,
130{
131    store: Arc<ColumnStore<P>>,
132    tables: RwLock<FxHashMap<TableId, TableState>>,
133    referencing_index: RwLock<ReferencingIndex>,
134}
135
136impl<P> MetadataManager<P>
137where
138    P: Pager<Blob = EntryHandle> + Send + Sync,
139{
140    /// Create a new metadata manager backed by the provided column store.
141    pub fn new(store: Arc<ColumnStore<P>>) -> Self {
142        Self {
143            store,
144            tables: RwLock::new(FxHashMap::default()),
145            referencing_index: RwLock::new(ReferencingIndex::default()),
146        }
147    }
148
149    /// Load metadata for a table from the catalog if not already cached.
150    fn ensure_table_state(&self, table_id: TableId) -> LlkvResult<()> {
151        if self.tables.read().unwrap().contains_key(&table_id) {
152            return Ok(());
153        }
154        let state = self.load_table_state(table_id)?;
155        {
156            let mut tables = self.tables.write().unwrap();
157            tables.entry(table_id).or_insert(state);
158        }
159        self.refresh_referencing_index_for_table(table_id);
160        Ok(())
161    }
162
163    fn load_table_state(&self, table_id: TableId) -> LlkvResult<TableState> {
164        let catalog = SysCatalog::new(&self.store);
165        let table_meta = catalog.get_table_meta(table_id);
166        let constraint_records = catalog.constraint_records_for_table(table_id)?;
167        let constraint_ids: Vec<ConstraintId> = constraint_records
168            .iter()
169            .map(|record| record.constraint_id)
170            .collect();
171        let constraint_name_entries = if constraint_ids.is_empty() {
172            Vec::new()
173        } else {
174            catalog.get_constraint_names(table_id, &constraint_ids)?
175        };
176        let multi_uniques = catalog.get_multi_column_uniques(table_id)?;
177        let mut constraints = FxHashMap::default();
178        let mut constraint_names = FxHashMap::default();
179        for (record, name) in constraint_records
180            .into_iter()
181            .zip(constraint_name_entries.into_iter())
182        {
183            if let Some(name) = name {
184                constraint_names.insert(record.constraint_id, name);
185            }
186            constraints.insert(record.constraint_id, record);
187        }
188        let snapshot = TableSnapshot::new(
189            table_meta,
190            FxHashMap::default(),
191            constraints,
192            multi_uniques,
193            constraint_names,
194        );
195        Ok(TableState::from_snapshot(snapshot))
196    }
197
198    fn refresh_referencing_index_for_table(&self, table_id: TableId) {
199        let foreign_keys: Vec<(TableId, ConstraintId)> = {
200            let tables = self.tables.read().unwrap();
201            match tables.get(&table_id) {
202                Some(state) => state
203                    .current
204                    .constraints
205                    .iter()
206                    .filter(|(_, record)| record.is_active())
207                    .filter_map(|(constraint_id, record)| {
208                        if let ConstraintKind::ForeignKey(fk) = &record.kind {
209                            Some((fk.referenced_table, *constraint_id))
210                        } else {
211                            None
212                        }
213                    })
214                    .collect(),
215                None => Vec::new(),
216            }
217        };
218
219        let mut index = self.referencing_index.write().unwrap();
220        index.remove_child(table_id);
221        for (parent_table, constraint_id) in foreign_keys {
222            index.insert(parent_table, table_id, constraint_id);
223        }
224    }
225
226    fn constraint_name_for(
227        &self,
228        table_id: TableId,
229        constraint_id: ConstraintId,
230    ) -> LlkvResult<Option<String>> {
231        self.ensure_table_state(table_id)?;
232        let tables = self.tables.read().unwrap();
233        let state = tables.get(&table_id).unwrap();
234        Ok(state.current.constraint_names.get(&constraint_id).cloned())
235    }
236
237    fn ensure_referencing_index_initialized(&self) -> LlkvResult<()> {
238        let needs_init = {
239            let index = self.referencing_index.read().unwrap();
240            !index.is_initialized()
241        };
242
243        if !needs_init {
244            return Ok(());
245        }
246
247        let metas = self.all_table_metas()?;
248        for (table_id, _) in metas {
249            self.ensure_table_state(table_id)?;
250            self.refresh_referencing_index_for_table(table_id);
251        }
252
253        let mut index = self.referencing_index.write().unwrap();
254        index.mark_initialized();
255        Ok(())
256    }
257
258    /// Retrieve the current table metadata snapshot (loaded lazily if required).
259    pub fn table_meta(&self, table_id: TableId) -> LlkvResult<Option<TableMeta>> {
260        self.ensure_table_state(table_id)?;
261        let tables = self.tables.read().unwrap();
262        Ok(tables
263            .get(&table_id)
264            .and_then(|state| state.current.table_meta.clone()))
265    }
266
267    /// Return the list of child table + constraint identifiers that reference the provided table.
268    pub fn foreign_keys_referencing(
269        &self,
270        referenced_table: TableId,
271    ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
272        self.ensure_referencing_index_initialized()?;
273        let index = self.referencing_index.read().unwrap();
274        Ok(index.children(referenced_table))
275    }
276
277    /// Update the in-memory table metadata. Changes are flushed on demand.
278    pub fn set_table_meta(&self, table_id: TableId, meta: TableMeta) -> LlkvResult<()> {
279        self.ensure_table_state(table_id)?;
280        let mut tables = self.tables.write().unwrap();
281        let state = tables.get_mut(&table_id).unwrap();
282        state.current.table_meta = Some(meta);
283        Ok(())
284    }
285
286    /// Fetch column metadata for the requested field identifiers, loading missing entries lazily.
287    pub fn column_metas(
288        &self,
289        table_id: TableId,
290        field_ids: &[FieldId],
291    ) -> LlkvResult<Vec<Option<ColMeta>>> {
292        self.ensure_table_state(table_id)?;
293
294        // Determine which columns still need to be loaded from the catalog.
295        let missing_ids = {
296            let tables = self.tables.read().unwrap();
297            let state = tables.get(&table_id).unwrap();
298            field_ids
299                .iter()
300                .copied()
301                .filter(|field_id| !state.current.column_metas.contains_key(field_id))
302                .collect::<Vec<_>>()
303        };
304
305        if !missing_ids.is_empty() {
306            let catalog = SysCatalog::new(&self.store);
307            let fetched = catalog.get_cols_meta(table_id, &missing_ids);
308            let mut tables = self.tables.write().unwrap();
309            let state = tables.get_mut(&table_id).unwrap();
310            for (idx, field_id) in missing_ids.iter().enumerate() {
311                if let Some(meta) = fetched[idx].clone() {
312                    state.current.column_metas.insert(*field_id, meta.clone());
313                    state
314                        .persisted
315                        .column_metas
316                        .entry(*field_id)
317                        .or_insert(meta);
318                }
319            }
320        }
321
322        let tables = self.tables.read().unwrap();
323        let state = tables.get(&table_id).unwrap();
324        Ok(field_ids
325            .iter()
326            .map(|field_id| state.current.column_metas.get(field_id).cloned())
327            .collect())
328    }
329
330    /// Upsert a single column metadata record in the in-memory snapshot.
331    pub fn set_column_meta(&self, table_id: TableId, meta: ColMeta) -> LlkvResult<()> {
332        self.ensure_table_state(table_id)?;
333        let mut tables = self.tables.write().unwrap();
334        let state = tables.get_mut(&table_id).unwrap();
335        state.current.column_metas.insert(meta.col_id, meta);
336        Ok(())
337    }
338
339    /// Return the multi-column UNIQUE definitions cached for the table.
340    pub fn multi_column_uniques(
341        &self,
342        table_id: TableId,
343    ) -> LlkvResult<Vec<MultiColumnUniqueEntryMeta>> {
344        self.ensure_table_state(table_id)?;
345        let tables = self.tables.read().unwrap();
346        let state = tables.get(&table_id).unwrap();
347        Ok(state.current.multi_uniques.clone())
348    }
349
350    /// Replace the cached multi-column UNIQUE definitions for the table.
351    pub fn set_multi_column_uniques(
352        &self,
353        table_id: TableId,
354        uniques: Vec<MultiColumnUniqueEntryMeta>,
355    ) -> LlkvResult<()> {
356        self.ensure_table_state(table_id)?;
357        let mut tables = self.tables.write().unwrap();
358        let state = tables.get_mut(&table_id).unwrap();
359        state.current.multi_uniques = uniques;
360        Ok(())
361    }
362
363    /// Register a sort index for a column at the metadata level, staging the change for the next flush.
364    pub fn register_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
365        self.ensure_table_state(table_id)?;
366
367        {
368            let mut tables = self.tables.write().unwrap();
369            let state = tables.get_mut(&table_id).unwrap();
370            if state.persisted.sort_indexes.contains(&field_id)
371                || state.current.sort_indexes.contains(&field_id)
372            {
373                state.current.sort_indexes.insert(field_id);
374                return Ok(());
375            }
376        }
377
378        if self.field_has_sort_index(table_id, field_id)? {
379            let mut tables = self.tables.write().unwrap();
380            let state = tables.get_mut(&table_id).unwrap();
381            state.persisted.sort_indexes.insert(field_id);
382            state.current.sort_indexes.insert(field_id);
383            return Ok(());
384        }
385
386        let mut tables = self.tables.write().unwrap();
387        let state = tables.get_mut(&table_id).unwrap();
388        state.current.sort_indexes.insert(field_id);
389        Ok(())
390    }
391
392    /// Unregister a sort index for a column, staging removal for the next flush.
393    pub fn unregister_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
394        self.ensure_table_state(table_id)?;
395
396        let mut tables = self.tables.write().unwrap();
397        let state = tables.get_mut(&table_id).unwrap();
398        state.current.sort_indexes.remove(&field_id);
399
400        if !state.persisted.sort_indexes.contains(&field_id) {
401            drop(tables);
402            if self.field_has_sort_index(table_id, field_id)? {
403                let mut tables = self.tables.write().unwrap();
404                let state = tables.get_mut(&table_id).unwrap();
405                state.persisted.sort_indexes.insert(field_id);
406            }
407        }
408
409        Ok(())
410    }
411
412    /// Mutate the cached multi-column UNIQUE definitions for a table in-place.
413    pub fn update_multi_column_uniques<F, T>(&self, table_id: TableId, f: F) -> LlkvResult<T>
414    where
415        F: FnOnce(&mut Vec<MultiColumnUniqueEntryMeta>) -> T,
416    {
417        self.ensure_table_state(table_id)?;
418        let mut tables = self.tables.write().unwrap();
419        let state = tables.get_mut(&table_id).unwrap();
420        let result = f(&mut state.current.multi_uniques);
421        Ok(result)
422    }
423
424    /// Prepare the metadata state for dropping a table by clearing cached entries.
425    ///
426    /// Column metadata is loaded eagerly for the provided field identifiers so deletions
427    /// are persisted on the next flush.
428    pub fn prepare_table_drop(&self, table_id: TableId, column_ids: &[FieldId]) -> LlkvResult<()> {
429        if !column_ids.is_empty() {
430            let _ = self.column_metas(table_id, column_ids)?;
431        } else {
432            self.ensure_table_state(table_id)?;
433        }
434
435        let mut tables = self.tables.write().unwrap();
436        if let Some(state) = tables.get_mut(&table_id) {
437            state.current.table_meta = None;
438            state.current.column_metas.clear();
439            state.current.constraints.clear();
440            state.current.multi_uniques.clear();
441            state.current.constraint_names.clear();
442            state.current.sort_indexes.clear();
443        }
444        drop(tables);
445        self.refresh_referencing_index_for_table(table_id);
446        Ok(())
447    }
448
449    /// Remove any cached snapshots for the specified table.
450    pub fn remove_table_state(&self, table_id: TableId) {
451        self.tables.write().unwrap().remove(&table_id);
452        self.referencing_index
453            .write()
454            .unwrap()
455            .remove_child(table_id);
456    }
457
458    /// Return all constraint records currently cached for the table.
459    pub fn constraint_records(&self, table_id: TableId) -> LlkvResult<Vec<ConstraintRecord>> {
460        self.ensure_table_state(table_id)?;
461        let tables = self.tables.read().unwrap();
462        let state = tables.get(&table_id).unwrap();
463        Ok(state.current.constraints.values().cloned().collect())
464    }
465
466    /// Fetch a subset of constraint records by their identifiers.
467    pub fn constraint_records_by_id(
468        &self,
469        table_id: TableId,
470        constraint_ids: &[ConstraintId],
471    ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
472        self.ensure_table_state(table_id)?;
473        let tables = self.tables.read().unwrap();
474        let state = tables.get(&table_id).unwrap();
475        Ok(constraint_ids
476            .iter()
477            .map(|constraint_id| state.current.constraints.get(constraint_id).cloned())
478            .collect())
479    }
480
481    /// Upsert constraint records in the in-memory snapshot.
482    pub fn put_constraint_records(
483        &self,
484        table_id: TableId,
485        records: &[ConstraintRecord],
486    ) -> LlkvResult<()> {
487        self.ensure_table_state(table_id)?;
488        let mut tables = self.tables.write().unwrap();
489        let state = tables.get_mut(&table_id).unwrap();
490        for record in records {
491            state
492                .current
493                .constraints
494                .insert(record.constraint_id, record.clone());
495        }
496        drop(tables);
497        self.refresh_referencing_index_for_table(table_id);
498        Ok(())
499    }
500
501    /// Upsert constraint names in the in-memory snapshot.
502    pub fn put_constraint_names(
503        &self,
504        table_id: TableId,
505        names: &[(ConstraintId, Option<String>)],
506    ) -> LlkvResult<()> {
507        if names.is_empty() {
508            return Ok(());
509        }
510        self.ensure_table_state(table_id)?;
511        let mut tables = self.tables.write().unwrap();
512        if let Some(state) = tables.get_mut(&table_id) {
513            for (constraint_id, name) in names {
514                if let Some(name) = name {
515                    state
516                        .current
517                        .constraint_names
518                        .insert(*constraint_id, name.clone());
519                } else {
520                    state.current.constraint_names.remove(constraint_id);
521                }
522            }
523        }
524        Ok(())
525    }
526
527    /// Produce a map of constraint records keyed by identifier.
528    pub fn constraint_record_map(
529        &self,
530        table_id: TableId,
531    ) -> LlkvResult<FxHashMap<ConstraintId, ConstraintRecord>> {
532        self.ensure_table_state(table_id)?;
533        let tables = self.tables.read().unwrap();
534        let state = tables.get(&table_id).unwrap();
535        Ok(state.current.constraints.clone())
536    }
537
538    /// Persist changes for a single table to the underlying catalog, writing only the diffs.
539    pub fn flush_table(&self, table_id: TableId) -> LlkvResult<()> {
540        self.ensure_table_state(table_id)?;
541        let mut tables = self.tables.write().unwrap();
542        let state = tables.get_mut(&table_id).unwrap();
543
544        let catalog = SysCatalog::new(&self.store);
545
546        match (
547            state.current.table_meta.as_ref(),
548            state.persisted.table_meta.as_ref(),
549        ) {
550            (Some(meta), Some(existing)) if meta != existing => {
551                catalog.put_table_meta(meta);
552                state.persisted.table_meta = Some(meta.clone());
553            }
554            (Some(meta), None) => {
555                catalog.put_table_meta(meta);
556                state.persisted.table_meta = Some(meta.clone());
557            }
558            (None, Some(_)) => {
559                catalog.delete_table_meta(table_id)?;
560                state.persisted.table_meta = None;
561            }
562            _ => {}
563        }
564
565        let mut dirty_columns: Vec<(FieldId, ColMeta)> = Vec::new();
566        for (field_id, meta) in &state.current.column_metas {
567            match state.persisted.column_metas.get(field_id) {
568                Some(existing) if existing == meta => {}
569                _ => dirty_columns.push((*field_id, meta.clone())),
570            }
571        }
572        for (field_id, meta) in dirty_columns.iter() {
573            catalog.put_col_meta(table_id, meta);
574            state.persisted.column_metas.insert(*field_id, meta.clone());
575        }
576
577        let removed_columns: Vec<FieldId> = state
578            .persisted
579            .column_metas
580            .keys()
581            .copied()
582            .filter(|field_id| !state.current.column_metas.contains_key(field_id))
583            .collect();
584        if !removed_columns.is_empty() {
585            catalog.delete_col_meta(table_id, &removed_columns)?;
586            for field_id in removed_columns {
587                state.persisted.column_metas.remove(&field_id);
588            }
589        }
590
591        let mut dirty_constraints: Vec<ConstraintRecord> = Vec::new();
592        for (constraint_id, record) in &state.current.constraints {
593            match state.persisted.constraints.get(constraint_id) {
594                Some(existing) if existing == record => {}
595                _ => dirty_constraints.push(record.clone()),
596            }
597        }
598        if !dirty_constraints.is_empty() {
599            catalog.put_constraint_records(table_id, &dirty_constraints)?;
600            for record in dirty_constraints {
601                state
602                    .persisted
603                    .constraints
604                    .insert(record.constraint_id, record);
605            }
606        }
607
608        let removed_constraints: Vec<ConstraintId> = state
609            .persisted
610            .constraints
611            .keys()
612            .copied()
613            .filter(|constraint_id| !state.current.constraints.contains_key(constraint_id))
614            .collect();
615        if !removed_constraints.is_empty() {
616            catalog.delete_constraint_records(table_id, &removed_constraints)?;
617            for constraint_id in removed_constraints {
618                state.persisted.constraints.remove(&constraint_id);
619            }
620        }
621
622        let mut dirty_constraint_names: Vec<(ConstraintId, String)> = Vec::new();
623        for (constraint_id, name) in &state.current.constraint_names {
624            match state.persisted.constraint_names.get(constraint_id) {
625                Some(existing) if existing == name => {}
626                _ => dirty_constraint_names.push((*constraint_id, name.clone())),
627            }
628        }
629        if !dirty_constraint_names.is_empty() {
630            let records: Vec<ConstraintNameRecord> = dirty_constraint_names
631                .iter()
632                .map(|(constraint_id, name)| ConstraintNameRecord {
633                    constraint_id: *constraint_id,
634                    name: Some(name.clone()),
635                })
636                .collect();
637            catalog.put_constraint_names(table_id, &records)?;
638            for (constraint_id, name) in dirty_constraint_names {
639                state.persisted.constraint_names.insert(constraint_id, name);
640            }
641        }
642
643        let removed_constraint_names: Vec<ConstraintId> = state
644            .persisted
645            .constraint_names
646            .keys()
647            .copied()
648            .filter(|constraint_id| !state.current.constraint_names.contains_key(constraint_id))
649            .collect();
650        if !removed_constraint_names.is_empty() {
651            catalog.delete_constraint_names(table_id, &removed_constraint_names)?;
652            for constraint_id in removed_constraint_names {
653                state.persisted.constraint_names.remove(&constraint_id);
654            }
655        }
656
657        if state.current.multi_uniques != state.persisted.multi_uniques {
658            if state.current.multi_uniques.is_empty() {
659                catalog.delete_multi_column_uniques(table_id)?;
660                state.persisted.multi_uniques.clear();
661            } else {
662                catalog.put_multi_column_uniques(table_id, &state.current.multi_uniques)?;
663                state.persisted.multi_uniques = state.current.multi_uniques.clone();
664            }
665        }
666
667        let sort_adds: Vec<FieldId> = state
668            .current
669            .sort_indexes
670            .iter()
671            .copied()
672            .filter(|field_id| !state.persisted.sort_indexes.contains(field_id))
673            .collect();
674        let sort_removes: Vec<FieldId> = state
675            .persisted
676            .sort_indexes
677            .iter()
678            .copied()
679            .filter(|field_id| !state.current.sort_indexes.contains(field_id))
680            .collect();
681        if !sort_adds.is_empty() || !sort_removes.is_empty() {
682            let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
683            for field_id in &sort_adds {
684                table.register_sort_index(*field_id)?;
685                state.persisted.sort_indexes.insert(*field_id);
686            }
687            for field_id in &sort_removes {
688                table.unregister_sort_index(*field_id)?;
689                state.persisted.sort_indexes.remove(field_id);
690            }
691        }
692
693        Ok(())
694    }
695
696    /// Persist changes for all tracked tables.
697    pub fn flush_all(&self) -> LlkvResult<()> {
698        let table_ids: Vec<TableId> = {
699            let tables = self.tables.read().unwrap();
700            tables.keys().copied().collect()
701        };
702        for table_id in table_ids {
703            self.flush_table(table_id)?;
704        }
705        Ok(())
706    }
707
708    /// Return all persisted table metadata.
709    pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
710        let catalog = SysCatalog::new(&self.store);
711        catalog.all_table_metas()
712    }
713
714    /// Return all persisted multi-column unique metadata.
715    pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnUniqueMeta>> {
716        let catalog = SysCatalog::new(&self.store);
717        catalog.all_multi_column_unique_metas()
718    }
719
720    /// Assemble foreign key descriptors for the table using cached metadata.
721    pub fn foreign_key_descriptors(
722        &self,
723        table_id: TableId,
724    ) -> LlkvResult<Vec<ForeignKeyDescriptor>> {
725        let records = self.constraint_records(table_id)?;
726        let mut descriptors = Vec::new();
727
728        for record in records {
729            if !record.is_active() {
730                continue;
731            }
732
733            let ConstraintKind::ForeignKey(fk) = record.kind else {
734                continue;
735            };
736
737            descriptors.push(ForeignKeyDescriptor {
738                constraint_id: record.constraint_id,
739                referencing_table_id: table_id,
740                referencing_field_ids: fk.referencing_field_ids.clone(),
741                referenced_table_id: fk.referenced_table,
742                referenced_field_ids: fk.referenced_field_ids.clone(),
743                on_delete: fk.on_delete,
744                on_update: fk.on_update,
745            });
746        }
747
748        Ok(descriptors)
749    }
750
751    /// Resolve foreign key descriptors into names suitable for runtime consumers.
752    pub fn foreign_key_views(
753        &self,
754        catalog: &TableCatalog,
755        table_id: TableId,
756    ) -> LlkvResult<Vec<ForeignKeyView>> {
757        let descriptors = self.foreign_key_descriptors(table_id)?;
758
759        if descriptors.is_empty() {
760            return Ok(Vec::new());
761        }
762
763        let (referencing_display, referencing_canonical) =
764            resolve_table_name(catalog, self, table_id)?;
765
766        let mut details = Vec::with_capacity(descriptors.len());
767        for descriptor in descriptors {
768            let referenced_table_id = descriptor.referenced_table_id;
769            let (referenced_display, referenced_canonical) =
770                resolve_table_name(catalog, self, referenced_table_id)?;
771
772            let referencing_column_names =
773                self.column_names(table_id, &descriptor.referencing_field_ids)?;
774            let referenced_column_names =
775                self.column_names(referenced_table_id, &descriptor.referenced_field_ids)?;
776            let constraint_name = self.constraint_name_for(table_id, descriptor.constraint_id)?;
777
778            details.push(ForeignKeyView {
779                constraint_id: descriptor.constraint_id,
780                constraint_name,
781                referencing_table_id: descriptor.referencing_table_id,
782                referencing_table_display: referencing_display.clone(),
783                referencing_table_canonical: referencing_canonical.clone(),
784                referencing_field_ids: descriptor.referencing_field_ids.clone(),
785                referencing_column_names,
786                referenced_table_id,
787                referenced_table_display: referenced_display.clone(),
788                referenced_table_canonical: referenced_canonical.clone(),
789                referenced_field_ids: descriptor.referenced_field_ids.clone(),
790                referenced_column_names,
791                on_delete: descriptor.on_delete,
792                on_update: descriptor.on_update,
793            });
794        }
795
796        Ok(details)
797    }
798
799    /// Assemble a consolidated read-only view of table metadata.
800    pub fn table_view(
801        &self,
802        catalog: &TableCatalog,
803        table_id: TableId,
804        field_ids: &[FieldId],
805    ) -> LlkvResult<TableView> {
806        let table_meta = self.table_meta(table_id)?;
807        let column_metas = self.column_metas(table_id, field_ids)?;
808        let constraint_records = self.constraint_records(table_id)?;
809        let multi_column_uniques = self.multi_column_uniques(table_id)?;
810        let foreign_keys = self.foreign_key_views(catalog, table_id)?;
811
812        Ok(TableView {
813            table_meta,
814            column_metas,
815            constraint_records,
816            multi_column_uniques,
817            foreign_keys,
818        })
819    }
820
821    /// Validate foreign key specifications and persist them for the referencing table.
822    pub fn validate_and_register_foreign_keys<F>(
823        &self,
824        referencing_table: &ForeignKeyTableInfo,
825        specs: &[ForeignKeySpec],
826        lookup_table: F,
827        timestamp_micros: u64,
828    ) -> LlkvResult<Vec<ValidatedForeignKey>>
829    where
830        F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
831    {
832        let validated = validate_foreign_keys(referencing_table, specs, lookup_table)?;
833        self.register_foreign_keys(referencing_table.table_id, &validated, timestamp_micros)?;
834        Ok(validated)
835    }
836
837    /// Register validated foreign key definitions for a table.
838    pub fn register_foreign_keys(
839        &self,
840        table_id: TableId,
841        foreign_keys: &[ValidatedForeignKey],
842        timestamp_micros: u64,
843    ) -> LlkvResult<()> {
844        if foreign_keys.is_empty() {
845            return Ok(());
846        }
847
848        let existing_constraints = self.constraint_record_map(table_id)?;
849        let mut next_constraint_id = existing_constraints
850            .keys()
851            .copied()
852            .max()
853            .unwrap_or(0)
854            .saturating_add(1);
855
856        let mut constraint_records = Vec::with_capacity(foreign_keys.len());
857        let mut constraint_names: Vec<(ConstraintId, Option<String>)> =
858            Vec::with_capacity(foreign_keys.len());
859
860        for fk in foreign_keys {
861            let constraint_id = next_constraint_id;
862            constraint_records.push(ConstraintRecord {
863                constraint_id,
864                kind: ConstraintKind::ForeignKey(ForeignKeyConstraint {
865                    referencing_field_ids: fk.referencing_field_ids.clone(),
866                    referenced_table: fk.referenced_table_id,
867                    referenced_field_ids: fk.referenced_field_ids.clone(),
868                    on_delete: fk.on_delete,
869                    on_update: fk.on_update,
870                }),
871                state: ConstraintState::Active,
872                revision: 1,
873                last_modified_micros: timestamp_micros,
874            });
875            constraint_names.push((constraint_id, fk.name.clone()));
876            next_constraint_id = next_constraint_id.saturating_add(1);
877        }
878
879        self.put_constraint_records(table_id, &constraint_records)?;
880        self.put_constraint_names(table_id, &constraint_names)?;
881        self.flush_table(table_id)?;
882
883        Ok(())
884    }
885
886    /// Register column metadata, physical storage columns, and primary/unique constraints.
887    pub fn apply_column_definitions(
888        &self,
889        table_id: TableId,
890        columns: &[TableColumn],
891        timestamp_micros: u64,
892    ) -> LlkvResult<()> {
893        if columns.is_empty() {
894            return Ok(());
895        }
896
897        self.ensure_table_state(table_id)?;
898
899        for column in columns {
900            let column_meta = ColMeta {
901                col_id: column.field_id,
902                name: Some(column.name.clone()),
903                flags: 0,
904                default: None,
905            };
906            self.set_column_meta(table_id, column_meta)?;
907        }
908
909        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
910        let store = table.store();
911
912        for column in columns {
913            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
914            store.ensure_column_registered(logical_field_id, &column.data_type)?;
915            store.data_type(logical_field_id)?;
916        }
917
918        let created_by_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
919        store.ensure_column_registered(created_by_lfid, &DataType::UInt64)?;
920
921        let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
922        store.ensure_column_registered(deleted_by_lfid, &DataType::UInt64)?;
923
924        let existing = self.constraint_record_map(table_id)?;
925        let mut next_constraint_id = existing
926            .keys()
927            .copied()
928            .max()
929            .unwrap_or(0)
930            .saturating_add(1);
931
932        let mut constraints = Vec::new();
933
934        let primary_key_fields: Vec<FieldId> = columns
935            .iter()
936            .filter(|col| col.primary_key)
937            .map(|col| col.field_id)
938            .collect();
939        if !primary_key_fields.is_empty() {
940            constraints.push(ConstraintRecord {
941                constraint_id: next_constraint_id,
942                kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
943                    field_ids: primary_key_fields,
944                }),
945                state: ConstraintState::Active,
946                revision: 1,
947                last_modified_micros: timestamp_micros,
948            });
949            next_constraint_id = next_constraint_id.saturating_add(1);
950        }
951
952        for column in columns.iter().filter(|col| col.unique && !col.primary_key) {
953            constraints.push(ConstraintRecord {
954                constraint_id: next_constraint_id,
955                kind: ConstraintKind::Unique(UniqueConstraint {
956                    field_ids: vec![column.field_id],
957                }),
958                state: ConstraintState::Active,
959                revision: 1,
960                last_modified_micros: timestamp_micros,
961            });
962            next_constraint_id = next_constraint_id.saturating_add(1);
963        }
964
965        if !constraints.is_empty() {
966            self.put_constraint_records(table_id, &constraints)?;
967        }
968
969        Ok(())
970    }
971
972    pub fn column_data_type(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<DataType> {
973        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
974        let store = table.store();
975        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
976        store.data_type(logical_field_id)
977    }
978
979    /// Register a multi-column UNIQUE definition for a table.
980    pub fn register_multi_column_unique(
981        &self,
982        table_id: TableId,
983        column_ids: &[FieldId],
984        index_name: Option<String>,
985    ) -> LlkvResult<MultiColumnUniqueRegistration> {
986        let mut created = false;
987        let mut existing_name: Option<Option<String>> = None;
988        let column_vec: Vec<FieldId> = column_ids.to_vec();
989
990        self.update_multi_column_uniques(table_id, |entries| {
991            if let Some(existing) = entries.iter().find(|entry| entry.column_ids == column_vec) {
992                existing_name = Some(existing.index_name.clone());
993            } else {
994                entries.push(MultiColumnUniqueEntryMeta {
995                    index_name: index_name.clone(),
996                    column_ids: column_vec.clone(),
997                });
998                created = true;
999            }
1000        })?;
1001
1002        if created {
1003            Ok(MultiColumnUniqueRegistration::Created)
1004        } else {
1005            Ok(MultiColumnUniqueRegistration::AlreadyExists {
1006                index_name: existing_name.unwrap_or(None),
1007            })
1008        }
1009    }
1010
1011    fn column_names(&self, table_id: TableId, field_ids: &[FieldId]) -> LlkvResult<Vec<String>> {
1012        if field_ids.is_empty() {
1013            return Ok(Vec::new());
1014        }
1015
1016        let metas = self.column_metas(table_id, field_ids)?;
1017        let mut names = Vec::with_capacity(field_ids.len());
1018        for (idx, field_id) in field_ids.iter().enumerate() {
1019            let name = metas
1020                .get(idx)
1021                .and_then(|meta| meta.as_ref())
1022                .and_then(|meta| meta.name.clone())
1023                .unwrap_or_else(|| format!("col_{}", field_id));
1024            names.push(name);
1025        }
1026        Ok(names)
1027    }
1028
1029    /// Reserve and return the next available table id.
1030    pub fn reserve_table_id(&self) -> LlkvResult<TableId> {
1031        let catalog = SysCatalog::new(&self.store);
1032
1033        let mut next = match catalog.get_next_table_id()? {
1034            Some(value) => value,
1035            None => {
1036                let seed = catalog
1037                    .max_table_id()?
1038                    .unwrap_or(reserved::CATALOG_TABLE_ID);
1039                let initial = seed.checked_add(1).ok_or_else(|| {
1040                    Error::InvalidArgumentError("exhausted available table ids".into())
1041                })?;
1042                catalog.put_next_table_id(initial)?;
1043                initial
1044            }
1045        };
1046
1047        while reserved::is_reserved_table_id(next) {
1048            next = next.checked_add(1).ok_or_else(|| {
1049                Error::InvalidArgumentError("exhausted available table ids".into())
1050            })?;
1051        }
1052
1053        let mut following = next
1054            .checked_add(1)
1055            .ok_or_else(|| Error::InvalidArgumentError("exhausted available table ids".into()))?;
1056
1057        while reserved::is_reserved_table_id(following) {
1058            following = following.checked_add(1).ok_or_else(|| {
1059                Error::InvalidArgumentError("exhausted available table ids".into())
1060            })?;
1061        }
1062
1063        catalog.put_next_table_id(following)?;
1064        Ok(next)
1065    }
1066
1067    /// Check if a field has a sort index in the underlying store.
1068    ///
1069    /// Note: Creates a temporary Table instance to access index metadata.
1070    /// This is acceptable since Table::from_id_and_store is lightweight (just wraps
1071    /// table_id + Arc<ColumnStore>) and this method is only called during index
1072    /// registration/unregistration, not in query hot paths.
1073    fn field_has_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<bool> {
1074        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1075        let indexes = table.list_registered_indexes(field_id)?;
1076        Ok(indexes.contains(&IndexKind::Sort))
1077    }
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082    use super::*;
1083    use crate::constraints::{ConstraintKind, ConstraintState, PrimaryKeyConstraint};
1084    use crate::{MultiColumnUniqueEntryMeta, Table};
1085    use llkv_column_map::ColumnStore;
1086    use llkv_column_map::store::IndexKind;
1087    use llkv_storage::pager::MemPager;
1088    use std::sync::Arc;
1089
1090    #[test]
1091    fn metadata_manager_persists_and_loads() {
1092        let pager = Arc::new(MemPager::default());
1093        let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1094        let manager = MetadataManager::new(Arc::clone(&store));
1095
1096        let table_id: TableId = 42;
1097        let table_meta = TableMeta {
1098            table_id,
1099            name: Some("users".into()),
1100            created_at_micros: 123,
1101            flags: 0,
1102            epoch: 1,
1103        };
1104        manager
1105            .set_table_meta(table_id, table_meta.clone())
1106            .unwrap();
1107
1108        {
1109            let tables = manager.tables.read().unwrap();
1110            let state = tables.get(&table_id).unwrap();
1111            assert!(state.current.table_meta.is_some());
1112        }
1113
1114        let column_meta = ColMeta {
1115            col_id: 1,
1116            name: Some("id".into()),
1117            flags: 0,
1118            default: None,
1119        };
1120        manager
1121            .set_column_meta(table_id, column_meta.clone())
1122            .unwrap();
1123
1124        let logical_field_id =
1125            llkv_column_map::types::LogicalFieldId::for_user(table_id, column_meta.col_id);
1126        store
1127            .ensure_column_registered(logical_field_id, &arrow::datatypes::DataType::Utf8)
1128            .unwrap();
1129
1130        manager
1131            .register_sort_index(table_id, column_meta.col_id)
1132            .unwrap();
1133
1134        let constraint = ConstraintRecord {
1135            constraint_id: 7,
1136            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1137                field_ids: vec![column_meta.col_id],
1138            }),
1139            state: ConstraintState::Active,
1140            revision: 1,
1141            last_modified_micros: 456,
1142        };
1143        manager
1144            .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1145            .unwrap();
1146
1147        let multi_unique = MultiColumnUniqueEntryMeta {
1148            index_name: Some("uniq_users_name".into()),
1149            column_ids: vec![column_meta.col_id],
1150        };
1151        manager
1152            .set_multi_column_uniques(table_id, vec![multi_unique.clone()])
1153            .unwrap();
1154
1155        assert_eq!(
1156            manager.table_meta(table_id).unwrap(),
1157            Some(table_meta.clone())
1158        );
1159
1160        manager.flush_table(table_id).unwrap();
1161
1162        let table = Table::from_id_and_store(table_id, Arc::clone(&store)).unwrap();
1163        let indexes = table.list_registered_indexes(column_meta.col_id).unwrap();
1164        assert!(indexes.contains(&IndexKind::Sort));
1165
1166        let verify_catalog = SysCatalog::new(&store);
1167        let column_roundtrip = verify_catalog.get_cols_meta(table_id, &[column_meta.col_id]);
1168        assert_eq!(column_roundtrip[0].as_ref(), Some(&column_meta));
1169        let constraints = verify_catalog
1170            .constraint_records_for_table(table_id)
1171            .unwrap();
1172        assert_eq!(constraints, vec![constraint.clone()]);
1173        let unique_roundtrip = verify_catalog.get_multi_column_uniques(table_id).unwrap();
1174        assert_eq!(unique_roundtrip, vec![multi_unique.clone()]);
1175
1176        let meta_from_cache = manager.table_meta(table_id).unwrap();
1177        assert_eq!(meta_from_cache, Some(table_meta.clone()));
1178
1179        let columns_from_cache = manager
1180            .column_metas(table_id, &[column_meta.col_id])
1181            .unwrap();
1182        assert_eq!(columns_from_cache[0].as_ref(), Some(&column_meta));
1183
1184        let constraints_from_cache = manager.constraint_records(table_id).unwrap();
1185        assert_eq!(constraints_from_cache, vec![constraint.clone()]);
1186
1187        let uniques_from_cache = manager.multi_column_uniques(table_id).unwrap();
1188        assert_eq!(uniques_from_cache, vec![multi_unique]);
1189
1190        // No additional writes should occur on subsequent flushes without modifications.
1191        manager.flush_table(table_id).unwrap();
1192    }
1193
1194    #[test]
1195    fn metadata_manager_lazy_loads_columns_and_constraints() {
1196        let pager = Arc::new(MemPager::default());
1197        let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1198        let manager = MetadataManager::new(Arc::clone(&store));
1199
1200        let table_id: TableId = 99;
1201        let column_meta = ColMeta {
1202            col_id: 3,
1203            name: Some("value".into()),
1204            flags: 0,
1205            default: None,
1206        };
1207        let initial_catalog = SysCatalog::new(&store);
1208        initial_catalog.put_col_meta(table_id, &column_meta);
1209
1210        let constraint = ConstraintRecord {
1211            constraint_id: 15,
1212            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1213                field_ids: vec![column_meta.col_id],
1214            }),
1215            state: ConstraintState::Active,
1216            revision: 1,
1217            last_modified_micros: 0,
1218        };
1219        initial_catalog
1220            .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1221            .unwrap();
1222        let multi_unique = MultiColumnUniqueEntryMeta {
1223            index_name: Some("uniq_value".into()),
1224            column_ids: vec![column_meta.col_id],
1225        };
1226        initial_catalog
1227            .put_multi_column_uniques(table_id, std::slice::from_ref(&multi_unique))
1228            .unwrap();
1229
1230        let columns = manager
1231            .column_metas(table_id, &[column_meta.col_id])
1232            .unwrap();
1233        assert_eq!(columns[0].as_ref(), Some(&column_meta));
1234
1235        let constraints = manager.constraint_records(table_id).unwrap();
1236        assert_eq!(constraints, vec![constraint]);
1237
1238        let uniques = manager.multi_column_uniques(table_id).unwrap();
1239        assert_eq!(uniques, vec![multi_unique]);
1240    }
1241}
1242
1243/// Descriptor describing a foreign key constraint scoped to field identifiers.
1244#[derive(Clone, Debug)]
1245pub struct ForeignKeyDescriptor {
1246    pub constraint_id: ConstraintId,
1247    pub referencing_table_id: TableId,
1248    pub referencing_field_ids: Vec<FieldId>,
1249    pub referenced_table_id: TableId,
1250    pub referenced_field_ids: Vec<FieldId>,
1251    pub on_delete: ForeignKeyAction,
1252    pub on_update: ForeignKeyAction,
1253}
1254
1255/// Result of attempting to register a multi-column unique definition.
1256#[derive(Debug, Clone, PartialEq, Eq)]
1257pub enum MultiColumnUniqueRegistration {
1258    Created,
1259    AlreadyExists { index_name: Option<String> },
1260}