llkv_table/
sys_catalog.rs

1//! System catalog for storing table and column metadata.
2//!
3//! The system catalog uses table 0 (reserved) to store metadata about all tables
4//! and columns in the database. This metadata includes:
5//!
6//! - **Table metadata** ([`TableMeta`]): Table ID, name, creation time, flags
7//! - **Column metadata** ([`ColMeta`]): Column ID, name, flags, default values
8//! - **Multi-column index metadata** ([`TableMultiColumnIndexMeta`])
9//!
10//! # Storage Format
11//!
12//! The catalog stores metadata as serialized [`bitcode`] blobs in special catalog
13//! columns within table 0. See [`CATALOG_TABLE_ID`] and related constants in the
14//! [`reserved`](crate::reserved) module.
15//!
16//! # Usage
17//!
18//! The [`SysCatalog`] provides methods to:
19//! - Insert/update table metadata ([`put_table_meta`](SysCatalog::put_table_meta))
20//! - Query table metadata ([`get_table_meta`](SysCatalog::get_table_meta))
21//! - Manage column metadata similarly
22//!
23//! This metadata is used by higher-level components to validate schemas, assign
24//! field IDs, and enforce table constraints.
25
26use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37    ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, ROW_ID_FIELD_ID, RowId, TableId};
40use llkv_column_map::store::scan::{
41    PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42    PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::types::LogicalFieldId;
46use llkv_column_map::{
47    ColumnStore,
48    store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
49};
50use llkv_result::{self, Result as LlkvResult};
51use llkv_storage::pager::{MemPager, Pager};
52use simd_r_drive_entry_handle::EntryHandle;
53
54// Import all reserved constants and validation functions
55use crate::reserved::*;
56
57// ----- Namespacing helpers -----
58
59// TODO: Dedupe with llkv_column_map::types::lfid()
60#[inline]
61fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
62    LogicalFieldId::for_user(table_id, col_id)
63}
64
65// TODO: Migrate to llkv_column_map::types::rid_table()
66#[inline]
67fn rid_table(table_id: TableId) -> u64 {
68    LogicalFieldId::for_user(table_id, ROW_ID_FIELD_ID).into()
69}
70
71// TODO: Migrate to llkv_column_map::types::rid_col()
72#[inline]
73fn rid_col(table_id: TableId, col_id: u32) -> u64 {
74    rowid_fid(lfid(table_id, col_id)).into()
75}
76
77#[inline]
78fn constraint_meta_lfid() -> LogicalFieldId {
79    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
80}
81
82#[inline]
83fn constraint_name_lfid() -> LogicalFieldId {
84    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
85}
86
87#[inline]
88fn constraint_row_lfid() -> LogicalFieldId {
89    rowid_fid(constraint_meta_lfid())
90}
91
92#[derive(Clone, Debug, Encode, Decode)]
93pub struct ConstraintNameRecord {
94    pub constraint_id: ConstraintId,
95    pub name: Option<String>,
96}
97
98fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
99    bitcode::decode(bytes).map_err(|err| {
100        llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
101    })
102}
103
104struct ConstraintRowCollector<'a, P, F>
105where
106    P: Pager<Blob = EntryHandle> + Send + Sync,
107    F: FnMut(Vec<ConstraintRecord>),
108{
109    store: &'a ColumnStore<P>,
110    lfid: LogicalFieldId,
111    table_id: TableId,
112    on_batch: &'a mut F,
113    buffer: Vec<RowId>,
114    error: Option<llkv_result::Error>,
115}
116
117impl<'a, P, F> ConstraintRowCollector<'a, P, F>
118where
119    P: Pager<Blob = EntryHandle> + Send + Sync,
120    F: FnMut(Vec<ConstraintRecord>),
121{
122    fn flush_buffer(&mut self) -> LlkvResult<()> {
123        if self.buffer.is_empty() {
124            return Ok(());
125        }
126
127        let row_ids = mem::take(&mut self.buffer);
128        let batch =
129            self.store
130                .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
131
132        if batch.num_columns() == 0 {
133            return Ok(());
134        }
135
136        let array = batch
137            .column(0)
138            .as_any()
139            .downcast_ref::<BinaryArray>()
140            .ok_or_else(|| {
141                llkv_result::Error::Internal(
142                    "constraint metadata column stored unexpected type".into(),
143                )
144            })?;
145
146        let mut records = Vec::with_capacity(row_ids.len());
147        for (idx, row_id) in row_ids.into_iter().enumerate() {
148            if array.is_null(idx) {
149                continue;
150            }
151
152            let record = decode_constraint_record(array.value(idx))?;
153            let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
154            if table_from_id != self.table_id {
155                continue;
156            }
157            if record.constraint_id != constraint_id {
158                return Err(llkv_result::Error::Internal(
159                    "constraint metadata id mismatch".into(),
160                ));
161            }
162            records.push(record);
163        }
164
165        if !records.is_empty() {
166            (self.on_batch)(records);
167        }
168
169        Ok(())
170    }
171
172    fn finish(&mut self) -> LlkvResult<()> {
173        if let Some(err) = self.error.take() {
174            return Err(err);
175        }
176        self.flush_buffer()
177    }
178}
179
180impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
181where
182    P: Pager<Blob = EntryHandle> + Send + Sync,
183    F: FnMut(Vec<ConstraintRecord>),
184{
185    fn u64_chunk(&mut self, values: &UInt64Array) {
186        if self.error.is_some() {
187            return;
188        }
189
190        for idx in 0..values.len() {
191            let row_id = values.value(idx);
192            let (table_id, _) = decode_constraint_row_id(row_id);
193            if table_id != self.table_id {
194                continue;
195            }
196            self.buffer.push(row_id);
197            if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
198                && let Err(err) = self.flush_buffer()
199            {
200                self.error = Some(err);
201                return;
202            }
203        }
204    }
205}
206
207impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
208where
209    P: Pager<Blob = EntryHandle> + Send + Sync,
210    F: FnMut(Vec<ConstraintRecord>),
211{
212}
213
214impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
215where
216    P: Pager<Blob = EntryHandle> + Send + Sync,
217    F: FnMut(Vec<ConstraintRecord>),
218{
219}
220
221impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
222where
223    P: Pager<Blob = EntryHandle> + Send + Sync,
224    F: FnMut(Vec<ConstraintRecord>),
225{
226}
227
228// ----- Public catalog types -----
229
230/// Metadata about a table.
231///
232/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
233#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
234pub struct TableMeta {
235    /// Unique identifier for this table.
236    pub table_id: TableId,
237    /// Optional human-readable name for the table.
238    pub name: Option<String>,
239    /// When the table was created (microseconds since epoch).
240    pub created_at_micros: u64,
241    /// Bitflags for table properties (e.g., temporary, system).
242    pub flags: u32,
243    /// Schema version or modification counter.
244    pub epoch: u64,
245    /// If this is a view, contains the SQL definition (SELECT statement).
246    /// If None, this is a regular table.
247    pub view_definition: Option<String>,
248}
249
250/// Metadata about a column.
251///
252/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
253#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
254pub struct ColMeta {
255    /// Unique identifier for this column within its table.
256    pub col_id: u32,
257    /// Optional human-readable name for the column.
258    pub name: Option<String>,
259    /// Bitflags for column properties (e.g., nullable, indexed).
260    pub flags: u32,
261    /// Optional serialized default value for the column.
262    pub default: Option<Vec<u8>>,
263}
264
265/// Metadata about a schema.
266///
267/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
268#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
269pub struct SchemaMeta {
270    /// Human-readable schema name (case-preserved).
271    pub name: String,
272    /// When the schema was created (microseconds since epoch).
273    pub created_at_micros: u64,
274    /// Bitflags for schema properties (reserved for future use).
275    pub flags: u32,
276}
277
278/// Metadata about a custom type (CREATE TYPE/DOMAIN).
279///
280/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
281/// Represents type aliases like `CREATE TYPE custom_type AS integer`.
282#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
283pub struct CustomTypeMeta {
284    /// Human-readable type name (case-preserved, stored lowercase in catalog).
285    pub name: String,
286    /// SQL representation of the base data type (e.g., "INTEGER", "VARCHAR(100)").
287    /// Stored as string to avoid Arrow DataType serialization complexity.
288    pub base_type_sql: String,
289    /// When the type was created (microseconds since epoch).
290    pub created_at_micros: u64,
291}
292
293/// Metadata describing a single multi-column index (unique or non-unique).
294///
295/// Used to track both named CREATE INDEX statements and UNIQUE constraints over multiple columns.
296#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
297pub struct MultiColumnIndexEntryMeta {
298    /// Optional human-readable index name (None for unnamed UNIQUE constraints).
299    pub index_name: Option<String>,
300    /// Normalized lowercase name used as map key.
301    pub canonical_name: String,
302    /// Field IDs participating in this index.
303    pub column_ids: Vec<FieldId>,
304    /// Whether this index enforces uniqueness.
305    pub unique: bool,
306}
307
308/// Metadata describing a single named single-column index.
309#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
310pub struct SingleColumnIndexEntryMeta {
311    /// Human-readable index name (case preserved).
312    pub index_name: String,
313    /// Lower-cased canonical index name for case-insensitive lookups.
314    pub canonical_name: String,
315    /// Identifier of the column the index covers.
316    pub column_id: FieldId,
317    /// Human-readable column name (case preserved).
318    pub column_name: String,
319    /// Whether this index enforces uniqueness for the column.
320    pub unique: bool,
321    /// Whether the index is sorted in ascending order (true) or descending (false).
322    pub ascending: bool,
323    /// Whether NULL values appear first (true) or last (false) in the index.
324    pub nulls_first: bool,
325}
326
327/// Metadata describing all single-column indexes registered for a table.
328#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
329pub struct TableSingleColumnIndexMeta {
330    /// Owning table identifier.
331    pub table_id: TableId,
332    /// Definitions for each named single-column index on the table.
333    pub indexes: Vec<SingleColumnIndexEntryMeta>,
334}
335
336/// Metadata describing all multi-column indexes (unique and non-unique) for a table.
337#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
338pub struct TableMultiColumnIndexMeta {
339    /// Table identifier these indexes belong to.
340    pub table_id: TableId,
341    /// Definitions of each persisted multi-column index.
342    pub indexes: Vec<MultiColumnIndexEntryMeta>,
343}
344
345// ----- SysCatalog -----
346
347/// Interface to the system catalog (table 0).
348///
349/// The system catalog stores metadata about all tables and columns in the database.
350/// It uses special reserved columns within table 0 to persist [`TableMeta`] and
351/// [`ColMeta`] structures.
352///
353/// # Lifetime
354///
355/// `SysCatalog` borrows a reference to the [`ColumnStore`] and does not own it.
356/// This allows multiple catalog instances to coexist with the same storage.
357pub struct SysCatalog<'a, P = MemPager>
358where
359    P: Pager<Blob = EntryHandle> + Send + Sync,
360{
361    store: &'a ColumnStore<P>,
362}
363
364impl<'a, P> SysCatalog<'a, P>
365where
366    P: Pager<Blob = EntryHandle> + Send + Sync,
367{
368    fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
369        if row_ids.is_empty() {
370            return Ok(());
371        }
372
373        let lfid_val: u64 = meta_field.into();
374        let schema = Arc::new(Schema::new(vec![
375            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
376            Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
377                crate::constants::FIELD_ID_META_KEY.to_string(),
378                lfid_val.to_string(),
379            )])),
380        ]));
381
382        let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
383        let mut builder = BinaryBuilder::new();
384        for _ in row_ids {
385            builder.append_null();
386        }
387        let meta_array = Arc::new(builder.finish());
388
389        let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
390        self.store.append(&batch)?;
391        Ok(())
392    }
393
394    /// Create a new system catalog interface using the provided column store.
395    pub fn new(store: &'a ColumnStore<P>) -> Self {
396        Self { store }
397    }
398
399    /// Insert or update table metadata.
400    ///
401    /// This persists the table's metadata to the system catalog. If metadata for
402    /// this table ID already exists, it is overwritten (last-write-wins).
403    pub fn put_table_meta(&self, meta: &TableMeta) {
404        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
405        let schema = Arc::new(Schema::new(vec![
406            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
407            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
408                crate::constants::FIELD_ID_META_KEY.to_string(),
409                lfid_val.to_string(),
410            )])),
411        ]));
412
413        let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
414        let meta_encoded = bitcode::encode(meta);
415        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
416
417        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
418        self.store.append(&batch).unwrap();
419    }
420
421    /// Retrieve table metadata by table ID.
422    ///
423    /// Returns `None` if no metadata exists for the given table ID.
424    pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
425        let row_id = rid_table(table_id);
426        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
427        let batch = self
428            .store
429            .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
430            .ok()?;
431
432        if batch.num_rows() == 0 || batch.num_columns() == 0 {
433            return None;
434        }
435
436        let array = batch
437            .column(0)
438            .as_any()
439            .downcast_ref::<BinaryArray>()
440            .expect("table meta column must be BinaryArray");
441
442        if array.is_null(0) {
443            return None;
444        }
445
446        bitcode::decode(array.value(0)).ok()
447    }
448
449    /// Upsert a single column’s metadata.
450    pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
451        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
452        let schema = Arc::new(Schema::new(vec![
453            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
454            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
455                crate::constants::FIELD_ID_META_KEY.to_string(),
456                lfid_val.to_string(),
457            )])),
458        ]));
459
460        let rid_value = rid_col(table_id, meta.col_id);
461        let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
462        let meta_encoded = bitcode::encode(meta);
463        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
464
465        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
466        self.store.append(&batch).unwrap();
467    }
468
469    /// Batch fetch specific column metas by col_id using a shared keyset.
470    pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
471        if col_ids.is_empty() {
472            return Vec::new();
473        }
474
475        let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
476        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
477
478        let batch =
479            match self
480                .store
481                .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
482            {
483                Ok(batch) => batch,
484                Err(_) => return vec![None; col_ids.len()],
485            };
486
487        let meta_col = batch
488            .column(0)
489            .as_any()
490            .downcast_ref::<BinaryArray>()
491            .expect("catalog meta column should be Binary");
492
493        col_ids
494            .iter()
495            .enumerate()
496            .map(|(idx, _)| {
497                if meta_col.is_null(idx) {
498                    None
499                } else {
500                    bitcode::decode(meta_col.value(idx)).ok()
501                }
502            })
503            .collect()
504    }
505
506    /// Delete metadata rows for the specified column identifiers.
507    pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
508        if col_ids.is_empty() {
509            return Ok(());
510        }
511
512        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
513        let row_ids: Vec<RowId> = col_ids
514            .iter()
515            .map(|&col_id| rid_col(table_id, col_id))
516            .collect();
517        self.write_null_entries(meta_field, &row_ids)
518    }
519
520    /// Remove the persisted table metadata record, if present.
521    pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
522        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
523        let row_id = rid_table(table_id);
524        self.write_null_entries(meta_field, &[row_id])
525    }
526
527    /// Delete constraint records for the provided identifiers.
528    pub fn delete_constraint_records(
529        &self,
530        table_id: TableId,
531        constraint_ids: &[ConstraintId],
532    ) -> LlkvResult<()> {
533        if constraint_ids.is_empty() {
534            return Ok(());
535        }
536
537        let meta_field = constraint_meta_lfid();
538        let row_ids: Vec<RowId> = constraint_ids
539            .iter()
540            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
541            .collect();
542        self.write_null_entries(meta_field, &row_ids)
543    }
544
545    /// Delete persisted constraint names for the provided identifiers.
546    pub fn delete_constraint_names(
547        &self,
548        table_id: TableId,
549        constraint_ids: &[ConstraintId],
550    ) -> LlkvResult<()> {
551        if constraint_ids.is_empty() {
552            return Ok(());
553        }
554
555        let lfid = constraint_name_lfid();
556        let row_ids: Vec<RowId> = constraint_ids
557            .iter()
558            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
559            .collect();
560        self.write_null_entries(lfid, &row_ids)
561    }
562
563    /// Delete the multi-column index metadata record for a table, if any.
564    pub fn delete_multi_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
565        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
566        let row_id = rid_table(table_id);
567        self.write_null_entries(meta_field, &[row_id])
568    }
569
570    /// Delete the single-column index metadata record for a table, if any exists.
571    pub fn delete_single_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
572        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
573        let row_id = rid_table(table_id);
574        self.write_null_entries(meta_field, &[row_id])
575    }
576
577    /// Persist the complete set of multi-column index definitions for a table.
578    pub fn put_multi_column_indexes(
579        &self,
580        table_id: TableId,
581        indexes: &[MultiColumnIndexEntryMeta],
582    ) -> LlkvResult<()> {
583        let lfid_val: u64 =
584            lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
585        let schema = Arc::new(Schema::new(vec![
586            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
587            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
588                crate::constants::FIELD_ID_META_KEY.to_string(),
589                lfid_val.to_string(),
590            )])),
591        ]));
592
593        let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
594        let meta = TableMultiColumnIndexMeta {
595            table_id,
596            indexes: indexes.to_vec(),
597        };
598        let encoded = bitcode::encode(&meta);
599        let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
600
601        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
602        self.store.append(&batch)?;
603        Ok(())
604    }
605
606    /// Persist the complete set of single-column index definitions for a table.
607    pub fn put_single_column_indexes(
608        &self,
609        table_id: TableId,
610        indexes: &[SingleColumnIndexEntryMeta],
611    ) -> LlkvResult<()> {
612        let lfid_val: u64 =
613            lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID).into();
614        let schema = Arc::new(Schema::new(vec![
615            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
616            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
617                crate::constants::FIELD_ID_META_KEY.to_string(),
618                lfid_val.to_string(),
619            )])),
620        ]));
621
622        let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
623        let meta = TableSingleColumnIndexMeta {
624            table_id,
625            indexes: indexes.to_vec(),
626        };
627        let encoded = bitcode::encode(&meta);
628        let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
629
630        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
631        self.store.append(&batch)?;
632        Ok(())
633    }
634
635    /// Retrieve all persisted multi-column index definitions for a table.
636    pub fn get_multi_column_indexes(
637        &self,
638        table_id: TableId,
639    ) -> LlkvResult<Vec<MultiColumnIndexEntryMeta>> {
640        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
641        let row_id = rid_table(table_id);
642        let batch = match self
643            .store
644            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
645        {
646            Ok(batch) => batch,
647            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
648            Err(err) => return Err(err),
649        };
650
651        if batch.num_columns() == 0 || batch.num_rows() == 0 {
652            return Ok(Vec::new());
653        }
654
655        let array = batch
656            .column(0)
657            .as_any()
658            .downcast_ref::<BinaryArray>()
659            .ok_or_else(|| {
660                llkv_result::Error::Internal(
661                    "catalog multi-column index column stored unexpected type".into(),
662                )
663            })?;
664
665        if array.is_null(0) {
666            return Ok(Vec::new());
667        }
668
669        let meta: TableMultiColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
670            llkv_result::Error::Internal(format!(
671                "failed to decode multi-column index metadata: {err}"
672            ))
673        })?;
674
675        Ok(meta.indexes)
676    }
677
678    /// Retrieve all persisted single-column index definitions for a table.
679    pub fn get_single_column_indexes(
680        &self,
681        table_id: TableId,
682    ) -> LlkvResult<Vec<SingleColumnIndexEntryMeta>> {
683        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
684        let row_id = rid_table(table_id);
685        let batch = match self
686            .store
687            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
688        {
689            Ok(batch) => batch,
690            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
691            Err(err) => return Err(err),
692        };
693
694        if batch.num_columns() == 0 || batch.num_rows() == 0 {
695            return Ok(Vec::new());
696        }
697
698        let array = batch
699            .column(0)
700            .as_any()
701            .downcast_ref::<BinaryArray>()
702            .ok_or_else(|| {
703                llkv_result::Error::Internal(
704                    "catalog single-column index column stored unexpected type".into(),
705                )
706            })?;
707
708        if array.is_null(0) {
709            return Ok(Vec::new());
710        }
711
712        let meta: TableSingleColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
713            llkv_result::Error::Internal(format!(
714                "failed to decode single-column index metadata: {err}"
715            ))
716        })?;
717
718        Ok(meta.indexes)
719    }
720
721    /// Retrieve all persisted multi-column index definitions across tables.
722    pub fn all_multi_column_index_metas(&self) -> LlkvResult<Vec<TableMultiColumnIndexMeta>> {
723        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
724        let row_field = rowid_fid(meta_field);
725
726        struct RowIdCollector {
727            row_ids: Vec<RowId>,
728        }
729
730        impl PrimitiveVisitor for RowIdCollector {
731            fn u64_chunk(&mut self, values: &UInt64Array) {
732                for i in 0..values.len() {
733                    self.row_ids.push(values.value(i));
734                }
735            }
736        }
737        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
738        impl PrimitiveSortedVisitor for RowIdCollector {}
739        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
740
741        let mut collector = RowIdCollector {
742            row_ids: Vec::new(),
743        };
744        match ScanBuilder::new(self.store, row_field)
745            .options(ScanOptions::default())
746            .run(&mut collector)
747        {
748            Ok(()) => {}
749            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
750            Err(err) => return Err(err),
751        }
752
753        if collector.row_ids.is_empty() {
754            return Ok(Vec::new());
755        }
756
757        let batch = match self.store.gather_rows(
758            &[meta_field],
759            &collector.row_ids,
760            GatherNullPolicy::IncludeNulls,
761        ) {
762            Ok(batch) => batch,
763            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
764            Err(err) => return Err(err),
765        };
766
767        if batch.num_columns() == 0 {
768            return Ok(Vec::new());
769        }
770
771        let array = batch
772            .column(0)
773            .as_any()
774            .downcast_ref::<BinaryArray>()
775            .ok_or_else(|| {
776                llkv_result::Error::Internal(
777                    "catalog multi-column index column stored unexpected type".into(),
778                )
779            })?;
780
781        let mut metas = Vec::with_capacity(batch.num_rows());
782        for idx in 0..batch.num_rows() {
783            if array.is_null(idx) {
784                continue;
785            }
786            let meta: TableMultiColumnIndexMeta =
787                bitcode::decode(array.value(idx)).map_err(|err| {
788                    llkv_result::Error::Internal(format!(
789                        "failed to decode multi-column index metadata: {err}"
790                    ))
791                })?;
792            metas.push(meta);
793        }
794
795        Ok(metas)
796    }
797
798    /// Persist or update multiple constraint records for a table in a single batch.
799    pub fn put_constraint_records(
800        &self,
801        table_id: TableId,
802        records: &[ConstraintRecord],
803    ) -> LlkvResult<()> {
804        if records.is_empty() {
805            return Ok(());
806        }
807
808        let lfid_val: u64 = constraint_meta_lfid().into();
809        let schema = Arc::new(Schema::new(vec![
810            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
811            Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
812                crate::constants::FIELD_ID_META_KEY.to_string(),
813                lfid_val.to_string(),
814            )])),
815        ]));
816
817        let row_ids: Vec<RowId> = records
818            .iter()
819            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
820            .collect();
821
822        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
823        let payload_array = Arc::new(BinaryArray::from_iter_values(
824            records.iter().map(bitcode::encode),
825        ));
826
827        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
828        self.store.append(&batch)?;
829        Ok(())
830    }
831
832    /// Persist or update constraint names for the specified identifiers.
833    pub fn put_constraint_names(
834        &self,
835        table_id: TableId,
836        names: &[ConstraintNameRecord],
837    ) -> LlkvResult<()> {
838        if names.is_empty() {
839            return Ok(());
840        }
841
842        let lfid_val: u64 = constraint_name_lfid().into();
843        let schema = Arc::new(Schema::new(vec![
844            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
845            Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
846                (
847                    crate::constants::FIELD_ID_META_KEY.to_string(),
848                    lfid_val.to_string(),
849                ),
850            ])),
851        ]));
852
853        let row_ids: Vec<RowId> = names
854            .iter()
855            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
856            .collect();
857        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
858        let payload_array = Arc::new(BinaryArray::from_iter_values(
859            names.iter().map(bitcode::encode),
860        ));
861
862        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
863        self.store.append(&batch)?;
864        Ok(())
865    }
866
867    /// Fetch multiple constraint records for a table in a single batch.
868    pub fn get_constraint_records(
869        &self,
870        table_id: TableId,
871        constraint_ids: &[ConstraintId],
872    ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
873        if constraint_ids.is_empty() {
874            return Ok(Vec::new());
875        }
876
877        let lfid = constraint_meta_lfid();
878        let row_ids: Vec<RowId> = constraint_ids
879            .iter()
880            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
881            .collect();
882
883        let batch = match self
884            .store
885            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
886        {
887            Ok(batch) => batch,
888            Err(llkv_result::Error::NotFound) => {
889                return Ok(vec![None; constraint_ids.len()]);
890            }
891            Err(err) => return Err(err),
892        };
893
894        if batch.num_columns() == 0 || batch.num_rows() == 0 {
895            return Ok(vec![None; constraint_ids.len()]);
896        }
897
898        let array = batch
899            .column(0)
900            .as_any()
901            .downcast_ref::<BinaryArray>()
902            .ok_or_else(|| {
903                llkv_result::Error::Internal(
904                    "constraint metadata column stored unexpected type".into(),
905                )
906            })?;
907
908        let mut results = Vec::with_capacity(constraint_ids.len());
909        for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
910            if array.is_null(idx) {
911                results.push(None);
912                continue;
913            }
914            let record = decode_constraint_record(array.value(idx))?;
915            if record.constraint_id != constraint_id {
916                return Err(llkv_result::Error::Internal(
917                    "constraint metadata id mismatch".into(),
918                ));
919            }
920            results.push(Some(record));
921        }
922
923        Ok(results)
924    }
925
926    /// Fetch constraint names for a table in a single batch.
927    pub fn get_constraint_names(
928        &self,
929        table_id: TableId,
930        constraint_ids: &[ConstraintId],
931    ) -> LlkvResult<Vec<Option<String>>> {
932        if constraint_ids.is_empty() {
933            return Ok(Vec::new());
934        }
935
936        let lfid = constraint_name_lfid();
937        let row_ids: Vec<RowId> = constraint_ids
938            .iter()
939            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
940            .collect();
941
942        let batch = match self
943            .store
944            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
945        {
946            Ok(batch) => batch,
947            Err(llkv_result::Error::NotFound) => {
948                return Ok(vec![None; constraint_ids.len()]);
949            }
950            Err(err) => return Err(err),
951        };
952
953        if batch.num_columns() == 0 {
954            return Ok(vec![None; constraint_ids.len()]);
955        }
956
957        let array = batch
958            .column(0)
959            .as_any()
960            .downcast_ref::<BinaryArray>()
961            .ok_or_else(|| {
962                llkv_result::Error::Internal(
963                    "constraint name metadata column stored unexpected type".into(),
964                )
965            })?;
966
967        let mut results = Vec::with_capacity(row_ids.len());
968        for idx in 0..row_ids.len() {
969            if array.is_null(idx) {
970                results.push(None);
971            } else {
972                let record: ConstraintNameRecord =
973                    bitcode::decode(array.value(idx)).map_err(|err| {
974                        llkv_result::Error::Internal(format!(
975                            "failed to decode constraint name metadata: {err}"
976                        ))
977                    })?;
978                results.push(record.name);
979            }
980        }
981
982        Ok(results)
983    }
984
985    /// Stream constraint records for a table in batches.
986    pub fn scan_constraint_records_for_table<F>(
987        &self,
988        table_id: TableId,
989        mut on_batch: F,
990    ) -> LlkvResult<()>
991    where
992        F: FnMut(Vec<ConstraintRecord>),
993    {
994        let row_field = constraint_row_lfid();
995        let mut visitor = ConstraintRowCollector {
996            store: self.store,
997            lfid: constraint_meta_lfid(),
998            table_id,
999            on_batch: &mut on_batch,
1000            buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
1001            error: None,
1002        };
1003
1004        match ScanBuilder::new(self.store, row_field)
1005            .options(ScanOptions::default())
1006            .run(&mut visitor)
1007        {
1008            Ok(()) => {}
1009            Err(llkv_result::Error::NotFound) => return Ok(()),
1010            Err(err) => return Err(err),
1011        }
1012
1013        visitor.finish()
1014    }
1015
1016    /// Load all constraint records for a table into a vector.
1017    pub fn constraint_records_for_table(
1018        &self,
1019        table_id: TableId,
1020    ) -> LlkvResult<Vec<ConstraintRecord>> {
1021        let mut all = Vec::new();
1022        self.scan_constraint_records_for_table(table_id, |mut chunk| {
1023            all.append(&mut chunk);
1024        })?;
1025        Ok(all)
1026    }
1027
1028    pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
1029        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
1030        let schema = Arc::new(Schema::new(vec![
1031            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1032            Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1033                crate::constants::FIELD_ID_META_KEY.to_string(),
1034                lfid_val.to_string(),
1035            )])),
1036        ]));
1037
1038        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
1039        let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
1040        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1041        self.store.append(&batch)?;
1042        Ok(())
1043    }
1044
1045    pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
1046        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
1047        let batch = match self.store.gather_rows(
1048            &[lfid],
1049            &[CATALOG_NEXT_TABLE_ROW_ID],
1050            GatherNullPolicy::IncludeNulls,
1051        ) {
1052            Ok(batch) => batch,
1053            Err(llkv_result::Error::NotFound) => return Ok(None),
1054            Err(err) => return Err(err),
1055        };
1056
1057        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1058            return Ok(None);
1059        }
1060
1061        let array = batch
1062            .column(0)
1063            .as_any()
1064            .downcast_ref::<UInt64Array>()
1065            .ok_or_else(|| {
1066                llkv_result::Error::Internal(
1067                    "catalog next_table_id column stored unexpected type".into(),
1068                )
1069            })?;
1070        if array.is_empty() || array.is_null(0) {
1071            return Ok(None);
1072        }
1073
1074        let value = array.value(0);
1075        if value > TableId::MAX as u64 {
1076            return Err(llkv_result::Error::InvalidArgumentError(
1077                "persisted next_table_id exceeds TableId range".into(),
1078            ));
1079        }
1080
1081        Ok(Some(value as TableId))
1082    }
1083
1084    pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
1085        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1086        let row_field = rowid_fid(meta_field);
1087
1088        let mut collector = MaxRowIdCollector { max: None };
1089        match ScanBuilder::new(self.store, row_field)
1090            .options(ScanOptions::default())
1091            .run(&mut collector)
1092        {
1093            Ok(()) => {}
1094            Err(llkv_result::Error::NotFound) => return Ok(None),
1095            Err(err) => return Err(err),
1096        }
1097
1098        let max_value = match collector.max {
1099            Some(value) => value,
1100            None => return Ok(None),
1101        };
1102
1103        let logical: LogicalFieldId = max_value.into();
1104        Ok(Some(logical.table_id()))
1105    }
1106
1107    /// Scan all table metadata entries from the catalog.
1108    /// Returns a vector of (table_id, TableMeta) pairs for all persisted tables.
1109    ///
1110    /// This method first scans for all row IDs in the table metadata column,
1111    /// then uses gather_rows to retrieve the actual metadata.
1112    pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
1113        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1114        let row_field = rowid_fid(meta_field);
1115
1116        // Collect all row IDs that have table metadata
1117        struct RowIdCollector {
1118            row_ids: Vec<RowId>,
1119        }
1120
1121        impl PrimitiveVisitor for RowIdCollector {
1122            fn u64_chunk(&mut self, values: &UInt64Array) {
1123                for i in 0..values.len() {
1124                    self.row_ids.push(values.value(i));
1125                }
1126            }
1127        }
1128        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1129        impl PrimitiveSortedVisitor for RowIdCollector {}
1130        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1131
1132        let mut collector = RowIdCollector {
1133            row_ids: Vec::new(),
1134        };
1135        match ScanBuilder::new(self.store, row_field)
1136            .options(ScanOptions::default())
1137            .run(&mut collector)
1138        {
1139            Ok(()) => {}
1140            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1141            Err(err) => return Err(err),
1142        }
1143
1144        if collector.row_ids.is_empty() {
1145            return Ok(Vec::new());
1146        }
1147
1148        // Gather all table metadata using the collected row IDs
1149        let batch = self.store.gather_rows(
1150            &[meta_field],
1151            &collector.row_ids,
1152            GatherNullPolicy::IncludeNulls,
1153        )?;
1154
1155        let meta_col = batch
1156            .column(0)
1157            .as_any()
1158            .downcast_ref::<BinaryArray>()
1159            .ok_or_else(|| {
1160                llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1161            })?;
1162
1163        let mut result = Vec::new();
1164        for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1165            if !meta_col.is_null(idx) {
1166                let bytes = meta_col.value(idx);
1167                if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1168                    let logical: LogicalFieldId = row_id.into();
1169                    let table_id = logical.table_id();
1170                    result.push((table_id, meta));
1171                }
1172            }
1173        }
1174
1175        Ok(result)
1176    }
1177
1178    /// Persist the next transaction id to the catalog.
1179    pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1180        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1181        let schema = Arc::new(Schema::new(vec![
1182            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1183            Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1184                crate::constants::FIELD_ID_META_KEY.to_string(),
1185                lfid_val.to_string(),
1186            )])),
1187        ]));
1188
1189        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1190        let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1191        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1192        self.store.append(&batch)?;
1193        Ok(())
1194    }
1195
1196    /// Load the next transaction id from the catalog.
1197    pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1198        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1199        let batch = match self.store.gather_rows(
1200            &[lfid],
1201            &[CATALOG_NEXT_TXN_ROW_ID],
1202            GatherNullPolicy::IncludeNulls,
1203        ) {
1204            Ok(batch) => batch,
1205            Err(llkv_result::Error::NotFound) => return Ok(None),
1206            Err(err) => return Err(err),
1207        };
1208
1209        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1210            return Ok(None);
1211        }
1212
1213        let array = batch
1214            .column(0)
1215            .as_any()
1216            .downcast_ref::<UInt64Array>()
1217            .ok_or_else(|| {
1218                llkv_result::Error::Internal(
1219                    "catalog next_txn_id column stored unexpected type".into(),
1220                )
1221            })?;
1222        if array.is_empty() || array.is_null(0) {
1223            return Ok(None);
1224        }
1225
1226        let value = array.value(0);
1227        Ok(Some(value))
1228    }
1229
1230    /// Persist the last committed transaction id to the catalog.
1231    pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1232        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1233        let schema = Arc::new(Schema::new(vec![
1234            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1235            Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1236                HashMap::from([(
1237                    crate::constants::FIELD_ID_META_KEY.to_string(),
1238                    lfid_val.to_string(),
1239                )]),
1240            ),
1241        ]));
1242
1243        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1244        let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1245        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1246        self.store.append(&batch)?;
1247        Ok(())
1248    }
1249
1250    /// Load the last committed transaction id from the catalog.
1251    pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1252        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1253        let batch = match self.store.gather_rows(
1254            &[lfid],
1255            &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1256            GatherNullPolicy::IncludeNulls,
1257        ) {
1258            Ok(batch) => batch,
1259            Err(llkv_result::Error::NotFound) => return Ok(None),
1260            Err(err) => return Err(err),
1261        };
1262
1263        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1264            return Ok(None);
1265        }
1266
1267        let array = batch
1268            .column(0)
1269            .as_any()
1270            .downcast_ref::<UInt64Array>()
1271            .ok_or_else(|| {
1272                llkv_result::Error::Internal(
1273                    "catalog last_committed_txn_id column stored unexpected type".into(),
1274                )
1275            })?;
1276        if array.is_empty() || array.is_null(0) {
1277            return Ok(None);
1278        }
1279
1280        let value = array.value(0);
1281        Ok(Some(value))
1282    }
1283
1284    /// Persist the catalog state to the system catalog.
1285    ///
1286    /// Stores the complete catalog state (all tables and fields) as a binary blob
1287    /// using bitcode serialization.
1288    pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1289        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1290        let schema = Arc::new(Schema::new(vec![
1291            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1292            Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1293                crate::constants::FIELD_ID_META_KEY.to_string(),
1294                lfid_val.to_string(),
1295            )])),
1296        ]));
1297
1298        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1299        let encoded = bitcode::encode(state);
1300        let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1301
1302        let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1303        self.store.append(&batch)?;
1304        Ok(())
1305    }
1306
1307    /// Load the catalog state from the system catalog.
1308    ///
1309    /// Retrieves the complete catalog state including all table and field mappings.
1310    pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1311        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1312        let batch = match self.store.gather_rows(
1313            &[lfid],
1314            &[CATALOG_STATE_ROW_ID],
1315            GatherNullPolicy::IncludeNulls,
1316        ) {
1317            Ok(batch) => batch,
1318            Err(llkv_result::Error::NotFound) => return Ok(None),
1319            Err(err) => return Err(err),
1320        };
1321
1322        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1323            return Ok(None);
1324        }
1325
1326        let array = batch
1327            .column(0)
1328            .as_any()
1329            .downcast_ref::<BinaryArray>()
1330            .ok_or_else(|| {
1331                llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1332            })?;
1333        if array.is_empty() || array.is_null(0) {
1334            return Ok(None);
1335        }
1336
1337        let bytes = array.value(0);
1338        let state = bitcode::decode(bytes).map_err(|e| {
1339            llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1340        })?;
1341        Ok(Some(state))
1342    }
1343
1344    /// Persist schema metadata to the catalog.
1345    ///
1346    /// Stores schema metadata at a row ID derived from the schema name hash.
1347    /// This allows efficient lookup and prevents collisions.
1348    pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1349        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1350        let schema = Arc::new(Schema::new(vec![
1351            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1352            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1353                crate::constants::FIELD_ID_META_KEY.to_string(),
1354                lfid_val.to_string(),
1355            )])),
1356        ]));
1357
1358        // Use hash of canonical (lowercase) schema name as row ID
1359        let canonical = meta.name.to_ascii_lowercase();
1360        let row_id_val = schema_name_to_row_id(&canonical);
1361        let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1362        let meta_encoded = bitcode::encode(meta);
1363        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1364
1365        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1366        self.store.append(&batch)?;
1367        Ok(())
1368    }
1369
1370    /// Retrieve schema metadata by name.
1371    ///
1372    /// Returns `None` if the schema does not exist.
1373    pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1374        let canonical = schema_name.to_ascii_lowercase();
1375        let row_id = schema_name_to_row_id(&canonical);
1376        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1377
1378        let batch = match self
1379            .store
1380            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1381        {
1382            Ok(batch) => batch,
1383            Err(llkv_result::Error::NotFound) => return Ok(None),
1384            Err(err) => return Err(err),
1385        };
1386
1387        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1388            return Ok(None);
1389        }
1390
1391        let array = batch
1392            .column(0)
1393            .as_any()
1394            .downcast_ref::<BinaryArray>()
1395            .ok_or_else(|| {
1396                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1397            })?;
1398
1399        if array.is_empty() || array.is_null(0) {
1400            return Ok(None);
1401        }
1402
1403        let bytes = array.value(0);
1404        let meta = bitcode::decode(bytes).map_err(|e| {
1405            llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1406        })?;
1407        Ok(Some(meta))
1408    }
1409
1410    /// Scan all schema metadata entries from the catalog.
1411    ///
1412    /// Returns a vector of all persisted schemas.
1413    pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1414        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1415        let row_field = rowid_fid(meta_field);
1416
1417        // Collect all row IDs that have schema metadata
1418        struct RowIdCollector {
1419            row_ids: Vec<RowId>,
1420        }
1421
1422        impl PrimitiveVisitor for RowIdCollector {
1423            fn u64_chunk(&mut self, values: &UInt64Array) {
1424                for i in 0..values.len() {
1425                    self.row_ids.push(values.value(i));
1426                }
1427            }
1428        }
1429        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1430        impl PrimitiveSortedVisitor for RowIdCollector {}
1431        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1432
1433        let mut collector = RowIdCollector {
1434            row_ids: Vec::new(),
1435        };
1436        match ScanBuilder::new(self.store, row_field)
1437            .options(ScanOptions::default())
1438            .run(&mut collector)
1439        {
1440            Ok(()) => {}
1441            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1442            Err(err) => return Err(err),
1443        }
1444
1445        if collector.row_ids.is_empty() {
1446            return Ok(Vec::new());
1447        }
1448
1449        // Gather all schema metadata using the collected row IDs
1450        let batch = self.store.gather_rows(
1451            &[meta_field],
1452            &collector.row_ids,
1453            GatherNullPolicy::IncludeNulls,
1454        )?;
1455
1456        let meta_col = batch
1457            .column(0)
1458            .as_any()
1459            .downcast_ref::<BinaryArray>()
1460            .ok_or_else(|| {
1461                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1462            })?;
1463
1464        let mut result = Vec::new();
1465        for idx in 0..collector.row_ids.len() {
1466            if !meta_col.is_null(idx) {
1467                let bytes = meta_col.value(idx);
1468                if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1469                    result.push(meta);
1470                }
1471            }
1472        }
1473
1474        Ok(result)
1475    }
1476
1477    /// Persist custom type metadata to the catalog.
1478    ///
1479    /// Stores custom type metadata at a row ID derived from the type name hash.
1480    pub fn put_custom_type_meta(&self, meta: &CustomTypeMeta) -> LlkvResult<()> {
1481        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID).into();
1482        let schema = Arc::new(Schema::new(vec![
1483            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1484            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1485                crate::constants::FIELD_ID_META_KEY.to_string(),
1486                lfid_val.to_string(),
1487            )])),
1488        ]));
1489
1490        // Use hash of canonical (lowercase) type name as row ID
1491        let canonical = meta.name.to_ascii_lowercase();
1492        let row_id_val = schema_name_to_row_id(&canonical); // Reuse same hash function
1493        let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1494        let meta_encoded = bitcode::encode(meta);
1495        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1496
1497        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1498        self.store.append(&batch)?;
1499        Ok(())
1500    }
1501
1502    /// Retrieve custom type metadata by name.
1503    ///
1504    /// Returns `None` if the type does not exist.
1505    pub fn get_custom_type_meta(&self, type_name: &str) -> LlkvResult<Option<CustomTypeMeta>> {
1506        let canonical = type_name.to_ascii_lowercase();
1507        let row_id = schema_name_to_row_id(&canonical);
1508        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1509
1510        let batch = match self
1511            .store
1512            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1513        {
1514            Ok(batch) => batch,
1515            Err(llkv_result::Error::NotFound) => return Ok(None),
1516            Err(err) => return Err(err),
1517        };
1518
1519        let meta_col = batch
1520            .column(0)
1521            .as_any()
1522            .downcast_ref::<BinaryArray>()
1523            .ok_or_else(|| {
1524                llkv_result::Error::Internal(
1525                    "catalog custom_type_meta column should be Binary".into(),
1526                )
1527            })?;
1528
1529        if meta_col.is_null(0) {
1530            return Ok(None);
1531        }
1532
1533        let bytes = meta_col.value(0);
1534        let meta = bitcode::decode(bytes).map_err(|err| {
1535            llkv_result::Error::Internal(format!("failed to decode custom type metadata: {err}"))
1536        })?;
1537        Ok(Some(meta))
1538    }
1539
1540    /// Delete custom type metadata by name.
1541    ///
1542    /// Returns `Ok(())` regardless of whether the type existed.
1543    pub fn delete_custom_type_meta(&self, type_name: &str) -> LlkvResult<()> {
1544        let canonical = type_name.to_ascii_lowercase();
1545        let row_id = schema_name_to_row_id(&canonical);
1546        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1547
1548        // Delete by writing null value
1549        let lfid_val: u64 = lfid.into();
1550        let schema = Arc::new(Schema::new(vec![
1551            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1552            Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
1553                crate::constants::FIELD_ID_META_KEY.to_string(),
1554                lfid_val.to_string(),
1555            )])),
1556        ]));
1557
1558        let row_id_arr = Arc::new(UInt64Array::from(vec![row_id]));
1559        let mut meta_builder = BinaryBuilder::new();
1560        meta_builder.append_null();
1561        let meta_arr = Arc::new(meta_builder.finish());
1562
1563        let batch = RecordBatch::try_new(schema, vec![row_id_arr, meta_arr])?;
1564        self.store.append(&batch)?;
1565        Ok(())
1566    }
1567
1568    /// Retrieve all custom type metadata.
1569    ///
1570    /// Returns all persisted custom types.
1571    pub fn all_custom_type_metas(&self) -> LlkvResult<Vec<CustomTypeMeta>> {
1572        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1573        let row_field = rowid_fid(meta_field);
1574
1575        // Collect all row IDs that have custom type metadata
1576        struct RowIdCollector {
1577            row_ids: Vec<RowId>,
1578        }
1579
1580        impl PrimitiveVisitor for RowIdCollector {
1581            fn u64_chunk(&mut self, values: &UInt64Array) {
1582                for i in 0..values.len() {
1583                    self.row_ids.push(values.value(i));
1584                }
1585            }
1586        }
1587        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1588        impl PrimitiveSortedVisitor for RowIdCollector {}
1589        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1590
1591        let mut collector = RowIdCollector {
1592            row_ids: Vec::new(),
1593        };
1594        match ScanBuilder::new(self.store, row_field)
1595            .options(ScanOptions::default())
1596            .run(&mut collector)
1597        {
1598            Ok(()) => {}
1599            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1600            Err(err) => return Err(err),
1601        }
1602
1603        if collector.row_ids.is_empty() {
1604            return Ok(Vec::new());
1605        }
1606
1607        // Gather all custom type metadata using the collected row IDs
1608        let batch = self.store.gather_rows(
1609            &[meta_field],
1610            &collector.row_ids,
1611            GatherNullPolicy::IncludeNulls,
1612        )?;
1613
1614        let meta_col = batch
1615            .column(0)
1616            .as_any()
1617            .downcast_ref::<BinaryArray>()
1618            .ok_or_else(|| {
1619                llkv_result::Error::Internal(
1620                    "catalog custom_type_meta column should be Binary".into(),
1621                )
1622            })?;
1623
1624        let mut result = Vec::new();
1625        for idx in 0..collector.row_ids.len() {
1626            if !meta_col.is_null(idx) {
1627                let bytes = meta_col.value(idx);
1628                if let Ok(meta) = bitcode::decode::<CustomTypeMeta>(bytes) {
1629                    result.push(meta);
1630                }
1631            }
1632        }
1633
1634        Ok(result)
1635    }
1636}
1637
1638/// Generate a row ID for schema metadata based on schema name.
1639///
1640/// Uses a simple hash to map schema names to row IDs. This is deterministic
1641/// and allows direct lookup without scanning.
1642fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1643    // Use a simple 64-bit FNV-1a hash for deterministic IDs across platforms and releases
1644    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1645    const FNV_PRIME: u64 = 0x1000_0000_01b3;
1646
1647    let mut hash = FNV_OFFSET;
1648    for byte in canonical_name.as_bytes() {
1649        hash ^= u64::from(*byte);
1650        hash = hash.wrapping_mul(FNV_PRIME);
1651    }
1652
1653    // Use high bit to avoid collision with reserved catalog row IDs (0-3) and table metadata rows
1654    hash | (1u64 << 63)
1655}
1656
1657struct MaxRowIdCollector {
1658    max: Option<RowId>,
1659}
1660
1661impl PrimitiveVisitor for MaxRowIdCollector {
1662    fn u64_chunk(&mut self, values: &UInt64Array) {
1663        for i in 0..values.len() {
1664            let value = values.value(i);
1665            self.max = match self.max {
1666                Some(curr) if curr >= value => Some(curr),
1667                _ => Some(value),
1668            };
1669        }
1670    }
1671}
1672
1673impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1674impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1675impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1676
1677#[cfg(test)]
1678mod tests {
1679    use super::*;
1680    use crate::constraints::{
1681        ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1682    };
1683    use llkv_column_map::ColumnStore;
1684    use std::sync::Arc;
1685
1686    #[test]
1687    fn constraint_records_roundtrip() {
1688        let pager = Arc::new(MemPager::default());
1689        let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1690        let catalog = SysCatalog::new(&store);
1691
1692        let table_id: TableId = 42;
1693        let record1 = ConstraintRecord {
1694            constraint_id: 1,
1695            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1696                field_ids: vec![1, 2],
1697            }),
1698            state: ConstraintState::Active,
1699            revision: 1,
1700            last_modified_micros: 100,
1701        };
1702        let record2 = ConstraintRecord {
1703            constraint_id: 2,
1704            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1705            state: ConstraintState::Active,
1706            revision: 2,
1707            last_modified_micros: 200,
1708        };
1709        catalog
1710            .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1711            .unwrap();
1712
1713        let other_table_record = ConstraintRecord {
1714            constraint_id: 1,
1715            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1716            state: ConstraintState::Active,
1717            revision: 1,
1718            last_modified_micros: 150,
1719        };
1720        catalog
1721            .put_constraint_records(7, &[other_table_record])
1722            .unwrap();
1723
1724        let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1725        fetched.sort_by_key(|record| record.constraint_id);
1726
1727        assert_eq!(fetched.len(), 2);
1728        assert_eq!(fetched[0], record1);
1729        assert_eq!(fetched[1], record2);
1730
1731        let single = catalog
1732            .get_constraint_records(table_id, &[record1.constraint_id])
1733            .unwrap();
1734        assert_eq!(single.len(), 1);
1735        assert_eq!(single[0].as_ref(), Some(&record1));
1736
1737        let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1738        assert_eq!(missing.len(), 1);
1739        assert!(missing[0].is_none());
1740    }
1741}