llkv_table/
sys_catalog.rs

1//! System catalog for storing table and column metadata.
2//!
3//! The system catalog uses table 0 (reserved) to store metadata about all tables
4//! and columns in the database. This metadata includes:
5//!
6//! - **Table metadata** ([`TableMeta`]): Table ID, name, creation time, flags
7//! - **Column metadata** ([`ColMeta`]): Column ID, name, flags, default values
8//! - **Multi-column unique metadata** ([`TableMultiColumnUniqueMeta`]): Unique index definitions per table
9//!
10//! # Storage Format
11//!
12//! The catalog stores metadata as serialized [`bitcode`] blobs in special catalog
13//! columns within table 0. See [`CATALOG_TABLE_ID`] and related constants in the
14//! [`reserved`](crate::reserved) module.
15//!
16//! # Usage
17//!
18//! The [`SysCatalog`] provides methods to:
19//! - Insert/update table metadata ([`put_table_meta`](SysCatalog::put_table_meta))
20//! - Query table metadata ([`get_table_meta`](SysCatalog::get_table_meta))
21//! - Manage column metadata similarly
22//!
23//! This metadata is used by higher-level components to validate schemas, assign
24//! field IDs, and enforce table constraints.
25
26use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37    ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, ROW_ID_FIELD_ID, RowId, TableId};
40use llkv_column_map::store::scan::{
41    PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42    PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::types::LogicalFieldId;
46use llkv_column_map::{
47    ColumnStore,
48    store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
49};
50use llkv_result::{self, Result as LlkvResult};
51use llkv_storage::pager::{MemPager, Pager};
52use simd_r_drive_entry_handle::EntryHandle;
53
54// Import all reserved constants and validation functions
55use crate::reserved::*;
56
57// ----- Namespacing helpers -----
58
59// TODO: Dedupe with llkv_column_map::types::lfid()
60#[inline]
61fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
62    LogicalFieldId::for_user(table_id, col_id)
63}
64
65// TODO: Migrate to llkv_column_map::types::rid_table()
66#[inline]
67fn rid_table(table_id: TableId) -> u64 {
68    LogicalFieldId::for_user(table_id, ROW_ID_FIELD_ID).into()
69}
70
71// TODO: Migrate to llkv_column_map::types::rid_col()
72#[inline]
73fn rid_col(table_id: TableId, col_id: u32) -> u64 {
74    rowid_fid(lfid(table_id, col_id)).into()
75}
76
77#[inline]
78fn constraint_meta_lfid() -> LogicalFieldId {
79    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
80}
81
82#[inline]
83fn constraint_name_lfid() -> LogicalFieldId {
84    lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
85}
86
87#[inline]
88fn constraint_row_lfid() -> LogicalFieldId {
89    rowid_fid(constraint_meta_lfid())
90}
91
92#[derive(Clone, Debug, Encode, Decode)]
93pub struct ConstraintNameRecord {
94    pub constraint_id: ConstraintId,
95    pub name: Option<String>,
96}
97
98fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
99    bitcode::decode(bytes).map_err(|err| {
100        llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
101    })
102}
103
104struct ConstraintRowCollector<'a, P, F>
105where
106    P: Pager<Blob = EntryHandle> + Send + Sync,
107    F: FnMut(Vec<ConstraintRecord>),
108{
109    store: &'a ColumnStore<P>,
110    lfid: LogicalFieldId,
111    table_id: TableId,
112    on_batch: &'a mut F,
113    buffer: Vec<RowId>,
114    error: Option<llkv_result::Error>,
115}
116
117impl<'a, P, F> ConstraintRowCollector<'a, P, F>
118where
119    P: Pager<Blob = EntryHandle> + Send + Sync,
120    F: FnMut(Vec<ConstraintRecord>),
121{
122    fn flush_buffer(&mut self) -> LlkvResult<()> {
123        if self.buffer.is_empty() {
124            return Ok(());
125        }
126
127        let row_ids = mem::take(&mut self.buffer);
128        let batch =
129            self.store
130                .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
131
132        if batch.num_columns() == 0 {
133            return Ok(());
134        }
135
136        let array = batch
137            .column(0)
138            .as_any()
139            .downcast_ref::<BinaryArray>()
140            .ok_or_else(|| {
141                llkv_result::Error::Internal(
142                    "constraint metadata column stored unexpected type".into(),
143                )
144            })?;
145
146        let mut records = Vec::with_capacity(row_ids.len());
147        for (idx, row_id) in row_ids.into_iter().enumerate() {
148            if array.is_null(idx) {
149                continue;
150            }
151
152            let record = decode_constraint_record(array.value(idx))?;
153            let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
154            if table_from_id != self.table_id {
155                continue;
156            }
157            if record.constraint_id != constraint_id {
158                return Err(llkv_result::Error::Internal(
159                    "constraint metadata id mismatch".into(),
160                ));
161            }
162            records.push(record);
163        }
164
165        if !records.is_empty() {
166            (self.on_batch)(records);
167        }
168
169        Ok(())
170    }
171
172    fn finish(&mut self) -> LlkvResult<()> {
173        if let Some(err) = self.error.take() {
174            return Err(err);
175        }
176        self.flush_buffer()
177    }
178}
179
180impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
181where
182    P: Pager<Blob = EntryHandle> + Send + Sync,
183    F: FnMut(Vec<ConstraintRecord>),
184{
185    fn u64_chunk(&mut self, values: &UInt64Array) {
186        if self.error.is_some() {
187            return;
188        }
189
190        for idx in 0..values.len() {
191            let row_id = values.value(idx);
192            let (table_id, _) = decode_constraint_row_id(row_id);
193            if table_id != self.table_id {
194                continue;
195            }
196            self.buffer.push(row_id);
197            if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
198                && let Err(err) = self.flush_buffer()
199            {
200                self.error = Some(err);
201                return;
202            }
203        }
204    }
205}
206
207impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
208where
209    P: Pager<Blob = EntryHandle> + Send + Sync,
210    F: FnMut(Vec<ConstraintRecord>),
211{
212}
213
214impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
215where
216    P: Pager<Blob = EntryHandle> + Send + Sync,
217    F: FnMut(Vec<ConstraintRecord>),
218{
219}
220
221impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
222where
223    P: Pager<Blob = EntryHandle> + Send + Sync,
224    F: FnMut(Vec<ConstraintRecord>),
225{
226}
227
228// ----- Public catalog types -----
229
230/// Metadata about a table.
231///
232/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
233#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
234pub struct TableMeta {
235    /// Unique identifier for this table.
236    pub table_id: TableId,
237    /// Optional human-readable name for the table.
238    pub name: Option<String>,
239    /// When the table was created (microseconds since epoch).
240    pub created_at_micros: u64,
241    /// Bitflags for table properties (e.g., temporary, system).
242    pub flags: u32,
243    /// Schema version or modification counter.
244    pub epoch: u64,
245}
246
247/// Metadata about a column.
248///
249/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
250#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
251pub struct ColMeta {
252    /// Unique identifier for this column within its table.
253    pub col_id: u32,
254    /// Optional human-readable name for the column.
255    pub name: Option<String>,
256    /// Bitflags for column properties (e.g., nullable, indexed).
257    pub flags: u32,
258    /// Optional serialized default value for the column.
259    pub default: Option<Vec<u8>>,
260}
261
262/// Metadata about a schema.
263///
264/// Stored in the system catalog (table 0) and serialized using [`bitcode`].
265#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
266pub struct SchemaMeta {
267    /// Human-readable schema name (case-preserved).
268    pub name: String,
269    /// When the schema was created (microseconds since epoch).
270    pub created_at_micros: u64,
271    /// Bitflags for schema properties (reserved for future use).
272    pub flags: u32,
273}
274
275/// Metadata describing a single multi-column UNIQUE index.
276#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
277pub struct MultiColumnUniqueEntryMeta {
278    /// Optional human-readable index name.
279    pub index_name: Option<String>,
280    /// Field IDs participating in this UNIQUE constraint.
281    pub column_ids: Vec<u32>,
282}
283
284/// Metadata describing all multi-column UNIQUE indexes for a table.
285#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
286pub struct TableMultiColumnUniqueMeta {
287    /// Table identifier these UNIQUE entries belong to.
288    pub table_id: TableId,
289    /// Definitions of each persisted multi-column UNIQUE.
290    pub uniques: Vec<MultiColumnUniqueEntryMeta>,
291}
292
293// ----- SysCatalog -----
294
295/// Interface to the system catalog (table 0).
296///
297/// The system catalog stores metadata about all tables and columns in the database.
298/// It uses special reserved columns within table 0 to persist [`TableMeta`] and
299/// [`ColMeta`] structures.
300///
301/// # Lifetime
302///
303/// `SysCatalog` borrows a reference to the [`ColumnStore`] and does not own it.
304/// This allows multiple catalog instances to coexist with the same storage.
305pub struct SysCatalog<'a, P = MemPager>
306where
307    P: Pager<Blob = EntryHandle> + Send + Sync,
308{
309    store: &'a ColumnStore<P>,
310}
311
312impl<'a, P> SysCatalog<'a, P>
313where
314    P: Pager<Blob = EntryHandle> + Send + Sync,
315{
316    fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
317        if row_ids.is_empty() {
318            return Ok(());
319        }
320
321        let lfid_val: u64 = meta_field.into();
322        let schema = Arc::new(Schema::new(vec![
323            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
324            Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
325                crate::constants::FIELD_ID_META_KEY.to_string(),
326                lfid_val.to_string(),
327            )])),
328        ]));
329
330        let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
331        let mut builder = BinaryBuilder::new();
332        for _ in row_ids {
333            builder.append_null();
334        }
335        let meta_array = Arc::new(builder.finish());
336
337        let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
338        self.store.append(&batch)?;
339        Ok(())
340    }
341
342    /// Create a new system catalog interface using the provided column store.
343    pub fn new(store: &'a ColumnStore<P>) -> Self {
344        Self { store }
345    }
346
347    /// Insert or update table metadata.
348    ///
349    /// This persists the table's metadata to the system catalog. If metadata for
350    /// this table ID already exists, it is overwritten (last-write-wins).
351    pub fn put_table_meta(&self, meta: &TableMeta) {
352        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
353        let schema = Arc::new(Schema::new(vec![
354            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
355            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
356                crate::constants::FIELD_ID_META_KEY.to_string(),
357                lfid_val.to_string(),
358            )])),
359        ]));
360
361        let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
362        let meta_encoded = bitcode::encode(meta);
363        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
364
365        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
366        self.store.append(&batch).unwrap();
367    }
368
369    /// Retrieve table metadata by table ID.
370    ///
371    /// Returns `None` if no metadata exists for the given table ID.
372    pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
373        let row_id = rid_table(table_id);
374        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
375        let batch = self
376            .store
377            .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
378            .ok()?;
379
380        if batch.num_rows() == 0 || batch.num_columns() == 0 {
381            return None;
382        }
383
384        let array = batch
385            .column(0)
386            .as_any()
387            .downcast_ref::<BinaryArray>()
388            .expect("table meta column must be BinaryArray");
389
390        if array.is_null(0) {
391            return None;
392        }
393
394        bitcode::decode(array.value(0)).ok()
395    }
396
397    /// Upsert a single column’s metadata.
398    pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
399        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
400        let schema = Arc::new(Schema::new(vec![
401            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
402            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
403                crate::constants::FIELD_ID_META_KEY.to_string(),
404                lfid_val.to_string(),
405            )])),
406        ]));
407
408        let rid_value = rid_col(table_id, meta.col_id);
409        let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
410        let meta_encoded = bitcode::encode(meta);
411        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
412
413        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
414        self.store.append(&batch).unwrap();
415    }
416
417    /// Batch fetch specific column metas by col_id using a shared keyset.
418    pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
419        if col_ids.is_empty() {
420            return Vec::new();
421        }
422
423        let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
424        let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
425
426        let batch =
427            match self
428                .store
429                .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
430            {
431                Ok(batch) => batch,
432                Err(_) => return vec![None; col_ids.len()],
433            };
434
435        let meta_col = batch
436            .column(0)
437            .as_any()
438            .downcast_ref::<BinaryArray>()
439            .expect("catalog meta column should be Binary");
440
441        col_ids
442            .iter()
443            .enumerate()
444            .map(|(idx, _)| {
445                if meta_col.is_null(idx) {
446                    None
447                } else {
448                    bitcode::decode(meta_col.value(idx)).ok()
449                }
450            })
451            .collect()
452    }
453
454    /// Delete metadata rows for the specified column identifiers.
455    pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
456        if col_ids.is_empty() {
457            return Ok(());
458        }
459
460        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
461        let row_ids: Vec<RowId> = col_ids
462            .iter()
463            .map(|&col_id| rid_col(table_id, col_id))
464            .collect();
465        self.write_null_entries(meta_field, &row_ids)
466    }
467
468    /// Remove the persisted table metadata record, if present.
469    pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
470        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
471        let row_id = rid_table(table_id);
472        self.write_null_entries(meta_field, &[row_id])
473    }
474
475    /// Delete constraint records for the provided identifiers.
476    pub fn delete_constraint_records(
477        &self,
478        table_id: TableId,
479        constraint_ids: &[ConstraintId],
480    ) -> LlkvResult<()> {
481        if constraint_ids.is_empty() {
482            return Ok(());
483        }
484
485        let meta_field = constraint_meta_lfid();
486        let row_ids: Vec<RowId> = constraint_ids
487            .iter()
488            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
489            .collect();
490        self.write_null_entries(meta_field, &row_ids)
491    }
492
493    /// Delete persisted constraint names for the provided identifiers.
494    pub fn delete_constraint_names(
495        &self,
496        table_id: TableId,
497        constraint_ids: &[ConstraintId],
498    ) -> LlkvResult<()> {
499        if constraint_ids.is_empty() {
500            return Ok(());
501        }
502
503        let lfid = constraint_name_lfid();
504        let row_ids: Vec<RowId> = constraint_ids
505            .iter()
506            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
507            .collect();
508        self.write_null_entries(lfid, &row_ids)
509    }
510
511    /// Delete the multi-column UNIQUE metadata record for a table, if any.
512    pub fn delete_multi_column_uniques(&self, table_id: TableId) -> LlkvResult<()> {
513        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
514        let row_id = rid_table(table_id);
515        self.write_null_entries(meta_field, &[row_id])
516    }
517
518    /// Persist the complete set of multi-column UNIQUE definitions for a table.
519    pub fn put_multi_column_uniques(
520        &self,
521        table_id: TableId,
522        uniques: &[MultiColumnUniqueEntryMeta],
523    ) -> LlkvResult<()> {
524        let lfid_val: u64 =
525            lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
526        let schema = Arc::new(Schema::new(vec![
527            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
528            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
529                crate::constants::FIELD_ID_META_KEY.to_string(),
530                lfid_val.to_string(),
531            )])),
532        ]));
533
534        let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
535        let meta = TableMultiColumnUniqueMeta {
536            table_id,
537            uniques: uniques.to_vec(),
538        };
539        let encoded = bitcode::encode(&meta);
540        let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
541
542        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
543        self.store.append(&batch)?;
544        Ok(())
545    }
546
547    /// Retrieve all persisted multi-column UNIQUE definitions for a table.
548    pub fn get_multi_column_uniques(
549        &self,
550        table_id: TableId,
551    ) -> LlkvResult<Vec<MultiColumnUniqueEntryMeta>> {
552        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
553        let row_id = rid_table(table_id);
554        let batch = match self
555            .store
556            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
557        {
558            Ok(batch) => batch,
559            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
560            Err(err) => return Err(err),
561        };
562
563        if batch.num_columns() == 0 || batch.num_rows() == 0 {
564            return Ok(Vec::new());
565        }
566
567        let array = batch
568            .column(0)
569            .as_any()
570            .downcast_ref::<BinaryArray>()
571            .ok_or_else(|| {
572                llkv_result::Error::Internal(
573                    "catalog multi-column unique column stored unexpected type".into(),
574                )
575            })?;
576
577        if array.is_null(0) {
578            return Ok(Vec::new());
579        }
580
581        let meta: TableMultiColumnUniqueMeta = bitcode::decode(array.value(0)).map_err(|err| {
582            llkv_result::Error::Internal(format!(
583                "failed to decode multi-column unique metadata: {err}"
584            ))
585        })?;
586
587        Ok(meta.uniques)
588    }
589
590    /// Retrieve all persisted multi-column UNIQUE definitions across tables.
591    pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnUniqueMeta>> {
592        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
593        let row_field = rowid_fid(meta_field);
594
595        struct RowIdCollector {
596            row_ids: Vec<RowId>,
597        }
598
599        impl PrimitiveVisitor for RowIdCollector {
600            fn u64_chunk(&mut self, values: &UInt64Array) {
601                for i in 0..values.len() {
602                    self.row_ids.push(values.value(i));
603                }
604            }
605        }
606        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
607        impl PrimitiveSortedVisitor for RowIdCollector {}
608        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
609
610        let mut collector = RowIdCollector {
611            row_ids: Vec::new(),
612        };
613        match ScanBuilder::new(self.store, row_field)
614            .options(ScanOptions::default())
615            .run(&mut collector)
616        {
617            Ok(()) => {}
618            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
619            Err(err) => return Err(err),
620        }
621
622        if collector.row_ids.is_empty() {
623            return Ok(Vec::new());
624        }
625
626        let batch = match self.store.gather_rows(
627            &[meta_field],
628            &collector.row_ids,
629            GatherNullPolicy::IncludeNulls,
630        ) {
631            Ok(batch) => batch,
632            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
633            Err(err) => return Err(err),
634        };
635
636        if batch.num_columns() == 0 {
637            return Ok(Vec::new());
638        }
639
640        let array = batch
641            .column(0)
642            .as_any()
643            .downcast_ref::<BinaryArray>()
644            .ok_or_else(|| {
645                llkv_result::Error::Internal(
646                    "catalog multi-column unique column stored unexpected type".into(),
647                )
648            })?;
649
650        let mut metas = Vec::with_capacity(batch.num_rows());
651        for idx in 0..batch.num_rows() {
652            if array.is_null(idx) {
653                continue;
654            }
655            let meta: TableMultiColumnUniqueMeta =
656                bitcode::decode(array.value(idx)).map_err(|err| {
657                    llkv_result::Error::Internal(format!(
658                        "failed to decode multi-column unique metadata: {err}"
659                    ))
660                })?;
661            metas.push(meta);
662        }
663
664        Ok(metas)
665    }
666
667    /// Persist or update multiple constraint records for a table in a single batch.
668    pub fn put_constraint_records(
669        &self,
670        table_id: TableId,
671        records: &[ConstraintRecord],
672    ) -> LlkvResult<()> {
673        if records.is_empty() {
674            return Ok(());
675        }
676
677        let lfid_val: u64 = constraint_meta_lfid().into();
678        let schema = Arc::new(Schema::new(vec![
679            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
680            Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
681                crate::constants::FIELD_ID_META_KEY.to_string(),
682                lfid_val.to_string(),
683            )])),
684        ]));
685
686        let row_ids: Vec<RowId> = records
687            .iter()
688            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
689            .collect();
690
691        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
692        let payload_array = Arc::new(BinaryArray::from_iter_values(
693            records.iter().map(bitcode::encode),
694        ));
695
696        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
697        self.store.append(&batch)?;
698        Ok(())
699    }
700
701    /// Persist or update constraint names for the specified identifiers.
702    pub fn put_constraint_names(
703        &self,
704        table_id: TableId,
705        names: &[ConstraintNameRecord],
706    ) -> LlkvResult<()> {
707        if names.is_empty() {
708            return Ok(());
709        }
710
711        let lfid_val: u64 = constraint_name_lfid().into();
712        let schema = Arc::new(Schema::new(vec![
713            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
714            Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
715                (
716                    crate::constants::FIELD_ID_META_KEY.to_string(),
717                    lfid_val.to_string(),
718                ),
719            ])),
720        ]));
721
722        let row_ids: Vec<RowId> = names
723            .iter()
724            .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
725            .collect();
726        let row_ids_array = Arc::new(UInt64Array::from(row_ids));
727        let payload_array = Arc::new(BinaryArray::from_iter_values(
728            names.iter().map(bitcode::encode),
729        ));
730
731        let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
732        self.store.append(&batch)?;
733        Ok(())
734    }
735
736    /// Fetch multiple constraint records for a table in a single batch.
737    pub fn get_constraint_records(
738        &self,
739        table_id: TableId,
740        constraint_ids: &[ConstraintId],
741    ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
742        if constraint_ids.is_empty() {
743            return Ok(Vec::new());
744        }
745
746        let lfid = constraint_meta_lfid();
747        let row_ids: Vec<RowId> = constraint_ids
748            .iter()
749            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
750            .collect();
751
752        let batch = match self
753            .store
754            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
755        {
756            Ok(batch) => batch,
757            Err(llkv_result::Error::NotFound) => {
758                return Ok(vec![None; constraint_ids.len()]);
759            }
760            Err(err) => return Err(err),
761        };
762
763        if batch.num_columns() == 0 || batch.num_rows() == 0 {
764            return Ok(vec![None; constraint_ids.len()]);
765        }
766
767        let array = batch
768            .column(0)
769            .as_any()
770            .downcast_ref::<BinaryArray>()
771            .ok_or_else(|| {
772                llkv_result::Error::Internal(
773                    "constraint metadata column stored unexpected type".into(),
774                )
775            })?;
776
777        let mut results = Vec::with_capacity(constraint_ids.len());
778        for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
779            if array.is_null(idx) {
780                results.push(None);
781                continue;
782            }
783            let record = decode_constraint_record(array.value(idx))?;
784            if record.constraint_id != constraint_id {
785                return Err(llkv_result::Error::Internal(
786                    "constraint metadata id mismatch".into(),
787                ));
788            }
789            results.push(Some(record));
790        }
791
792        Ok(results)
793    }
794
795    /// Fetch constraint names for a table in a single batch.
796    pub fn get_constraint_names(
797        &self,
798        table_id: TableId,
799        constraint_ids: &[ConstraintId],
800    ) -> LlkvResult<Vec<Option<String>>> {
801        if constraint_ids.is_empty() {
802            return Ok(Vec::new());
803        }
804
805        let lfid = constraint_name_lfid();
806        let row_ids: Vec<RowId> = constraint_ids
807            .iter()
808            .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
809            .collect();
810
811        let batch = match self
812            .store
813            .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
814        {
815            Ok(batch) => batch,
816            Err(llkv_result::Error::NotFound) => {
817                return Ok(vec![None; constraint_ids.len()]);
818            }
819            Err(err) => return Err(err),
820        };
821
822        if batch.num_columns() == 0 {
823            return Ok(vec![None; constraint_ids.len()]);
824        }
825
826        let array = batch
827            .column(0)
828            .as_any()
829            .downcast_ref::<BinaryArray>()
830            .ok_or_else(|| {
831                llkv_result::Error::Internal(
832                    "constraint name metadata column stored unexpected type".into(),
833                )
834            })?;
835
836        let mut results = Vec::with_capacity(row_ids.len());
837        for idx in 0..row_ids.len() {
838            if array.is_null(idx) {
839                results.push(None);
840            } else {
841                let record: ConstraintNameRecord =
842                    bitcode::decode(array.value(idx)).map_err(|err| {
843                        llkv_result::Error::Internal(format!(
844                            "failed to decode constraint name metadata: {err}"
845                        ))
846                    })?;
847                results.push(record.name);
848            }
849        }
850
851        Ok(results)
852    }
853
854    /// Stream constraint records for a table in batches.
855    pub fn scan_constraint_records_for_table<F>(
856        &self,
857        table_id: TableId,
858        mut on_batch: F,
859    ) -> LlkvResult<()>
860    where
861        F: FnMut(Vec<ConstraintRecord>),
862    {
863        let row_field = constraint_row_lfid();
864        let mut visitor = ConstraintRowCollector {
865            store: self.store,
866            lfid: constraint_meta_lfid(),
867            table_id,
868            on_batch: &mut on_batch,
869            buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
870            error: None,
871        };
872
873        match ScanBuilder::new(self.store, row_field)
874            .options(ScanOptions::default())
875            .run(&mut visitor)
876        {
877            Ok(()) => {}
878            Err(llkv_result::Error::NotFound) => return Ok(()),
879            Err(err) => return Err(err),
880        }
881
882        visitor.finish()
883    }
884
885    /// Load all constraint records for a table into a vector.
886    pub fn constraint_records_for_table(
887        &self,
888        table_id: TableId,
889    ) -> LlkvResult<Vec<ConstraintRecord>> {
890        let mut all = Vec::new();
891        self.scan_constraint_records_for_table(table_id, |mut chunk| {
892            all.append(&mut chunk);
893        })?;
894        Ok(all)
895    }
896
897    pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
898        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
899        let schema = Arc::new(Schema::new(vec![
900            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
901            Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
902                crate::constants::FIELD_ID_META_KEY.to_string(),
903                lfid_val.to_string(),
904            )])),
905        ]));
906
907        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
908        let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
909        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
910        self.store.append(&batch)?;
911        Ok(())
912    }
913
914    pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
915        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
916        let batch = match self.store.gather_rows(
917            &[lfid],
918            &[CATALOG_NEXT_TABLE_ROW_ID],
919            GatherNullPolicy::IncludeNulls,
920        ) {
921            Ok(batch) => batch,
922            Err(llkv_result::Error::NotFound) => return Ok(None),
923            Err(err) => return Err(err),
924        };
925
926        if batch.num_columns() == 0 || batch.num_rows() == 0 {
927            return Ok(None);
928        }
929
930        let array = batch
931            .column(0)
932            .as_any()
933            .downcast_ref::<UInt64Array>()
934            .ok_or_else(|| {
935                llkv_result::Error::Internal(
936                    "catalog next_table_id column stored unexpected type".into(),
937                )
938            })?;
939        if array.is_empty() || array.is_null(0) {
940            return Ok(None);
941        }
942
943        let value = array.value(0);
944        if value > TableId::MAX as u64 {
945            return Err(llkv_result::Error::InvalidArgumentError(
946                "persisted next_table_id exceeds TableId range".into(),
947            ));
948        }
949
950        Ok(Some(value as TableId))
951    }
952
953    pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
954        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
955        let row_field = rowid_fid(meta_field);
956
957        let mut collector = MaxRowIdCollector { max: None };
958        match ScanBuilder::new(self.store, row_field)
959            .options(ScanOptions::default())
960            .run(&mut collector)
961        {
962            Ok(()) => {}
963            Err(llkv_result::Error::NotFound) => return Ok(None),
964            Err(err) => return Err(err),
965        }
966
967        let max_value = match collector.max {
968            Some(value) => value,
969            None => return Ok(None),
970        };
971
972        let logical: LogicalFieldId = max_value.into();
973        Ok(Some(logical.table_id()))
974    }
975
976    /// Scan all table metadata entries from the catalog.
977    /// Returns a vector of (table_id, TableMeta) pairs for all persisted tables.
978    ///
979    /// This method first scans for all row IDs in the table metadata column,
980    /// then uses gather_rows to retrieve the actual metadata.
981    pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
982        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
983        let row_field = rowid_fid(meta_field);
984
985        // Collect all row IDs that have table metadata
986        struct RowIdCollector {
987            row_ids: Vec<RowId>,
988        }
989
990        impl PrimitiveVisitor for RowIdCollector {
991            fn u64_chunk(&mut self, values: &UInt64Array) {
992                for i in 0..values.len() {
993                    self.row_ids.push(values.value(i));
994                }
995            }
996        }
997        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
998        impl PrimitiveSortedVisitor for RowIdCollector {}
999        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1000
1001        let mut collector = RowIdCollector {
1002            row_ids: Vec::new(),
1003        };
1004        match ScanBuilder::new(self.store, row_field)
1005            .options(ScanOptions::default())
1006            .run(&mut collector)
1007        {
1008            Ok(()) => {}
1009            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1010            Err(err) => return Err(err),
1011        }
1012
1013        if collector.row_ids.is_empty() {
1014            return Ok(Vec::new());
1015        }
1016
1017        // Gather all table metadata using the collected row IDs
1018        let batch = self.store.gather_rows(
1019            &[meta_field],
1020            &collector.row_ids,
1021            GatherNullPolicy::IncludeNulls,
1022        )?;
1023
1024        let meta_col = batch
1025            .column(0)
1026            .as_any()
1027            .downcast_ref::<BinaryArray>()
1028            .ok_or_else(|| {
1029                llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1030            })?;
1031
1032        let mut result = Vec::new();
1033        for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1034            if !meta_col.is_null(idx) {
1035                let bytes = meta_col.value(idx);
1036                if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1037                    let logical: LogicalFieldId = row_id.into();
1038                    let table_id = logical.table_id();
1039                    result.push((table_id, meta));
1040                }
1041            }
1042        }
1043
1044        Ok(result)
1045    }
1046
1047    /// Persist the next transaction id to the catalog.
1048    pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1049        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1050        let schema = Arc::new(Schema::new(vec![
1051            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1052            Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1053                crate::constants::FIELD_ID_META_KEY.to_string(),
1054                lfid_val.to_string(),
1055            )])),
1056        ]));
1057
1058        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1059        let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1060        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1061        self.store.append(&batch)?;
1062        Ok(())
1063    }
1064
1065    /// Load the next transaction id from the catalog.
1066    pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1067        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1068        let batch = match self.store.gather_rows(
1069            &[lfid],
1070            &[CATALOG_NEXT_TXN_ROW_ID],
1071            GatherNullPolicy::IncludeNulls,
1072        ) {
1073            Ok(batch) => batch,
1074            Err(llkv_result::Error::NotFound) => return Ok(None),
1075            Err(err) => return Err(err),
1076        };
1077
1078        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1079            return Ok(None);
1080        }
1081
1082        let array = batch
1083            .column(0)
1084            .as_any()
1085            .downcast_ref::<UInt64Array>()
1086            .ok_or_else(|| {
1087                llkv_result::Error::Internal(
1088                    "catalog next_txn_id column stored unexpected type".into(),
1089                )
1090            })?;
1091        if array.is_empty() || array.is_null(0) {
1092            return Ok(None);
1093        }
1094
1095        let value = array.value(0);
1096        Ok(Some(value))
1097    }
1098
1099    /// Persist the last committed transaction id to the catalog.
1100    pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1101        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1102        let schema = Arc::new(Schema::new(vec![
1103            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1104            Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1105                HashMap::from([(
1106                    crate::constants::FIELD_ID_META_KEY.to_string(),
1107                    lfid_val.to_string(),
1108                )]),
1109            ),
1110        ]));
1111
1112        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1113        let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1114        let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1115        self.store.append(&batch)?;
1116        Ok(())
1117    }
1118
1119    /// Load the last committed transaction id from the catalog.
1120    pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1121        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1122        let batch = match self.store.gather_rows(
1123            &[lfid],
1124            &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1125            GatherNullPolicy::IncludeNulls,
1126        ) {
1127            Ok(batch) => batch,
1128            Err(llkv_result::Error::NotFound) => return Ok(None),
1129            Err(err) => return Err(err),
1130        };
1131
1132        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1133            return Ok(None);
1134        }
1135
1136        let array = batch
1137            .column(0)
1138            .as_any()
1139            .downcast_ref::<UInt64Array>()
1140            .ok_or_else(|| {
1141                llkv_result::Error::Internal(
1142                    "catalog last_committed_txn_id column stored unexpected type".into(),
1143                )
1144            })?;
1145        if array.is_empty() || array.is_null(0) {
1146            return Ok(None);
1147        }
1148
1149        let value = array.value(0);
1150        Ok(Some(value))
1151    }
1152
1153    /// Persist the catalog state to the system catalog.
1154    ///
1155    /// Stores the complete catalog state (all tables and fields) as a binary blob
1156    /// using bitcode serialization.
1157    pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1158        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1159        let schema = Arc::new(Schema::new(vec![
1160            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1161            Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1162                crate::constants::FIELD_ID_META_KEY.to_string(),
1163                lfid_val.to_string(),
1164            )])),
1165        ]));
1166
1167        let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1168        let encoded = bitcode::encode(state);
1169        let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1170
1171        let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1172        self.store.append(&batch)?;
1173        Ok(())
1174    }
1175
1176    /// Load the catalog state from the system catalog.
1177    ///
1178    /// Retrieves the complete catalog state including all table and field mappings.
1179    pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1180        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1181        let batch = match self.store.gather_rows(
1182            &[lfid],
1183            &[CATALOG_STATE_ROW_ID],
1184            GatherNullPolicy::IncludeNulls,
1185        ) {
1186            Ok(batch) => batch,
1187            Err(llkv_result::Error::NotFound) => return Ok(None),
1188            Err(err) => return Err(err),
1189        };
1190
1191        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1192            return Ok(None);
1193        }
1194
1195        let array = batch
1196            .column(0)
1197            .as_any()
1198            .downcast_ref::<BinaryArray>()
1199            .ok_or_else(|| {
1200                llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1201            })?;
1202        if array.is_empty() || array.is_null(0) {
1203            return Ok(None);
1204        }
1205
1206        let bytes = array.value(0);
1207        let state = bitcode::decode(bytes).map_err(|e| {
1208            llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1209        })?;
1210        Ok(Some(state))
1211    }
1212
1213    /// Persist schema metadata to the catalog.
1214    ///
1215    /// Stores schema metadata at a row ID derived from the schema name hash.
1216    /// This allows efficient lookup and prevents collisions.
1217    pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1218        let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1219        let schema = Arc::new(Schema::new(vec![
1220            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1221            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1222                crate::constants::FIELD_ID_META_KEY.to_string(),
1223                lfid_val.to_string(),
1224            )])),
1225        ]));
1226
1227        // Use hash of canonical (lowercase) schema name as row ID
1228        let canonical = meta.name.to_ascii_lowercase();
1229        let row_id_val = schema_name_to_row_id(&canonical);
1230        let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1231        let meta_encoded = bitcode::encode(meta);
1232        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1233
1234        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1235        self.store.append(&batch)?;
1236        Ok(())
1237    }
1238
1239    /// Retrieve schema metadata by name.
1240    ///
1241    /// Returns `None` if the schema does not exist.
1242    pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1243        let canonical = schema_name.to_ascii_lowercase();
1244        let row_id = schema_name_to_row_id(&canonical);
1245        let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1246
1247        let batch = match self
1248            .store
1249            .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1250        {
1251            Ok(batch) => batch,
1252            Err(llkv_result::Error::NotFound) => return Ok(None),
1253            Err(err) => return Err(err),
1254        };
1255
1256        if batch.num_columns() == 0 || batch.num_rows() == 0 {
1257            return Ok(None);
1258        }
1259
1260        let array = batch
1261            .column(0)
1262            .as_any()
1263            .downcast_ref::<BinaryArray>()
1264            .ok_or_else(|| {
1265                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1266            })?;
1267
1268        if array.is_empty() || array.is_null(0) {
1269            return Ok(None);
1270        }
1271
1272        let bytes = array.value(0);
1273        let meta = bitcode::decode(bytes).map_err(|e| {
1274            llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1275        })?;
1276        Ok(Some(meta))
1277    }
1278
1279    /// Scan all schema metadata entries from the catalog.
1280    ///
1281    /// Returns a vector of all persisted schemas.
1282    pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1283        let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1284        let row_field = rowid_fid(meta_field);
1285
1286        // Collect all row IDs that have schema metadata
1287        struct RowIdCollector {
1288            row_ids: Vec<RowId>,
1289        }
1290
1291        impl PrimitiveVisitor for RowIdCollector {
1292            fn u64_chunk(&mut self, values: &UInt64Array) {
1293                for i in 0..values.len() {
1294                    self.row_ids.push(values.value(i));
1295                }
1296            }
1297        }
1298        impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1299        impl PrimitiveSortedVisitor for RowIdCollector {}
1300        impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1301
1302        let mut collector = RowIdCollector {
1303            row_ids: Vec::new(),
1304        };
1305        match ScanBuilder::new(self.store, row_field)
1306            .options(ScanOptions::default())
1307            .run(&mut collector)
1308        {
1309            Ok(()) => {}
1310            Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1311            Err(err) => return Err(err),
1312        }
1313
1314        if collector.row_ids.is_empty() {
1315            return Ok(Vec::new());
1316        }
1317
1318        // Gather all schema metadata using the collected row IDs
1319        let batch = self.store.gather_rows(
1320            &[meta_field],
1321            &collector.row_ids,
1322            GatherNullPolicy::IncludeNulls,
1323        )?;
1324
1325        let meta_col = batch
1326            .column(0)
1327            .as_any()
1328            .downcast_ref::<BinaryArray>()
1329            .ok_or_else(|| {
1330                llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1331            })?;
1332
1333        let mut result = Vec::new();
1334        for idx in 0..collector.row_ids.len() {
1335            if !meta_col.is_null(idx) {
1336                let bytes = meta_col.value(idx);
1337                if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1338                    result.push(meta);
1339                }
1340            }
1341        }
1342
1343        Ok(result)
1344    }
1345}
1346
1347/// Generate a row ID for schema metadata based on schema name.
1348///
1349/// Uses a simple hash to map schema names to row IDs. This is deterministic
1350/// and allows direct lookup without scanning.
1351fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1352    // Use a simple 64-bit FNV-1a hash for deterministic IDs across platforms and releases
1353    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1354    const FNV_PRIME: u64 = 0x1000_0000_01b3;
1355
1356    let mut hash = FNV_OFFSET;
1357    for byte in canonical_name.as_bytes() {
1358        hash ^= u64::from(*byte);
1359        hash = hash.wrapping_mul(FNV_PRIME);
1360    }
1361
1362    // Use high bit to avoid collision with reserved catalog row IDs (0-3) and table metadata rows
1363    hash | (1u64 << 63)
1364}
1365
1366struct MaxRowIdCollector {
1367    max: Option<RowId>,
1368}
1369
1370impl PrimitiveVisitor for MaxRowIdCollector {
1371    fn u64_chunk(&mut self, values: &UInt64Array) {
1372        for i in 0..values.len() {
1373            let value = values.value(i);
1374            self.max = match self.max {
1375                Some(curr) if curr >= value => Some(curr),
1376                _ => Some(value),
1377            };
1378        }
1379    }
1380}
1381
1382impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1383impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1384impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1385
1386#[cfg(test)]
1387mod tests {
1388    use super::*;
1389    use crate::constraints::{
1390        ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1391    };
1392    use llkv_column_map::ColumnStore;
1393    use std::sync::Arc;
1394
1395    #[test]
1396    fn constraint_records_roundtrip() {
1397        let pager = Arc::new(MemPager::default());
1398        let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1399        let catalog = SysCatalog::new(&store);
1400
1401        let table_id: TableId = 42;
1402        let record1 = ConstraintRecord {
1403            constraint_id: 1,
1404            kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1405                field_ids: vec![1, 2],
1406            }),
1407            state: ConstraintState::Active,
1408            revision: 1,
1409            last_modified_micros: 100,
1410        };
1411        let record2 = ConstraintRecord {
1412            constraint_id: 2,
1413            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1414            state: ConstraintState::Active,
1415            revision: 2,
1416            last_modified_micros: 200,
1417        };
1418        catalog
1419            .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1420            .unwrap();
1421
1422        let other_table_record = ConstraintRecord {
1423            constraint_id: 1,
1424            kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1425            state: ConstraintState::Active,
1426            revision: 1,
1427            last_modified_micros: 150,
1428        };
1429        catalog
1430            .put_constraint_records(7, &[other_table_record])
1431            .unwrap();
1432
1433        let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1434        fetched.sort_by_key(|record| record.constraint_id);
1435
1436        assert_eq!(fetched.len(), 2);
1437        assert_eq!(fetched[0], record1);
1438        assert_eq!(fetched[1], record2);
1439
1440        let single = catalog
1441            .get_constraint_records(table_id, &[record1.constraint_id])
1442            .unwrap();
1443        assert_eq!(single.len(), 1);
1444        assert_eq!(single[0].as_ref(), Some(&record1));
1445
1446        let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1447        assert_eq!(missing.len(), 1);
1448        assert!(missing[0].is_none());
1449    }
1450}