llkv_table/
sys_catalog.rs

1//! System catalog for storing table and column metadata.
2//!
3//! The system catalog uses table 0 (reserved) to store metadata about all tables
4//! and columns in the database. This metadata includes:
5//!
6//! - **Table metadata** ([`TableMeta`]): Table ID, name, creation time, flags
7//! - **Column metadata** ([`ColMeta`]): Column ID, name, flags, default values
8//!
9//! # Storage Format
10//!
11//! The catalog stores metadata as serialized [`bitcode`] blobs in special catalog
12//! columns within table 0. See [`CATALOG_TABLE_ID`] and related constants in the
13//! [`reserved`](crate::reserved) module.
14//!
15//! # Usage
16//!
17//! The [`SysCatalog`] provides methods to:
18//! - Insert/update table metadata ([`put_table_meta`](SysCatalog::put_table_meta))
19//! - Query table metadata ([`get_table_meta`](SysCatalog::get_table_meta))
20//! - Manage column metadata similarly
21//!
22//! This metadata is used by higher-level components to validate schemas, assign
23//! field IDs, and enforce table constraints.
24
25use std::collections::HashMap;
26use std::sync::Arc;
27
28use arrow::array::{Array, BinaryArray, UInt64Array};
29use arrow::datatypes::{DataType, Field, Schema};
30use arrow::record_batch::RecordBatch;
31use bitcode::{Decode, Encode};
32
33use crate::types::TableId;
34use llkv_column_map::store::scan::{
35    PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
36    PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
37};
38
39use llkv_column_map::types::LogicalFieldId;
40use llkv_column_map::{
41    ColumnStore,
42    store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
43    types::Namespace,
44};
45use llkv_result::{self, Result as LlkvResult};
46use llkv_storage::pager::{MemPager, Pager};
47use simd_r_drive_entry_handle::EntryHandle;
48
49// Import all reserved constants and validation functions
50use crate::reserved::*;
51
52// ----- Namespacing helpers -----
53
54// TODO: Dedupe with llkv_column_map::types::lfid()
55#[inline]
56fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
57    LogicalFieldId::new()
58        .with_namespace(Namespace::UserData)
59        .with_table_id(table_id)
60        .with_field_id(col_id)
61}
62
63// TODO: Migrate to llkv_column_map::types::rid_table()
64#[inline]
65fn rid_table(table_id: TableId) -> u64 {
66    let fid = LogicalFieldId::new()
67        .with_namespace(Namespace::UserData)
68        .with_table_id(table_id)
69        .with_field_id(0);
70    fid.into()
71}
72
73// TODO: Migrate to llkv_column_map::types::rid_col()
74#[inline]
75fn rid_col(table_id: TableId, col_id: u32) -> u64 {
76    lfid(table_id, col_id).into()
77}
78
79// ----- Public catalog types -----
80
81/// Metadata about a table.
82///
83/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
84#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
85pub struct TableMeta {
86    /// Unique identifier for this table.
87    pub table_id: TableId,
88    /// Optional human-readable name for the table.
89    pub name: Option<String>,
90    /// When the table was created (microseconds since epoch).
91    pub created_at_micros: u64,
92    /// Bitflags for table properties (e.g., temporary, system).
93    pub flags: u32,
94    /// Schema version or modification counter.
95    pub epoch: u64,
96}
97
98/// Metadata about a column.
99///
100/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
101#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
102pub struct ColMeta {
103    /// Unique identifier for this column within its table.
104    pub col_id: u32,
105    /// Optional human-readable name for the column.
106    pub name: Option<String>,
107    /// Bitflags for column properties (e.g., nullable, indexed).
108    pub flags: u32,
109    /// Optional serialized default value for the column.
110    pub default: Option<Vec<u8>>,
111}
112
113// ----- SysCatalog -----
114
115/// Interface to the system catalog (table 0).
116///
117/// The system catalog stores metadata about all tables and columns in the database.
118/// It uses special reserved columns within table 0 to persist [`TableMeta`] and
119/// [`ColMeta`] structures.
120///
121/// # Lifetime
122///
123/// `SysCatalog` borrows a reference to the [`ColumnStore`] and does not own it.
124/// This allows multiple catalog instances to coexist with the same storage.
125pub struct SysCatalog<'a, P = MemPager>
126where
127    P: Pager<Blob = EntryHandle> + Send + Sync,
128{
129    store: &'a ColumnStore<P>,
130}
131
132impl<'a, P> SysCatalog<'a, P>
133where
134    P: Pager<Blob = EntryHandle> + Send + Sync,
135{
136    /// Create a new system catalog interface using the provided column store.
137    pub fn new(store: &'a ColumnStore<P>) -> Self {
138        Self { store }
139    }
140
141    /// Insert or update table metadata.
142    ///
143    /// This persists the table's metadata to the system catalog. If metadata for
144    /// this table ID already exists, it is overwritten (last-write-wins).
145    pub fn put_table_meta(&self, meta: &TableMeta) {
146        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
147        let schema = Arc::new(Schema::new(vec![
148            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
149            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
150                crate::constants::FIELD_ID_META_KEY.to_string(),
151                lfid_val.to_string(),
152            )])),
153        ]));
154
155        let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
156        let meta_encoded = bitcode::encode(meta);
157        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
158
159        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
160        self.store.append(&batch).unwrap();
161    }
162
163    /// Retrieve table metadata by table ID.
164    ///
165    /// Returns `None` if no metadata exists for the given table ID.
166    pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
167        struct MetaVisitor {
168            target_rid: u64,
169            meta: Option<TableMeta>,
170        }
171        impl PrimitiveVisitor for MetaVisitor {}
172        impl PrimitiveWithRowIdsVisitor for MetaVisitor {
173            fn u64_chunk_with_rids(&mut self, v: &UInt64Array, r: &UInt64Array) {
174                for i in 0..r.len() {
175                    if r.value(i) == self.target_rid {
176                        // This logic assumes the 'meta' column is u64. It needs to be Binary.
177                        // This scan implementation is a placeholder and needs to be updated
178                        // to correctly handle BinaryArray for the metadata.
179                        // For now, this lets the code compile.
180                        let _bytes = v.value(i);
181                        // self.meta = Some(bitcode::decode(&bytes.to_be_bytes()).unwrap());
182                        break;
183                    }
184                }
185            }
186        }
187        impl PrimitiveSortedVisitor for MetaVisitor {}
188        impl PrimitiveSortedWithRowIdsVisitor for MetaVisitor {}
189
190        let mut visitor = MetaVisitor {
191            target_rid: rid_table(table_id),
192            meta: None,
193        };
194        // Note: The scan needs `with_row_ids` to be true for `u64_chunk_with_rids` to be called.
195        // A full implementation would require passing ScanOptions.
196        let scan_opts = llkv_column_map::store::scan::ScanOptions {
197            with_row_ids: true,
198            ..Default::default()
199        };
200
201        let _ = self.store.scan(
202            lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID),
203            scan_opts,
204            &mut visitor,
205        );
206        visitor.meta
207    }
208
209    /// Upsert a single column’s metadata.
210    pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
211        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
212        let schema = Arc::new(Schema::new(vec![
213            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
214            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
215                crate::constants::FIELD_ID_META_KEY.to_string(),
216                lfid_val.to_string(),
217            )])),
218        ]));
219
220        let rid_value = rid_col(table_id, meta.col_id);
221        let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
222        let meta_encoded = bitcode::encode(meta);
223        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
224
225        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
226        self.store.append(&batch).unwrap();
227    }
228
229    /// Batch fetch specific column metas by col_id using a shared keyset.
230    pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
231        if col_ids.is_empty() {
232            return Vec::new();
233        }
234
235        let row_ids: Vec<u64> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
236        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
237
238        let batch =
239            match self
240                .store
241                .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
242            {
243                Ok(batch) => batch,
244                Err(_) => return vec![None; col_ids.len()],
245            };
246
247        let meta_col = batch
248            .column(0)
249            .as_any()
250            .downcast_ref::<BinaryArray>()
251            .expect("catalog meta column should be Binary");
252
253        col_ids
254            .iter()
255            .enumerate()
256            .map(|(idx, _)| {
257                if meta_col.is_null(idx) {
258                    None
259                } else {
260                    bitcode::decode(meta_col.value(idx)).ok()
261                }
262            })
263            .collect()
264    }
265
266    pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
267        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
268        let schema = Arc::new(Schema::new(vec![
269            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
270            Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
271                crate::constants::FIELD_ID_META_KEY.to_string(),
272                lfid_val.to_string(),
273            )])),
274        ]));
275
276        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
277        let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
278        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
279        self.store.append(&batch)?;
280        Ok(())
281    }
282
283    pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
284        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
285        let batch = match self.store.gather_rows(
286            &[lfid],
287            &[CATALOG_NEXT_TABLE_ROW_ID],
288            GatherNullPolicy::IncludeNulls,
289        ) {
290            Ok(batch) => batch,
291            Err(llkv_result::Error::NotFound) => return Ok(None),
292            Err(err) => return Err(err),
293        };
294
295        if batch.num_columns() == 0 || batch.num_rows() == 0 {
296            return Ok(None);
297        }
298
299        let array = batch
300            .column(0)
301            .as_any()
302            .downcast_ref::<UInt64Array>()
303            .ok_or_else(|| {
304                llkv_result::Error::Internal(
305                    "catalog next_table_id column stored unexpected type".into(),
306                )
307            })?;
308        if array.is_empty() || array.is_null(0) {
309            return Ok(None);
310        }
311
312        let value = array.value(0);
313        if value > TableId::MAX as u64 {
314            return Err(llkv_result::Error::InvalidArgumentError(
315                "persisted next_table_id exceeds TableId range".into(),
316            ));
317        }
318
319        Ok(Some(value as TableId))
320    }
321
322    pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
323        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
324        let row_field = rowid_fid(meta_field);
325
326        let mut collector = MaxRowIdCollector { max: None };
327        match ScanBuilder::new(self.store, row_field)
328            .options(ScanOptions::default())
329            .run(&mut collector)
330        {
331            Ok(()) => {}
332            Err(llkv_result::Error::NotFound) => return Ok(None),
333            Err(err) => return Err(err),
334        }
335
336        let max_value = match collector.max {
337            Some(value) => value,
338            None => return Ok(None),
339        };
340
341        let logical: LogicalFieldId = max_value.into();
342        Ok(Some(logical.table_id()))
343    }
344
345    /// Scan all table metadata entries from the catalog.
346    /// Returns a vector of (table_id, TableMeta) pairs for all persisted tables.
347    ///
348    /// This method first scans for all row IDs in the table metadata column,
349    /// then uses gather_rows to retrieve the actual metadata.
350    pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
351        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
352        let row_field = rowid_fid(meta_field);
353
354        // Collect all row IDs that have table metadata
355        struct RowIdCollector {
356            row_ids: Vec<u64>,
357        }
358
359        impl PrimitiveVisitor for RowIdCollector {
360            fn u64_chunk(&mut self, values: &UInt64Array) {
361                for i in 0..values.len() {
362                    self.row_ids.push(values.value(i));
363                }
364            }
365        }
366        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
367        impl PrimitiveSortedVisitor for RowIdCollector {}
368        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
369
370        let mut collector = RowIdCollector {
371            row_ids: Vec::new(),
372        };
373        match ScanBuilder::new(self.store, row_field)
374            .options(ScanOptions::default())
375            .run(&mut collector)
376        {
377            Ok(()) => {}
378            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
379            Err(err) => return Err(err),
380        }
381
382        if collector.row_ids.is_empty() {
383            return Ok(Vec::new());
384        }
385
386        // Gather all table metadata using the collected row IDs
387        let batch = self.store.gather_rows(
388            &[meta_field],
389            &collector.row_ids,
390            GatherNullPolicy::IncludeNulls,
391        )?;
392
393        let meta_col = batch
394            .column(0)
395            .as_any()
396            .downcast_ref::<BinaryArray>()
397            .ok_or_else(|| {
398                llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
399            })?;
400
401        let mut result = Vec::new();
402        for (idx, &row_id) in collector.row_ids.iter().enumerate() {
403            if !meta_col.is_null(idx) {
404                let bytes = meta_col.value(idx);
405                if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
406                    let logical: LogicalFieldId = row_id.into();
407                    let table_id = logical.table_id();
408                    result.push((table_id, meta));
409                }
410            }
411        }
412
413        Ok(result)
414    }
415
416    /// Persist the next transaction id to the catalog.
417    pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
418        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
419        let schema = Arc::new(Schema::new(vec![
420            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
421            Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
422                crate::constants::FIELD_ID_META_KEY.to_string(),
423                lfid_val.to_string(),
424            )])),
425        ]));
426
427        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
428        let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
429        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
430        self.store.append(&batch)?;
431        Ok(())
432    }
433
434    /// Load the next transaction id from the catalog.
435    pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
436        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
437        let batch = match self.store.gather_rows(
438            &[lfid],
439            &[CATALOG_NEXT_TXN_ROW_ID],
440            GatherNullPolicy::IncludeNulls,
441        ) {
442            Ok(batch) => batch,
443            Err(llkv_result::Error::NotFound) => return Ok(None),
444            Err(err) => return Err(err),
445        };
446
447        if batch.num_columns() == 0 || batch.num_rows() == 0 {
448            return Ok(None);
449        }
450
451        let array = batch
452            .column(0)
453            .as_any()
454            .downcast_ref::<UInt64Array>()
455            .ok_or_else(|| {
456                llkv_result::Error::Internal(
457                    "catalog next_txn_id column stored unexpected type".into(),
458                )
459            })?;
460        if array.is_empty() || array.is_null(0) {
461            return Ok(None);
462        }
463
464        let value = array.value(0);
465        Ok(Some(value))
466    }
467
468    /// Persist the last committed transaction id to the catalog.
469    pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
470        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
471        let schema = Arc::new(Schema::new(vec![
472            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
473            Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
474                HashMap::from([(
475                    crate::constants::FIELD_ID_META_KEY.to_string(),
476                    lfid_val.to_string(),
477                )]),
478            ),
479        ]));
480
481        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
482        let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
483        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
484        self.store.append(&batch)?;
485        Ok(())
486    }
487
488    /// Load the last committed transaction id from the catalog.
489    pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
490        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
491        let batch = match self.store.gather_rows(
492            &[lfid],
493            &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
494            GatherNullPolicy::IncludeNulls,
495        ) {
496            Ok(batch) => batch,
497            Err(llkv_result::Error::NotFound) => return Ok(None),
498            Err(err) => return Err(err),
499        };
500
501        if batch.num_columns() == 0 || batch.num_rows() == 0 {
502            return Ok(None);
503        }
504
505        let array = batch
506            .column(0)
507            .as_any()
508            .downcast_ref::<UInt64Array>()
509            .ok_or_else(|| {
510                llkv_result::Error::Internal(
511                    "catalog last_committed_txn_id column stored unexpected type".into(),
512                )
513            })?;
514        if array.is_empty() || array.is_null(0) {
515            return Ok(None);
516        }
517
518        let value = array.value(0);
519        Ok(Some(value))
520    }
521
522    /// Persist the catalog state to the system catalog.
523    ///
524    /// Stores the complete catalog state (all tables and fields) as a binary blob
525    /// using bitcode serialization.
526    pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
527        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
528        let schema = Arc::new(Schema::new(vec![
529            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
530            Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
531                crate::constants::FIELD_ID_META_KEY.to_string(),
532                lfid_val.to_string(),
533            )])),
534        ]));
535
536        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
537        let encoded = bitcode::encode(state);
538        let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
539
540        let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
541        self.store.append(&batch)?;
542        Ok(())
543    }
544
545    /// Load the catalog state from the system catalog.
546    ///
547    /// Retrieves the complete catalog state including all table and field mappings.
548    pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
549        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
550        let batch = match self.store.gather_rows(
551            &[lfid],
552            &[CATALOG_STATE_ROW_ID],
553            GatherNullPolicy::IncludeNulls,
554        ) {
555            Ok(batch) => batch,
556            Err(llkv_result::Error::NotFound) => return Ok(None),
557            Err(err) => return Err(err),
558        };
559
560        if batch.num_columns() == 0 || batch.num_rows() == 0 {
561            return Ok(None);
562        }
563
564        let array = batch
565            .column(0)
566            .as_any()
567            .downcast_ref::<BinaryArray>()
568            .ok_or_else(|| {
569                llkv_result::Error::Internal("catalog state column stored unexpected type".into())
570            })?;
571        if array.is_empty() || array.is_null(0) {
572            return Ok(None);
573        }
574
575        let bytes = array.value(0);
576        let state = bitcode::decode(bytes).map_err(|e| {
577            llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
578        })?;
579        Ok(Some(state))
580    }
581}
582
583struct MaxRowIdCollector {
584    max: Option<u64>,
585}
586
587impl PrimitiveVisitor for MaxRowIdCollector {
588    fn u64_chunk(&mut self, values: &UInt64Array) {
589        for i in 0..values.len() {
590            let value = values.value(i);
591            self.max = match self.max {
592                Some(curr) if curr >= value => Some(curr),
593                _ => Some(value),
594            };
595        }
596    }
597}
598
599impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
600impl PrimitiveSortedVisitor for MaxRowIdCollector {}
601impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}