llkv_table/
sys_catalog.rs

1//! System catalog for storing table and column metadata.
2//!
3//! The system catalog uses table 0 (reserved) to store metadata about all tables
4//! and columns in the database. This metadata includes:
5//!
6//! - **Table metadata** ([`TableMeta`]): Table ID, name, creation time, flags
7//! - **Column metadata** ([`ColMeta`]): Column ID, name, flags, default values
8//! - **Multi-column unique metadata** ([`TableMultiColumnUniqueMeta`]): Unique index definitions per table
9//!
10//! # Storage Format
11//!
12//! The catalog stores metadata as serialized [`bitcode`] blobs in special catalog
13//! columns within table 0. See [`CATALOG_TABLE_ID`] and related constants in the
14//! [`reserved`](crate::reserved) module.
15//!
16//! # Usage
17//!
18//! The [`SysCatalog`] provides methods to:
19//! - Insert/update table metadata ([`put_table_meta`](SysCatalog::put_table_meta))
20//! - Query table metadata ([`get_table_meta`](SysCatalog::get_table_meta))
21//! - Manage column metadata similarly
22//!
23//! This metadata is used by higher-level components to validate schemas, assign
24//! field IDs, and enforce table constraints.
25
26use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37    ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, ROW_ID_FIELD_ID, RowId, TableId};
40use llkv_column_map::store::scan::{
41    PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42    PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::types::LogicalFieldId;
46use llkv_column_map::{
47    ColumnStore,
48    store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
49};
50use llkv_result::{self, Result as LlkvResult};
51use llkv_storage::pager::{MemPager, Pager};
52use simd_r_drive_entry_handle::EntryHandle;
53
54// Import all reserved constants and validation functions
55use crate::reserved::*;
56
57// ----- Namespacing helpers -----
58
59// TODO: Dedupe with llkv_column_map::types::lfid()
60#[inline]
61fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
62    LogicalFieldId::for_user(table_id, col_id)
63}
64
65// TODO: Migrate to llkv_column_map::types::rid_table()
66#[inline]
67fn rid_table(table_id: TableId) -> u64 {
68    LogicalFieldId::for_user(table_id, ROW_ID_FIELD_ID).into()
69}
70
71// TODO: Migrate to llkv_column_map::types::rid_col()
72#[inline]
73fn rid_col(table_id: TableId, col_id: u32) -> u64 {
74    rowid_fid(lfid(table_id, col_id)).into()
75}
76
77#[inline]
78fn constraint_meta_lfid() -> LogicalFieldId {
79    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
80}
81
82#[inline]
83fn constraint_name_lfid() -> LogicalFieldId {
84    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
85}
86
87#[inline]
88fn constraint_row_lfid() -> LogicalFieldId {
89    rowid_fid(constraint_meta_lfid())
90}
91
92#[derive(Clone, Debug, Encode, Decode)]
93pub struct ConstraintNameRecord {
94    pub constraint_id: ConstraintId,
95    pub name: Option<String>,
96}
97
98fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
99    bitcode::decode(bytes).map_err(|err| {
100        llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
101    })
102}
103
104struct ConstraintRowCollector<'a, P, F>
105where
106    P: Pager<Blob = EntryHandle> + Send + Sync,
107    F: FnMut(Vec<ConstraintRecord>),
108{
109    store: &'a ColumnStore<P>,
110    lfid: LogicalFieldId,
111    table_id: TableId,
112    on_batch: &'a mut F,
113    buffer: Vec<RowId>,
114    error: Option<llkv_result::Error>,
115}
116
117impl<'a, P, F> ConstraintRowCollector<'a, P, F>
118where
119    P: Pager<Blob = EntryHandle> + Send + Sync,
120    F: FnMut(Vec<ConstraintRecord>),
121{
122    fn flush_buffer(&mut self) -> LlkvResult<()> {
123        if self.buffer.is_empty() {
124            return Ok(());
125        }
126
127        let row_ids = mem::take(&mut self.buffer);
128        let batch =
129            self.store
130                .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
131
132        if batch.num_columns() == 0 {
133            return Ok(());
134        }
135
136        let array = batch
137            .column(0)
138            .as_any()
139            .downcast_ref::<BinaryArray>()
140            .ok_or_else(|| {
141                llkv_result::Error::Internal(
142                    "constraint metadata column stored unexpected type".into(),
143                )
144            })?;
145
146        let mut records = Vec::with_capacity(row_ids.len());
147        for (idx, row_id) in row_ids.into_iter().enumerate() {
148            if array.is_null(idx) {
149                continue;
150            }
151
152            let record = decode_constraint_record(array.value(idx))?;
153            let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
154            if table_from_id != self.table_id {
155                continue;
156            }
157            if record.constraint_id != constraint_id {
158                return Err(llkv_result::Error::Internal(
159                    "constraint metadata id mismatch".into(),
160                ));
161            }
162            records.push(record);
163        }
164
165        if !records.is_empty() {
166            (self.on_batch)(records);
167        }
168
169        Ok(())
170    }
171
172    fn finish(&mut self) -> LlkvResult<()> {
173        if let Some(err) = self.error.take() {
174            return Err(err);
175        }
176        self.flush_buffer()
177    }
178}
179
180impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
181where
182    P: Pager<Blob = EntryHandle> + Send + Sync,
183    F: FnMut(Vec<ConstraintRecord>),
184{
185    fn u64_chunk(&mut self, values: &UInt64Array) {
186        if self.error.is_some() {
187            return;
188        }
189
190        for idx in 0..values.len() {
191            let row_id = values.value(idx);
192            let (table_id, _) = decode_constraint_row_id(row_id);
193            if table_id != self.table_id {
194                continue;
195            }
196            self.buffer.push(row_id);
197            if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
198                && let Err(err) = self.flush_buffer()
199            {
200                self.error = Some(err);
201                return;
202            }
203        }
204    }
205}
206
207impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
208where
209    P: Pager<Blob = EntryHandle> + Send + Sync,
210    F: FnMut(Vec<ConstraintRecord>),
211{
212}
213
214impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
215where
216    P: Pager<Blob = EntryHandle> + Send + Sync,
217    F: FnMut(Vec<ConstraintRecord>),
218{
219}
220
221impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
222where
223    P: Pager<Blob = EntryHandle> + Send + Sync,
224    F: FnMut(Vec<ConstraintRecord>),
225{
226}
227
228// ----- Public catalog types -----
229
230/// Metadata about a table.
231///
232/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
233#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
234pub struct TableMeta {
235    /// Unique identifier for this table.
236    pub table_id: TableId,
237    /// Optional human-readable name for the table.
238    pub name: Option<String>,
239    /// When the table was created (microseconds since epoch).
240    pub created_at_micros: u64,
241    /// Bitflags for table properties (e.g., temporary, system).
242    pub flags: u32,
243    /// Schema version or modification counter.
244    pub epoch: u64,
245    /// If this is a view, contains the SQL definition (SELECT statement).
246    /// If None, this is a regular table.
247    pub view_definition: Option<String>,
248}
249
250/// Metadata about a column.
251///
252/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
253#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
254pub struct ColMeta {
255    /// Unique identifier for this column within its table.
256    pub col_id: u32,
257    /// Optional human-readable name for the column.
258    pub name: Option<String>,
259    /// Bitflags for column properties (e.g., nullable, indexed).
260    pub flags: u32,
261    /// Optional serialized default value for the column.
262    pub default: Option<Vec<u8>>,
263}
264
265/// Metadata about a schema.
266///
267/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
268#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
269pub struct SchemaMeta {
270    /// Human-readable schema name (case-preserved).
271    pub name: String,
272    /// When the schema was created (microseconds since epoch).
273    pub created_at_micros: u64,
274    /// Bitflags for schema properties (reserved for future use).
275    pub flags: u32,
276}
277
278/// Metadata about a custom type (CREATE TYPE/DOMAIN).
279///
280/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
281/// Represents type aliases like `CREATE TYPE custom_type AS integer`.
282#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
283pub struct CustomTypeMeta {
284    /// Human-readable type name (case-preserved, stored lowercase in catalog).
285    pub name: String,
286    /// SQL representation of the base data type (e.g., "INTEGER", "VARCHAR(100)").
287    /// Stored as string to avoid Arrow DataType serialization complexity.
288    pub base_type_sql: String,
289    /// When the type was created (microseconds since epoch).
290    pub created_at_micros: u64,
291}
292
293/// Metadata describing a single multi-column UNIQUE index.
294#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
295pub struct MultiColumnUniqueEntryMeta {
296    /// Optional human-readable index name.
297    pub index_name: Option<String>,
298    /// Field IDs participating in this UNIQUE constraint.
299    pub column_ids: Vec<u32>,
300}
301
302/// Metadata describing a single named single-column index.
303#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
304pub struct SingleColumnIndexEntryMeta {
305    /// Human-readable index name (case preserved).
306    pub index_name: String,
307    /// Lower-cased canonical index name for case-insensitive lookups.
308    pub canonical_name: String,
309    /// Identifier of the column the index covers.
310    pub column_id: FieldId,
311    /// Human-readable column name (case preserved).
312    pub column_name: String,
313    /// Whether this index enforces uniqueness for the column.
314    pub unique: bool,
315}
316
317/// Metadata describing all single-column indexes registered for a table.
318#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
319pub struct TableSingleColumnIndexMeta {
320    /// Owning table identifier.
321    pub table_id: TableId,
322    /// Definitions for each named single-column index on the table.
323    pub indexes: Vec<SingleColumnIndexEntryMeta>,
324}
325
326/// Metadata describing all multi-column UNIQUE indexes for a table.
327#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
328pub struct TableMultiColumnUniqueMeta {
329    /// Table identifier these UNIQUE entries belong to.
330    pub table_id: TableId,
331    /// Definitions of each persisted multi-column UNIQUE.
332    pub uniques: Vec<MultiColumnUniqueEntryMeta>,
333}
334
335// ----- SysCatalog -----
336
337/// Interface to the system catalog (table 0).
338///
339/// The system catalog stores metadata about all tables and columns in the database.
340/// It uses special reserved columns within table 0 to persist [`TableMeta`] and
341/// [`ColMeta`] structures.
342///
343/// # Lifetime
344///
345/// `SysCatalog` borrows a reference to the [`ColumnStore`] and does not own it.
346/// This allows multiple catalog instances to coexist with the same storage.
347pub struct SysCatalog<'a, P = MemPager>
348where
349    P: Pager<Blob = EntryHandle> + Send + Sync,
350{
351    store: &'a ColumnStore<P>,
352}
353
354impl<'a, P> SysCatalog<'a, P>
355where
356    P: Pager<Blob = EntryHandle> + Send + Sync,
357{
358    fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
359        if row_ids.is_empty() {
360            return Ok(());
361        }
362
363        let lfid_val: u64 = meta_field.into();
364        let schema = Arc::new(Schema::new(vec![
365            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
366            Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
367                crate::constants::FIELD_ID_META_KEY.to_string(),
368                lfid_val.to_string(),
369            )])),
370        ]));
371
372        let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
373        let mut builder = BinaryBuilder::new();
374        for _ in row_ids {
375            builder.append_null();
376        }
377        let meta_array = Arc::new(builder.finish());
378
379        let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
380        self.store.append(&batch)?;
381        Ok(())
382    }
383
384    /// Create a new system catalog interface using the provided column store.
385    pub fn new(store: &'a ColumnStore<P>) -> Self {
386        Self { store }
387    }
388
389    /// Insert or update table metadata.
390    ///
391    /// This persists the table's metadata to the system catalog. If metadata for
392    /// this table ID already exists, it is overwritten (last-write-wins).
393    pub fn put_table_meta(&self, meta: &TableMeta) {
394        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
395        let schema = Arc::new(Schema::new(vec![
396            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
397            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
398                crate::constants::FIELD_ID_META_KEY.to_string(),
399                lfid_val.to_string(),
400            )])),
401        ]));
402
403        let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
404        let meta_encoded = bitcode::encode(meta);
405        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
406
407        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
408        self.store.append(&batch).unwrap();
409    }
410
411    /// Retrieve table metadata by table ID.
412    ///
413    /// Returns `None` if no metadata exists for the given table ID.
414    pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
415        let row_id = rid_table(table_id);
416        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
417        let batch = self
418            .store
419            .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
420            .ok()?;
421
422        if batch.num_rows() == 0 || batch.num_columns() == 0 {
423            return None;
424        }
425
426        let array = batch
427            .column(0)
428            .as_any()
429            .downcast_ref::<BinaryArray>()
430            .expect("table meta column must be BinaryArray");
431
432        if array.is_null(0) {
433            return None;
434        }
435
436        bitcode::decode(array.value(0)).ok()
437    }
438
439    /// Upsert a single column’s metadata.
440    pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
441        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
442        let schema = Arc::new(Schema::new(vec![
443            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
444            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
445                crate::constants::FIELD_ID_META_KEY.to_string(),
446                lfid_val.to_string(),
447            )])),
448        ]));
449
450        let rid_value = rid_col(table_id, meta.col_id);
451        let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
452        let meta_encoded = bitcode::encode(meta);
453        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
454
455        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
456        self.store.append(&batch).unwrap();
457    }
458
459    /// Batch fetch specific column metas by col_id using a shared keyset.
460    pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
461        if col_ids.is_empty() {
462            return Vec::new();
463        }
464
465        let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
466        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
467
468        let batch =
469            match self
470                .store
471                .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
472            {
473                Ok(batch) => batch,
474                Err(_) => return vec![None; col_ids.len()],
475            };
476
477        let meta_col = batch
478            .column(0)
479            .as_any()
480            .downcast_ref::<BinaryArray>()
481            .expect("catalog meta column should be Binary");
482
483        col_ids
484            .iter()
485            .enumerate()
486            .map(|(idx, _)| {
487                if meta_col.is_null(idx) {
488                    None
489                } else {
490                    bitcode::decode(meta_col.value(idx)).ok()
491                }
492            })
493            .collect()
494    }
495
496    /// Delete metadata rows for the specified column identifiers.
497    pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
498        if col_ids.is_empty() {
499            return Ok(());
500        }
501
502        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
503        let row_ids: Vec<RowId> = col_ids
504            .iter()
505            .map(|&col_id| rid_col(table_id, col_id))
506            .collect();
507        self.write_null_entries(meta_field, &row_ids)
508    }
509
510    /// Remove the persisted table metadata record, if present.
511    pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
512        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
513        let row_id = rid_table(table_id);
514        self.write_null_entries(meta_field, &[row_id])
515    }
516
517    /// Delete constraint records for the provided identifiers.
518    pub fn delete_constraint_records(
519        &self,
520        table_id: TableId,
521        constraint_ids: &[ConstraintId],
522    ) -> LlkvResult<()> {
523        if constraint_ids.is_empty() {
524            return Ok(());
525        }
526
527        let meta_field = constraint_meta_lfid();
528        let row_ids: Vec<RowId> = constraint_ids
529            .iter()
530            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
531            .collect();
532        self.write_null_entries(meta_field, &row_ids)
533    }
534
535    /// Delete persisted constraint names for the provided identifiers.
536    pub fn delete_constraint_names(
537        &self,
538        table_id: TableId,
539        constraint_ids: &[ConstraintId],
540    ) -> LlkvResult<()> {
541        if constraint_ids.is_empty() {
542            return Ok(());
543        }
544
545        let lfid = constraint_name_lfid();
546        let row_ids: Vec<RowId> = constraint_ids
547            .iter()
548            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
549            .collect();
550        self.write_null_entries(lfid, &row_ids)
551    }
552
553    /// Delete the multi-column UNIQUE metadata record for a table, if any.
554    pub fn delete_multi_column_uniques(&self, table_id: TableId) -> LlkvResult<()> {
555        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
556        let row_id = rid_table(table_id);
557        self.write_null_entries(meta_field, &[row_id])
558    }
559
560    /// Delete the single-column index metadata record for a table, if any exists.
561    pub fn delete_single_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
562        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
563        let row_id = rid_table(table_id);
564        self.write_null_entries(meta_field, &[row_id])
565    }
566
567    /// Persist the complete set of multi-column UNIQUE definitions for a table.
568    pub fn put_multi_column_uniques(
569        &self,
570        table_id: TableId,
571        uniques: &[MultiColumnUniqueEntryMeta],
572    ) -> LlkvResult<()> {
573        let lfid_val: u64 =
574            lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
575        let schema = Arc::new(Schema::new(vec![
576            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
577            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
578                crate::constants::FIELD_ID_META_KEY.to_string(),
579                lfid_val.to_string(),
580            )])),
581        ]));
582
583        let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
584        let meta = TableMultiColumnUniqueMeta {
585            table_id,
586            uniques: uniques.to_vec(),
587        };
588        let encoded = bitcode::encode(&meta);
589        let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
590
591        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
592        self.store.append(&batch)?;
593        Ok(())
594    }
595
596    /// Persist the complete set of single-column index definitions for a table.
597    pub fn put_single_column_indexes(
598        &self,
599        table_id: TableId,
600        indexes: &[SingleColumnIndexEntryMeta],
601    ) -> LlkvResult<()> {
602        let lfid_val: u64 =
603            lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID).into();
604        let schema = Arc::new(Schema::new(vec![
605            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
606            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
607                crate::constants::FIELD_ID_META_KEY.to_string(),
608                lfid_val.to_string(),
609            )])),
610        ]));
611
612        let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
613        let meta = TableSingleColumnIndexMeta {
614            table_id,
615            indexes: indexes.to_vec(),
616        };
617        let encoded = bitcode::encode(&meta);
618        let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
619
620        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
621        self.store.append(&batch)?;
622        Ok(())
623    }
624
625    /// Retrieve all persisted multi-column UNIQUE definitions for a table.
626    pub fn get_multi_column_uniques(
627        &self,
628        table_id: TableId,
629    ) -> LlkvResult<Vec<MultiColumnUniqueEntryMeta>> {
630        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
631        let row_id = rid_table(table_id);
632        let batch = match self
633            .store
634            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
635        {
636            Ok(batch) => batch,
637            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
638            Err(err) => return Err(err),
639        };
640
641        if batch.num_columns() == 0 || batch.num_rows() == 0 {
642            return Ok(Vec::new());
643        }
644
645        let array = batch
646            .column(0)
647            .as_any()
648            .downcast_ref::<BinaryArray>()
649            .ok_or_else(|| {
650                llkv_result::Error::Internal(
651                    "catalog multi-column unique column stored unexpected type".into(),
652                )
653            })?;
654
655        if array.is_null(0) {
656            return Ok(Vec::new());
657        }
658
659        let meta: TableMultiColumnUniqueMeta = bitcode::decode(array.value(0)).map_err(|err| {
660            llkv_result::Error::Internal(format!(
661                "failed to decode multi-column unique metadata: {err}"
662            ))
663        })?;
664
665        Ok(meta.uniques)
666    }
667
668    /// Retrieve all persisted single-column index definitions for a table.
669    pub fn get_single_column_indexes(
670        &self,
671        table_id: TableId,
672    ) -> LlkvResult<Vec<SingleColumnIndexEntryMeta>> {
673        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
674        let row_id = rid_table(table_id);
675        let batch = match self
676            .store
677            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
678        {
679            Ok(batch) => batch,
680            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
681            Err(err) => return Err(err),
682        };
683
684        if batch.num_columns() == 0 || batch.num_rows() == 0 {
685            return Ok(Vec::new());
686        }
687
688        let array = batch
689            .column(0)
690            .as_any()
691            .downcast_ref::<BinaryArray>()
692            .ok_or_else(|| {
693                llkv_result::Error::Internal(
694                    "catalog single-column index column stored unexpected type".into(),
695                )
696            })?;
697
698        if array.is_null(0) {
699            return Ok(Vec::new());
700        }
701
702        let meta: TableSingleColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
703            llkv_result::Error::Internal(format!(
704                "failed to decode single-column index metadata: {err}"
705            ))
706        })?;
707
708        Ok(meta.indexes)
709    }
710
711    /// Retrieve all persisted multi-column UNIQUE definitions across tables.
712    pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnUniqueMeta>> {
713        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
714        let row_field = rowid_fid(meta_field);
715
716        struct RowIdCollector {
717            row_ids: Vec<RowId>,
718        }
719
720        impl PrimitiveVisitor for RowIdCollector {
721            fn u64_chunk(&mut self, values: &UInt64Array) {
722                for i in 0..values.len() {
723                    self.row_ids.push(values.value(i));
724                }
725            }
726        }
727        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
728        impl PrimitiveSortedVisitor for RowIdCollector {}
729        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
730
731        let mut collector = RowIdCollector {
732            row_ids: Vec::new(),
733        };
734        match ScanBuilder::new(self.store, row_field)
735            .options(ScanOptions::default())
736            .run(&mut collector)
737        {
738            Ok(()) => {}
739            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
740            Err(err) => return Err(err),
741        }
742
743        if collector.row_ids.is_empty() {
744            return Ok(Vec::new());
745        }
746
747        let batch = match self.store.gather_rows(
748            &[meta_field],
749            &collector.row_ids,
750            GatherNullPolicy::IncludeNulls,
751        ) {
752            Ok(batch) => batch,
753            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
754            Err(err) => return Err(err),
755        };
756
757        if batch.num_columns() == 0 {
758            return Ok(Vec::new());
759        }
760
761        let array = batch
762            .column(0)
763            .as_any()
764            .downcast_ref::<BinaryArray>()
765            .ok_or_else(|| {
766                llkv_result::Error::Internal(
767                    "catalog multi-column unique column stored unexpected type".into(),
768                )
769            })?;
770
771        let mut metas = Vec::with_capacity(batch.num_rows());
772        for idx in 0..batch.num_rows() {
773            if array.is_null(idx) {
774                continue;
775            }
776            let meta: TableMultiColumnUniqueMeta =
777                bitcode::decode(array.value(idx)).map_err(|err| {
778                    llkv_result::Error::Internal(format!(
779                        "failed to decode multi-column unique metadata: {err}"
780                    ))
781                })?;
782            metas.push(meta);
783        }
784
785        Ok(metas)
786    }
787
788    /// Persist or update multiple constraint records for a table in a single batch.
789    pub fn put_constraint_records(
790        &self,
791        table_id: TableId,
792        records: &[ConstraintRecord],
793    ) -> LlkvResult<()> {
794        if records.is_empty() {
795            return Ok(());
796        }
797
798        let lfid_val: u64 = constraint_meta_lfid().into();
799        let schema = Arc::new(Schema::new(vec![
800            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
801            Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
802                crate::constants::FIELD_ID_META_KEY.to_string(),
803                lfid_val.to_string(),
804            )])),
805        ]));
806
807        let row_ids: Vec<RowId> = records
808            .iter()
809            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
810            .collect();
811
812        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
813        let payload_array = Arc::new(BinaryArray::from_iter_values(
814            records.iter().map(bitcode::encode),
815        ));
816
817        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
818        self.store.append(&batch)?;
819        Ok(())
820    }
821
822    /// Persist or update constraint names for the specified identifiers.
823    pub fn put_constraint_names(
824        &self,
825        table_id: TableId,
826        names: &[ConstraintNameRecord],
827    ) -> LlkvResult<()> {
828        if names.is_empty() {
829            return Ok(());
830        }
831
832        let lfid_val: u64 = constraint_name_lfid().into();
833        let schema = Arc::new(Schema::new(vec![
834            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
835            Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
836                (
837                    crate::constants::FIELD_ID_META_KEY.to_string(),
838                    lfid_val.to_string(),
839                ),
840            ])),
841        ]));
842
843        let row_ids: Vec<RowId> = names
844            .iter()
845            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
846            .collect();
847        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
848        let payload_array = Arc::new(BinaryArray::from_iter_values(
849            names.iter().map(bitcode::encode),
850        ));
851
852        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
853        self.store.append(&batch)?;
854        Ok(())
855    }
856
857    /// Fetch multiple constraint records for a table in a single batch.
858    pub fn get_constraint_records(
859        &self,
860        table_id: TableId,
861        constraint_ids: &[ConstraintId],
862    ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
863        if constraint_ids.is_empty() {
864            return Ok(Vec::new());
865        }
866
867        let lfid = constraint_meta_lfid();
868        let row_ids: Vec<RowId> = constraint_ids
869            .iter()
870            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
871            .collect();
872
873        let batch = match self
874            .store
875            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
876        {
877            Ok(batch) => batch,
878            Err(llkv_result::Error::NotFound) => {
879                return Ok(vec![None; constraint_ids.len()]);
880            }
881            Err(err) => return Err(err),
882        };
883
884        if batch.num_columns() == 0 || batch.num_rows() == 0 {
885            return Ok(vec![None; constraint_ids.len()]);
886        }
887
888        let array = batch
889            .column(0)
890            .as_any()
891            .downcast_ref::<BinaryArray>()
892            .ok_or_else(|| {
893                llkv_result::Error::Internal(
894                    "constraint metadata column stored unexpected type".into(),
895                )
896            })?;
897
898        let mut results = Vec::with_capacity(constraint_ids.len());
899        for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
900            if array.is_null(idx) {
901                results.push(None);
902                continue;
903            }
904            let record = decode_constraint_record(array.value(idx))?;
905            if record.constraint_id != constraint_id {
906                return Err(llkv_result::Error::Internal(
907                    "constraint metadata id mismatch".into(),
908                ));
909            }
910            results.push(Some(record));
911        }
912
913        Ok(results)
914    }
915
916    /// Fetch constraint names for a table in a single batch.
917    pub fn get_constraint_names(
918        &self,
919        table_id: TableId,
920        constraint_ids: &[ConstraintId],
921    ) -> LlkvResult<Vec<Option<String>>> {
922        if constraint_ids.is_empty() {
923            return Ok(Vec::new());
924        }
925
926        let lfid = constraint_name_lfid();
927        let row_ids: Vec<RowId> = constraint_ids
928            .iter()
929            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
930            .collect();
931
932        let batch = match self
933            .store
934            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
935        {
936            Ok(batch) => batch,
937            Err(llkv_result::Error::NotFound) => {
938                return Ok(vec![None; constraint_ids.len()]);
939            }
940            Err(err) => return Err(err),
941        };
942
943        if batch.num_columns() == 0 {
944            return Ok(vec![None; constraint_ids.len()]);
945        }
946
947        let array = batch
948            .column(0)
949            .as_any()
950            .downcast_ref::<BinaryArray>()
951            .ok_or_else(|| {
952                llkv_result::Error::Internal(
953                    "constraint name metadata column stored unexpected type".into(),
954                )
955            })?;
956
957        let mut results = Vec::with_capacity(row_ids.len());
958        for idx in 0..row_ids.len() {
959            if array.is_null(idx) {
960                results.push(None);
961            } else {
962                let record: ConstraintNameRecord =
963                    bitcode::decode(array.value(idx)).map_err(|err| {
964                        llkv_result::Error::Internal(format!(
965                            "failed to decode constraint name metadata: {err}"
966                        ))
967                    })?;
968                results.push(record.name);
969            }
970        }
971
972        Ok(results)
973    }
974
975    /// Stream constraint records for a table in batches.
976    pub fn scan_constraint_records_for_table<F>(
977        &self,
978        table_id: TableId,
979        mut on_batch: F,
980    ) -> LlkvResult<()>
981    where
982        F: FnMut(Vec<ConstraintRecord>),
983    {
984        let row_field = constraint_row_lfid();
985        let mut visitor = ConstraintRowCollector {
986            store: self.store,
987            lfid: constraint_meta_lfid(),
988            table_id,
989            on_batch: &mut on_batch,
990            buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
991            error: None,
992        };
993
994        match ScanBuilder::new(self.store, row_field)
995            .options(ScanOptions::default())
996            .run(&mut visitor)
997        {
998            Ok(()) => {}
999            Err(llkv_result::Error::NotFound) => return Ok(()),
1000            Err(err) => return Err(err),
1001        }
1002
1003        visitor.finish()
1004    }
1005
1006    /// Load all constraint records for a table into a vector.
1007    pub fn constraint_records_for_table(
1008        &self,
1009        table_id: TableId,
1010    ) -> LlkvResult<Vec<ConstraintRecord>> {
1011        let mut all = Vec::new();
1012        self.scan_constraint_records_for_table(table_id, |mut chunk| {
1013            all.append(&mut chunk);
1014        })?;
1015        Ok(all)
1016    }
1017
1018    pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
1019        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
1020        let schema = Arc::new(Schema::new(vec![
1021            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1022            Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1023                crate::constants::FIELD_ID_META_KEY.to_string(),
1024                lfid_val.to_string(),
1025            )])),
1026        ]));
1027
1028        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
1029        let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
1030        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1031        self.store.append(&batch)?;
1032        Ok(())
1033    }
1034
1035    pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
1036        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
1037        let batch = match self.store.gather_rows(
1038            &[lfid],
1039            &[CATALOG_NEXT_TABLE_ROW_ID],
1040            GatherNullPolicy::IncludeNulls,
1041        ) {
1042            Ok(batch) => batch,
1043            Err(llkv_result::Error::NotFound) => return Ok(None),
1044            Err(err) => return Err(err),
1045        };
1046
1047        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1048            return Ok(None);
1049        }
1050
1051        let array = batch
1052            .column(0)
1053            .as_any()
1054            .downcast_ref::<UInt64Array>()
1055            .ok_or_else(|| {
1056                llkv_result::Error::Internal(
1057                    "catalog next_table_id column stored unexpected type".into(),
1058                )
1059            })?;
1060        if array.is_empty() || array.is_null(0) {
1061            return Ok(None);
1062        }
1063
1064        let value = array.value(0);
1065        if value > TableId::MAX as u64 {
1066            return Err(llkv_result::Error::InvalidArgumentError(
1067                "persisted next_table_id exceeds TableId range".into(),
1068            ));
1069        }
1070
1071        Ok(Some(value as TableId))
1072    }
1073
1074    pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
1075        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1076        let row_field = rowid_fid(meta_field);
1077
1078        let mut collector = MaxRowIdCollector { max: None };
1079        match ScanBuilder::new(self.store, row_field)
1080            .options(ScanOptions::default())
1081            .run(&mut collector)
1082        {
1083            Ok(()) => {}
1084            Err(llkv_result::Error::NotFound) => return Ok(None),
1085            Err(err) => return Err(err),
1086        }
1087
1088        let max_value = match collector.max {
1089            Some(value) => value,
1090            None => return Ok(None),
1091        };
1092
1093        let logical: LogicalFieldId = max_value.into();
1094        Ok(Some(logical.table_id()))
1095    }
1096
1097    /// Scan all table metadata entries from the catalog.
1098    /// Returns a vector of (table_id, TableMeta) pairs for all persisted tables.
1099    ///
1100    /// This method first scans for all row IDs in the table metadata column,
1101    /// then uses gather_rows to retrieve the actual metadata.
1102    pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
1103        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1104        let row_field = rowid_fid(meta_field);
1105
1106        // Collect all row IDs that have table metadata
1107        struct RowIdCollector {
1108            row_ids: Vec<RowId>,
1109        }
1110
1111        impl PrimitiveVisitor for RowIdCollector {
1112            fn u64_chunk(&mut self, values: &UInt64Array) {
1113                for i in 0..values.len() {
1114                    self.row_ids.push(values.value(i));
1115                }
1116            }
1117        }
1118        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1119        impl PrimitiveSortedVisitor for RowIdCollector {}
1120        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1121
1122        let mut collector = RowIdCollector {
1123            row_ids: Vec::new(),
1124        };
1125        match ScanBuilder::new(self.store, row_field)
1126            .options(ScanOptions::default())
1127            .run(&mut collector)
1128        {
1129            Ok(()) => {}
1130            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1131            Err(err) => return Err(err),
1132        }
1133
1134        if collector.row_ids.is_empty() {
1135            return Ok(Vec::new());
1136        }
1137
1138        // Gather all table metadata using the collected row IDs
1139        let batch = self.store.gather_rows(
1140            &[meta_field],
1141            &collector.row_ids,
1142            GatherNullPolicy::IncludeNulls,
1143        )?;
1144
1145        let meta_col = batch
1146            .column(0)
1147            .as_any()
1148            .downcast_ref::<BinaryArray>()
1149            .ok_or_else(|| {
1150                llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1151            })?;
1152
1153        let mut result = Vec::new();
1154        for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1155            if !meta_col.is_null(idx) {
1156                let bytes = meta_col.value(idx);
1157                if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1158                    let logical: LogicalFieldId = row_id.into();
1159                    let table_id = logical.table_id();
1160                    result.push((table_id, meta));
1161                }
1162            }
1163        }
1164
1165        Ok(result)
1166    }
1167
1168    /// Persist the next transaction id to the catalog.
1169    pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1170        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1171        let schema = Arc::new(Schema::new(vec![
1172            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1173            Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1174                crate::constants::FIELD_ID_META_KEY.to_string(),
1175                lfid_val.to_string(),
1176            )])),
1177        ]));
1178
1179        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1180        let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1181        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1182        self.store.append(&batch)?;
1183        Ok(())
1184    }
1185
1186    /// Load the next transaction id from the catalog.
1187    pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1188        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1189        let batch = match self.store.gather_rows(
1190            &[lfid],
1191            &[CATALOG_NEXT_TXN_ROW_ID],
1192            GatherNullPolicy::IncludeNulls,
1193        ) {
1194            Ok(batch) => batch,
1195            Err(llkv_result::Error::NotFound) => return Ok(None),
1196            Err(err) => return Err(err),
1197        };
1198
1199        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1200            return Ok(None);
1201        }
1202
1203        let array = batch
1204            .column(0)
1205            .as_any()
1206            .downcast_ref::<UInt64Array>()
1207            .ok_or_else(|| {
1208                llkv_result::Error::Internal(
1209                    "catalog next_txn_id column stored unexpected type".into(),
1210                )
1211            })?;
1212        if array.is_empty() || array.is_null(0) {
1213            return Ok(None);
1214        }
1215
1216        let value = array.value(0);
1217        Ok(Some(value))
1218    }
1219
1220    /// Persist the last committed transaction id to the catalog.
1221    pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1222        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1223        let schema = Arc::new(Schema::new(vec![
1224            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1225            Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1226                HashMap::from([(
1227                    crate::constants::FIELD_ID_META_KEY.to_string(),
1228                    lfid_val.to_string(),
1229                )]),
1230            ),
1231        ]));
1232
1233        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1234        let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1235        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1236        self.store.append(&batch)?;
1237        Ok(())
1238    }
1239
1240    /// Load the last committed transaction id from the catalog.
1241    pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1242        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1243        let batch = match self.store.gather_rows(
1244            &[lfid],
1245            &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1246            GatherNullPolicy::IncludeNulls,
1247        ) {
1248            Ok(batch) => batch,
1249            Err(llkv_result::Error::NotFound) => return Ok(None),
1250            Err(err) => return Err(err),
1251        };
1252
1253        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1254            return Ok(None);
1255        }
1256
1257        let array = batch
1258            .column(0)
1259            .as_any()
1260            .downcast_ref::<UInt64Array>()
1261            .ok_or_else(|| {
1262                llkv_result::Error::Internal(
1263                    "catalog last_committed_txn_id column stored unexpected type".into(),
1264                )
1265            })?;
1266        if array.is_empty() || array.is_null(0) {
1267            return Ok(None);
1268        }
1269
1270        let value = array.value(0);
1271        Ok(Some(value))
1272    }
1273
1274    /// Persist the catalog state to the system catalog.
1275    ///
1276    /// Stores the complete catalog state (all tables and fields) as a binary blob
1277    /// using bitcode serialization.
1278    pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1279        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1280        let schema = Arc::new(Schema::new(vec![
1281            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1282            Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1283                crate::constants::FIELD_ID_META_KEY.to_string(),
1284                lfid_val.to_string(),
1285            )])),
1286        ]));
1287
1288        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1289        let encoded = bitcode::encode(state);
1290        let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1291
1292        let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1293        self.store.append(&batch)?;
1294        Ok(())
1295    }
1296
1297    /// Load the catalog state from the system catalog.
1298    ///
1299    /// Retrieves the complete catalog state including all table and field mappings.
1300    pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1301        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1302        let batch = match self.store.gather_rows(
1303            &[lfid],
1304            &[CATALOG_STATE_ROW_ID],
1305            GatherNullPolicy::IncludeNulls,
1306        ) {
1307            Ok(batch) => batch,
1308            Err(llkv_result::Error::NotFound) => return Ok(None),
1309            Err(err) => return Err(err),
1310        };
1311
1312        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1313            return Ok(None);
1314        }
1315
1316        let array = batch
1317            .column(0)
1318            .as_any()
1319            .downcast_ref::<BinaryArray>()
1320            .ok_or_else(|| {
1321                llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1322            })?;
1323        if array.is_empty() || array.is_null(0) {
1324            return Ok(None);
1325        }
1326
1327        let bytes = array.value(0);
1328        let state = bitcode::decode(bytes).map_err(|e| {
1329            llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1330        })?;
1331        Ok(Some(state))
1332    }
1333
1334    /// Persist schema metadata to the catalog.
1335    ///
1336    /// Stores schema metadata at a row ID derived from the schema name hash.
1337    /// This allows efficient lookup and prevents collisions.
1338    pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1339        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1340        let schema = Arc::new(Schema::new(vec![
1341            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1342            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1343                crate::constants::FIELD_ID_META_KEY.to_string(),
1344                lfid_val.to_string(),
1345            )])),
1346        ]));
1347
1348        // Use hash of canonical (lowercase) schema name as row ID
1349        let canonical = meta.name.to_ascii_lowercase();
1350        let row_id_val = schema_name_to_row_id(&canonical);
1351        let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1352        let meta_encoded = bitcode::encode(meta);
1353        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1354
1355        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1356        self.store.append(&batch)?;
1357        Ok(())
1358    }
1359
1360    /// Retrieve schema metadata by name.
1361    ///
1362    /// Returns `None` if the schema does not exist.
1363    pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1364        let canonical = schema_name.to_ascii_lowercase();
1365        let row_id = schema_name_to_row_id(&canonical);
1366        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1367
1368        let batch = match self
1369            .store
1370            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1371        {
1372            Ok(batch) => batch,
1373            Err(llkv_result::Error::NotFound) => return Ok(None),
1374            Err(err) => return Err(err),
1375        };
1376
1377        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1378            return Ok(None);
1379        }
1380
1381        let array = batch
1382            .column(0)
1383            .as_any()
1384            .downcast_ref::<BinaryArray>()
1385            .ok_or_else(|| {
1386                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1387            })?;
1388
1389        if array.is_empty() || array.is_null(0) {
1390            return Ok(None);
1391        }
1392
1393        let bytes = array.value(0);
1394        let meta = bitcode::decode(bytes).map_err(|e| {
1395            llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1396        })?;
1397        Ok(Some(meta))
1398    }
1399
1400    /// Scan all schema metadata entries from the catalog.
1401    ///
1402    /// Returns a vector of all persisted schemas.
1403    pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1404        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1405        let row_field = rowid_fid(meta_field);
1406
1407        // Collect all row IDs that have schema metadata
1408        struct RowIdCollector {
1409            row_ids: Vec<RowId>,
1410        }
1411
1412        impl PrimitiveVisitor for RowIdCollector {
1413            fn u64_chunk(&mut self, values: &UInt64Array) {
1414                for i in 0..values.len() {
1415                    self.row_ids.push(values.value(i));
1416                }
1417            }
1418        }
1419        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1420        impl PrimitiveSortedVisitor for RowIdCollector {}
1421        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1422
1423        let mut collector = RowIdCollector {
1424            row_ids: Vec::new(),
1425        };
1426        match ScanBuilder::new(self.store, row_field)
1427            .options(ScanOptions::default())
1428            .run(&mut collector)
1429        {
1430            Ok(()) => {}
1431            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1432            Err(err) => return Err(err),
1433        }
1434
1435        if collector.row_ids.is_empty() {
1436            return Ok(Vec::new());
1437        }
1438
1439        // Gather all schema metadata using the collected row IDs
1440        let batch = self.store.gather_rows(
1441            &[meta_field],
1442            &collector.row_ids,
1443            GatherNullPolicy::IncludeNulls,
1444        )?;
1445
1446        let meta_col = batch
1447            .column(0)
1448            .as_any()
1449            .downcast_ref::<BinaryArray>()
1450            .ok_or_else(|| {
1451                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1452            })?;
1453
1454        let mut result = Vec::new();
1455        for idx in 0..collector.row_ids.len() {
1456            if !meta_col.is_null(idx) {
1457                let bytes = meta_col.value(idx);
1458                if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1459                    result.push(meta);
1460                }
1461            }
1462        }
1463
1464        Ok(result)
1465    }
1466
1467    /// Persist custom type metadata to the catalog.
1468    ///
1469    /// Stores custom type metadata at a row ID derived from the type name hash.
1470    pub fn put_custom_type_meta(&self, meta: &CustomTypeMeta) -> LlkvResult<()> {
1471        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID).into();
1472        let schema = Arc::new(Schema::new(vec![
1473            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1474            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1475                crate::constants::FIELD_ID_META_KEY.to_string(),
1476                lfid_val.to_string(),
1477            )])),
1478        ]));
1479
1480        // Use hash of canonical (lowercase) type name as row ID
1481        let canonical = meta.name.to_ascii_lowercase();
1482        let row_id_val = schema_name_to_row_id(&canonical); // Reuse same hash function
1483        let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1484        let meta_encoded = bitcode::encode(meta);
1485        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1486
1487        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1488        self.store.append(&batch)?;
1489        Ok(())
1490    }
1491
1492    /// Retrieve custom type metadata by name.
1493    ///
1494    /// Returns `None` if the type does not exist.
1495    pub fn get_custom_type_meta(&self, type_name: &str) -> LlkvResult<Option<CustomTypeMeta>> {
1496        let canonical = type_name.to_ascii_lowercase();
1497        let row_id = schema_name_to_row_id(&canonical);
1498        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1499
1500        let batch = match self
1501            .store
1502            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1503        {
1504            Ok(batch) => batch,
1505            Err(llkv_result::Error::NotFound) => return Ok(None),
1506            Err(err) => return Err(err),
1507        };
1508
1509        let meta_col = batch
1510            .column(0)
1511            .as_any()
1512            .downcast_ref::<BinaryArray>()
1513            .ok_or_else(|| {
1514                llkv_result::Error::Internal(
1515                    "catalog custom_type_meta column should be Binary".into(),
1516                )
1517            })?;
1518
1519        if meta_col.is_null(0) {
1520            return Ok(None);
1521        }
1522
1523        let bytes = meta_col.value(0);
1524        let meta = bitcode::decode(bytes).map_err(|err| {
1525            llkv_result::Error::Internal(format!("failed to decode custom type metadata: {err}"))
1526        })?;
1527        Ok(Some(meta))
1528    }
1529
1530    /// Delete custom type metadata by name.
1531    ///
1532    /// Returns `Ok(())` regardless of whether the type existed.
1533    pub fn delete_custom_type_meta(&self, type_name: &str) -> LlkvResult<()> {
1534        let canonical = type_name.to_ascii_lowercase();
1535        let row_id = schema_name_to_row_id(&canonical);
1536        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1537
1538        // Delete by writing null value
1539        let lfid_val: u64 = lfid.into();
1540        let schema = Arc::new(Schema::new(vec![
1541            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1542            Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
1543                crate::constants::FIELD_ID_META_KEY.to_string(),
1544                lfid_val.to_string(),
1545            )])),
1546        ]));
1547
1548        let row_id_arr = Arc::new(UInt64Array::from(vec![row_id]));
1549        let mut meta_builder = BinaryBuilder::new();
1550        meta_builder.append_null();
1551        let meta_arr = Arc::new(meta_builder.finish());
1552
1553        let batch = RecordBatch::try_new(schema, vec![row_id_arr, meta_arr])?;
1554        self.store.append(&batch)?;
1555        Ok(())
1556    }
1557
1558    /// Retrieve all custom type metadata.
1559    ///
1560    /// Returns all persisted custom types.
1561    pub fn all_custom_type_metas(&self) -> LlkvResult<Vec<CustomTypeMeta>> {
1562        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1563        let row_field = rowid_fid(meta_field);
1564
1565        // Collect all row IDs that have custom type metadata
1566        struct RowIdCollector {
1567            row_ids: Vec<RowId>,
1568        }
1569
1570        impl PrimitiveVisitor for RowIdCollector {
1571            fn u64_chunk(&mut self, values: &UInt64Array) {
1572                for i in 0..values.len() {
1573                    self.row_ids.push(values.value(i));
1574                }
1575            }
1576        }
1577        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1578        impl PrimitiveSortedVisitor for RowIdCollector {}
1579        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1580
1581        let mut collector = RowIdCollector {
1582            row_ids: Vec::new(),
1583        };
1584        match ScanBuilder::new(self.store, row_field)
1585            .options(ScanOptions::default())
1586            .run(&mut collector)
1587        {
1588            Ok(()) => {}
1589            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1590            Err(err) => return Err(err),
1591        }
1592
1593        if collector.row_ids.is_empty() {
1594            return Ok(Vec::new());
1595        }
1596
1597        // Gather all custom type metadata using the collected row IDs
1598        let batch = self.store.gather_rows(
1599            &[meta_field],
1600            &collector.row_ids,
1601            GatherNullPolicy::IncludeNulls,
1602        )?;
1603
1604        let meta_col = batch
1605            .column(0)
1606            .as_any()
1607            .downcast_ref::<BinaryArray>()
1608            .ok_or_else(|| {
1609                llkv_result::Error::Internal(
1610                    "catalog custom_type_meta column should be Binary".into(),
1611                )
1612            })?;
1613
1614        let mut result = Vec::new();
1615        for idx in 0..collector.row_ids.len() {
1616            if !meta_col.is_null(idx) {
1617                let bytes = meta_col.value(idx);
1618                if let Ok(meta) = bitcode::decode::<CustomTypeMeta>(bytes) {
1619                    result.push(meta);
1620                }
1621            }
1622        }
1623
1624        Ok(result)
1625    }
1626}
1627
1628/// Generate a row ID for schema metadata based on schema name.
1629///
1630/// Uses a simple hash to map schema names to row IDs. This is deterministic
1631/// and allows direct lookup without scanning.
1632fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1633    // Use a simple 64-bit FNV-1a hash for deterministic IDs across platforms and releases
1634    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1635    const FNV_PRIME: u64 = 0x1000_0000_01b3;
1636
1637    let mut hash = FNV_OFFSET;
1638    for byte in canonical_name.as_bytes() {
1639        hash ^= u64::from(*byte);
1640        hash = hash.wrapping_mul(FNV_PRIME);
1641    }
1642
1643    // Use high bit to avoid collision with reserved catalog row IDs (0-3) and table metadata rows
1644    hash | (1u64 << 63)
1645}
1646
1647struct MaxRowIdCollector {
1648    max: Option<RowId>,
1649}
1650
1651impl PrimitiveVisitor for MaxRowIdCollector {
1652    fn u64_chunk(&mut self, values: &UInt64Array) {
1653        for i in 0..values.len() {
1654            let value = values.value(i);
1655            self.max = match self.max {
1656                Some(curr) if curr >= value => Some(curr),
1657                _ => Some(value),
1658            };
1659        }
1660    }
1661}
1662
1663impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1664impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1665impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1666
1667#[cfg(test)]
1668mod tests {
1669    use super::*;
1670    use crate::constraints::{
1671        ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1672    };
1673    use llkv_column_map::ColumnStore;
1674    use std::sync::Arc;
1675
1676    #[test]
1677    fn constraint_records_roundtrip() {
1678        let pager = Arc::new(MemPager::default());
1679        let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1680        let catalog = SysCatalog::new(&store);
1681
1682        let table_id: TableId = 42;
1683        let record1 = ConstraintRecord {
1684            constraint_id: 1,
1685            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1686                field_ids: vec![1, 2],
1687            }),
1688            state: ConstraintState::Active,
1689            revision: 1,
1690            last_modified_micros: 100,
1691        };
1692        let record2 = ConstraintRecord {
1693            constraint_id: 2,
1694            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1695            state: ConstraintState::Active,
1696            revision: 2,
1697            last_modified_micros: 200,
1698        };
1699        catalog
1700            .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1701            .unwrap();
1702
1703        let other_table_record = ConstraintRecord {
1704            constraint_id: 1,
1705            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1706            state: ConstraintState::Active,
1707            revision: 1,
1708            last_modified_micros: 150,
1709        };
1710        catalog
1711            .put_constraint_records(7, &[other_table_record])
1712            .unwrap();
1713
1714        let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1715        fetched.sort_by_key(|record| record.constraint_id);
1716
1717        assert_eq!(fetched.len(), 2);
1718        assert_eq!(fetched[0], record1);
1719        assert_eq!(fetched[1], record2);
1720
1721        let single = catalog
1722            .get_constraint_records(table_id, &[record1.constraint_id])
1723            .unwrap();
1724        assert_eq!(single.len(), 1);
1725        assert_eq!(single[0].as_ref(), Some(&record1));
1726
1727        let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1728        assert_eq!(missing.len(), 1);
1729        assert!(missing[0].is_none());
1730    }
1731}