llkv_table/
sys_catalog.rs

1//! System catalog for storing table and column metadata.
2//!
3//! The system catalog uses table 0 (reserved) to store metadata about all tables
4//! and columns in the database. This metadata includes:
5//!
6//! - **Table metadata** ([`TableMeta`]): Table ID, name, creation time, flags
7//! - **Column metadata** ([`ColMeta`]): Column ID, name, flags, default values
8//! - **Multi-column unique metadata** ([`TableMultiColumnUniqueMeta`]): Unique index definitions per table
9//!
10//! # Storage Format
11//!
12//! The catalog stores metadata as serialized [`bitcode`] blobs in special catalog
13//! columns within table 0. See [`CATALOG_TABLE_ID`] and related constants in the
14//! [`reserved`](crate::reserved) module.
15//!
16//! # Usage
17//!
18//! The [`SysCatalog`] provides methods to:
19//! - Insert/update table metadata ([`put_table_meta`](SysCatalog::put_table_meta))
20//! - Query table metadata ([`get_table_meta`](SysCatalog::get_table_meta))
21//! - Manage column metadata similarly
22//!
23//! This metadata is used by higher-level components to validate schemas, assign
24//! field IDs, and enforce table constraints.
25
26use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37    ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, RowId, TableId};
40use llkv_column_map::store::scan::{
41    PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42    PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::types::LogicalFieldId;
46use llkv_column_map::{
47    ColumnStore,
48    store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
49    types::Namespace,
50};
51use llkv_result::{self, Result as LlkvResult};
52use llkv_storage::pager::{MemPager, Pager};
53use simd_r_drive_entry_handle::EntryHandle;
54
55// Import all reserved constants and validation functions
56use crate::reserved::*;
57
58// ----- Namespacing helpers -----
59
60// TODO: Dedupe with llkv_column_map::types::lfid()
61#[inline]
62fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
63    LogicalFieldId::new()
64        .with_namespace(Namespace::UserData)
65        .with_table_id(table_id)
66        .with_field_id(col_id)
67}
68
69// TODO: Migrate to llkv_column_map::types::rid_table()
70#[inline]
71fn rid_table(table_id: TableId) -> u64 {
72    let fid = LogicalFieldId::new()
73        .with_namespace(Namespace::UserData)
74        .with_table_id(table_id)
75        .with_field_id(0);
76    fid.into()
77}
78
79// TODO: Migrate to llkv_column_map::types::rid_col()
80#[inline]
81fn rid_col(table_id: TableId, col_id: u32) -> u64 {
82    lfid(table_id, col_id).into()
83}
84
85#[inline]
86fn constraint_meta_lfid() -> LogicalFieldId {
87    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
88}
89
90#[inline]
91fn constraint_name_lfid() -> LogicalFieldId {
92    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
93}
94
95#[inline]
96fn constraint_row_lfid() -> LogicalFieldId {
97    rowid_fid(constraint_meta_lfid())
98}
99
100#[derive(Clone, Debug, Encode, Decode)]
101pub struct ConstraintNameRecord {
102    pub constraint_id: ConstraintId,
103    pub name: Option<String>,
104}
105
106fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
107    bitcode::decode(bytes).map_err(|err| {
108        llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
109    })
110}
111
112struct ConstraintRowCollector<'a, P, F>
113where
114    P: Pager<Blob = EntryHandle> + Send + Sync,
115    F: FnMut(Vec<ConstraintRecord>),
116{
117    store: &'a ColumnStore<P>,
118    lfid: LogicalFieldId,
119    table_id: TableId,
120    on_batch: &'a mut F,
121    buffer: Vec<RowId>,
122    error: Option<llkv_result::Error>,
123}
124
125impl<'a, P, F> ConstraintRowCollector<'a, P, F>
126where
127    P: Pager<Blob = EntryHandle> + Send + Sync,
128    F: FnMut(Vec<ConstraintRecord>),
129{
130    fn flush_buffer(&mut self) -> LlkvResult<()> {
131        if self.buffer.is_empty() {
132            return Ok(());
133        }
134
135        let row_ids = mem::take(&mut self.buffer);
136        let batch =
137            self.store
138                .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
139
140        if batch.num_columns() == 0 {
141            return Ok(());
142        }
143
144        let array = batch
145            .column(0)
146            .as_any()
147            .downcast_ref::<BinaryArray>()
148            .ok_or_else(|| {
149                llkv_result::Error::Internal(
150                    "constraint metadata column stored unexpected type".into(),
151                )
152            })?;
153
154        let mut records = Vec::with_capacity(row_ids.len());
155        for (idx, row_id) in row_ids.into_iter().enumerate() {
156            if array.is_null(idx) {
157                continue;
158            }
159
160            let record = decode_constraint_record(array.value(idx))?;
161            let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
162            if table_from_id != self.table_id {
163                continue;
164            }
165            if record.constraint_id != constraint_id {
166                return Err(llkv_result::Error::Internal(
167                    "constraint metadata id mismatch".into(),
168                ));
169            }
170            records.push(record);
171        }
172
173        if !records.is_empty() {
174            (self.on_batch)(records);
175        }
176
177        Ok(())
178    }
179
180    fn finish(&mut self) -> LlkvResult<()> {
181        if let Some(err) = self.error.take() {
182            return Err(err);
183        }
184        self.flush_buffer()
185    }
186}
187
188impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
189where
190    P: Pager<Blob = EntryHandle> + Send + Sync,
191    F: FnMut(Vec<ConstraintRecord>),
192{
193    fn u64_chunk(&mut self, values: &UInt64Array) {
194        if self.error.is_some() {
195            return;
196        }
197
198        for idx in 0..values.len() {
199            let row_id = values.value(idx);
200            let (table_id, _) = decode_constraint_row_id(row_id);
201            if table_id != self.table_id {
202                continue;
203            }
204            self.buffer.push(row_id);
205            if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
206                && let Err(err) = self.flush_buffer()
207            {
208                self.error = Some(err);
209                return;
210            }
211        }
212    }
213}
214
215impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
216where
217    P: Pager<Blob = EntryHandle> + Send + Sync,
218    F: FnMut(Vec<ConstraintRecord>),
219{
220}
221
222impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
223where
224    P: Pager<Blob = EntryHandle> + Send + Sync,
225    F: FnMut(Vec<ConstraintRecord>),
226{
227}
228
229impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
230where
231    P: Pager<Blob = EntryHandle> + Send + Sync,
232    F: FnMut(Vec<ConstraintRecord>),
233{
234}
235
236// ----- Public catalog types -----
237
238/// Metadata about a table.
239///
240/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
241#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
242pub struct TableMeta {
243    /// Unique identifier for this table.
244    pub table_id: TableId,
245    /// Optional human-readable name for the table.
246    pub name: Option<String>,
247    /// When the table was created (microseconds since epoch).
248    pub created_at_micros: u64,
249    /// Bitflags for table properties (e.g., temporary, system).
250    pub flags: u32,
251    /// Schema version or modification counter.
252    pub epoch: u64,
253}
254
255/// Metadata about a column.
256///
257/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
258#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
259pub struct ColMeta {
260    /// Unique identifier for this column within its table.
261    pub col_id: u32,
262    /// Optional human-readable name for the column.
263    pub name: Option<String>,
264    /// Bitflags for column properties (e.g., nullable, indexed).
265    pub flags: u32,
266    /// Optional serialized default value for the column.
267    pub default: Option<Vec<u8>>,
268}
269
270/// Metadata about a schema.
271///
272/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
273#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
274pub struct SchemaMeta {
275    /// Human-readable schema name (case-preserved).
276    pub name: String,
277    /// When the schema was created (microseconds since epoch).
278    pub created_at_micros: u64,
279    /// Bitflags for schema properties (reserved for future use).
280    pub flags: u32,
281}
282
283/// Metadata describing a single multi-column UNIQUE index.
284#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
285pub struct MultiColumnUniqueEntryMeta {
286    /// Optional human-readable index name.
287    pub index_name: Option<String>,
288    /// Field IDs participating in this UNIQUE constraint.
289    pub column_ids: Vec<u32>,
290}
291
292/// Metadata describing all multi-column UNIQUE indexes for a table.
293#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
294pub struct TableMultiColumnUniqueMeta {
295    /// Table identifier these UNIQUE entries belong to.
296    pub table_id: TableId,
297    /// Definitions of each persisted multi-column UNIQUE.
298    pub uniques: Vec<MultiColumnUniqueEntryMeta>,
299}
300
301// ----- SysCatalog -----
302
303/// Interface to the system catalog (table 0).
304///
305/// The system catalog stores metadata about all tables and columns in the database.
306/// It uses special reserved columns within table 0 to persist [`TableMeta`] and
307/// [`ColMeta`] structures.
308///
309/// # Lifetime
310///
311/// `SysCatalog` borrows a reference to the [`ColumnStore`] and does not own it.
312/// This allows multiple catalog instances to coexist with the same storage.
313pub struct SysCatalog<'a, P = MemPager>
314where
315    P: Pager<Blob = EntryHandle> + Send + Sync,
316{
317    store: &'a ColumnStore<P>,
318}
319
320impl<'a, P> SysCatalog<'a, P>
321where
322    P: Pager<Blob = EntryHandle> + Send + Sync,
323{
324    fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
325        if row_ids.is_empty() {
326            return Ok(());
327        }
328
329        let lfid_val: u64 = meta_field.into();
330        let schema = Arc::new(Schema::new(vec![
331            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
332            Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
333                crate::constants::FIELD_ID_META_KEY.to_string(),
334                lfid_val.to_string(),
335            )])),
336        ]));
337
338        let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
339        let mut builder = BinaryBuilder::new();
340        for _ in row_ids {
341            builder.append_null();
342        }
343        let meta_array = Arc::new(builder.finish());
344
345        let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
346        self.store.append(&batch)?;
347        Ok(())
348    }
349
350    /// Create a new system catalog interface using the provided column store.
351    pub fn new(store: &'a ColumnStore<P>) -> Self {
352        Self { store }
353    }
354
355    /// Insert or update table metadata.
356    ///
357    /// This persists the table's metadata to the system catalog. If metadata for
358    /// this table ID already exists, it is overwritten (last-write-wins).
359    pub fn put_table_meta(&self, meta: &TableMeta) {
360        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
361        let schema = Arc::new(Schema::new(vec![
362            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
363            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
364                crate::constants::FIELD_ID_META_KEY.to_string(),
365                lfid_val.to_string(),
366            )])),
367        ]));
368
369        let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
370        let meta_encoded = bitcode::encode(meta);
371        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
372
373        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
374        self.store.append(&batch).unwrap();
375    }
376
377    /// Retrieve table metadata by table ID.
378    ///
379    /// Returns `None` if no metadata exists for the given table ID.
380    pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
381        let row_id = rid_table(table_id);
382        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
383        let batch = self
384            .store
385            .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
386            .ok()?;
387
388        if batch.num_rows() == 0 || batch.num_columns() == 0 {
389            return None;
390        }
391
392        let array = batch
393            .column(0)
394            .as_any()
395            .downcast_ref::<BinaryArray>()
396            .expect("table meta column must be BinaryArray");
397
398        if array.is_null(0) {
399            return None;
400        }
401
402        bitcode::decode(array.value(0)).ok()
403    }
404
405    /// Upsert a single column’s metadata.
406    pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
407        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
408        let schema = Arc::new(Schema::new(vec![
409            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
410            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
411                crate::constants::FIELD_ID_META_KEY.to_string(),
412                lfid_val.to_string(),
413            )])),
414        ]));
415
416        let rid_value = rid_col(table_id, meta.col_id);
417        let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
418        let meta_encoded = bitcode::encode(meta);
419        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
420
421        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
422        self.store.append(&batch).unwrap();
423    }
424
425    /// Batch fetch specific column metas by col_id using a shared keyset.
426    pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
427        if col_ids.is_empty() {
428            return Vec::new();
429        }
430
431        let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
432        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
433
434        let batch =
435            match self
436                .store
437                .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
438            {
439                Ok(batch) => batch,
440                Err(_) => return vec![None; col_ids.len()],
441            };
442
443        let meta_col = batch
444            .column(0)
445            .as_any()
446            .downcast_ref::<BinaryArray>()
447            .expect("catalog meta column should be Binary");
448
449        col_ids
450            .iter()
451            .enumerate()
452            .map(|(idx, _)| {
453                if meta_col.is_null(idx) {
454                    None
455                } else {
456                    bitcode::decode(meta_col.value(idx)).ok()
457                }
458            })
459            .collect()
460    }
461
462    /// Delete metadata rows for the specified column identifiers.
463    pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
464        if col_ids.is_empty() {
465            return Ok(());
466        }
467
468        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
469        let row_ids: Vec<RowId> = col_ids
470            .iter()
471            .map(|&col_id| rid_col(table_id, col_id))
472            .collect();
473        self.write_null_entries(meta_field, &row_ids)
474    }
475
476    /// Remove the persisted table metadata record, if present.
477    pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
478        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
479        let row_id = rid_table(table_id);
480        self.write_null_entries(meta_field, &[row_id])
481    }
482
483    /// Delete constraint records for the provided identifiers.
484    pub fn delete_constraint_records(
485        &self,
486        table_id: TableId,
487        constraint_ids: &[ConstraintId],
488    ) -> LlkvResult<()> {
489        if constraint_ids.is_empty() {
490            return Ok(());
491        }
492
493        let meta_field = constraint_meta_lfid();
494        let row_ids: Vec<RowId> = constraint_ids
495            .iter()
496            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
497            .collect();
498        self.write_null_entries(meta_field, &row_ids)
499    }
500
501    /// Delete persisted constraint names for the provided identifiers.
502    pub fn delete_constraint_names(
503        &self,
504        table_id: TableId,
505        constraint_ids: &[ConstraintId],
506    ) -> LlkvResult<()> {
507        if constraint_ids.is_empty() {
508            return Ok(());
509        }
510
511        let lfid = constraint_name_lfid();
512        let row_ids: Vec<RowId> = constraint_ids
513            .iter()
514            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
515            .collect();
516        self.write_null_entries(lfid, &row_ids)
517    }
518
519    /// Delete the multi-column UNIQUE metadata record for a table, if any.
520    pub fn delete_multi_column_uniques(&self, table_id: TableId) -> LlkvResult<()> {
521        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
522        let row_id = rid_table(table_id);
523        self.write_null_entries(meta_field, &[row_id])
524    }
525
526    /// Persist the complete set of multi-column UNIQUE definitions for a table.
527    pub fn put_multi_column_uniques(
528        &self,
529        table_id: TableId,
530        uniques: &[MultiColumnUniqueEntryMeta],
531    ) -> LlkvResult<()> {
532        let lfid_val: u64 =
533            lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
534        let schema = Arc::new(Schema::new(vec![
535            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
536            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
537                crate::constants::FIELD_ID_META_KEY.to_string(),
538                lfid_val.to_string(),
539            )])),
540        ]));
541
542        let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
543        let meta = TableMultiColumnUniqueMeta {
544            table_id,
545            uniques: uniques.to_vec(),
546        };
547        let encoded = bitcode::encode(&meta);
548        let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
549
550        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
551        self.store.append(&batch)?;
552        Ok(())
553    }
554
555    /// Retrieve all persisted multi-column UNIQUE definitions for a table.
556    pub fn get_multi_column_uniques(
557        &self,
558        table_id: TableId,
559    ) -> LlkvResult<Vec<MultiColumnUniqueEntryMeta>> {
560        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
561        let row_id = rid_table(table_id);
562        let batch = match self
563            .store
564            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
565        {
566            Ok(batch) => batch,
567            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
568            Err(err) => return Err(err),
569        };
570
571        if batch.num_columns() == 0 || batch.num_rows() == 0 {
572            return Ok(Vec::new());
573        }
574
575        let array = batch
576            .column(0)
577            .as_any()
578            .downcast_ref::<BinaryArray>()
579            .ok_or_else(|| {
580                llkv_result::Error::Internal(
581                    "catalog multi-column unique column stored unexpected type".into(),
582                )
583            })?;
584
585        if array.is_null(0) {
586            return Ok(Vec::new());
587        }
588
589        let meta: TableMultiColumnUniqueMeta = bitcode::decode(array.value(0)).map_err(|err| {
590            llkv_result::Error::Internal(format!(
591                "failed to decode multi-column unique metadata: {err}"
592            ))
593        })?;
594
595        Ok(meta.uniques)
596    }
597
598    /// Retrieve all persisted multi-column UNIQUE definitions across tables.
599    pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnUniqueMeta>> {
600        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
601        let row_field = rowid_fid(meta_field);
602
603        struct RowIdCollector {
604            row_ids: Vec<RowId>,
605        }
606
607        impl PrimitiveVisitor for RowIdCollector {
608            fn u64_chunk(&mut self, values: &UInt64Array) {
609                for i in 0..values.len() {
610                    self.row_ids.push(values.value(i));
611                }
612            }
613        }
614        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
615        impl PrimitiveSortedVisitor for RowIdCollector {}
616        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
617
618        let mut collector = RowIdCollector {
619            row_ids: Vec::new(),
620        };
621        match ScanBuilder::new(self.store, row_field)
622            .options(ScanOptions::default())
623            .run(&mut collector)
624        {
625            Ok(()) => {}
626            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
627            Err(err) => return Err(err),
628        }
629
630        if collector.row_ids.is_empty() {
631            return Ok(Vec::new());
632        }
633
634        let batch = match self.store.gather_rows(
635            &[meta_field],
636            &collector.row_ids,
637            GatherNullPolicy::IncludeNulls,
638        ) {
639            Ok(batch) => batch,
640            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
641            Err(err) => return Err(err),
642        };
643
644        if batch.num_columns() == 0 {
645            return Ok(Vec::new());
646        }
647
648        let array = batch
649            .column(0)
650            .as_any()
651            .downcast_ref::<BinaryArray>()
652            .ok_or_else(|| {
653                llkv_result::Error::Internal(
654                    "catalog multi-column unique column stored unexpected type".into(),
655                )
656            })?;
657
658        let mut metas = Vec::with_capacity(batch.num_rows());
659        for idx in 0..batch.num_rows() {
660            if array.is_null(idx) {
661                continue;
662            }
663            let meta: TableMultiColumnUniqueMeta =
664                bitcode::decode(array.value(idx)).map_err(|err| {
665                    llkv_result::Error::Internal(format!(
666                        "failed to decode multi-column unique metadata: {err}"
667                    ))
668                })?;
669            metas.push(meta);
670        }
671
672        Ok(metas)
673    }
674
675    // TODO: Use batch APIs for better performance.
676    /// Persist or update multiple constraint records for a table in a single batch.
677    pub fn put_constraint_records(
678        &self,
679        table_id: TableId,
680        records: &[ConstraintRecord],
681    ) -> LlkvResult<()> {
682        if records.is_empty() {
683            return Ok(());
684        }
685
686        let lfid_val: u64 = constraint_meta_lfid().into();
687        let schema = Arc::new(Schema::new(vec![
688            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
689            Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
690                crate::constants::FIELD_ID_META_KEY.to_string(),
691                lfid_val.to_string(),
692            )])),
693        ]));
694
695        let row_ids: Vec<RowId> = records
696            .iter()
697            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
698            .collect();
699
700        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
701        let payload_array = Arc::new(BinaryArray::from_iter_values(
702            records.iter().map(bitcode::encode),
703        ));
704
705        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
706        self.store.append(&batch)?;
707        Ok(())
708    }
709
710    /// Persist or update constraint names for the specified identifiers.
711    pub fn put_constraint_names(
712        &self,
713        table_id: TableId,
714        names: &[ConstraintNameRecord],
715    ) -> LlkvResult<()> {
716        if names.is_empty() {
717            return Ok(());
718        }
719
720        let lfid_val: u64 = constraint_name_lfid().into();
721        let schema = Arc::new(Schema::new(vec![
722            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
723            Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
724                (
725                    crate::constants::FIELD_ID_META_KEY.to_string(),
726                    lfid_val.to_string(),
727                ),
728            ])),
729        ]));
730
731        let row_ids: Vec<RowId> = names
732            .iter()
733            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
734            .collect();
735        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
736        let payload_array = Arc::new(BinaryArray::from_iter_values(
737            names.iter().map(bitcode::encode),
738        ));
739
740        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
741        self.store.append(&batch)?;
742        Ok(())
743    }
744
745    /// Fetch multiple constraint records for a table in a single batch.
746    pub fn get_constraint_records(
747        &self,
748        table_id: TableId,
749        constraint_ids: &[ConstraintId],
750    ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
751        if constraint_ids.is_empty() {
752            return Ok(Vec::new());
753        }
754
755        let lfid = constraint_meta_lfid();
756        let row_ids: Vec<RowId> = constraint_ids
757            .iter()
758            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
759            .collect();
760
761        let batch = match self
762            .store
763            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
764        {
765            Ok(batch) => batch,
766            Err(llkv_result::Error::NotFound) => {
767                return Ok(vec![None; constraint_ids.len()]);
768            }
769            Err(err) => return Err(err),
770        };
771
772        if batch.num_columns() == 0 || batch.num_rows() == 0 {
773            return Ok(vec![None; constraint_ids.len()]);
774        }
775
776        let array = batch
777            .column(0)
778            .as_any()
779            .downcast_ref::<BinaryArray>()
780            .ok_or_else(|| {
781                llkv_result::Error::Internal(
782                    "constraint metadata column stored unexpected type".into(),
783                )
784            })?;
785
786        let mut results = Vec::with_capacity(constraint_ids.len());
787        for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
788            if array.is_null(idx) {
789                results.push(None);
790                continue;
791            }
792            let record = decode_constraint_record(array.value(idx))?;
793            if record.constraint_id != constraint_id {
794                return Err(llkv_result::Error::Internal(
795                    "constraint metadata id mismatch".into(),
796                ));
797            }
798            results.push(Some(record));
799        }
800
801        Ok(results)
802    }
803
804    /// Fetch constraint names for a table in a single batch.
805    pub fn get_constraint_names(
806        &self,
807        table_id: TableId,
808        constraint_ids: &[ConstraintId],
809    ) -> LlkvResult<Vec<Option<String>>> {
810        if constraint_ids.is_empty() {
811            return Ok(Vec::new());
812        }
813
814        let lfid = constraint_name_lfid();
815        let row_ids: Vec<RowId> = constraint_ids
816            .iter()
817            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
818            .collect();
819
820        let batch = match self
821            .store
822            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
823        {
824            Ok(batch) => batch,
825            Err(llkv_result::Error::NotFound) => {
826                return Ok(vec![None; constraint_ids.len()]);
827            }
828            Err(err) => return Err(err),
829        };
830
831        if batch.num_columns() == 0 {
832            return Ok(vec![None; constraint_ids.len()]);
833        }
834
835        let array = batch
836            .column(0)
837            .as_any()
838            .downcast_ref::<BinaryArray>()
839            .ok_or_else(|| {
840                llkv_result::Error::Internal(
841                    "constraint name metadata column stored unexpected type".into(),
842                )
843            })?;
844
845        let mut results = Vec::with_capacity(row_ids.len());
846        for idx in 0..row_ids.len() {
847            if array.is_null(idx) {
848                results.push(None);
849            } else {
850                let record: ConstraintNameRecord =
851                    bitcode::decode(array.value(idx)).map_err(|err| {
852                        llkv_result::Error::Internal(format!(
853                            "failed to decode constraint name metadata: {err}"
854                        ))
855                    })?;
856                results.push(record.name);
857            }
858        }
859
860        Ok(results)
861    }
862
863    /// Stream constraint records for a table in batches.
864    pub fn scan_constraint_records_for_table<F>(
865        &self,
866        table_id: TableId,
867        mut on_batch: F,
868    ) -> LlkvResult<()>
869    where
870        F: FnMut(Vec<ConstraintRecord>),
871    {
872        let row_field = constraint_row_lfid();
873        let mut visitor = ConstraintRowCollector {
874            store: self.store,
875            lfid: constraint_meta_lfid(),
876            table_id,
877            on_batch: &mut on_batch,
878            buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
879            error: None,
880        };
881
882        match ScanBuilder::new(self.store, row_field)
883            .options(ScanOptions::default())
884            .run(&mut visitor)
885        {
886            Ok(()) => {}
887            Err(llkv_result::Error::NotFound) => return Ok(()),
888            Err(err) => return Err(err),
889        }
890
891        visitor.finish()
892    }
893
894    /// Load all constraint records for a table into a vector.
895    pub fn constraint_records_for_table(
896        &self,
897        table_id: TableId,
898    ) -> LlkvResult<Vec<ConstraintRecord>> {
899        let mut all = Vec::new();
900        self.scan_constraint_records_for_table(table_id, |mut chunk| {
901            all.append(&mut chunk);
902        })?;
903        Ok(all)
904    }
905
906    pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
907        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
908        let schema = Arc::new(Schema::new(vec![
909            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
910            Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
911                crate::constants::FIELD_ID_META_KEY.to_string(),
912                lfid_val.to_string(),
913            )])),
914        ]));
915
916        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
917        let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
918        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
919        self.store.append(&batch)?;
920        Ok(())
921    }
922
923    pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
924        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
925        let batch = match self.store.gather_rows(
926            &[lfid],
927            &[CATALOG_NEXT_TABLE_ROW_ID],
928            GatherNullPolicy::IncludeNulls,
929        ) {
930            Ok(batch) => batch,
931            Err(llkv_result::Error::NotFound) => return Ok(None),
932            Err(err) => return Err(err),
933        };
934
935        if batch.num_columns() == 0 || batch.num_rows() == 0 {
936            return Ok(None);
937        }
938
939        let array = batch
940            .column(0)
941            .as_any()
942            .downcast_ref::<UInt64Array>()
943            .ok_or_else(|| {
944                llkv_result::Error::Internal(
945                    "catalog next_table_id column stored unexpected type".into(),
946                )
947            })?;
948        if array.is_empty() || array.is_null(0) {
949            return Ok(None);
950        }
951
952        let value = array.value(0);
953        if value > TableId::MAX as u64 {
954            return Err(llkv_result::Error::InvalidArgumentError(
955                "persisted next_table_id exceeds TableId range".into(),
956            ));
957        }
958
959        Ok(Some(value as TableId))
960    }
961
962    pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
963        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
964        let row_field = rowid_fid(meta_field);
965
966        let mut collector = MaxRowIdCollector { max: None };
967        match ScanBuilder::new(self.store, row_field)
968            .options(ScanOptions::default())
969            .run(&mut collector)
970        {
971            Ok(()) => {}
972            Err(llkv_result::Error::NotFound) => return Ok(None),
973            Err(err) => return Err(err),
974        }
975
976        let max_value = match collector.max {
977            Some(value) => value,
978            None => return Ok(None),
979        };
980
981        let logical: LogicalFieldId = max_value.into();
982        Ok(Some(logical.table_id()))
983    }
984
985    /// Scan all table metadata entries from the catalog.
986    /// Returns a vector of (table_id, TableMeta) pairs for all persisted tables.
987    ///
988    /// This method first scans for all row IDs in the table metadata column,
989    /// then uses gather_rows to retrieve the actual metadata.
990    pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
991        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
992        let row_field = rowid_fid(meta_field);
993
994        // Collect all row IDs that have table metadata
995        struct RowIdCollector {
996            row_ids: Vec<RowId>,
997        }
998
999        impl PrimitiveVisitor for RowIdCollector {
1000            fn u64_chunk(&mut self, values: &UInt64Array) {
1001                for i in 0..values.len() {
1002                    self.row_ids.push(values.value(i));
1003                }
1004            }
1005        }
1006        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1007        impl PrimitiveSortedVisitor for RowIdCollector {}
1008        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1009
1010        let mut collector = RowIdCollector {
1011            row_ids: Vec::new(),
1012        };
1013        match ScanBuilder::new(self.store, row_field)
1014            .options(ScanOptions::default())
1015            .run(&mut collector)
1016        {
1017            Ok(()) => {}
1018            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1019            Err(err) => return Err(err),
1020        }
1021
1022        if collector.row_ids.is_empty() {
1023            return Ok(Vec::new());
1024        }
1025
1026        // Gather all table metadata using the collected row IDs
1027        let batch = self.store.gather_rows(
1028            &[meta_field],
1029            &collector.row_ids,
1030            GatherNullPolicy::IncludeNulls,
1031        )?;
1032
1033        let meta_col = batch
1034            .column(0)
1035            .as_any()
1036            .downcast_ref::<BinaryArray>()
1037            .ok_or_else(|| {
1038                llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1039            })?;
1040
1041        let mut result = Vec::new();
1042        for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1043            if !meta_col.is_null(idx) {
1044                let bytes = meta_col.value(idx);
1045                if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1046                    let logical: LogicalFieldId = row_id.into();
1047                    let table_id = logical.table_id();
1048                    result.push((table_id, meta));
1049                }
1050            }
1051        }
1052
1053        Ok(result)
1054    }
1055
1056    /// Persist the next transaction id to the catalog.
1057    pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1058        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1059        let schema = Arc::new(Schema::new(vec![
1060            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1061            Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1062                crate::constants::FIELD_ID_META_KEY.to_string(),
1063                lfid_val.to_string(),
1064            )])),
1065        ]));
1066
1067        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1068        let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1069        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1070        self.store.append(&batch)?;
1071        Ok(())
1072    }
1073
1074    /// Load the next transaction id from the catalog.
1075    pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1076        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1077        let batch = match self.store.gather_rows(
1078            &[lfid],
1079            &[CATALOG_NEXT_TXN_ROW_ID],
1080            GatherNullPolicy::IncludeNulls,
1081        ) {
1082            Ok(batch) => batch,
1083            Err(llkv_result::Error::NotFound) => return Ok(None),
1084            Err(err) => return Err(err),
1085        };
1086
1087        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1088            return Ok(None);
1089        }
1090
1091        let array = batch
1092            .column(0)
1093            .as_any()
1094            .downcast_ref::<UInt64Array>()
1095            .ok_or_else(|| {
1096                llkv_result::Error::Internal(
1097                    "catalog next_txn_id column stored unexpected type".into(),
1098                )
1099            })?;
1100        if array.is_empty() || array.is_null(0) {
1101            return Ok(None);
1102        }
1103
1104        let value = array.value(0);
1105        Ok(Some(value))
1106    }
1107
1108    /// Persist the last committed transaction id to the catalog.
1109    pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1110        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1111        let schema = Arc::new(Schema::new(vec![
1112            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1113            Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1114                HashMap::from([(
1115                    crate::constants::FIELD_ID_META_KEY.to_string(),
1116                    lfid_val.to_string(),
1117                )]),
1118            ),
1119        ]));
1120
1121        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1122        let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1123        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1124        self.store.append(&batch)?;
1125        Ok(())
1126    }
1127
1128    /// Load the last committed transaction id from the catalog.
1129    pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1130        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1131        let batch = match self.store.gather_rows(
1132            &[lfid],
1133            &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1134            GatherNullPolicy::IncludeNulls,
1135        ) {
1136            Ok(batch) => batch,
1137            Err(llkv_result::Error::NotFound) => return Ok(None),
1138            Err(err) => return Err(err),
1139        };
1140
1141        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1142            return Ok(None);
1143        }
1144
1145        let array = batch
1146            .column(0)
1147            .as_any()
1148            .downcast_ref::<UInt64Array>()
1149            .ok_or_else(|| {
1150                llkv_result::Error::Internal(
1151                    "catalog last_committed_txn_id column stored unexpected type".into(),
1152                )
1153            })?;
1154        if array.is_empty() || array.is_null(0) {
1155            return Ok(None);
1156        }
1157
1158        let value = array.value(0);
1159        Ok(Some(value))
1160    }
1161
1162    /// Persist the catalog state to the system catalog.
1163    ///
1164    /// Stores the complete catalog state (all tables and fields) as a binary blob
1165    /// using bitcode serialization.
1166    pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1167        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1168        let schema = Arc::new(Schema::new(vec![
1169            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1170            Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1171                crate::constants::FIELD_ID_META_KEY.to_string(),
1172                lfid_val.to_string(),
1173            )])),
1174        ]));
1175
1176        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1177        let encoded = bitcode::encode(state);
1178        let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1179
1180        let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1181        self.store.append(&batch)?;
1182        Ok(())
1183    }
1184
1185    /// Load the catalog state from the system catalog.
1186    ///
1187    /// Retrieves the complete catalog state including all table and field mappings.
1188    pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1189        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1190        let batch = match self.store.gather_rows(
1191            &[lfid],
1192            &[CATALOG_STATE_ROW_ID],
1193            GatherNullPolicy::IncludeNulls,
1194        ) {
1195            Ok(batch) => batch,
1196            Err(llkv_result::Error::NotFound) => return Ok(None),
1197            Err(err) => return Err(err),
1198        };
1199
1200        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1201            return Ok(None);
1202        }
1203
1204        let array = batch
1205            .column(0)
1206            .as_any()
1207            .downcast_ref::<BinaryArray>()
1208            .ok_or_else(|| {
1209                llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1210            })?;
1211        if array.is_empty() || array.is_null(0) {
1212            return Ok(None);
1213        }
1214
1215        let bytes = array.value(0);
1216        let state = bitcode::decode(bytes).map_err(|e| {
1217            llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1218        })?;
1219        Ok(Some(state))
1220    }
1221
1222    /// Persist schema metadata to the catalog.
1223    ///
1224    /// Stores schema metadata at a row ID derived from the schema name hash.
1225    /// This allows efficient lookup and prevents collisions.
1226    pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1227        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1228        let schema = Arc::new(Schema::new(vec![
1229            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1230            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1231                crate::constants::FIELD_ID_META_KEY.to_string(),
1232                lfid_val.to_string(),
1233            )])),
1234        ]));
1235
1236        // Use hash of canonical (lowercase) schema name as row ID
1237        let canonical = meta.name.to_ascii_lowercase();
1238        let row_id_val = schema_name_to_row_id(&canonical);
1239        let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1240        let meta_encoded = bitcode::encode(meta);
1241        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1242
1243        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1244        self.store.append(&batch)?;
1245        Ok(())
1246    }
1247
1248    /// Retrieve schema metadata by name.
1249    ///
1250    /// Returns `None` if the schema does not exist.
1251    pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1252        let canonical = schema_name.to_ascii_lowercase();
1253        let row_id = schema_name_to_row_id(&canonical);
1254        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1255
1256        let batch = match self
1257            .store
1258            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1259        {
1260            Ok(batch) => batch,
1261            Err(llkv_result::Error::NotFound) => return Ok(None),
1262            Err(err) => return Err(err),
1263        };
1264
1265        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1266            return Ok(None);
1267        }
1268
1269        let array = batch
1270            .column(0)
1271            .as_any()
1272            .downcast_ref::<BinaryArray>()
1273            .ok_or_else(|| {
1274                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1275            })?;
1276
1277        if array.is_empty() || array.is_null(0) {
1278            return Ok(None);
1279        }
1280
1281        let bytes = array.value(0);
1282        let meta = bitcode::decode(bytes).map_err(|e| {
1283            llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1284        })?;
1285        Ok(Some(meta))
1286    }
1287
1288    /// Scan all schema metadata entries from the catalog.
1289    ///
1290    /// Returns a vector of all persisted schemas.
1291    pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1292        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1293        let row_field = rowid_fid(meta_field);
1294
1295        // Collect all row IDs that have schema metadata
1296        struct RowIdCollector {
1297            row_ids: Vec<RowId>,
1298        }
1299
1300        impl PrimitiveVisitor for RowIdCollector {
1301            fn u64_chunk(&mut self, values: &UInt64Array) {
1302                for i in 0..values.len() {
1303                    self.row_ids.push(values.value(i));
1304                }
1305            }
1306        }
1307        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1308        impl PrimitiveSortedVisitor for RowIdCollector {}
1309        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1310
1311        let mut collector = RowIdCollector {
1312            row_ids: Vec::new(),
1313        };
1314        match ScanBuilder::new(self.store, row_field)
1315            .options(ScanOptions::default())
1316            .run(&mut collector)
1317        {
1318            Ok(()) => {}
1319            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1320            Err(err) => return Err(err),
1321        }
1322
1323        if collector.row_ids.is_empty() {
1324            return Ok(Vec::new());
1325        }
1326
1327        // Gather all schema metadata using the collected row IDs
1328        let batch = self.store.gather_rows(
1329            &[meta_field],
1330            &collector.row_ids,
1331            GatherNullPolicy::IncludeNulls,
1332        )?;
1333
1334        let meta_col = batch
1335            .column(0)
1336            .as_any()
1337            .downcast_ref::<BinaryArray>()
1338            .ok_or_else(|| {
1339                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1340            })?;
1341
1342        let mut result = Vec::new();
1343        for idx in 0..collector.row_ids.len() {
1344            if !meta_col.is_null(idx) {
1345                let bytes = meta_col.value(idx);
1346                if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1347                    result.push(meta);
1348                }
1349            }
1350        }
1351
1352        Ok(result)
1353    }
1354}
1355
1356/// Generate a row ID for schema metadata based on schema name.
1357///
1358/// Uses a simple hash to map schema names to row IDs. This is deterministic
1359/// and allows direct lookup without scanning.
1360fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1361    // Use a simple 64-bit FNV-1a hash for deterministic IDs across platforms and releases
1362    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1363    const FNV_PRIME: u64 = 0x1000_0000_01b3;
1364
1365    let mut hash = FNV_OFFSET;
1366    for byte in canonical_name.as_bytes() {
1367        hash ^= u64::from(*byte);
1368        hash = hash.wrapping_mul(FNV_PRIME);
1369    }
1370
1371    // Use high bit to avoid collision with reserved catalog row IDs (0-3) and table metadata rows
1372    hash | (1u64 << 63)
1373}
1374
1375struct MaxRowIdCollector {
1376    max: Option<RowId>,
1377}
1378
1379impl PrimitiveVisitor for MaxRowIdCollector {
1380    fn u64_chunk(&mut self, values: &UInt64Array) {
1381        for i in 0..values.len() {
1382            let value = values.value(i);
1383            self.max = match self.max {
1384                Some(curr) if curr >= value => Some(curr),
1385                _ => Some(value),
1386            };
1387        }
1388    }
1389}
1390
1391impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1392impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1393impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1394
1395#[cfg(test)]
1396mod tests {
1397    use super::*;
1398    use crate::constraints::{
1399        ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1400    };
1401    use llkv_column_map::ColumnStore;
1402    use std::sync::Arc;
1403
1404    #[test]
1405    fn constraint_records_roundtrip() {
1406        let pager = Arc::new(MemPager::default());
1407        let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1408        let catalog = SysCatalog::new(&store);
1409
1410        let table_id: TableId = 42;
1411        let record1 = ConstraintRecord {
1412            constraint_id: 1,
1413            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1414                field_ids: vec![1, 2],
1415            }),
1416            state: ConstraintState::Active,
1417            revision: 1,
1418            last_modified_micros: 100,
1419        };
1420        let record2 = ConstraintRecord {
1421            constraint_id: 2,
1422            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1423            state: ConstraintState::Active,
1424            revision: 2,
1425            last_modified_micros: 200,
1426        };
1427        catalog
1428            .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1429            .unwrap();
1430
1431        let other_table_record = ConstraintRecord {
1432            constraint_id: 1,
1433            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1434            state: ConstraintState::Active,
1435            revision: 1,
1436            last_modified_micros: 150,
1437        };
1438        catalog
1439            .put_constraint_records(7, &[other_table_record])
1440            .unwrap();
1441
1442        let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1443        fetched.sort_by_key(|record| record.constraint_id);
1444
1445        assert_eq!(fetched.len(), 2);
1446        assert_eq!(fetched[0], record1);
1447        assert_eq!(fetched[1], record2);
1448
1449        let single = catalog
1450            .get_constraint_records(table_id, &[record1.constraint_id])
1451            .unwrap();
1452        assert_eq!(single.len(), 1);
1453        assert_eq!(single[0].as_ref(), Some(&record1));
1454
1455        let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1456        assert_eq!(missing.len(), 1);
1457        assert!(missing[0].is_none());
1458    }
1459}