1#![forbid(unsafe_code)]
8
9use crate::catalog::TableCatalog;
10use crate::constraints::{
11 ConstraintId, ConstraintKind, ConstraintRecord, ConstraintState, ForeignKeyAction,
12 ForeignKeyConstraint, PrimaryKeyConstraint, UniqueConstraint,
13};
14use crate::constraints::{ForeignKeyTableInfo, ValidatedForeignKey, validate_foreign_keys};
15use crate::reserved;
16use crate::resolvers::resolve_table_name;
17use crate::sys_catalog::{ConstraintNameRecord, SysCatalog};
18use crate::table::Table;
19use crate::types::{FieldId, TableColumn, TableId};
20use crate::view::{ForeignKeyView, TableView};
21use crate::{ColMeta, MultiColumnUniqueEntryMeta, TableMeta, TableMultiColumnUniqueMeta};
22use arrow::datatypes::DataType;
23use llkv_column_map::ColumnStore;
24use llkv_column_map::store::IndexKind;
25use llkv_column_map::types::LogicalFieldId;
26use llkv_plan::ForeignKeySpec;
27use llkv_result::{Error, Result as LlkvResult};
28use llkv_storage::pager::Pager;
29use rustc_hash::{FxHashMap, FxHashSet};
30use simd_r_drive_entry_handle::EntryHandle;
31use std::sync::{Arc, RwLock};
32
33#[derive(Clone, Debug, Default)]
34struct TableSnapshot {
35 table_meta: Option<TableMeta>,
36 column_metas: FxHashMap<FieldId, ColMeta>,
37 constraints: FxHashMap<ConstraintId, ConstraintRecord>,
38 multi_uniques: Vec<MultiColumnUniqueEntryMeta>,
39 constraint_names: FxHashMap<ConstraintId, String>,
40 sort_indexes: FxHashSet<FieldId>,
41}
42
43impl TableSnapshot {
44 fn new(
45 table_meta: Option<TableMeta>,
46 column_metas: FxHashMap<FieldId, ColMeta>,
47 constraints: FxHashMap<ConstraintId, ConstraintRecord>,
48 multi_uniques: Vec<MultiColumnUniqueEntryMeta>,
49 constraint_names: FxHashMap<ConstraintId, String>,
50 ) -> Self {
51 Self {
52 table_meta,
53 column_metas,
54 constraints,
55 multi_uniques,
56 constraint_names,
57 sort_indexes: FxHashSet::default(),
58 }
59 }
60}
61
62#[derive(Clone, Debug)]
63struct TableState {
64 current: TableSnapshot,
65 persisted: TableSnapshot,
66}
67
68impl TableState {
69 fn from_snapshot(snapshot: TableSnapshot) -> Self {
70 Self {
71 current: snapshot.clone(),
72 persisted: snapshot,
73 }
74 }
75}
76
77#[derive(Default)]
78struct ReferencingIndex {
79 parent_to_children: FxHashMap<TableId, FxHashSet<(TableId, ConstraintId)>>,
80 child_to_parents: FxHashMap<TableId, FxHashSet<TableId>>,
81 initialized: bool,
82}
83
84impl ReferencingIndex {
85 fn remove_child(&mut self, child_id: TableId) {
86 if let Some(parents) = self.child_to_parents.remove(&child_id) {
87 for parent_id in parents {
88 if let Some(children) = self.parent_to_children.get_mut(&parent_id) {
89 children.retain(|(entry_child, _)| *entry_child != child_id);
90 if children.is_empty() {
91 self.parent_to_children.remove(&parent_id);
92 }
93 }
94 }
95 }
96 }
97
98 fn insert(&mut self, parent_id: TableId, child_id: TableId, constraint_id: ConstraintId) {
99 self.parent_to_children
100 .entry(parent_id)
101 .or_default()
102 .insert((child_id, constraint_id));
103 self.child_to_parents
104 .entry(child_id)
105 .or_default()
106 .insert(parent_id);
107 self.initialized = true;
108 }
109
110 fn children(&self, parent_id: TableId) -> Vec<(TableId, ConstraintId)> {
111 self.parent_to_children
112 .get(&parent_id)
113 .map(|set| set.iter().cloned().collect())
114 .unwrap_or_default()
115 }
116
117 fn mark_initialized(&mut self) {
118 self.initialized = true;
119 }
120
121 fn is_initialized(&self) -> bool {
122 self.initialized
123 }
124}
125
126pub struct MetadataManager<P>
128where
129 P: Pager<Blob = EntryHandle> + Send + Sync,
130{
131 store: Arc<ColumnStore<P>>,
132 tables: RwLock<FxHashMap<TableId, TableState>>,
133 referencing_index: RwLock<ReferencingIndex>,
134}
135
136impl<P> MetadataManager<P>
137where
138 P: Pager<Blob = EntryHandle> + Send + Sync,
139{
140 pub fn new(store: Arc<ColumnStore<P>>) -> Self {
142 Self {
143 store,
144 tables: RwLock::new(FxHashMap::default()),
145 referencing_index: RwLock::new(ReferencingIndex::default()),
146 }
147 }
148
149 fn ensure_table_state(&self, table_id: TableId) -> LlkvResult<()> {
151 if self.tables.read().unwrap().contains_key(&table_id) {
152 return Ok(());
153 }
154 let state = self.load_table_state(table_id)?;
155 {
156 let mut tables = self.tables.write().unwrap();
157 tables.entry(table_id).or_insert(state);
158 }
159 self.refresh_referencing_index_for_table(table_id);
160 Ok(())
161 }
162
163 fn load_table_state(&self, table_id: TableId) -> LlkvResult<TableState> {
164 let catalog = SysCatalog::new(&self.store);
165 let table_meta = catalog.get_table_meta(table_id);
166 let constraint_records = catalog.constraint_records_for_table(table_id)?;
167 let constraint_ids: Vec<ConstraintId> = constraint_records
168 .iter()
169 .map(|record| record.constraint_id)
170 .collect();
171 let constraint_name_entries = if constraint_ids.is_empty() {
172 Vec::new()
173 } else {
174 catalog.get_constraint_names(table_id, &constraint_ids)?
175 };
176 let multi_uniques = catalog.get_multi_column_uniques(table_id)?;
177 let mut constraints = FxHashMap::default();
178 let mut constraint_names = FxHashMap::default();
179 for (record, name) in constraint_records
180 .into_iter()
181 .zip(constraint_name_entries.into_iter())
182 {
183 if let Some(name) = name {
184 constraint_names.insert(record.constraint_id, name);
185 }
186 constraints.insert(record.constraint_id, record);
187 }
188 let snapshot = TableSnapshot::new(
189 table_meta,
190 FxHashMap::default(),
191 constraints,
192 multi_uniques,
193 constraint_names,
194 );
195 Ok(TableState::from_snapshot(snapshot))
196 }
197
198 fn refresh_referencing_index_for_table(&self, table_id: TableId) {
199 let foreign_keys: Vec<(TableId, ConstraintId)> = {
200 let tables = self.tables.read().unwrap();
201 match tables.get(&table_id) {
202 Some(state) => state
203 .current
204 .constraints
205 .iter()
206 .filter(|(_, record)| record.is_active())
207 .filter_map(|(constraint_id, record)| {
208 if let ConstraintKind::ForeignKey(fk) = &record.kind {
209 Some((fk.referenced_table, *constraint_id))
210 } else {
211 None
212 }
213 })
214 .collect(),
215 None => Vec::new(),
216 }
217 };
218
219 let mut index = self.referencing_index.write().unwrap();
220 index.remove_child(table_id);
221 for (parent_table, constraint_id) in foreign_keys {
222 index.insert(parent_table, table_id, constraint_id);
223 }
224 }
225
226 fn constraint_name_for(
227 &self,
228 table_id: TableId,
229 constraint_id: ConstraintId,
230 ) -> LlkvResult<Option<String>> {
231 self.ensure_table_state(table_id)?;
232 let tables = self.tables.read().unwrap();
233 let state = tables.get(&table_id).unwrap();
234 Ok(state.current.constraint_names.get(&constraint_id).cloned())
235 }
236
237 fn ensure_referencing_index_initialized(&self) -> LlkvResult<()> {
238 let needs_init = {
239 let index = self.referencing_index.read().unwrap();
240 !index.is_initialized()
241 };
242
243 if !needs_init {
244 return Ok(());
245 }
246
247 let metas = self.all_table_metas()?;
248 for (table_id, _) in metas {
249 self.ensure_table_state(table_id)?;
250 self.refresh_referencing_index_for_table(table_id);
251 }
252
253 let mut index = self.referencing_index.write().unwrap();
254 index.mark_initialized();
255 Ok(())
256 }
257
258 pub fn table_meta(&self, table_id: TableId) -> LlkvResult<Option<TableMeta>> {
260 self.ensure_table_state(table_id)?;
261 let tables = self.tables.read().unwrap();
262 Ok(tables
263 .get(&table_id)
264 .and_then(|state| state.current.table_meta.clone()))
265 }
266
267 pub fn foreign_keys_referencing(
269 &self,
270 referenced_table: TableId,
271 ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
272 self.ensure_referencing_index_initialized()?;
273 let index = self.referencing_index.read().unwrap();
274 Ok(index.children(referenced_table))
275 }
276
277 pub fn set_table_meta(&self, table_id: TableId, meta: TableMeta) -> LlkvResult<()> {
279 self.ensure_table_state(table_id)?;
280 let mut tables = self.tables.write().unwrap();
281 let state = tables.get_mut(&table_id).unwrap();
282 state.current.table_meta = Some(meta);
283 Ok(())
284 }
285
286 pub fn column_metas(
288 &self,
289 table_id: TableId,
290 field_ids: &[FieldId],
291 ) -> LlkvResult<Vec<Option<ColMeta>>> {
292 self.ensure_table_state(table_id)?;
293
294 let missing_ids = {
296 let tables = self.tables.read().unwrap();
297 let state = tables.get(&table_id).unwrap();
298 field_ids
299 .iter()
300 .copied()
301 .filter(|field_id| !state.current.column_metas.contains_key(field_id))
302 .collect::<Vec<_>>()
303 };
304
305 if !missing_ids.is_empty() {
306 let catalog = SysCatalog::new(&self.store);
307 let fetched = catalog.get_cols_meta(table_id, &missing_ids);
308 let mut tables = self.tables.write().unwrap();
309 let state = tables.get_mut(&table_id).unwrap();
310 for (idx, field_id) in missing_ids.iter().enumerate() {
311 if let Some(meta) = fetched[idx].clone() {
312 state.current.column_metas.insert(*field_id, meta.clone());
313 state
314 .persisted
315 .column_metas
316 .entry(*field_id)
317 .or_insert(meta);
318 }
319 }
320 }
321
322 let tables = self.tables.read().unwrap();
323 let state = tables.get(&table_id).unwrap();
324 Ok(field_ids
325 .iter()
326 .map(|field_id| state.current.column_metas.get(field_id).cloned())
327 .collect())
328 }
329
330 pub fn set_column_meta(&self, table_id: TableId, meta: ColMeta) -> LlkvResult<()> {
332 self.ensure_table_state(table_id)?;
333 let mut tables = self.tables.write().unwrap();
334 let state = tables.get_mut(&table_id).unwrap();
335 state.current.column_metas.insert(meta.col_id, meta);
336 Ok(())
337 }
338
339 pub fn multi_column_uniques(
341 &self,
342 table_id: TableId,
343 ) -> LlkvResult<Vec<MultiColumnUniqueEntryMeta>> {
344 self.ensure_table_state(table_id)?;
345 let tables = self.tables.read().unwrap();
346 let state = tables.get(&table_id).unwrap();
347 Ok(state.current.multi_uniques.clone())
348 }
349
350 pub fn set_multi_column_uniques(
352 &self,
353 table_id: TableId,
354 uniques: Vec<MultiColumnUniqueEntryMeta>,
355 ) -> LlkvResult<()> {
356 self.ensure_table_state(table_id)?;
357 let mut tables = self.tables.write().unwrap();
358 let state = tables.get_mut(&table_id).unwrap();
359 state.current.multi_uniques = uniques;
360 Ok(())
361 }
362
363 pub fn register_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
365 self.ensure_table_state(table_id)?;
366
367 {
368 let mut tables = self.tables.write().unwrap();
369 let state = tables.get_mut(&table_id).unwrap();
370 if state.persisted.sort_indexes.contains(&field_id)
371 || state.current.sort_indexes.contains(&field_id)
372 {
373 state.current.sort_indexes.insert(field_id);
374 return Ok(());
375 }
376 }
377
378 if self.field_has_sort_index(table_id, field_id)? {
379 let mut tables = self.tables.write().unwrap();
380 let state = tables.get_mut(&table_id).unwrap();
381 state.persisted.sort_indexes.insert(field_id);
382 state.current.sort_indexes.insert(field_id);
383 return Ok(());
384 }
385
386 let mut tables = self.tables.write().unwrap();
387 let state = tables.get_mut(&table_id).unwrap();
388 state.current.sort_indexes.insert(field_id);
389 Ok(())
390 }
391
392 pub fn unregister_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<()> {
394 self.ensure_table_state(table_id)?;
395
396 let mut tables = self.tables.write().unwrap();
397 let state = tables.get_mut(&table_id).unwrap();
398 state.current.sort_indexes.remove(&field_id);
399
400 if !state.persisted.sort_indexes.contains(&field_id) {
401 drop(tables);
402 if self.field_has_sort_index(table_id, field_id)? {
403 let mut tables = self.tables.write().unwrap();
404 let state = tables.get_mut(&table_id).unwrap();
405 state.persisted.sort_indexes.insert(field_id);
406 }
407 }
408
409 Ok(())
410 }
411
412 pub fn update_multi_column_uniques<F, T>(&self, table_id: TableId, f: F) -> LlkvResult<T>
414 where
415 F: FnOnce(&mut Vec<MultiColumnUniqueEntryMeta>) -> T,
416 {
417 self.ensure_table_state(table_id)?;
418 let mut tables = self.tables.write().unwrap();
419 let state = tables.get_mut(&table_id).unwrap();
420 let result = f(&mut state.current.multi_uniques);
421 Ok(result)
422 }
423
424 pub fn prepare_table_drop(&self, table_id: TableId, column_ids: &[FieldId]) -> LlkvResult<()> {
429 if !column_ids.is_empty() {
430 let _ = self.column_metas(table_id, column_ids)?;
431 } else {
432 self.ensure_table_state(table_id)?;
433 }
434
435 let mut tables = self.tables.write().unwrap();
436 if let Some(state) = tables.get_mut(&table_id) {
437 state.current.table_meta = None;
438 state.current.column_metas.clear();
439 state.current.constraints.clear();
440 state.current.multi_uniques.clear();
441 state.current.constraint_names.clear();
442 state.current.sort_indexes.clear();
443 }
444 drop(tables);
445 self.refresh_referencing_index_for_table(table_id);
446 Ok(())
447 }
448
449 pub fn remove_table_state(&self, table_id: TableId) {
451 self.tables.write().unwrap().remove(&table_id);
452 self.referencing_index
453 .write()
454 .unwrap()
455 .remove_child(table_id);
456 }
457
458 pub fn constraint_records(&self, table_id: TableId) -> LlkvResult<Vec<ConstraintRecord>> {
460 self.ensure_table_state(table_id)?;
461 let tables = self.tables.read().unwrap();
462 let state = tables.get(&table_id).unwrap();
463 Ok(state.current.constraints.values().cloned().collect())
464 }
465
466 pub fn constraint_records_by_id(
468 &self,
469 table_id: TableId,
470 constraint_ids: &[ConstraintId],
471 ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
472 self.ensure_table_state(table_id)?;
473 let tables = self.tables.read().unwrap();
474 let state = tables.get(&table_id).unwrap();
475 Ok(constraint_ids
476 .iter()
477 .map(|constraint_id| state.current.constraints.get(constraint_id).cloned())
478 .collect())
479 }
480
481 pub fn put_constraint_records(
483 &self,
484 table_id: TableId,
485 records: &[ConstraintRecord],
486 ) -> LlkvResult<()> {
487 self.ensure_table_state(table_id)?;
488 let mut tables = self.tables.write().unwrap();
489 let state = tables.get_mut(&table_id).unwrap();
490 for record in records {
491 state
492 .current
493 .constraints
494 .insert(record.constraint_id, record.clone());
495 }
496 drop(tables);
497 self.refresh_referencing_index_for_table(table_id);
498 Ok(())
499 }
500
501 pub fn put_constraint_names(
503 &self,
504 table_id: TableId,
505 names: &[(ConstraintId, Option<String>)],
506 ) -> LlkvResult<()> {
507 if names.is_empty() {
508 return Ok(());
509 }
510 self.ensure_table_state(table_id)?;
511 let mut tables = self.tables.write().unwrap();
512 if let Some(state) = tables.get_mut(&table_id) {
513 for (constraint_id, name) in names {
514 if let Some(name) = name {
515 state
516 .current
517 .constraint_names
518 .insert(*constraint_id, name.clone());
519 } else {
520 state.current.constraint_names.remove(constraint_id);
521 }
522 }
523 }
524 Ok(())
525 }
526
527 pub fn constraint_record_map(
529 &self,
530 table_id: TableId,
531 ) -> LlkvResult<FxHashMap<ConstraintId, ConstraintRecord>> {
532 self.ensure_table_state(table_id)?;
533 let tables = self.tables.read().unwrap();
534 let state = tables.get(&table_id).unwrap();
535 Ok(state.current.constraints.clone())
536 }
537
538 pub fn flush_table(&self, table_id: TableId) -> LlkvResult<()> {
540 self.ensure_table_state(table_id)?;
541 let mut tables = self.tables.write().unwrap();
542 let state = tables.get_mut(&table_id).unwrap();
543
544 let catalog = SysCatalog::new(&self.store);
545
546 match (
547 state.current.table_meta.as_ref(),
548 state.persisted.table_meta.as_ref(),
549 ) {
550 (Some(meta), Some(existing)) if meta != existing => {
551 catalog.put_table_meta(meta);
552 state.persisted.table_meta = Some(meta.clone());
553 }
554 (Some(meta), None) => {
555 catalog.put_table_meta(meta);
556 state.persisted.table_meta = Some(meta.clone());
557 }
558 (None, Some(_)) => {
559 catalog.delete_table_meta(table_id)?;
560 state.persisted.table_meta = None;
561 }
562 _ => {}
563 }
564
565 let mut dirty_columns: Vec<(FieldId, ColMeta)> = Vec::new();
566 for (field_id, meta) in &state.current.column_metas {
567 match state.persisted.column_metas.get(field_id) {
568 Some(existing) if existing == meta => {}
569 _ => dirty_columns.push((*field_id, meta.clone())),
570 }
571 }
572 for (field_id, meta) in dirty_columns.iter() {
573 catalog.put_col_meta(table_id, meta);
574 state.persisted.column_metas.insert(*field_id, meta.clone());
575 }
576
577 let removed_columns: Vec<FieldId> = state
578 .persisted
579 .column_metas
580 .keys()
581 .copied()
582 .filter(|field_id| !state.current.column_metas.contains_key(field_id))
583 .collect();
584 if !removed_columns.is_empty() {
585 catalog.delete_col_meta(table_id, &removed_columns)?;
586 for field_id in removed_columns {
587 state.persisted.column_metas.remove(&field_id);
588 }
589 }
590
591 let mut dirty_constraints: Vec<ConstraintRecord> = Vec::new();
592 for (constraint_id, record) in &state.current.constraints {
593 match state.persisted.constraints.get(constraint_id) {
594 Some(existing) if existing == record => {}
595 _ => dirty_constraints.push(record.clone()),
596 }
597 }
598 if !dirty_constraints.is_empty() {
599 catalog.put_constraint_records(table_id, &dirty_constraints)?;
600 for record in dirty_constraints {
601 state
602 .persisted
603 .constraints
604 .insert(record.constraint_id, record);
605 }
606 }
607
608 let removed_constraints: Vec<ConstraintId> = state
609 .persisted
610 .constraints
611 .keys()
612 .copied()
613 .filter(|constraint_id| !state.current.constraints.contains_key(constraint_id))
614 .collect();
615 if !removed_constraints.is_empty() {
616 catalog.delete_constraint_records(table_id, &removed_constraints)?;
617 for constraint_id in removed_constraints {
618 state.persisted.constraints.remove(&constraint_id);
619 }
620 }
621
622 let mut dirty_constraint_names: Vec<(ConstraintId, String)> = Vec::new();
623 for (constraint_id, name) in &state.current.constraint_names {
624 match state.persisted.constraint_names.get(constraint_id) {
625 Some(existing) if existing == name => {}
626 _ => dirty_constraint_names.push((*constraint_id, name.clone())),
627 }
628 }
629 if !dirty_constraint_names.is_empty() {
630 let records: Vec<ConstraintNameRecord> = dirty_constraint_names
631 .iter()
632 .map(|(constraint_id, name)| ConstraintNameRecord {
633 constraint_id: *constraint_id,
634 name: Some(name.clone()),
635 })
636 .collect();
637 catalog.put_constraint_names(table_id, &records)?;
638 for (constraint_id, name) in dirty_constraint_names {
639 state.persisted.constraint_names.insert(constraint_id, name);
640 }
641 }
642
643 let removed_constraint_names: Vec<ConstraintId> = state
644 .persisted
645 .constraint_names
646 .keys()
647 .copied()
648 .filter(|constraint_id| !state.current.constraint_names.contains_key(constraint_id))
649 .collect();
650 if !removed_constraint_names.is_empty() {
651 catalog.delete_constraint_names(table_id, &removed_constraint_names)?;
652 for constraint_id in removed_constraint_names {
653 state.persisted.constraint_names.remove(&constraint_id);
654 }
655 }
656
657 if state.current.multi_uniques != state.persisted.multi_uniques {
658 if state.current.multi_uniques.is_empty() {
659 catalog.delete_multi_column_uniques(table_id)?;
660 state.persisted.multi_uniques.clear();
661 } else {
662 catalog.put_multi_column_uniques(table_id, &state.current.multi_uniques)?;
663 state.persisted.multi_uniques = state.current.multi_uniques.clone();
664 }
665 }
666
667 let sort_adds: Vec<FieldId> = state
668 .current
669 .sort_indexes
670 .iter()
671 .copied()
672 .filter(|field_id| !state.persisted.sort_indexes.contains(field_id))
673 .collect();
674 let sort_removes: Vec<FieldId> = state
675 .persisted
676 .sort_indexes
677 .iter()
678 .copied()
679 .filter(|field_id| !state.current.sort_indexes.contains(field_id))
680 .collect();
681 if !sort_adds.is_empty() || !sort_removes.is_empty() {
682 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
683 for field_id in &sort_adds {
684 table.register_sort_index(*field_id)?;
685 state.persisted.sort_indexes.insert(*field_id);
686 }
687 for field_id in &sort_removes {
688 table.unregister_sort_index(*field_id)?;
689 state.persisted.sort_indexes.remove(field_id);
690 }
691 }
692
693 Ok(())
694 }
695
696 pub fn flush_all(&self) -> LlkvResult<()> {
698 let table_ids: Vec<TableId> = {
699 let tables = self.tables.read().unwrap();
700 tables.keys().copied().collect()
701 };
702 for table_id in table_ids {
703 self.flush_table(table_id)?;
704 }
705 Ok(())
706 }
707
708 pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
710 let catalog = SysCatalog::new(&self.store);
711 catalog.all_table_metas()
712 }
713
714 pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnUniqueMeta>> {
716 let catalog = SysCatalog::new(&self.store);
717 catalog.all_multi_column_unique_metas()
718 }
719
720 pub fn foreign_key_descriptors(
722 &self,
723 table_id: TableId,
724 ) -> LlkvResult<Vec<ForeignKeyDescriptor>> {
725 let records = self.constraint_records(table_id)?;
726 let mut descriptors = Vec::new();
727
728 for record in records {
729 if !record.is_active() {
730 continue;
731 }
732
733 let ConstraintKind::ForeignKey(fk) = record.kind else {
734 continue;
735 };
736
737 descriptors.push(ForeignKeyDescriptor {
738 constraint_id: record.constraint_id,
739 referencing_table_id: table_id,
740 referencing_field_ids: fk.referencing_field_ids.clone(),
741 referenced_table_id: fk.referenced_table,
742 referenced_field_ids: fk.referenced_field_ids.clone(),
743 on_delete: fk.on_delete,
744 on_update: fk.on_update,
745 });
746 }
747
748 Ok(descriptors)
749 }
750
751 pub fn foreign_key_views(
753 &self,
754 catalog: &TableCatalog,
755 table_id: TableId,
756 ) -> LlkvResult<Vec<ForeignKeyView>> {
757 let descriptors = self.foreign_key_descriptors(table_id)?;
758
759 if descriptors.is_empty() {
760 return Ok(Vec::new());
761 }
762
763 let (referencing_display, referencing_canonical) =
764 resolve_table_name(catalog, self, table_id)?;
765
766 let mut details = Vec::with_capacity(descriptors.len());
767 for descriptor in descriptors {
768 let referenced_table_id = descriptor.referenced_table_id;
769 let (referenced_display, referenced_canonical) =
770 resolve_table_name(catalog, self, referenced_table_id)?;
771
772 let referencing_column_names =
773 self.column_names(table_id, &descriptor.referencing_field_ids)?;
774 let referenced_column_names =
775 self.column_names(referenced_table_id, &descriptor.referenced_field_ids)?;
776 let constraint_name = self.constraint_name_for(table_id, descriptor.constraint_id)?;
777
778 details.push(ForeignKeyView {
779 constraint_id: descriptor.constraint_id,
780 constraint_name,
781 referencing_table_id: descriptor.referencing_table_id,
782 referencing_table_display: referencing_display.clone(),
783 referencing_table_canonical: referencing_canonical.clone(),
784 referencing_field_ids: descriptor.referencing_field_ids.clone(),
785 referencing_column_names,
786 referenced_table_id,
787 referenced_table_display: referenced_display.clone(),
788 referenced_table_canonical: referenced_canonical.clone(),
789 referenced_field_ids: descriptor.referenced_field_ids.clone(),
790 referenced_column_names,
791 on_delete: descriptor.on_delete,
792 on_update: descriptor.on_update,
793 });
794 }
795
796 Ok(details)
797 }
798
799 pub fn table_view(
801 &self,
802 catalog: &TableCatalog,
803 table_id: TableId,
804 field_ids: &[FieldId],
805 ) -> LlkvResult<TableView> {
806 let table_meta = self.table_meta(table_id)?;
807 let column_metas = self.column_metas(table_id, field_ids)?;
808 let constraint_records = self.constraint_records(table_id)?;
809 let multi_column_uniques = self.multi_column_uniques(table_id)?;
810 let foreign_keys = self.foreign_key_views(catalog, table_id)?;
811
812 Ok(TableView {
813 table_meta,
814 column_metas,
815 constraint_records,
816 multi_column_uniques,
817 foreign_keys,
818 })
819 }
820
821 pub fn validate_and_register_foreign_keys<F>(
823 &self,
824 referencing_table: &ForeignKeyTableInfo,
825 specs: &[ForeignKeySpec],
826 lookup_table: F,
827 timestamp_micros: u64,
828 ) -> LlkvResult<Vec<ValidatedForeignKey>>
829 where
830 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
831 {
832 let validated = validate_foreign_keys(referencing_table, specs, lookup_table)?;
833 self.register_foreign_keys(referencing_table.table_id, &validated, timestamp_micros)?;
834 Ok(validated)
835 }
836
837 pub fn register_foreign_keys(
839 &self,
840 table_id: TableId,
841 foreign_keys: &[ValidatedForeignKey],
842 timestamp_micros: u64,
843 ) -> LlkvResult<()> {
844 if foreign_keys.is_empty() {
845 return Ok(());
846 }
847
848 let existing_constraints = self.constraint_record_map(table_id)?;
849 let mut next_constraint_id = existing_constraints
850 .keys()
851 .copied()
852 .max()
853 .unwrap_or(0)
854 .saturating_add(1);
855
856 let mut constraint_records = Vec::with_capacity(foreign_keys.len());
857 let mut constraint_names: Vec<(ConstraintId, Option<String>)> =
858 Vec::with_capacity(foreign_keys.len());
859
860 for fk in foreign_keys {
861 let constraint_id = next_constraint_id;
862 constraint_records.push(ConstraintRecord {
863 constraint_id,
864 kind: ConstraintKind::ForeignKey(ForeignKeyConstraint {
865 referencing_field_ids: fk.referencing_field_ids.clone(),
866 referenced_table: fk.referenced_table_id,
867 referenced_field_ids: fk.referenced_field_ids.clone(),
868 on_delete: fk.on_delete,
869 on_update: fk.on_update,
870 }),
871 state: ConstraintState::Active,
872 revision: 1,
873 last_modified_micros: timestamp_micros,
874 });
875 constraint_names.push((constraint_id, fk.name.clone()));
876 next_constraint_id = next_constraint_id.saturating_add(1);
877 }
878
879 self.put_constraint_records(table_id, &constraint_records)?;
880 self.put_constraint_names(table_id, &constraint_names)?;
881 self.flush_table(table_id)?;
882
883 Ok(())
884 }
885
886 pub fn apply_column_definitions(
888 &self,
889 table_id: TableId,
890 columns: &[TableColumn],
891 timestamp_micros: u64,
892 ) -> LlkvResult<()> {
893 if columns.is_empty() {
894 return Ok(());
895 }
896
897 self.ensure_table_state(table_id)?;
898
899 for column in columns {
900 let column_meta = ColMeta {
901 col_id: column.field_id,
902 name: Some(column.name.clone()),
903 flags: 0,
904 default: None,
905 };
906 self.set_column_meta(table_id, column_meta)?;
907 }
908
909 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
910 let store = table.store();
911
912 for column in columns {
913 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
914 store.ensure_column_registered(logical_field_id, &column.data_type)?;
915 store.data_type(logical_field_id)?;
916 }
917
918 let created_by_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
919 store.ensure_column_registered(created_by_lfid, &DataType::UInt64)?;
920
921 let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
922 store.ensure_column_registered(deleted_by_lfid, &DataType::UInt64)?;
923
924 let existing = self.constraint_record_map(table_id)?;
925 let mut next_constraint_id = existing
926 .keys()
927 .copied()
928 .max()
929 .unwrap_or(0)
930 .saturating_add(1);
931
932 let mut constraints = Vec::new();
933
934 let primary_key_fields: Vec<FieldId> = columns
935 .iter()
936 .filter(|col| col.primary_key)
937 .map(|col| col.field_id)
938 .collect();
939 if !primary_key_fields.is_empty() {
940 constraints.push(ConstraintRecord {
941 constraint_id: next_constraint_id,
942 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
943 field_ids: primary_key_fields,
944 }),
945 state: ConstraintState::Active,
946 revision: 1,
947 last_modified_micros: timestamp_micros,
948 });
949 next_constraint_id = next_constraint_id.saturating_add(1);
950 }
951
952 for column in columns.iter().filter(|col| col.unique && !col.primary_key) {
953 constraints.push(ConstraintRecord {
954 constraint_id: next_constraint_id,
955 kind: ConstraintKind::Unique(UniqueConstraint {
956 field_ids: vec![column.field_id],
957 }),
958 state: ConstraintState::Active,
959 revision: 1,
960 last_modified_micros: timestamp_micros,
961 });
962 next_constraint_id = next_constraint_id.saturating_add(1);
963 }
964
965 if !constraints.is_empty() {
966 self.put_constraint_records(table_id, &constraints)?;
967 }
968
969 Ok(())
970 }
971
972 pub fn column_data_type(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<DataType> {
973 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
974 let store = table.store();
975 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
976 store.data_type(logical_field_id)
977 }
978
979 pub fn register_multi_column_unique(
981 &self,
982 table_id: TableId,
983 column_ids: &[FieldId],
984 index_name: Option<String>,
985 ) -> LlkvResult<MultiColumnUniqueRegistration> {
986 let mut created = false;
987 let mut existing_name: Option<Option<String>> = None;
988 let column_vec: Vec<FieldId> = column_ids.to_vec();
989
990 self.update_multi_column_uniques(table_id, |entries| {
991 if let Some(existing) = entries.iter().find(|entry| entry.column_ids == column_vec) {
992 existing_name = Some(existing.index_name.clone());
993 } else {
994 entries.push(MultiColumnUniqueEntryMeta {
995 index_name: index_name.clone(),
996 column_ids: column_vec.clone(),
997 });
998 created = true;
999 }
1000 })?;
1001
1002 if created {
1003 Ok(MultiColumnUniqueRegistration::Created)
1004 } else {
1005 Ok(MultiColumnUniqueRegistration::AlreadyExists {
1006 index_name: existing_name.unwrap_or(None),
1007 })
1008 }
1009 }
1010
1011 fn column_names(&self, table_id: TableId, field_ids: &[FieldId]) -> LlkvResult<Vec<String>> {
1012 if field_ids.is_empty() {
1013 return Ok(Vec::new());
1014 }
1015
1016 let metas = self.column_metas(table_id, field_ids)?;
1017 let mut names = Vec::with_capacity(field_ids.len());
1018 for (idx, field_id) in field_ids.iter().enumerate() {
1019 let name = metas
1020 .get(idx)
1021 .and_then(|meta| meta.as_ref())
1022 .and_then(|meta| meta.name.clone())
1023 .unwrap_or_else(|| format!("col_{}", field_id));
1024 names.push(name);
1025 }
1026 Ok(names)
1027 }
1028
1029 pub fn reserve_table_id(&self) -> LlkvResult<TableId> {
1031 let catalog = SysCatalog::new(&self.store);
1032
1033 let mut next = match catalog.get_next_table_id()? {
1034 Some(value) => value,
1035 None => {
1036 let seed = catalog
1037 .max_table_id()?
1038 .unwrap_or(reserved::CATALOG_TABLE_ID);
1039 let initial = seed.checked_add(1).ok_or_else(|| {
1040 Error::InvalidArgumentError("exhausted available table ids".into())
1041 })?;
1042 catalog.put_next_table_id(initial)?;
1043 initial
1044 }
1045 };
1046
1047 while reserved::is_reserved_table_id(next) {
1048 next = next.checked_add(1).ok_or_else(|| {
1049 Error::InvalidArgumentError("exhausted available table ids".into())
1050 })?;
1051 }
1052
1053 let mut following = next
1054 .checked_add(1)
1055 .ok_or_else(|| Error::InvalidArgumentError("exhausted available table ids".into()))?;
1056
1057 while reserved::is_reserved_table_id(following) {
1058 following = following.checked_add(1).ok_or_else(|| {
1059 Error::InvalidArgumentError("exhausted available table ids".into())
1060 })?;
1061 }
1062
1063 catalog.put_next_table_id(following)?;
1064 Ok(next)
1065 }
1066
1067 fn field_has_sort_index(&self, table_id: TableId, field_id: FieldId) -> LlkvResult<bool> {
1074 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
1075 let indexes = table.list_registered_indexes(field_id)?;
1076 Ok(indexes.contains(&IndexKind::Sort))
1077 }
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082 use super::*;
1083 use crate::constraints::{ConstraintKind, ConstraintState, PrimaryKeyConstraint};
1084 use crate::{MultiColumnUniqueEntryMeta, Table};
1085 use llkv_column_map::ColumnStore;
1086 use llkv_column_map::store::IndexKind;
1087 use llkv_storage::pager::MemPager;
1088 use std::sync::Arc;
1089
1090 #[test]
1091 fn metadata_manager_persists_and_loads() {
1092 let pager = Arc::new(MemPager::default());
1093 let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1094 let manager = MetadataManager::new(Arc::clone(&store));
1095
1096 let table_id: TableId = 42;
1097 let table_meta = TableMeta {
1098 table_id,
1099 name: Some("users".into()),
1100 created_at_micros: 123,
1101 flags: 0,
1102 epoch: 1,
1103 };
1104 manager
1105 .set_table_meta(table_id, table_meta.clone())
1106 .unwrap();
1107
1108 {
1109 let tables = manager.tables.read().unwrap();
1110 let state = tables.get(&table_id).unwrap();
1111 assert!(state.current.table_meta.is_some());
1112 }
1113
1114 let column_meta = ColMeta {
1115 col_id: 1,
1116 name: Some("id".into()),
1117 flags: 0,
1118 default: None,
1119 };
1120 manager
1121 .set_column_meta(table_id, column_meta.clone())
1122 .unwrap();
1123
1124 let logical_field_id =
1125 llkv_column_map::types::LogicalFieldId::for_user(table_id, column_meta.col_id);
1126 store
1127 .ensure_column_registered(logical_field_id, &arrow::datatypes::DataType::Utf8)
1128 .unwrap();
1129
1130 manager
1131 .register_sort_index(table_id, column_meta.col_id)
1132 .unwrap();
1133
1134 let constraint = ConstraintRecord {
1135 constraint_id: 7,
1136 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1137 field_ids: vec![column_meta.col_id],
1138 }),
1139 state: ConstraintState::Active,
1140 revision: 1,
1141 last_modified_micros: 456,
1142 };
1143 manager
1144 .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1145 .unwrap();
1146
1147 let multi_unique = MultiColumnUniqueEntryMeta {
1148 index_name: Some("uniq_users_name".into()),
1149 column_ids: vec![column_meta.col_id],
1150 };
1151 manager
1152 .set_multi_column_uniques(table_id, vec![multi_unique.clone()])
1153 .unwrap();
1154
1155 assert_eq!(
1156 manager.table_meta(table_id).unwrap(),
1157 Some(table_meta.clone())
1158 );
1159
1160 manager.flush_table(table_id).unwrap();
1161
1162 let table = Table::from_id_and_store(table_id, Arc::clone(&store)).unwrap();
1163 let indexes = table.list_registered_indexes(column_meta.col_id).unwrap();
1164 assert!(indexes.contains(&IndexKind::Sort));
1165
1166 let verify_catalog = SysCatalog::new(&store);
1167 let column_roundtrip = verify_catalog.get_cols_meta(table_id, &[column_meta.col_id]);
1168 assert_eq!(column_roundtrip[0].as_ref(), Some(&column_meta));
1169 let constraints = verify_catalog
1170 .constraint_records_for_table(table_id)
1171 .unwrap();
1172 assert_eq!(constraints, vec![constraint.clone()]);
1173 let unique_roundtrip = verify_catalog.get_multi_column_uniques(table_id).unwrap();
1174 assert_eq!(unique_roundtrip, vec![multi_unique.clone()]);
1175
1176 let meta_from_cache = manager.table_meta(table_id).unwrap();
1177 assert_eq!(meta_from_cache, Some(table_meta.clone()));
1178
1179 let columns_from_cache = manager
1180 .column_metas(table_id, &[column_meta.col_id])
1181 .unwrap();
1182 assert_eq!(columns_from_cache[0].as_ref(), Some(&column_meta));
1183
1184 let constraints_from_cache = manager.constraint_records(table_id).unwrap();
1185 assert_eq!(constraints_from_cache, vec![constraint.clone()]);
1186
1187 let uniques_from_cache = manager.multi_column_uniques(table_id).unwrap();
1188 assert_eq!(uniques_from_cache, vec![multi_unique]);
1189
1190 manager.flush_table(table_id).unwrap();
1192 }
1193
1194 #[test]
1195 fn metadata_manager_lazy_loads_columns_and_constraints() {
1196 let pager = Arc::new(MemPager::default());
1197 let store = Arc::new(ColumnStore::open(Arc::clone(&pager)).unwrap());
1198 let manager = MetadataManager::new(Arc::clone(&store));
1199
1200 let table_id: TableId = 99;
1201 let column_meta = ColMeta {
1202 col_id: 3,
1203 name: Some("value".into()),
1204 flags: 0,
1205 default: None,
1206 };
1207 let initial_catalog = SysCatalog::new(&store);
1208 initial_catalog.put_col_meta(table_id, &column_meta);
1209
1210 let constraint = ConstraintRecord {
1211 constraint_id: 15,
1212 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1213 field_ids: vec![column_meta.col_id],
1214 }),
1215 state: ConstraintState::Active,
1216 revision: 1,
1217 last_modified_micros: 0,
1218 };
1219 initial_catalog
1220 .put_constraint_records(table_id, std::slice::from_ref(&constraint))
1221 .unwrap();
1222 let multi_unique = MultiColumnUniqueEntryMeta {
1223 index_name: Some("uniq_value".into()),
1224 column_ids: vec![column_meta.col_id],
1225 };
1226 initial_catalog
1227 .put_multi_column_uniques(table_id, std::slice::from_ref(&multi_unique))
1228 .unwrap();
1229
1230 let columns = manager
1231 .column_metas(table_id, &[column_meta.col_id])
1232 .unwrap();
1233 assert_eq!(columns[0].as_ref(), Some(&column_meta));
1234
1235 let constraints = manager.constraint_records(table_id).unwrap();
1236 assert_eq!(constraints, vec![constraint]);
1237
1238 let uniques = manager.multi_column_uniques(table_id).unwrap();
1239 assert_eq!(uniques, vec![multi_unique]);
1240 }
1241}
1242
1243#[derive(Clone, Debug)]
1245pub struct ForeignKeyDescriptor {
1246 pub constraint_id: ConstraintId,
1247 pub referencing_table_id: TableId,
1248 pub referencing_field_ids: Vec<FieldId>,
1249 pub referenced_table_id: TableId,
1250 pub referenced_field_ids: Vec<FieldId>,
1251 pub on_delete: ForeignKeyAction,
1252 pub on_update: ForeignKeyAction,
1253}
1254
1255#[derive(Debug, Clone, PartialEq, Eq)]
1257pub enum MultiColumnUniqueRegistration {
1258 Created,
1259 AlreadyExists { index_name: Option<String> },
1260}