1use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37 ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, RowId, TableId};
40use llkv_column_map::store::scan::{
41 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::{
46 ColumnStore,
47 store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
48};
49use llkv_result::{self, Result as LlkvResult};
50use llkv_storage::pager::{MemPager, Pager};
51use llkv_types::{LogicalFieldId, lfid, rid_col, rid_table};
52use simd_r_drive_entry_handle::EntryHandle;
53
54use crate::reserved::*;
56
57#[inline]
58fn constraint_meta_lfid() -> LogicalFieldId {
59 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
60}
61
62#[inline]
63fn constraint_name_lfid() -> LogicalFieldId {
64 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
65}
66
67#[inline]
68fn constraint_row_lfid() -> LogicalFieldId {
69 rowid_fid(constraint_meta_lfid())
70}
71
72#[derive(Clone, Debug, Encode, Decode)]
73pub struct ConstraintNameRecord {
74 pub constraint_id: ConstraintId,
75 pub name: Option<String>,
76}
77
78fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
79 bitcode::decode(bytes).map_err(|err| {
80 llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
81 })
82}
83
84struct ConstraintRowCollector<'a, P, F>
85where
86 P: Pager<Blob = EntryHandle> + Send + Sync,
87 F: FnMut(Vec<ConstraintRecord>),
88{
89 store: &'a ColumnStore<P>,
90 lfid: LogicalFieldId,
91 table_id: TableId,
92 on_batch: &'a mut F,
93 buffer: Vec<RowId>,
94 error: Option<llkv_result::Error>,
95}
96
97impl<'a, P, F> ConstraintRowCollector<'a, P, F>
98where
99 P: Pager<Blob = EntryHandle> + Send + Sync,
100 F: FnMut(Vec<ConstraintRecord>),
101{
102 fn flush_buffer(&mut self) -> LlkvResult<()> {
103 if self.buffer.is_empty() {
104 return Ok(());
105 }
106
107 let row_ids = mem::take(&mut self.buffer);
108 let batch =
109 self.store
110 .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
111
112 if batch.num_columns() == 0 {
113 return Ok(());
114 }
115
116 let array = batch
117 .column(0)
118 .as_any()
119 .downcast_ref::<BinaryArray>()
120 .ok_or_else(|| {
121 llkv_result::Error::Internal(
122 "constraint metadata column stored unexpected type".into(),
123 )
124 })?;
125
126 let mut records = Vec::with_capacity(row_ids.len());
127 for (idx, row_id) in row_ids.into_iter().enumerate() {
128 if array.is_null(idx) {
129 continue;
130 }
131
132 let record = decode_constraint_record(array.value(idx))?;
133 let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
134 if table_from_id != self.table_id {
135 continue;
136 }
137 if record.constraint_id != constraint_id {
138 return Err(llkv_result::Error::Internal(
139 "constraint metadata id mismatch".into(),
140 ));
141 }
142 records.push(record);
143 }
144
145 if !records.is_empty() {
146 (self.on_batch)(records);
147 }
148
149 Ok(())
150 }
151
152 fn finish(&mut self) -> LlkvResult<()> {
153 if let Some(err) = self.error.take() {
154 return Err(err);
155 }
156 self.flush_buffer()
157 }
158}
159
160impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
161where
162 P: Pager<Blob = EntryHandle> + Send + Sync,
163 F: FnMut(Vec<ConstraintRecord>),
164{
165 fn u64_chunk(&mut self, values: &UInt64Array) {
166 if self.error.is_some() {
167 return;
168 }
169
170 for idx in 0..values.len() {
171 let row_id = values.value(idx);
172 let (table_id, _) = decode_constraint_row_id(row_id);
173 if table_id != self.table_id {
174 continue;
175 }
176 self.buffer.push(row_id);
177 if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
178 && let Err(err) = self.flush_buffer()
179 {
180 self.error = Some(err);
181 return;
182 }
183 }
184 }
185}
186
187impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
188where
189 P: Pager<Blob = EntryHandle> + Send + Sync,
190 F: FnMut(Vec<ConstraintRecord>),
191{
192}
193
194impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
195where
196 P: Pager<Blob = EntryHandle> + Send + Sync,
197 F: FnMut(Vec<ConstraintRecord>),
198{
199}
200
201impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
202where
203 P: Pager<Blob = EntryHandle> + Send + Sync,
204 F: FnMut(Vec<ConstraintRecord>),
205{
206}
207
208#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
214pub struct TableMeta {
215 pub table_id: TableId,
217 pub name: Option<String>,
219 pub created_at_micros: u64,
221 pub flags: u32,
223 pub epoch: u64,
225 pub view_definition: Option<String>,
228}
229
230#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
234pub struct ColMeta {
235 pub col_id: u32,
237 pub name: Option<String>,
239 pub flags: u32,
241 pub default: Option<Vec<u8>>,
243}
244
245#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
249pub struct SchemaMeta {
250 pub name: String,
252 pub created_at_micros: u64,
254 pub flags: u32,
256}
257
258#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
263pub struct CustomTypeMeta {
264 pub name: String,
266 pub base_type_sql: String,
269 pub created_at_micros: u64,
271}
272
273#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
277pub struct MultiColumnIndexEntryMeta {
278 pub index_name: Option<String>,
280 pub canonical_name: String,
282 pub column_ids: Vec<FieldId>,
284 pub unique: bool,
286}
287
288#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
290pub struct SingleColumnIndexEntryMeta {
291 pub index_name: String,
293 pub canonical_name: String,
295 pub column_id: FieldId,
297 pub column_name: String,
299 pub unique: bool,
301 pub ascending: bool,
303 pub nulls_first: bool,
305}
306
307#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
309pub struct TableSingleColumnIndexMeta {
310 pub table_id: TableId,
312 pub indexes: Vec<SingleColumnIndexEntryMeta>,
314}
315
316#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
318pub struct TableMultiColumnIndexMeta {
319 pub table_id: TableId,
321 pub indexes: Vec<MultiColumnIndexEntryMeta>,
323}
324
325#[derive(Encode, Decode, Clone, Copy, Debug, PartialEq, Eq)]
327pub enum TriggerTimingMeta {
328 Before,
329 After,
330 InsteadOf,
331}
332
333#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
335pub enum TriggerEventMeta {
336 Insert,
337 Update { columns: Vec<String> },
338 Delete,
339}
340
341#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
343pub struct TriggerEntryMeta {
344 pub name: String,
346 pub canonical_name: String,
348 pub timing: TriggerTimingMeta,
350 pub event: TriggerEventMeta,
352 pub for_each_row: bool,
354 pub condition: Option<String>,
356 pub body_sql: String,
358}
359
360#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
362pub struct TableTriggerMeta {
363 pub table_id: TableId,
365 pub triggers: Vec<TriggerEntryMeta>,
367}
368
369pub struct SysCatalog<'a, P = MemPager>
382where
383 P: Pager<Blob = EntryHandle> + Send + Sync,
384{
385 store: &'a ColumnStore<P>,
386}
387
388impl<'a, P> SysCatalog<'a, P>
389where
390 P: Pager<Blob = EntryHandle> + Send + Sync,
391{
392 fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
393 if row_ids.is_empty() {
394 return Ok(());
395 }
396
397 let lfid_val: u64 = meta_field.into();
398 let schema = Arc::new(Schema::new(vec![
399 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
400 Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
401 crate::constants::FIELD_ID_META_KEY.to_string(),
402 lfid_val.to_string(),
403 )])),
404 ]));
405
406 let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
407 let mut builder = BinaryBuilder::new();
408 for _ in row_ids {
409 builder.append_null();
410 }
411 let meta_array = Arc::new(builder.finish());
412
413 let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
414 self.store.append(&batch)?;
415 Ok(())
416 }
417
418 pub fn new(store: &'a ColumnStore<P>) -> Self {
420 Self { store }
421 }
422
423 pub fn put_table_meta(&self, meta: &TableMeta) {
428 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
429 let schema = Arc::new(Schema::new(vec![
430 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
431 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
432 crate::constants::FIELD_ID_META_KEY.to_string(),
433 lfid_val.to_string(),
434 )])),
435 ]));
436
437 let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
438 let meta_encoded = bitcode::encode(meta);
439 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
440
441 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
442 self.store.append(&batch).unwrap();
443 }
444
445 pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
449 let row_id = rid_table(table_id);
450 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
451 let batch = self
452 .store
453 .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
454 .ok()?;
455
456 if batch.num_rows() == 0 || batch.num_columns() == 0 {
457 return None;
458 }
459
460 let array = batch
461 .column(0)
462 .as_any()
463 .downcast_ref::<BinaryArray>()
464 .expect("table meta column must be BinaryArray");
465
466 if array.is_null(0) {
467 return None;
468 }
469
470 bitcode::decode(array.value(0)).ok()
471 }
472
473 pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
475 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
476 let schema = Arc::new(Schema::new(vec![
477 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
478 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
479 crate::constants::FIELD_ID_META_KEY.to_string(),
480 lfid_val.to_string(),
481 )])),
482 ]));
483
484 let rid_value = rid_col(table_id, meta.col_id);
485 let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
486 let meta_encoded = bitcode::encode(meta);
487 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
488
489 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
490 self.store.append(&batch).unwrap();
491 }
492
493 pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
495 if col_ids.is_empty() {
496 return Vec::new();
497 }
498
499 let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
500 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
501
502 let batch =
503 match self
504 .store
505 .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
506 {
507 Ok(batch) => batch,
508 Err(_) => return vec![None; col_ids.len()],
509 };
510
511 let meta_col = batch
512 .column(0)
513 .as_any()
514 .downcast_ref::<BinaryArray>()
515 .expect("catalog meta column should be Binary");
516
517 col_ids
518 .iter()
519 .enumerate()
520 .map(|(idx, _)| {
521 if meta_col.is_null(idx) {
522 None
523 } else {
524 bitcode::decode(meta_col.value(idx)).ok()
525 }
526 })
527 .collect()
528 }
529
530 pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
532 if col_ids.is_empty() {
533 return Ok(());
534 }
535
536 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
537 let row_ids: Vec<RowId> = col_ids
538 .iter()
539 .map(|&col_id| rid_col(table_id, col_id))
540 .collect();
541 self.write_null_entries(meta_field, &row_ids)
542 }
543
544 pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
546 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
547 let row_id = rid_table(table_id);
548 self.write_null_entries(meta_field, &[row_id])
549 }
550
551 pub fn delete_constraint_records(
553 &self,
554 table_id: TableId,
555 constraint_ids: &[ConstraintId],
556 ) -> LlkvResult<()> {
557 if constraint_ids.is_empty() {
558 return Ok(());
559 }
560
561 let meta_field = constraint_meta_lfid();
562 let row_ids: Vec<RowId> = constraint_ids
563 .iter()
564 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
565 .collect();
566 self.write_null_entries(meta_field, &row_ids)
567 }
568
569 pub fn delete_constraint_names(
571 &self,
572 table_id: TableId,
573 constraint_ids: &[ConstraintId],
574 ) -> LlkvResult<()> {
575 if constraint_ids.is_empty() {
576 return Ok(());
577 }
578
579 let lfid = constraint_name_lfid();
580 let row_ids: Vec<RowId> = constraint_ids
581 .iter()
582 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
583 .collect();
584 self.write_null_entries(lfid, &row_ids)
585 }
586
587 pub fn delete_multi_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
589 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
590 let row_id = rid_table(table_id);
591 self.write_null_entries(meta_field, &[row_id])
592 }
593
594 pub fn delete_single_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
596 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
597 let row_id = rid_table(table_id);
598 self.write_null_entries(meta_field, &[row_id])
599 }
600
601 pub fn put_multi_column_indexes(
603 &self,
604 table_id: TableId,
605 indexes: &[MultiColumnIndexEntryMeta],
606 ) -> LlkvResult<()> {
607 let lfid_val: u64 =
608 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
609 let schema = Arc::new(Schema::new(vec![
610 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
611 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
612 crate::constants::FIELD_ID_META_KEY.to_string(),
613 lfid_val.to_string(),
614 )])),
615 ]));
616
617 let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
618 let meta = TableMultiColumnIndexMeta {
619 table_id,
620 indexes: indexes.to_vec(),
621 };
622 let encoded = bitcode::encode(&meta);
623 let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
624
625 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
626 self.store.append(&batch)?;
627 Ok(())
628 }
629
630 pub fn put_single_column_indexes(
632 &self,
633 table_id: TableId,
634 indexes: &[SingleColumnIndexEntryMeta],
635 ) -> LlkvResult<()> {
636 let lfid_val: u64 =
637 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID).into();
638 let schema = Arc::new(Schema::new(vec![
639 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
640 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
641 crate::constants::FIELD_ID_META_KEY.to_string(),
642 lfid_val.to_string(),
643 )])),
644 ]));
645
646 let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
647 let meta = TableSingleColumnIndexMeta {
648 table_id,
649 indexes: indexes.to_vec(),
650 };
651 let encoded = bitcode::encode(&meta);
652 let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
653
654 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
655 self.store.append(&batch)?;
656 Ok(())
657 }
658
659 pub fn get_multi_column_indexes(
661 &self,
662 table_id: TableId,
663 ) -> LlkvResult<Vec<MultiColumnIndexEntryMeta>> {
664 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
665 let row_id = rid_table(table_id);
666 let batch = match self
667 .store
668 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
669 {
670 Ok(batch) => batch,
671 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
672 Err(err) => return Err(err),
673 };
674
675 if batch.num_columns() == 0 || batch.num_rows() == 0 {
676 return Ok(Vec::new());
677 }
678
679 let array = batch
680 .column(0)
681 .as_any()
682 .downcast_ref::<BinaryArray>()
683 .ok_or_else(|| {
684 llkv_result::Error::Internal(
685 "catalog multi-column index column stored unexpected type".into(),
686 )
687 })?;
688
689 if array.is_null(0) {
690 return Ok(Vec::new());
691 }
692
693 let meta: TableMultiColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
694 llkv_result::Error::Internal(format!(
695 "failed to decode multi-column index metadata: {err}"
696 ))
697 })?;
698
699 Ok(meta.indexes)
700 }
701
702 pub fn get_single_column_indexes(
704 &self,
705 table_id: TableId,
706 ) -> LlkvResult<Vec<SingleColumnIndexEntryMeta>> {
707 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
708 let row_id = rid_table(table_id);
709 let batch = match self
710 .store
711 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
712 {
713 Ok(batch) => batch,
714 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
715 Err(err) => return Err(err),
716 };
717
718 if batch.num_columns() == 0 || batch.num_rows() == 0 {
719 return Ok(Vec::new());
720 }
721
722 let array = batch
723 .column(0)
724 .as_any()
725 .downcast_ref::<BinaryArray>()
726 .ok_or_else(|| {
727 llkv_result::Error::Internal(
728 "catalog single-column index column stored unexpected type".into(),
729 )
730 })?;
731
732 if array.is_null(0) {
733 return Ok(Vec::new());
734 }
735
736 let meta: TableSingleColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
737 llkv_result::Error::Internal(format!(
738 "failed to decode single-column index metadata: {err}"
739 ))
740 })?;
741
742 Ok(meta.indexes)
743 }
744
745 pub fn put_triggers(&self, table_id: TableId, triggers: &[TriggerEntryMeta]) -> LlkvResult<()> {
747 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TRIGGER_META_ID).into();
748 let schema = Arc::new(Schema::new(vec![
749 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
750 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
751 crate::constants::FIELD_ID_META_KEY.to_string(),
752 lfid_val.to_string(),
753 )])),
754 ]));
755
756 let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
757 let meta = TableTriggerMeta {
758 table_id,
759 triggers: triggers.to_vec(),
760 };
761 let encoded = bitcode::encode(&meta);
762 let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
763
764 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
765 self.store.append(&batch)?;
766 Ok(())
767 }
768
769 pub fn delete_triggers(&self, table_id: TableId) -> LlkvResult<()> {
771 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TRIGGER_META_ID);
772 let row_id = rid_table(table_id);
773 self.write_null_entries(meta_field, &[row_id])
774 }
775
776 pub fn get_triggers(&self, table_id: TableId) -> LlkvResult<Vec<TriggerEntryMeta>> {
778 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TRIGGER_META_ID);
779 let row_id = rid_table(table_id);
780 let batch = match self
781 .store
782 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
783 {
784 Ok(batch) => batch,
785 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
786 Err(err) => return Err(err),
787 };
788
789 if batch.num_columns() == 0 || batch.num_rows() == 0 {
790 return Ok(Vec::new());
791 }
792
793 let array = batch
794 .column(0)
795 .as_any()
796 .downcast_ref::<BinaryArray>()
797 .ok_or_else(|| {
798 llkv_result::Error::Internal(
799 "catalog trigger metadata column stored unexpected type".into(),
800 )
801 })?;
802
803 if array.is_null(0) {
804 return Ok(Vec::new());
805 }
806
807 let meta: TableTriggerMeta = bitcode::decode(array.value(0)).map_err(|err| {
808 llkv_result::Error::Internal(format!("failed to decode trigger metadata: {err}"))
809 })?;
810
811 Ok(meta.triggers)
812 }
813
814 pub fn all_multi_column_index_metas(&self) -> LlkvResult<Vec<TableMultiColumnIndexMeta>> {
816 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
817 let row_field = rowid_fid(meta_field);
818
819 struct RowIdCollector {
820 row_ids: Vec<RowId>,
821 }
822
823 impl PrimitiveVisitor for RowIdCollector {
824 fn u64_chunk(&mut self, values: &UInt64Array) {
825 for i in 0..values.len() {
826 self.row_ids.push(values.value(i));
827 }
828 }
829 }
830 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
831 impl PrimitiveSortedVisitor for RowIdCollector {}
832 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
833
834 let mut collector = RowIdCollector {
835 row_ids: Vec::new(),
836 };
837 match ScanBuilder::new(self.store, row_field)
838 .options(ScanOptions::default())
839 .run(&mut collector)
840 {
841 Ok(()) => {}
842 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
843 Err(err) => return Err(err),
844 }
845
846 if collector.row_ids.is_empty() {
847 return Ok(Vec::new());
848 }
849
850 let batch = match self.store.gather_rows(
851 &[meta_field],
852 &collector.row_ids,
853 GatherNullPolicy::IncludeNulls,
854 ) {
855 Ok(batch) => batch,
856 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
857 Err(err) => return Err(err),
858 };
859
860 if batch.num_columns() == 0 {
861 return Ok(Vec::new());
862 }
863
864 let array = batch
865 .column(0)
866 .as_any()
867 .downcast_ref::<BinaryArray>()
868 .ok_or_else(|| {
869 llkv_result::Error::Internal(
870 "catalog multi-column index column stored unexpected type".into(),
871 )
872 })?;
873
874 let mut metas = Vec::with_capacity(batch.num_rows());
875 for idx in 0..batch.num_rows() {
876 if array.is_null(idx) {
877 continue;
878 }
879 let meta: TableMultiColumnIndexMeta =
880 bitcode::decode(array.value(idx)).map_err(|err| {
881 llkv_result::Error::Internal(format!(
882 "failed to decode multi-column index metadata: {err}"
883 ))
884 })?;
885 metas.push(meta);
886 }
887
888 Ok(metas)
889 }
890
891 pub fn put_constraint_records(
893 &self,
894 table_id: TableId,
895 records: &[ConstraintRecord],
896 ) -> LlkvResult<()> {
897 if records.is_empty() {
898 return Ok(());
899 }
900
901 let lfid_val: u64 = constraint_meta_lfid().into();
902 let schema = Arc::new(Schema::new(vec![
903 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
904 Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
905 crate::constants::FIELD_ID_META_KEY.to_string(),
906 lfid_val.to_string(),
907 )])),
908 ]));
909
910 let row_ids: Vec<RowId> = records
911 .iter()
912 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
913 .collect();
914
915 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
916 let payload_array = Arc::new(BinaryArray::from_iter_values(
917 records.iter().map(bitcode::encode),
918 ));
919
920 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
921 self.store.append(&batch)?;
922 Ok(())
923 }
924
925 pub fn put_constraint_names(
927 &self,
928 table_id: TableId,
929 names: &[ConstraintNameRecord],
930 ) -> LlkvResult<()> {
931 if names.is_empty() {
932 return Ok(());
933 }
934
935 let lfid_val: u64 = constraint_name_lfid().into();
936 let schema = Arc::new(Schema::new(vec![
937 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
938 Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
939 (
940 crate::constants::FIELD_ID_META_KEY.to_string(),
941 lfid_val.to_string(),
942 ),
943 ])),
944 ]));
945
946 let row_ids: Vec<RowId> = names
947 .iter()
948 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
949 .collect();
950 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
951 let payload_array = Arc::new(BinaryArray::from_iter_values(
952 names.iter().map(bitcode::encode),
953 ));
954
955 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
956 self.store.append(&batch)?;
957 Ok(())
958 }
959
960 pub fn get_constraint_records(
962 &self,
963 table_id: TableId,
964 constraint_ids: &[ConstraintId],
965 ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
966 if constraint_ids.is_empty() {
967 return Ok(Vec::new());
968 }
969
970 let lfid = constraint_meta_lfid();
971 let row_ids: Vec<RowId> = constraint_ids
972 .iter()
973 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
974 .collect();
975
976 let batch = match self
977 .store
978 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
979 {
980 Ok(batch) => batch,
981 Err(llkv_result::Error::NotFound) => {
982 return Ok(vec![None; constraint_ids.len()]);
983 }
984 Err(err) => return Err(err),
985 };
986
987 if batch.num_columns() == 0 || batch.num_rows() == 0 {
988 return Ok(vec![None; constraint_ids.len()]);
989 }
990
991 let array = batch
992 .column(0)
993 .as_any()
994 .downcast_ref::<BinaryArray>()
995 .ok_or_else(|| {
996 llkv_result::Error::Internal(
997 "constraint metadata column stored unexpected type".into(),
998 )
999 })?;
1000
1001 let mut results = Vec::with_capacity(constraint_ids.len());
1002 for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
1003 if array.is_null(idx) {
1004 results.push(None);
1005 continue;
1006 }
1007 let record = decode_constraint_record(array.value(idx))?;
1008 if record.constraint_id != constraint_id {
1009 return Err(llkv_result::Error::Internal(
1010 "constraint metadata id mismatch".into(),
1011 ));
1012 }
1013 results.push(Some(record));
1014 }
1015
1016 Ok(results)
1017 }
1018
1019 pub fn get_constraint_names(
1021 &self,
1022 table_id: TableId,
1023 constraint_ids: &[ConstraintId],
1024 ) -> LlkvResult<Vec<Option<String>>> {
1025 if constraint_ids.is_empty() {
1026 return Ok(Vec::new());
1027 }
1028
1029 let lfid = constraint_name_lfid();
1030 let row_ids: Vec<RowId> = constraint_ids
1031 .iter()
1032 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
1033 .collect();
1034
1035 let batch = match self
1036 .store
1037 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
1038 {
1039 Ok(batch) => batch,
1040 Err(llkv_result::Error::NotFound) => {
1041 return Ok(vec![None; constraint_ids.len()]);
1042 }
1043 Err(err) => return Err(err),
1044 };
1045
1046 if batch.num_columns() == 0 {
1047 return Ok(vec![None; constraint_ids.len()]);
1048 }
1049
1050 let array = batch
1051 .column(0)
1052 .as_any()
1053 .downcast_ref::<BinaryArray>()
1054 .ok_or_else(|| {
1055 llkv_result::Error::Internal(
1056 "constraint name metadata column stored unexpected type".into(),
1057 )
1058 })?;
1059
1060 let mut results = Vec::with_capacity(row_ids.len());
1061 for idx in 0..row_ids.len() {
1062 if array.is_null(idx) {
1063 results.push(None);
1064 } else {
1065 let record: ConstraintNameRecord =
1066 bitcode::decode(array.value(idx)).map_err(|err| {
1067 llkv_result::Error::Internal(format!(
1068 "failed to decode constraint name metadata: {err}"
1069 ))
1070 })?;
1071 results.push(record.name);
1072 }
1073 }
1074
1075 Ok(results)
1076 }
1077
1078 pub fn scan_constraint_records_for_table<F>(
1080 &self,
1081 table_id: TableId,
1082 mut on_batch: F,
1083 ) -> LlkvResult<()>
1084 where
1085 F: FnMut(Vec<ConstraintRecord>),
1086 {
1087 let row_field = constraint_row_lfid();
1088 let mut visitor = ConstraintRowCollector {
1089 store: self.store,
1090 lfid: constraint_meta_lfid(),
1091 table_id,
1092 on_batch: &mut on_batch,
1093 buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
1094 error: None,
1095 };
1096
1097 match ScanBuilder::new(self.store, row_field)
1098 .options(ScanOptions::default())
1099 .run(&mut visitor)
1100 {
1101 Ok(()) => {}
1102 Err(llkv_result::Error::NotFound) => return Ok(()),
1103 Err(err) => return Err(err),
1104 }
1105
1106 visitor.finish()
1107 }
1108
1109 pub fn constraint_records_for_table(
1111 &self,
1112 table_id: TableId,
1113 ) -> LlkvResult<Vec<ConstraintRecord>> {
1114 let mut all = Vec::new();
1115 self.scan_constraint_records_for_table(table_id, |mut chunk| {
1116 all.append(&mut chunk);
1117 })?;
1118 Ok(all)
1119 }
1120
1121 pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
1122 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
1123 let schema = Arc::new(Schema::new(vec![
1124 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1125 Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1126 crate::constants::FIELD_ID_META_KEY.to_string(),
1127 lfid_val.to_string(),
1128 )])),
1129 ]));
1130
1131 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
1132 let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
1133 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1134 self.store.append(&batch)?;
1135 Ok(())
1136 }
1137
1138 pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
1139 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
1140 let batch = match self.store.gather_rows(
1141 &[lfid],
1142 &[CATALOG_NEXT_TABLE_ROW_ID],
1143 GatherNullPolicy::IncludeNulls,
1144 ) {
1145 Ok(batch) => batch,
1146 Err(llkv_result::Error::NotFound) => return Ok(None),
1147 Err(err) => return Err(err),
1148 };
1149
1150 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1151 return Ok(None);
1152 }
1153
1154 let array = batch
1155 .column(0)
1156 .as_any()
1157 .downcast_ref::<UInt64Array>()
1158 .ok_or_else(|| {
1159 llkv_result::Error::Internal(
1160 "catalog next_table_id column stored unexpected type".into(),
1161 )
1162 })?;
1163 if array.is_empty() || array.is_null(0) {
1164 return Ok(None);
1165 }
1166
1167 let value = array.value(0);
1168 if value > TableId::MAX as u64 {
1169 return Err(llkv_result::Error::InvalidArgumentError(
1170 "persisted next_table_id exceeds TableId range".into(),
1171 ));
1172 }
1173
1174 Ok(Some(value as TableId))
1175 }
1176
1177 pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
1178 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1179 let row_field = rowid_fid(meta_field);
1180
1181 let mut collector = MaxRowIdCollector { max: None };
1182 match ScanBuilder::new(self.store, row_field)
1183 .options(ScanOptions::default())
1184 .run(&mut collector)
1185 {
1186 Ok(()) => {}
1187 Err(llkv_result::Error::NotFound) => return Ok(None),
1188 Err(err) => return Err(err),
1189 }
1190
1191 let max_value = match collector.max {
1192 Some(value) => value,
1193 None => return Ok(None),
1194 };
1195
1196 let logical: LogicalFieldId = max_value.into();
1197 Ok(Some(logical.table_id()))
1198 }
1199
1200 pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
1206 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1207 let row_field = rowid_fid(meta_field);
1208
1209 struct RowIdCollector {
1211 row_ids: Vec<RowId>,
1212 }
1213
1214 impl PrimitiveVisitor for RowIdCollector {
1215 fn u64_chunk(&mut self, values: &UInt64Array) {
1216 for i in 0..values.len() {
1217 self.row_ids.push(values.value(i));
1218 }
1219 }
1220 }
1221 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1222 impl PrimitiveSortedVisitor for RowIdCollector {}
1223 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1224
1225 let mut collector = RowIdCollector {
1226 row_ids: Vec::new(),
1227 };
1228 match ScanBuilder::new(self.store, row_field)
1229 .options(ScanOptions::default())
1230 .run(&mut collector)
1231 {
1232 Ok(()) => {}
1233 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1234 Err(err) => return Err(err),
1235 }
1236
1237 if collector.row_ids.is_empty() {
1238 return Ok(Vec::new());
1239 }
1240
1241 let batch = self.store.gather_rows(
1243 &[meta_field],
1244 &collector.row_ids,
1245 GatherNullPolicy::IncludeNulls,
1246 )?;
1247
1248 let meta_col = batch
1249 .column(0)
1250 .as_any()
1251 .downcast_ref::<BinaryArray>()
1252 .ok_or_else(|| {
1253 llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1254 })?;
1255
1256 let mut result = Vec::new();
1257 for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1258 if !meta_col.is_null(idx) {
1259 let bytes = meta_col.value(idx);
1260 if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1261 let logical: LogicalFieldId = row_id.into();
1262 let table_id = logical.table_id();
1263 result.push((table_id, meta));
1264 }
1265 }
1266 }
1267
1268 Ok(result)
1269 }
1270
1271 pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1273 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1274 let schema = Arc::new(Schema::new(vec![
1275 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1276 Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1277 crate::constants::FIELD_ID_META_KEY.to_string(),
1278 lfid_val.to_string(),
1279 )])),
1280 ]));
1281
1282 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1283 let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1284 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1285 self.store.append(&batch)?;
1286 Ok(())
1287 }
1288
1289 pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1291 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1292 let batch = match self.store.gather_rows(
1293 &[lfid],
1294 &[CATALOG_NEXT_TXN_ROW_ID],
1295 GatherNullPolicy::IncludeNulls,
1296 ) {
1297 Ok(batch) => batch,
1298 Err(llkv_result::Error::NotFound) => return Ok(None),
1299 Err(err) => return Err(err),
1300 };
1301
1302 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1303 return Ok(None);
1304 }
1305
1306 let array = batch
1307 .column(0)
1308 .as_any()
1309 .downcast_ref::<UInt64Array>()
1310 .ok_or_else(|| {
1311 llkv_result::Error::Internal(
1312 "catalog next_txn_id column stored unexpected type".into(),
1313 )
1314 })?;
1315 if array.is_empty() || array.is_null(0) {
1316 return Ok(None);
1317 }
1318
1319 let value = array.value(0);
1320 Ok(Some(value))
1321 }
1322
1323 pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1325 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1326 let schema = Arc::new(Schema::new(vec![
1327 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1328 Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1329 HashMap::from([(
1330 crate::constants::FIELD_ID_META_KEY.to_string(),
1331 lfid_val.to_string(),
1332 )]),
1333 ),
1334 ]));
1335
1336 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1337 let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1338 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1339 self.store.append(&batch)?;
1340 Ok(())
1341 }
1342
1343 pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1345 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1346 let batch = match self.store.gather_rows(
1347 &[lfid],
1348 &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1349 GatherNullPolicy::IncludeNulls,
1350 ) {
1351 Ok(batch) => batch,
1352 Err(llkv_result::Error::NotFound) => return Ok(None),
1353 Err(err) => return Err(err),
1354 };
1355
1356 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1357 return Ok(None);
1358 }
1359
1360 let array = batch
1361 .column(0)
1362 .as_any()
1363 .downcast_ref::<UInt64Array>()
1364 .ok_or_else(|| {
1365 llkv_result::Error::Internal(
1366 "catalog last_committed_txn_id column stored unexpected type".into(),
1367 )
1368 })?;
1369 if array.is_empty() || array.is_null(0) {
1370 return Ok(None);
1371 }
1372
1373 let value = array.value(0);
1374 Ok(Some(value))
1375 }
1376
1377 pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1382 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1383 let schema = Arc::new(Schema::new(vec![
1384 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1385 Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1386 crate::constants::FIELD_ID_META_KEY.to_string(),
1387 lfid_val.to_string(),
1388 )])),
1389 ]));
1390
1391 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1392 let encoded = bitcode::encode(state);
1393 let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1394
1395 let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1396 self.store.append(&batch)?;
1397 Ok(())
1398 }
1399
1400 pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1404 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1405 let batch = match self.store.gather_rows(
1406 &[lfid],
1407 &[CATALOG_STATE_ROW_ID],
1408 GatherNullPolicy::IncludeNulls,
1409 ) {
1410 Ok(batch) => batch,
1411 Err(llkv_result::Error::NotFound) => return Ok(None),
1412 Err(err) => return Err(err),
1413 };
1414
1415 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1416 return Ok(None);
1417 }
1418
1419 let array = batch
1420 .column(0)
1421 .as_any()
1422 .downcast_ref::<BinaryArray>()
1423 .ok_or_else(|| {
1424 llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1425 })?;
1426 if array.is_empty() || array.is_null(0) {
1427 return Ok(None);
1428 }
1429
1430 let bytes = array.value(0);
1431 let state = bitcode::decode(bytes).map_err(|e| {
1432 llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1433 })?;
1434 Ok(Some(state))
1435 }
1436
1437 pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1442 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1443 let schema = Arc::new(Schema::new(vec![
1444 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1445 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1446 crate::constants::FIELD_ID_META_KEY.to_string(),
1447 lfid_val.to_string(),
1448 )])),
1449 ]));
1450
1451 let canonical = meta.name.to_ascii_lowercase();
1453 let row_id_val = schema_name_to_row_id(&canonical);
1454 let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1455 let meta_encoded = bitcode::encode(meta);
1456 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1457
1458 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1459 self.store.append(&batch)?;
1460 Ok(())
1461 }
1462
1463 pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1467 let canonical = schema_name.to_ascii_lowercase();
1468 let row_id = schema_name_to_row_id(&canonical);
1469 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1470
1471 let batch = match self
1472 .store
1473 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1474 {
1475 Ok(batch) => batch,
1476 Err(llkv_result::Error::NotFound) => return Ok(None),
1477 Err(err) => return Err(err),
1478 };
1479
1480 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1481 return Ok(None);
1482 }
1483
1484 let array = batch
1485 .column(0)
1486 .as_any()
1487 .downcast_ref::<BinaryArray>()
1488 .ok_or_else(|| {
1489 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1490 })?;
1491
1492 if array.is_empty() || array.is_null(0) {
1493 return Ok(None);
1494 }
1495
1496 let bytes = array.value(0);
1497 let meta = bitcode::decode(bytes).map_err(|e| {
1498 llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1499 })?;
1500 Ok(Some(meta))
1501 }
1502
1503 pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1507 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1508 let row_field = rowid_fid(meta_field);
1509
1510 struct RowIdCollector {
1512 row_ids: Vec<RowId>,
1513 }
1514
1515 impl PrimitiveVisitor for RowIdCollector {
1516 fn u64_chunk(&mut self, values: &UInt64Array) {
1517 for i in 0..values.len() {
1518 self.row_ids.push(values.value(i));
1519 }
1520 }
1521 }
1522 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1523 impl PrimitiveSortedVisitor for RowIdCollector {}
1524 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1525
1526 let mut collector = RowIdCollector {
1527 row_ids: Vec::new(),
1528 };
1529 match ScanBuilder::new(self.store, row_field)
1530 .options(ScanOptions::default())
1531 .run(&mut collector)
1532 {
1533 Ok(()) => {}
1534 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1535 Err(err) => return Err(err),
1536 }
1537
1538 if collector.row_ids.is_empty() {
1539 return Ok(Vec::new());
1540 }
1541
1542 let batch = self.store.gather_rows(
1544 &[meta_field],
1545 &collector.row_ids,
1546 GatherNullPolicy::IncludeNulls,
1547 )?;
1548
1549 let meta_col = batch
1550 .column(0)
1551 .as_any()
1552 .downcast_ref::<BinaryArray>()
1553 .ok_or_else(|| {
1554 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1555 })?;
1556
1557 let mut result = Vec::new();
1558 for idx in 0..collector.row_ids.len() {
1559 if !meta_col.is_null(idx) {
1560 let bytes = meta_col.value(idx);
1561 if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1562 result.push(meta);
1563 }
1564 }
1565 }
1566
1567 Ok(result)
1568 }
1569
1570 pub fn put_custom_type_meta(&self, meta: &CustomTypeMeta) -> LlkvResult<()> {
1574 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID).into();
1575 let schema = Arc::new(Schema::new(vec![
1576 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1577 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1578 crate::constants::FIELD_ID_META_KEY.to_string(),
1579 lfid_val.to_string(),
1580 )])),
1581 ]));
1582
1583 let canonical = meta.name.to_ascii_lowercase();
1585 let row_id_val = schema_name_to_row_id(&canonical); let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1587 let meta_encoded = bitcode::encode(meta);
1588 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1589
1590 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1591 self.store.append(&batch)?;
1592 Ok(())
1593 }
1594
1595 pub fn get_custom_type_meta(&self, type_name: &str) -> LlkvResult<Option<CustomTypeMeta>> {
1599 let canonical = type_name.to_ascii_lowercase();
1600 let row_id = schema_name_to_row_id(&canonical);
1601 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1602
1603 let batch = match self
1604 .store
1605 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1606 {
1607 Ok(batch) => batch,
1608 Err(llkv_result::Error::NotFound) => return Ok(None),
1609 Err(err) => return Err(err),
1610 };
1611
1612 let meta_col = batch
1613 .column(0)
1614 .as_any()
1615 .downcast_ref::<BinaryArray>()
1616 .ok_or_else(|| {
1617 llkv_result::Error::Internal(
1618 "catalog custom_type_meta column should be Binary".into(),
1619 )
1620 })?;
1621
1622 if meta_col.is_null(0) {
1623 return Ok(None);
1624 }
1625
1626 let bytes = meta_col.value(0);
1627 let meta = bitcode::decode(bytes).map_err(|err| {
1628 llkv_result::Error::Internal(format!("failed to decode custom type metadata: {err}"))
1629 })?;
1630 Ok(Some(meta))
1631 }
1632
1633 pub fn delete_custom_type_meta(&self, type_name: &str) -> LlkvResult<()> {
1637 let canonical = type_name.to_ascii_lowercase();
1638 let row_id = schema_name_to_row_id(&canonical);
1639 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1640
1641 let lfid_val: u64 = lfid.into();
1643 let schema = Arc::new(Schema::new(vec![
1644 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1645 Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
1646 crate::constants::FIELD_ID_META_KEY.to_string(),
1647 lfid_val.to_string(),
1648 )])),
1649 ]));
1650
1651 let row_id_arr = Arc::new(UInt64Array::from(vec![row_id]));
1652 let mut meta_builder = BinaryBuilder::new();
1653 meta_builder.append_null();
1654 let meta_arr = Arc::new(meta_builder.finish());
1655
1656 let batch = RecordBatch::try_new(schema, vec![row_id_arr, meta_arr])?;
1657 self.store.append(&batch)?;
1658 Ok(())
1659 }
1660
1661 pub fn all_custom_type_metas(&self) -> LlkvResult<Vec<CustomTypeMeta>> {
1665 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1666 let row_field = rowid_fid(meta_field);
1667
1668 struct RowIdCollector {
1670 row_ids: Vec<RowId>,
1671 }
1672
1673 impl PrimitiveVisitor for RowIdCollector {
1674 fn u64_chunk(&mut self, values: &UInt64Array) {
1675 for i in 0..values.len() {
1676 self.row_ids.push(values.value(i));
1677 }
1678 }
1679 }
1680 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1681 impl PrimitiveSortedVisitor for RowIdCollector {}
1682 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1683
1684 let mut collector = RowIdCollector {
1685 row_ids: Vec::new(),
1686 };
1687 match ScanBuilder::new(self.store, row_field)
1688 .options(ScanOptions::default())
1689 .run(&mut collector)
1690 {
1691 Ok(()) => {}
1692 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1693 Err(err) => return Err(err),
1694 }
1695
1696 if collector.row_ids.is_empty() {
1697 return Ok(Vec::new());
1698 }
1699
1700 let batch = self.store.gather_rows(
1702 &[meta_field],
1703 &collector.row_ids,
1704 GatherNullPolicy::IncludeNulls,
1705 )?;
1706
1707 let meta_col = batch
1708 .column(0)
1709 .as_any()
1710 .downcast_ref::<BinaryArray>()
1711 .ok_or_else(|| {
1712 llkv_result::Error::Internal(
1713 "catalog custom_type_meta column should be Binary".into(),
1714 )
1715 })?;
1716
1717 let mut result = Vec::new();
1718 for idx in 0..collector.row_ids.len() {
1719 if !meta_col.is_null(idx) {
1720 let bytes = meta_col.value(idx);
1721 if let Ok(meta) = bitcode::decode::<CustomTypeMeta>(bytes) {
1722 result.push(meta);
1723 }
1724 }
1725 }
1726
1727 Ok(result)
1728 }
1729}
1730
1731fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1736 const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1738 const FNV_PRIME: u64 = 0x1000_0000_01b3;
1739
1740 let mut hash = FNV_OFFSET;
1741 for byte in canonical_name.as_bytes() {
1742 hash ^= u64::from(*byte);
1743 hash = hash.wrapping_mul(FNV_PRIME);
1744 }
1745
1746 hash | (1u64 << 63)
1748}
1749
1750struct MaxRowIdCollector {
1751 max: Option<RowId>,
1752}
1753
1754impl PrimitiveVisitor for MaxRowIdCollector {
1755 fn u64_chunk(&mut self, values: &UInt64Array) {
1756 for i in 0..values.len() {
1757 let value = values.value(i);
1758 self.max = match self.max {
1759 Some(curr) if curr >= value => Some(curr),
1760 _ => Some(value),
1761 };
1762 }
1763 }
1764}
1765
1766impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1767impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1768impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1769
1770#[cfg(test)]
1771mod tests {
1772 use super::*;
1773 use crate::constraints::{
1774 ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1775 };
1776 use llkv_column_map::ColumnStore;
1777 use std::sync::Arc;
1778
1779 #[test]
1780 fn constraint_records_roundtrip() {
1781 let pager = Arc::new(MemPager::default());
1782 let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1783 let catalog = SysCatalog::new(&store);
1784
1785 let table_id: TableId = 42;
1786 let record1 = ConstraintRecord {
1787 constraint_id: 1,
1788 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1789 field_ids: vec![1, 2],
1790 }),
1791 state: ConstraintState::Active,
1792 revision: 1,
1793 last_modified_micros: 100,
1794 };
1795 let record2 = ConstraintRecord {
1796 constraint_id: 2,
1797 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1798 state: ConstraintState::Active,
1799 revision: 2,
1800 last_modified_micros: 200,
1801 };
1802 catalog
1803 .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1804 .unwrap();
1805
1806 let other_table_record = ConstraintRecord {
1807 constraint_id: 1,
1808 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1809 state: ConstraintState::Active,
1810 revision: 1,
1811 last_modified_micros: 150,
1812 };
1813 catalog
1814 .put_constraint_records(7, &[other_table_record])
1815 .unwrap();
1816
1817 let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1818 fetched.sort_by_key(|record| record.constraint_id);
1819
1820 assert_eq!(fetched.len(), 2);
1821 assert_eq!(fetched[0], record1);
1822 assert_eq!(fetched[1], record2);
1823
1824 let single = catalog
1825 .get_constraint_records(table_id, &[record1.constraint_id])
1826 .unwrap();
1827 assert_eq!(single.len(), 1);
1828 assert_eq!(single[0].as_ref(), Some(&record1));
1829
1830 let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1831 assert_eq!(missing.len(), 1);
1832 assert!(missing[0].is_none());
1833 }
1834}