llkv_table/
sys_catalog.rs

1//! System catalog for storing table and column metadata.
2//!
3//! The system catalog uses table 0 (reserved) to store metadata about all tables
4//! and columns in the database. This metadata includes:
5//!
6//! - **Table metadata** ([`TableMeta`]): Table ID, name, creation time, flags
7//! - **Column metadata** ([`ColMeta`]): Column ID, name, flags, default values
8//! - **Multi-column index metadata** ([`TableMultiColumnIndexMeta`])
9//!
10//! # Storage Format
11//!
12//! The catalog stores metadata as serialized [`bitcode`] blobs in special catalog
13//! columns within table 0. See [`CATALOG_TABLE_ID`] and related constants in the
14//! [`reserved`](crate::reserved) module.
15//!
16//! # Usage
17//!
18//! The [`SysCatalog`] provides methods to:
19//! - Insert/update table metadata ([`put_table_meta`](SysCatalog::put_table_meta))
20//! - Query table metadata ([`get_table_meta`](SysCatalog::get_table_meta))
21//! - Manage column metadata similarly
22//!
23//! This metadata is used by higher-level components to validate schemas, assign
24//! field IDs, and enforce table constraints.
25
26use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37    ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, RowId, TableId};
40use llkv_column_map::store::scan::{
41    PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42    PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::{
46    ColumnStore,
47    store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
48};
49use llkv_result::{self, Result as LlkvResult};
50use llkv_storage::pager::{MemPager, Pager};
51use llkv_types::{LogicalFieldId, lfid, rid_col, rid_table};
52use simd_r_drive_entry_handle::EntryHandle;
53
54// Import all reserved constants and validation functions
55use crate::reserved::*;
56
57#[inline]
58fn constraint_meta_lfid() -> LogicalFieldId {
59    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
60}
61
62#[inline]
63fn constraint_name_lfid() -> LogicalFieldId {
64    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
65}
66
67#[inline]
68fn constraint_row_lfid() -> LogicalFieldId {
69    rowid_fid(constraint_meta_lfid())
70}
71
72#[derive(Clone, Debug, Encode, Decode)]
73pub struct ConstraintNameRecord {
74    pub constraint_id: ConstraintId,
75    pub name: Option<String>,
76}
77
78fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
79    bitcode::decode(bytes).map_err(|err| {
80        llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
81    })
82}
83
84struct ConstraintRowCollector<'a, P, F>
85where
86    P: Pager<Blob = EntryHandle> + Send + Sync,
87    F: FnMut(Vec<ConstraintRecord>),
88{
89    store: &'a ColumnStore<P>,
90    lfid: LogicalFieldId,
91    table_id: TableId,
92    on_batch: &'a mut F,
93    buffer: Vec<RowId>,
94    error: Option<llkv_result::Error>,
95}
96
97impl<'a, P, F> ConstraintRowCollector<'a, P, F>
98where
99    P: Pager<Blob = EntryHandle> + Send + Sync,
100    F: FnMut(Vec<ConstraintRecord>),
101{
102    fn flush_buffer(&mut self) -> LlkvResult<()> {
103        if self.buffer.is_empty() {
104            return Ok(());
105        }
106
107        let row_ids = mem::take(&mut self.buffer);
108        let batch =
109            self.store
110                .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
111
112        if batch.num_columns() == 0 {
113            return Ok(());
114        }
115
116        let array = batch
117            .column(0)
118            .as_any()
119            .downcast_ref::<BinaryArray>()
120            .ok_or_else(|| {
121                llkv_result::Error::Internal(
122                    "constraint metadata column stored unexpected type".into(),
123                )
124            })?;
125
126        let mut records = Vec::with_capacity(row_ids.len());
127        for (idx, row_id) in row_ids.into_iter().enumerate() {
128            if array.is_null(idx) {
129                continue;
130            }
131
132            let record = decode_constraint_record(array.value(idx))?;
133            let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
134            if table_from_id != self.table_id {
135                continue;
136            }
137            if record.constraint_id != constraint_id {
138                return Err(llkv_result::Error::Internal(
139                    "constraint metadata id mismatch".into(),
140                ));
141            }
142            records.push(record);
143        }
144
145        if !records.is_empty() {
146            (self.on_batch)(records);
147        }
148
149        Ok(())
150    }
151
152    fn finish(&mut self) -> LlkvResult<()> {
153        if let Some(err) = self.error.take() {
154            return Err(err);
155        }
156        self.flush_buffer()
157    }
158}
159
160impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
161where
162    P: Pager<Blob = EntryHandle> + Send + Sync,
163    F: FnMut(Vec<ConstraintRecord>),
164{
165    fn u64_chunk(&mut self, values: &UInt64Array) {
166        if self.error.is_some() {
167            return;
168        }
169
170        for idx in 0..values.len() {
171            let row_id = values.value(idx);
172            let (table_id, _) = decode_constraint_row_id(row_id);
173            if table_id != self.table_id {
174                continue;
175            }
176            self.buffer.push(row_id);
177            if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
178                && let Err(err) = self.flush_buffer()
179            {
180                self.error = Some(err);
181                return;
182            }
183        }
184    }
185}
186
187impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
188where
189    P: Pager<Blob = EntryHandle> + Send + Sync,
190    F: FnMut(Vec<ConstraintRecord>),
191{
192}
193
194impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
195where
196    P: Pager<Blob = EntryHandle> + Send + Sync,
197    F: FnMut(Vec<ConstraintRecord>),
198{
199}
200
201impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
202where
203    P: Pager<Blob = EntryHandle> + Send + Sync,
204    F: FnMut(Vec<ConstraintRecord>),
205{
206}
207
208// ----- Public catalog types -----
209
210/// Metadata about a table.
211///
212/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
213#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
214pub struct TableMeta {
215    /// Unique identifier for this table.
216    pub table_id: TableId,
217    /// Optional human-readable name for the table.
218    pub name: Option<String>,
219    /// When the table was created (microseconds since epoch).
220    pub created_at_micros: u64,
221    /// Bitflags for table properties (e.g., temporary, system).
222    pub flags: u32,
223    /// Schema version or modification counter.
224    pub epoch: u64,
225    /// If this is a view, contains the SQL definition (SELECT statement).
226    /// If None, this is a regular table.
227    pub view_definition: Option<String>,
228}
229
230/// Metadata about a column.
231///
232/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
233#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
234pub struct ColMeta {
235    /// Unique identifier for this column within its table.
236    pub col_id: u32,
237    /// Optional human-readable name for the column.
238    pub name: Option<String>,
239    /// Bitflags for column properties (e.g., nullable, indexed).
240    pub flags: u32,
241    /// Optional serialized default value for the column.
242    pub default: Option<Vec<u8>>,
243}
244
245/// Metadata about a schema.
246///
247/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
248#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
249pub struct SchemaMeta {
250    /// Human-readable schema name (case-preserved).
251    pub name: String,
252    /// When the schema was created (microseconds since epoch).
253    pub created_at_micros: u64,
254    /// Bitflags for schema properties (reserved for future use).
255    pub flags: u32,
256}
257
258/// Metadata about a custom type (CREATE TYPE/DOMAIN).
259///
260/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
261/// Represents type aliases like `CREATE TYPE custom_type AS integer`.
262#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
263pub struct CustomTypeMeta {
264    /// Human-readable type name (case-preserved, stored lowercase in catalog).
265    pub name: String,
266    /// SQL representation of the base data type (e.g., "INTEGER", "VARCHAR(100)").
267    /// Stored as string to avoid Arrow DataType serialization complexity.
268    pub base_type_sql: String,
269    /// When the type was created (microseconds since epoch).
270    pub created_at_micros: u64,
271}
272
273/// Metadata describing a single multi-column index (unique or non-unique).
274///
275/// Used to track both named CREATE INDEX statements and UNIQUE constraints over multiple columns.
276#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
277pub struct MultiColumnIndexEntryMeta {
278    /// Optional human-readable index name (None for unnamed UNIQUE constraints).
279    pub index_name: Option<String>,
280    /// Normalized lowercase name used as map key.
281    pub canonical_name: String,
282    /// Field IDs participating in this index.
283    pub column_ids: Vec<FieldId>,
284    /// Whether this index enforces uniqueness.
285    pub unique: bool,
286}
287
288/// Metadata describing a single named single-column index.
289#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
290pub struct SingleColumnIndexEntryMeta {
291    /// Human-readable index name (case preserved).
292    pub index_name: String,
293    /// Lower-cased canonical index name for case-insensitive lookups.
294    pub canonical_name: String,
295    /// Identifier of the column the index covers.
296    pub column_id: FieldId,
297    /// Human-readable column name (case preserved).
298    pub column_name: String,
299    /// Whether this index enforces uniqueness for the column.
300    pub unique: bool,
301    /// Whether the index is sorted in ascending order (true) or descending (false).
302    pub ascending: bool,
303    /// Whether NULL values appear first (true) or last (false) in the index.
304    pub nulls_first: bool,
305}
306
307/// Metadata describing all single-column indexes registered for a table.
308#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
309pub struct TableSingleColumnIndexMeta {
310    /// Owning table identifier.
311    pub table_id: TableId,
312    /// Definitions for each named single-column index on the table.
313    pub indexes: Vec<SingleColumnIndexEntryMeta>,
314}
315
316/// Metadata describing all multi-column indexes (unique and non-unique) for a table.
317#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
318pub struct TableMultiColumnIndexMeta {
319    /// Table identifier these indexes belong to.
320    pub table_id: TableId,
321    /// Definitions of each persisted multi-column index.
322    pub indexes: Vec<MultiColumnIndexEntryMeta>,
323}
324
325/// Timing information for a trigger (BEFORE, AFTER, INSTEAD OF).
326#[derive(Encode, Decode, Clone, Copy, Debug, PartialEq, Eq)]
327pub enum TriggerTimingMeta {
328    Before,
329    After,
330    InsteadOf,
331}
332
333/// Trigger event metadata describing which operation fires the trigger.
334#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
335pub enum TriggerEventMeta {
336    Insert,
337    Update { columns: Vec<String> },
338    Delete,
339}
340
341/// Persisted trigger definition stored alongside table metadata.
342#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
343pub struct TriggerEntryMeta {
344    /// Display name preserving original casing.
345    pub name: String,
346    /// Canonical lowercase trigger name for case-insensitive lookups.
347    pub canonical_name: String,
348    /// Timing phase indicating when the trigger executes relative to the mutation.
349    pub timing: TriggerTimingMeta,
350    /// Mutation event that fires the trigger.
351    pub event: TriggerEventMeta,
352    /// Whether the trigger fires per affected row (true) or per statement (false).
353    pub for_each_row: bool,
354    /// Optional SQL expression from the WHEN clause.
355    pub condition: Option<String>,
356    /// Trigger body stored as SQL string (BEGIN/END block or single statement).
357    pub body_sql: String,
358}
359
360/// Collection of triggers registered for a table.
361#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
362pub struct TableTriggerMeta {
363    /// Owning table identifier.
364    pub table_id: TableId,
365    /// Trigger definitions associated with the table.
366    pub triggers: Vec<TriggerEntryMeta>,
367}
368
369// ----- SysCatalog -----
370
371/// Interface to the system catalog (table 0).
372///
373/// The system catalog stores metadata about all tables and columns in the database.
374/// It uses special reserved columns within table 0 to persist [`TableMeta`] and
375/// [`ColMeta`] structures.
376///
377/// # Lifetime
378///
379/// `SysCatalog` borrows a reference to the [`ColumnStore`] and does not own it.
380/// This allows multiple catalog instances to coexist with the same storage.
381pub struct SysCatalog<'a, P = MemPager>
382where
383    P: Pager<Blob = EntryHandle> + Send + Sync,
384{
385    store: &'a ColumnStore<P>,
386}
387
388impl<'a, P> SysCatalog<'a, P>
389where
390    P: Pager<Blob = EntryHandle> + Send + Sync,
391{
392    fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
393        if row_ids.is_empty() {
394            return Ok(());
395        }
396
397        let lfid_val: u64 = meta_field.into();
398        let schema = Arc::new(Schema::new(vec![
399            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
400            Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
401                crate::constants::FIELD_ID_META_KEY.to_string(),
402                lfid_val.to_string(),
403            )])),
404        ]));
405
406        let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
407        let mut builder = BinaryBuilder::new();
408        for _ in row_ids {
409            builder.append_null();
410        }
411        let meta_array = Arc::new(builder.finish());
412
413        let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
414        self.store.append(&batch)?;
415        Ok(())
416    }
417
418    /// Create a new system catalog interface using the provided column store.
419    pub fn new(store: &'a ColumnStore<P>) -> Self {
420        Self { store }
421    }
422
423    /// Insert or update table metadata.
424    ///
425    /// This persists the table's metadata to the system catalog. If metadata for
426    /// this table ID already exists, it is overwritten (last-write-wins).
427    pub fn put_table_meta(&self, meta: &TableMeta) {
428        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
429        let schema = Arc::new(Schema::new(vec![
430            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
431            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
432                crate::constants::FIELD_ID_META_KEY.to_string(),
433                lfid_val.to_string(),
434            )])),
435        ]));
436
437        let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
438        let meta_encoded = bitcode::encode(meta);
439        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
440
441        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
442        self.store.append(&batch).unwrap();
443    }
444
445    /// Retrieve table metadata by table ID.
446    ///
447    /// Returns `None` if no metadata exists for the given table ID.
448    pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
449        let row_id = rid_table(table_id);
450        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
451        let batch = self
452            .store
453            .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
454            .ok()?;
455
456        if batch.num_rows() == 0 || batch.num_columns() == 0 {
457            return None;
458        }
459
460        let array = batch
461            .column(0)
462            .as_any()
463            .downcast_ref::<BinaryArray>()
464            .expect("table meta column must be BinaryArray");
465
466        if array.is_null(0) {
467            return None;
468        }
469
470        bitcode::decode(array.value(0)).ok()
471    }
472
473    /// Upsert a single column’s metadata.
474    pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
475        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
476        let schema = Arc::new(Schema::new(vec![
477            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
478            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
479                crate::constants::FIELD_ID_META_KEY.to_string(),
480                lfid_val.to_string(),
481            )])),
482        ]));
483
484        let rid_value = rid_col(table_id, meta.col_id);
485        let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
486        let meta_encoded = bitcode::encode(meta);
487        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
488
489        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
490        self.store.append(&batch).unwrap();
491    }
492
493    /// Batch fetch specific column metas by col_id using a shared keyset.
494    pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
495        if col_ids.is_empty() {
496            return Vec::new();
497        }
498
499        let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
500        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
501
502        let batch =
503            match self
504                .store
505                .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
506            {
507                Ok(batch) => batch,
508                Err(_) => return vec![None; col_ids.len()],
509            };
510
511        let meta_col = batch
512            .column(0)
513            .as_any()
514            .downcast_ref::<BinaryArray>()
515            .expect("catalog meta column should be Binary");
516
517        col_ids
518            .iter()
519            .enumerate()
520            .map(|(idx, _)| {
521                if meta_col.is_null(idx) {
522                    None
523                } else {
524                    bitcode::decode(meta_col.value(idx)).ok()
525                }
526            })
527            .collect()
528    }
529
530    /// Delete metadata rows for the specified column identifiers.
531    pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
532        if col_ids.is_empty() {
533            return Ok(());
534        }
535
536        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
537        let row_ids: Vec<RowId> = col_ids
538            .iter()
539            .map(|&col_id| rid_col(table_id, col_id))
540            .collect();
541        self.write_null_entries(meta_field, &row_ids)
542    }
543
544    /// Remove the persisted table metadata record, if present.
545    pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
546        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
547        let row_id = rid_table(table_id);
548        self.write_null_entries(meta_field, &[row_id])
549    }
550
551    /// Delete constraint records for the provided identifiers.
552    pub fn delete_constraint_records(
553        &self,
554        table_id: TableId,
555        constraint_ids: &[ConstraintId],
556    ) -> LlkvResult<()> {
557        if constraint_ids.is_empty() {
558            return Ok(());
559        }
560
561        let meta_field = constraint_meta_lfid();
562        let row_ids: Vec<RowId> = constraint_ids
563            .iter()
564            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
565            .collect();
566        self.write_null_entries(meta_field, &row_ids)
567    }
568
569    /// Delete persisted constraint names for the provided identifiers.
570    pub fn delete_constraint_names(
571        &self,
572        table_id: TableId,
573        constraint_ids: &[ConstraintId],
574    ) -> LlkvResult<()> {
575        if constraint_ids.is_empty() {
576            return Ok(());
577        }
578
579        let lfid = constraint_name_lfid();
580        let row_ids: Vec<RowId> = constraint_ids
581            .iter()
582            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
583            .collect();
584        self.write_null_entries(lfid, &row_ids)
585    }
586
587    /// Delete the multi-column index metadata record for a table, if any.
588    pub fn delete_multi_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
589        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
590        let row_id = rid_table(table_id);
591        self.write_null_entries(meta_field, &[row_id])
592    }
593
594    /// Delete the single-column index metadata record for a table, if any exists.
595    pub fn delete_single_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
596        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
597        let row_id = rid_table(table_id);
598        self.write_null_entries(meta_field, &[row_id])
599    }
600
601    /// Persist the complete set of multi-column index definitions for a table.
602    pub fn put_multi_column_indexes(
603        &self,
604        table_id: TableId,
605        indexes: &[MultiColumnIndexEntryMeta],
606    ) -> LlkvResult<()> {
607        let lfid_val: u64 =
608            lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
609        let schema = Arc::new(Schema::new(vec![
610            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
611            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
612                crate::constants::FIELD_ID_META_KEY.to_string(),
613                lfid_val.to_string(),
614            )])),
615        ]));
616
617        let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
618        let meta = TableMultiColumnIndexMeta {
619            table_id,
620            indexes: indexes.to_vec(),
621        };
622        let encoded = bitcode::encode(&meta);
623        let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
624
625        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
626        self.store.append(&batch)?;
627        Ok(())
628    }
629
630    /// Persist the complete set of single-column index definitions for a table.
631    pub fn put_single_column_indexes(
632        &self,
633        table_id: TableId,
634        indexes: &[SingleColumnIndexEntryMeta],
635    ) -> LlkvResult<()> {
636        let lfid_val: u64 =
637            lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID).into();
638        let schema = Arc::new(Schema::new(vec![
639            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
640            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
641                crate::constants::FIELD_ID_META_KEY.to_string(),
642                lfid_val.to_string(),
643            )])),
644        ]));
645
646        let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
647        let meta = TableSingleColumnIndexMeta {
648            table_id,
649            indexes: indexes.to_vec(),
650        };
651        let encoded = bitcode::encode(&meta);
652        let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
653
654        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
655        self.store.append(&batch)?;
656        Ok(())
657    }
658
659    /// Retrieve all persisted multi-column index definitions for a table.
660    pub fn get_multi_column_indexes(
661        &self,
662        table_id: TableId,
663    ) -> LlkvResult<Vec<MultiColumnIndexEntryMeta>> {
664        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
665        let row_id = rid_table(table_id);
666        let batch = match self
667            .store
668            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
669        {
670            Ok(batch) => batch,
671            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
672            Err(err) => return Err(err),
673        };
674
675        if batch.num_columns() == 0 || batch.num_rows() == 0 {
676            return Ok(Vec::new());
677        }
678
679        let array = batch
680            .column(0)
681            .as_any()
682            .downcast_ref::<BinaryArray>()
683            .ok_or_else(|| {
684                llkv_result::Error::Internal(
685                    "catalog multi-column index column stored unexpected type".into(),
686                )
687            })?;
688
689        if array.is_null(0) {
690            return Ok(Vec::new());
691        }
692
693        let meta: TableMultiColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
694            llkv_result::Error::Internal(format!(
695                "failed to decode multi-column index metadata: {err}"
696            ))
697        })?;
698
699        Ok(meta.indexes)
700    }
701
702    /// Retrieve all persisted single-column index definitions for a table.
703    pub fn get_single_column_indexes(
704        &self,
705        table_id: TableId,
706    ) -> LlkvResult<Vec<SingleColumnIndexEntryMeta>> {
707        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
708        let row_id = rid_table(table_id);
709        let batch = match self
710            .store
711            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
712        {
713            Ok(batch) => batch,
714            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
715            Err(err) => return Err(err),
716        };
717
718        if batch.num_columns() == 0 || batch.num_rows() == 0 {
719            return Ok(Vec::new());
720        }
721
722        let array = batch
723            .column(0)
724            .as_any()
725            .downcast_ref::<BinaryArray>()
726            .ok_or_else(|| {
727                llkv_result::Error::Internal(
728                    "catalog single-column index column stored unexpected type".into(),
729                )
730            })?;
731
732        if array.is_null(0) {
733            return Ok(Vec::new());
734        }
735
736        let meta: TableSingleColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
737            llkv_result::Error::Internal(format!(
738                "failed to decode single-column index metadata: {err}"
739            ))
740        })?;
741
742        Ok(meta.indexes)
743    }
744
745    /// Persist the trigger definitions for a table.
746    pub fn put_triggers(&self, table_id: TableId, triggers: &[TriggerEntryMeta]) -> LlkvResult<()> {
747        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TRIGGER_META_ID).into();
748        let schema = Arc::new(Schema::new(vec![
749            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
750            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
751                crate::constants::FIELD_ID_META_KEY.to_string(),
752                lfid_val.to_string(),
753            )])),
754        ]));
755
756        let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
757        let meta = TableTriggerMeta {
758            table_id,
759            triggers: triggers.to_vec(),
760        };
761        let encoded = bitcode::encode(&meta);
762        let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
763
764        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
765        self.store.append(&batch)?;
766        Ok(())
767    }
768
769    /// Remove all trigger definitions for the provided table.
770    pub fn delete_triggers(&self, table_id: TableId) -> LlkvResult<()> {
771        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TRIGGER_META_ID);
772        let row_id = rid_table(table_id);
773        self.write_null_entries(meta_field, &[row_id])
774    }
775
776    /// Retrieve persisted trigger definitions for a table.
777    pub fn get_triggers(&self, table_id: TableId) -> LlkvResult<Vec<TriggerEntryMeta>> {
778        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TRIGGER_META_ID);
779        let row_id = rid_table(table_id);
780        let batch = match self
781            .store
782            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
783        {
784            Ok(batch) => batch,
785            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
786            Err(err) => return Err(err),
787        };
788
789        if batch.num_columns() == 0 || batch.num_rows() == 0 {
790            return Ok(Vec::new());
791        }
792
793        let array = batch
794            .column(0)
795            .as_any()
796            .downcast_ref::<BinaryArray>()
797            .ok_or_else(|| {
798                llkv_result::Error::Internal(
799                    "catalog trigger metadata column stored unexpected type".into(),
800                )
801            })?;
802
803        if array.is_null(0) {
804            return Ok(Vec::new());
805        }
806
807        let meta: TableTriggerMeta = bitcode::decode(array.value(0)).map_err(|err| {
808            llkv_result::Error::Internal(format!("failed to decode trigger metadata: {err}"))
809        })?;
810
811        Ok(meta.triggers)
812    }
813
814    /// Retrieve all persisted multi-column index definitions across tables.
815    pub fn all_multi_column_index_metas(&self) -> LlkvResult<Vec<TableMultiColumnIndexMeta>> {
816        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
817        let row_field = rowid_fid(meta_field);
818
819        struct RowIdCollector {
820            row_ids: Vec<RowId>,
821        }
822
823        impl PrimitiveVisitor for RowIdCollector {
824            fn u64_chunk(&mut self, values: &UInt64Array) {
825                for i in 0..values.len() {
826                    self.row_ids.push(values.value(i));
827                }
828            }
829        }
830        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
831        impl PrimitiveSortedVisitor for RowIdCollector {}
832        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
833
834        let mut collector = RowIdCollector {
835            row_ids: Vec::new(),
836        };
837        match ScanBuilder::new(self.store, row_field)
838            .options(ScanOptions::default())
839            .run(&mut collector)
840        {
841            Ok(()) => {}
842            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
843            Err(err) => return Err(err),
844        }
845
846        if collector.row_ids.is_empty() {
847            return Ok(Vec::new());
848        }
849
850        let batch = match self.store.gather_rows(
851            &[meta_field],
852            &collector.row_ids,
853            GatherNullPolicy::IncludeNulls,
854        ) {
855            Ok(batch) => batch,
856            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
857            Err(err) => return Err(err),
858        };
859
860        if batch.num_columns() == 0 {
861            return Ok(Vec::new());
862        }
863
864        let array = batch
865            .column(0)
866            .as_any()
867            .downcast_ref::<BinaryArray>()
868            .ok_or_else(|| {
869                llkv_result::Error::Internal(
870                    "catalog multi-column index column stored unexpected type".into(),
871                )
872            })?;
873
874        let mut metas = Vec::with_capacity(batch.num_rows());
875        for idx in 0..batch.num_rows() {
876            if array.is_null(idx) {
877                continue;
878            }
879            let meta: TableMultiColumnIndexMeta =
880                bitcode::decode(array.value(idx)).map_err(|err| {
881                    llkv_result::Error::Internal(format!(
882                        "failed to decode multi-column index metadata: {err}"
883                    ))
884                })?;
885            metas.push(meta);
886        }
887
888        Ok(metas)
889    }
890
891    /// Persist or update multiple constraint records for a table in a single batch.
892    pub fn put_constraint_records(
893        &self,
894        table_id: TableId,
895        records: &[ConstraintRecord],
896    ) -> LlkvResult<()> {
897        if records.is_empty() {
898            return Ok(());
899        }
900
901        let lfid_val: u64 = constraint_meta_lfid().into();
902        let schema = Arc::new(Schema::new(vec![
903            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
904            Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
905                crate::constants::FIELD_ID_META_KEY.to_string(),
906                lfid_val.to_string(),
907            )])),
908        ]));
909
910        let row_ids: Vec<RowId> = records
911            .iter()
912            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
913            .collect();
914
915        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
916        let payload_array = Arc::new(BinaryArray::from_iter_values(
917            records.iter().map(bitcode::encode),
918        ));
919
920        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
921        self.store.append(&batch)?;
922        Ok(())
923    }
924
925    /// Persist or update constraint names for the specified identifiers.
926    pub fn put_constraint_names(
927        &self,
928        table_id: TableId,
929        names: &[ConstraintNameRecord],
930    ) -> LlkvResult<()> {
931        if names.is_empty() {
932            return Ok(());
933        }
934
935        let lfid_val: u64 = constraint_name_lfid().into();
936        let schema = Arc::new(Schema::new(vec![
937            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
938            Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
939                (
940                    crate::constants::FIELD_ID_META_KEY.to_string(),
941                    lfid_val.to_string(),
942                ),
943            ])),
944        ]));
945
946        let row_ids: Vec<RowId> = names
947            .iter()
948            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
949            .collect();
950        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
951        let payload_array = Arc::new(BinaryArray::from_iter_values(
952            names.iter().map(bitcode::encode),
953        ));
954
955        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
956        self.store.append(&batch)?;
957        Ok(())
958    }
959
960    /// Fetch multiple constraint records for a table in a single batch.
961    pub fn get_constraint_records(
962        &self,
963        table_id: TableId,
964        constraint_ids: &[ConstraintId],
965    ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
966        if constraint_ids.is_empty() {
967            return Ok(Vec::new());
968        }
969
970        let lfid = constraint_meta_lfid();
971        let row_ids: Vec<RowId> = constraint_ids
972            .iter()
973            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
974            .collect();
975
976        let batch = match self
977            .store
978            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
979        {
980            Ok(batch) => batch,
981            Err(llkv_result::Error::NotFound) => {
982                return Ok(vec![None; constraint_ids.len()]);
983            }
984            Err(err) => return Err(err),
985        };
986
987        if batch.num_columns() == 0 || batch.num_rows() == 0 {
988            return Ok(vec![None; constraint_ids.len()]);
989        }
990
991        let array = batch
992            .column(0)
993            .as_any()
994            .downcast_ref::<BinaryArray>()
995            .ok_or_else(|| {
996                llkv_result::Error::Internal(
997                    "constraint metadata column stored unexpected type".into(),
998                )
999            })?;
1000
1001        let mut results = Vec::with_capacity(constraint_ids.len());
1002        for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
1003            if array.is_null(idx) {
1004                results.push(None);
1005                continue;
1006            }
1007            let record = decode_constraint_record(array.value(idx))?;
1008            if record.constraint_id != constraint_id {
1009                return Err(llkv_result::Error::Internal(
1010                    "constraint metadata id mismatch".into(),
1011                ));
1012            }
1013            results.push(Some(record));
1014        }
1015
1016        Ok(results)
1017    }
1018
1019    /// Fetch constraint names for a table in a single batch.
1020    pub fn get_constraint_names(
1021        &self,
1022        table_id: TableId,
1023        constraint_ids: &[ConstraintId],
1024    ) -> LlkvResult<Vec<Option<String>>> {
1025        if constraint_ids.is_empty() {
1026            return Ok(Vec::new());
1027        }
1028
1029        let lfid = constraint_name_lfid();
1030        let row_ids: Vec<RowId> = constraint_ids
1031            .iter()
1032            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
1033            .collect();
1034
1035        let batch = match self
1036            .store
1037            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
1038        {
1039            Ok(batch) => batch,
1040            Err(llkv_result::Error::NotFound) => {
1041                return Ok(vec![None; constraint_ids.len()]);
1042            }
1043            Err(err) => return Err(err),
1044        };
1045
1046        if batch.num_columns() == 0 {
1047            return Ok(vec![None; constraint_ids.len()]);
1048        }
1049
1050        let array = batch
1051            .column(0)
1052            .as_any()
1053            .downcast_ref::<BinaryArray>()
1054            .ok_or_else(|| {
1055                llkv_result::Error::Internal(
1056                    "constraint name metadata column stored unexpected type".into(),
1057                )
1058            })?;
1059
1060        let mut results = Vec::with_capacity(row_ids.len());
1061        for idx in 0..row_ids.len() {
1062            if array.is_null(idx) {
1063                results.push(None);
1064            } else {
1065                let record: ConstraintNameRecord =
1066                    bitcode::decode(array.value(idx)).map_err(|err| {
1067                        llkv_result::Error::Internal(format!(
1068                            "failed to decode constraint name metadata: {err}"
1069                        ))
1070                    })?;
1071                results.push(record.name);
1072            }
1073        }
1074
1075        Ok(results)
1076    }
1077
1078    /// Stream constraint records for a table in batches.
1079    pub fn scan_constraint_records_for_table<F>(
1080        &self,
1081        table_id: TableId,
1082        mut on_batch: F,
1083    ) -> LlkvResult<()>
1084    where
1085        F: FnMut(Vec<ConstraintRecord>),
1086    {
1087        let row_field = constraint_row_lfid();
1088        let mut visitor = ConstraintRowCollector {
1089            store: self.store,
1090            lfid: constraint_meta_lfid(),
1091            table_id,
1092            on_batch: &mut on_batch,
1093            buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
1094            error: None,
1095        };
1096
1097        match ScanBuilder::new(self.store, row_field)
1098            .options(ScanOptions::default())
1099            .run(&mut visitor)
1100        {
1101            Ok(()) => {}
1102            Err(llkv_result::Error::NotFound) => return Ok(()),
1103            Err(err) => return Err(err),
1104        }
1105
1106        visitor.finish()
1107    }
1108
1109    /// Load all constraint records for a table into a vector.
1110    pub fn constraint_records_for_table(
1111        &self,
1112        table_id: TableId,
1113    ) -> LlkvResult<Vec<ConstraintRecord>> {
1114        let mut all = Vec::new();
1115        self.scan_constraint_records_for_table(table_id, |mut chunk| {
1116            all.append(&mut chunk);
1117        })?;
1118        Ok(all)
1119    }
1120
1121    pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
1122        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
1123        let schema = Arc::new(Schema::new(vec![
1124            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1125            Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1126                crate::constants::FIELD_ID_META_KEY.to_string(),
1127                lfid_val.to_string(),
1128            )])),
1129        ]));
1130
1131        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
1132        let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
1133        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1134        self.store.append(&batch)?;
1135        Ok(())
1136    }
1137
1138    pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
1139        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
1140        let batch = match self.store.gather_rows(
1141            &[lfid],
1142            &[CATALOG_NEXT_TABLE_ROW_ID],
1143            GatherNullPolicy::IncludeNulls,
1144        ) {
1145            Ok(batch) => batch,
1146            Err(llkv_result::Error::NotFound) => return Ok(None),
1147            Err(err) => return Err(err),
1148        };
1149
1150        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1151            return Ok(None);
1152        }
1153
1154        let array = batch
1155            .column(0)
1156            .as_any()
1157            .downcast_ref::<UInt64Array>()
1158            .ok_or_else(|| {
1159                llkv_result::Error::Internal(
1160                    "catalog next_table_id column stored unexpected type".into(),
1161                )
1162            })?;
1163        if array.is_empty() || array.is_null(0) {
1164            return Ok(None);
1165        }
1166
1167        let value = array.value(0);
1168        if value > TableId::MAX as u64 {
1169            return Err(llkv_result::Error::InvalidArgumentError(
1170                "persisted next_table_id exceeds TableId range".into(),
1171            ));
1172        }
1173
1174        Ok(Some(value as TableId))
1175    }
1176
1177    pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
1178        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1179        let row_field = rowid_fid(meta_field);
1180
1181        let mut collector = MaxRowIdCollector { max: None };
1182        match ScanBuilder::new(self.store, row_field)
1183            .options(ScanOptions::default())
1184            .run(&mut collector)
1185        {
1186            Ok(()) => {}
1187            Err(llkv_result::Error::NotFound) => return Ok(None),
1188            Err(err) => return Err(err),
1189        }
1190
1191        let max_value = match collector.max {
1192            Some(value) => value,
1193            None => return Ok(None),
1194        };
1195
1196        let logical: LogicalFieldId = max_value.into();
1197        Ok(Some(logical.table_id()))
1198    }
1199
1200    /// Scan all table metadata entries from the catalog.
1201    /// Returns a vector of (table_id, TableMeta) pairs for all persisted tables.
1202    ///
1203    /// This method first scans for all row IDs in the table metadata column,
1204    /// then uses gather_rows to retrieve the actual metadata.
1205    pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
1206        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1207        let row_field = rowid_fid(meta_field);
1208
1209        // Collect all row IDs that have table metadata
1210        struct RowIdCollector {
1211            row_ids: Vec<RowId>,
1212        }
1213
1214        impl PrimitiveVisitor for RowIdCollector {
1215            fn u64_chunk(&mut self, values: &UInt64Array) {
1216                for i in 0..values.len() {
1217                    self.row_ids.push(values.value(i));
1218                }
1219            }
1220        }
1221        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1222        impl PrimitiveSortedVisitor for RowIdCollector {}
1223        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1224
1225        let mut collector = RowIdCollector {
1226            row_ids: Vec::new(),
1227        };
1228        match ScanBuilder::new(self.store, row_field)
1229            .options(ScanOptions::default())
1230            .run(&mut collector)
1231        {
1232            Ok(()) => {}
1233            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1234            Err(err) => return Err(err),
1235        }
1236
1237        if collector.row_ids.is_empty() {
1238            return Ok(Vec::new());
1239        }
1240
1241        // Gather all table metadata using the collected row IDs
1242        let batch = self.store.gather_rows(
1243            &[meta_field],
1244            &collector.row_ids,
1245            GatherNullPolicy::IncludeNulls,
1246        )?;
1247
1248        let meta_col = batch
1249            .column(0)
1250            .as_any()
1251            .downcast_ref::<BinaryArray>()
1252            .ok_or_else(|| {
1253                llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1254            })?;
1255
1256        let mut result = Vec::new();
1257        for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1258            if !meta_col.is_null(idx) {
1259                let bytes = meta_col.value(idx);
1260                if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1261                    let logical: LogicalFieldId = row_id.into();
1262                    let table_id = logical.table_id();
1263                    result.push((table_id, meta));
1264                }
1265            }
1266        }
1267
1268        Ok(result)
1269    }
1270
1271    /// Persist the next transaction id to the catalog.
1272    pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1273        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1274        let schema = Arc::new(Schema::new(vec![
1275            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1276            Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1277                crate::constants::FIELD_ID_META_KEY.to_string(),
1278                lfid_val.to_string(),
1279            )])),
1280        ]));
1281
1282        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1283        let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1284        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1285        self.store.append(&batch)?;
1286        Ok(())
1287    }
1288
1289    /// Load the next transaction id from the catalog.
1290    pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1291        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1292        let batch = match self.store.gather_rows(
1293            &[lfid],
1294            &[CATALOG_NEXT_TXN_ROW_ID],
1295            GatherNullPolicy::IncludeNulls,
1296        ) {
1297            Ok(batch) => batch,
1298            Err(llkv_result::Error::NotFound) => return Ok(None),
1299            Err(err) => return Err(err),
1300        };
1301
1302        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1303            return Ok(None);
1304        }
1305
1306        let array = batch
1307            .column(0)
1308            .as_any()
1309            .downcast_ref::<UInt64Array>()
1310            .ok_or_else(|| {
1311                llkv_result::Error::Internal(
1312                    "catalog next_txn_id column stored unexpected type".into(),
1313                )
1314            })?;
1315        if array.is_empty() || array.is_null(0) {
1316            return Ok(None);
1317        }
1318
1319        let value = array.value(0);
1320        Ok(Some(value))
1321    }
1322
1323    /// Persist the last committed transaction id to the catalog.
1324    pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1325        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1326        let schema = Arc::new(Schema::new(vec![
1327            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1328            Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1329                HashMap::from([(
1330                    crate::constants::FIELD_ID_META_KEY.to_string(),
1331                    lfid_val.to_string(),
1332                )]),
1333            ),
1334        ]));
1335
1336        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1337        let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1338        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1339        self.store.append(&batch)?;
1340        Ok(())
1341    }
1342
1343    /// Load the last committed transaction id from the catalog.
1344    pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1345        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1346        let batch = match self.store.gather_rows(
1347            &[lfid],
1348            &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1349            GatherNullPolicy::IncludeNulls,
1350        ) {
1351            Ok(batch) => batch,
1352            Err(llkv_result::Error::NotFound) => return Ok(None),
1353            Err(err) => return Err(err),
1354        };
1355
1356        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1357            return Ok(None);
1358        }
1359
1360        let array = batch
1361            .column(0)
1362            .as_any()
1363            .downcast_ref::<UInt64Array>()
1364            .ok_or_else(|| {
1365                llkv_result::Error::Internal(
1366                    "catalog last_committed_txn_id column stored unexpected type".into(),
1367                )
1368            })?;
1369        if array.is_empty() || array.is_null(0) {
1370            return Ok(None);
1371        }
1372
1373        let value = array.value(0);
1374        Ok(Some(value))
1375    }
1376
1377    /// Persist the catalog state to the system catalog.
1378    ///
1379    /// Stores the complete catalog state (all tables and fields) as a binary blob
1380    /// using bitcode serialization.
1381    pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1382        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1383        let schema = Arc::new(Schema::new(vec![
1384            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1385            Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1386                crate::constants::FIELD_ID_META_KEY.to_string(),
1387                lfid_val.to_string(),
1388            )])),
1389        ]));
1390
1391        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1392        let encoded = bitcode::encode(state);
1393        let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1394
1395        let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1396        self.store.append(&batch)?;
1397        Ok(())
1398    }
1399
1400    /// Load the catalog state from the system catalog.
1401    ///
1402    /// Retrieves the complete catalog state including all table and field mappings.
1403    pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1404        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1405        let batch = match self.store.gather_rows(
1406            &[lfid],
1407            &[CATALOG_STATE_ROW_ID],
1408            GatherNullPolicy::IncludeNulls,
1409        ) {
1410            Ok(batch) => batch,
1411            Err(llkv_result::Error::NotFound) => return Ok(None),
1412            Err(err) => return Err(err),
1413        };
1414
1415        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1416            return Ok(None);
1417        }
1418
1419        let array = batch
1420            .column(0)
1421            .as_any()
1422            .downcast_ref::<BinaryArray>()
1423            .ok_or_else(|| {
1424                llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1425            })?;
1426        if array.is_empty() || array.is_null(0) {
1427            return Ok(None);
1428        }
1429
1430        let bytes = array.value(0);
1431        let state = bitcode::decode(bytes).map_err(|e| {
1432            llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1433        })?;
1434        Ok(Some(state))
1435    }
1436
1437    /// Persist schema metadata to the catalog.
1438    ///
1439    /// Stores schema metadata at a row ID derived from the schema name hash.
1440    /// This allows efficient lookup and prevents collisions.
1441    pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1442        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1443        let schema = Arc::new(Schema::new(vec![
1444            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1445            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1446                crate::constants::FIELD_ID_META_KEY.to_string(),
1447                lfid_val.to_string(),
1448            )])),
1449        ]));
1450
1451        // Use hash of canonical (lowercase) schema name as row ID
1452        let canonical = meta.name.to_ascii_lowercase();
1453        let row_id_val = schema_name_to_row_id(&canonical);
1454        let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1455        let meta_encoded = bitcode::encode(meta);
1456        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1457
1458        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1459        self.store.append(&batch)?;
1460        Ok(())
1461    }
1462
1463    /// Retrieve schema metadata by name.
1464    ///
1465    /// Returns `None` if the schema does not exist.
1466    pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1467        let canonical = schema_name.to_ascii_lowercase();
1468        let row_id = schema_name_to_row_id(&canonical);
1469        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1470
1471        let batch = match self
1472            .store
1473            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1474        {
1475            Ok(batch) => batch,
1476            Err(llkv_result::Error::NotFound) => return Ok(None),
1477            Err(err) => return Err(err),
1478        };
1479
1480        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1481            return Ok(None);
1482        }
1483
1484        let array = batch
1485            .column(0)
1486            .as_any()
1487            .downcast_ref::<BinaryArray>()
1488            .ok_or_else(|| {
1489                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1490            })?;
1491
1492        if array.is_empty() || array.is_null(0) {
1493            return Ok(None);
1494        }
1495
1496        let bytes = array.value(0);
1497        let meta = bitcode::decode(bytes).map_err(|e| {
1498            llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1499        })?;
1500        Ok(Some(meta))
1501    }
1502
1503    /// Scan all schema metadata entries from the catalog.
1504    ///
1505    /// Returns a vector of all persisted schemas.
1506    pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1507        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1508        let row_field = rowid_fid(meta_field);
1509
1510        // Collect all row IDs that have schema metadata
1511        struct RowIdCollector {
1512            row_ids: Vec<RowId>,
1513        }
1514
1515        impl PrimitiveVisitor for RowIdCollector {
1516            fn u64_chunk(&mut self, values: &UInt64Array) {
1517                for i in 0..values.len() {
1518                    self.row_ids.push(values.value(i));
1519                }
1520            }
1521        }
1522        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1523        impl PrimitiveSortedVisitor for RowIdCollector {}
1524        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1525
1526        let mut collector = RowIdCollector {
1527            row_ids: Vec::new(),
1528        };
1529        match ScanBuilder::new(self.store, row_field)
1530            .options(ScanOptions::default())
1531            .run(&mut collector)
1532        {
1533            Ok(()) => {}
1534            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1535            Err(err) => return Err(err),
1536        }
1537
1538        if collector.row_ids.is_empty() {
1539            return Ok(Vec::new());
1540        }
1541
1542        // Gather all schema metadata using the collected row IDs
1543        let batch = self.store.gather_rows(
1544            &[meta_field],
1545            &collector.row_ids,
1546            GatherNullPolicy::IncludeNulls,
1547        )?;
1548
1549        let meta_col = batch
1550            .column(0)
1551            .as_any()
1552            .downcast_ref::<BinaryArray>()
1553            .ok_or_else(|| {
1554                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1555            })?;
1556
1557        let mut result = Vec::new();
1558        for idx in 0..collector.row_ids.len() {
1559            if !meta_col.is_null(idx) {
1560                let bytes = meta_col.value(idx);
1561                if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1562                    result.push(meta);
1563                }
1564            }
1565        }
1566
1567        Ok(result)
1568    }
1569
1570    /// Persist custom type metadata to the catalog.
1571    ///
1572    /// Stores custom type metadata at a row ID derived from the type name hash.
1573    pub fn put_custom_type_meta(&self, meta: &CustomTypeMeta) -> LlkvResult<()> {
1574        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID).into();
1575        let schema = Arc::new(Schema::new(vec![
1576            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1577            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1578                crate::constants::FIELD_ID_META_KEY.to_string(),
1579                lfid_val.to_string(),
1580            )])),
1581        ]));
1582
1583        // Use hash of canonical (lowercase) type name as row ID
1584        let canonical = meta.name.to_ascii_lowercase();
1585        let row_id_val = schema_name_to_row_id(&canonical); // Reuse same hash function
1586        let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1587        let meta_encoded = bitcode::encode(meta);
1588        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1589
1590        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1591        self.store.append(&batch)?;
1592        Ok(())
1593    }
1594
1595    /// Retrieve custom type metadata by name.
1596    ///
1597    /// Returns `None` if the type does not exist.
1598    pub fn get_custom_type_meta(&self, type_name: &str) -> LlkvResult<Option<CustomTypeMeta>> {
1599        let canonical = type_name.to_ascii_lowercase();
1600        let row_id = schema_name_to_row_id(&canonical);
1601        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1602
1603        let batch = match self
1604            .store
1605            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1606        {
1607            Ok(batch) => batch,
1608            Err(llkv_result::Error::NotFound) => return Ok(None),
1609            Err(err) => return Err(err),
1610        };
1611
1612        let meta_col = batch
1613            .column(0)
1614            .as_any()
1615            .downcast_ref::<BinaryArray>()
1616            .ok_or_else(|| {
1617                llkv_result::Error::Internal(
1618                    "catalog custom_type_meta column should be Binary".into(),
1619                )
1620            })?;
1621
1622        if meta_col.is_null(0) {
1623            return Ok(None);
1624        }
1625
1626        let bytes = meta_col.value(0);
1627        let meta = bitcode::decode(bytes).map_err(|err| {
1628            llkv_result::Error::Internal(format!("failed to decode custom type metadata: {err}"))
1629        })?;
1630        Ok(Some(meta))
1631    }
1632
1633    /// Delete custom type metadata by name.
1634    ///
1635    /// Returns `Ok(())` regardless of whether the type existed.
1636    pub fn delete_custom_type_meta(&self, type_name: &str) -> LlkvResult<()> {
1637        let canonical = type_name.to_ascii_lowercase();
1638        let row_id = schema_name_to_row_id(&canonical);
1639        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1640
1641        // Delete by writing null value
1642        let lfid_val: u64 = lfid.into();
1643        let schema = Arc::new(Schema::new(vec![
1644            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1645            Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
1646                crate::constants::FIELD_ID_META_KEY.to_string(),
1647                lfid_val.to_string(),
1648            )])),
1649        ]));
1650
1651        let row_id_arr = Arc::new(UInt64Array::from(vec![row_id]));
1652        let mut meta_builder = BinaryBuilder::new();
1653        meta_builder.append_null();
1654        let meta_arr = Arc::new(meta_builder.finish());
1655
1656        let batch = RecordBatch::try_new(schema, vec![row_id_arr, meta_arr])?;
1657        self.store.append(&batch)?;
1658        Ok(())
1659    }
1660
1661    /// Retrieve all custom type metadata.
1662    ///
1663    /// Returns all persisted custom types.
1664    pub fn all_custom_type_metas(&self) -> LlkvResult<Vec<CustomTypeMeta>> {
1665        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1666        let row_field = rowid_fid(meta_field);
1667
1668        // Collect all row IDs that have custom type metadata
1669        struct RowIdCollector {
1670            row_ids: Vec<RowId>,
1671        }
1672
1673        impl PrimitiveVisitor for RowIdCollector {
1674            fn u64_chunk(&mut self, values: &UInt64Array) {
1675                for i in 0..values.len() {
1676                    self.row_ids.push(values.value(i));
1677                }
1678            }
1679        }
1680        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1681        impl PrimitiveSortedVisitor for RowIdCollector {}
1682        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1683
1684        let mut collector = RowIdCollector {
1685            row_ids: Vec::new(),
1686        };
1687        match ScanBuilder::new(self.store, row_field)
1688            .options(ScanOptions::default())
1689            .run(&mut collector)
1690        {
1691            Ok(()) => {}
1692            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1693            Err(err) => return Err(err),
1694        }
1695
1696        if collector.row_ids.is_empty() {
1697            return Ok(Vec::new());
1698        }
1699
1700        // Gather all custom type metadata using the collected row IDs
1701        let batch = self.store.gather_rows(
1702            &[meta_field],
1703            &collector.row_ids,
1704            GatherNullPolicy::IncludeNulls,
1705        )?;
1706
1707        let meta_col = batch
1708            .column(0)
1709            .as_any()
1710            .downcast_ref::<BinaryArray>()
1711            .ok_or_else(|| {
1712                llkv_result::Error::Internal(
1713                    "catalog custom_type_meta column should be Binary".into(),
1714                )
1715            })?;
1716
1717        let mut result = Vec::new();
1718        for idx in 0..collector.row_ids.len() {
1719            if !meta_col.is_null(idx) {
1720                let bytes = meta_col.value(idx);
1721                if let Ok(meta) = bitcode::decode::<CustomTypeMeta>(bytes) {
1722                    result.push(meta);
1723                }
1724            }
1725        }
1726
1727        Ok(result)
1728    }
1729}
1730
1731/// Generate a row ID for schema metadata based on schema name.
1732///
1733/// Uses a simple hash to map schema names to row IDs. This is deterministic
1734/// and allows direct lookup without scanning.
1735fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1736    // Use a simple 64-bit FNV-1a hash for deterministic IDs across platforms and releases
1737    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1738    const FNV_PRIME: u64 = 0x1000_0000_01b3;
1739
1740    let mut hash = FNV_OFFSET;
1741    for byte in canonical_name.as_bytes() {
1742        hash ^= u64::from(*byte);
1743        hash = hash.wrapping_mul(FNV_PRIME);
1744    }
1745
1746    // Use high bit to avoid collision with reserved catalog row IDs (0-3) and table metadata rows
1747    hash | (1u64 << 63)
1748}
1749
1750struct MaxRowIdCollector {
1751    max: Option<RowId>,
1752}
1753
1754impl PrimitiveVisitor for MaxRowIdCollector {
1755    fn u64_chunk(&mut self, values: &UInt64Array) {
1756        for i in 0..values.len() {
1757            let value = values.value(i);
1758            self.max = match self.max {
1759                Some(curr) if curr >= value => Some(curr),
1760                _ => Some(value),
1761            };
1762        }
1763    }
1764}
1765
1766impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1767impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1768impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1769
1770#[cfg(test)]
1771mod tests {
1772    use super::*;
1773    use crate::constraints::{
1774        ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1775    };
1776    use llkv_column_map::ColumnStore;
1777    use std::sync::Arc;
1778
1779    #[test]
1780    fn constraint_records_roundtrip() {
1781        let pager = Arc::new(MemPager::default());
1782        let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1783        let catalog = SysCatalog::new(&store);
1784
1785        let table_id: TableId = 42;
1786        let record1 = ConstraintRecord {
1787            constraint_id: 1,
1788            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1789                field_ids: vec![1, 2],
1790            }),
1791            state: ConstraintState::Active,
1792            revision: 1,
1793            last_modified_micros: 100,
1794        };
1795        let record2 = ConstraintRecord {
1796            constraint_id: 2,
1797            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1798            state: ConstraintState::Active,
1799            revision: 2,
1800            last_modified_micros: 200,
1801        };
1802        catalog
1803            .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1804            .unwrap();
1805
1806        let other_table_record = ConstraintRecord {
1807            constraint_id: 1,
1808            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1809            state: ConstraintState::Active,
1810            revision: 1,
1811            last_modified_micros: 150,
1812        };
1813        catalog
1814            .put_constraint_records(7, &[other_table_record])
1815            .unwrap();
1816
1817        let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1818        fetched.sort_by_key(|record| record.constraint_id);
1819
1820        assert_eq!(fetched.len(), 2);
1821        assert_eq!(fetched[0], record1);
1822        assert_eq!(fetched[1], record2);
1823
1824        let single = catalog
1825            .get_constraint_records(table_id, &[record1.constraint_id])
1826            .unwrap();
1827        assert_eq!(single.len(), 1);
1828        assert_eq!(single[0].as_ref(), Some(&record1));
1829
1830        let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1831        assert_eq!(missing.len(), 1);
1832        assert!(missing[0].is_none());
1833    }
1834}