1use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37 ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, ROW_ID_FIELD_ID, RowId, TableId};
40use llkv_column_map::store::scan::{
41 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::types::LogicalFieldId;
46use llkv_column_map::{
47 ColumnStore,
48 store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
49};
50use llkv_result::{self, Result as LlkvResult};
51use llkv_storage::pager::{MemPager, Pager};
52use simd_r_drive_entry_handle::EntryHandle;
53
54use crate::reserved::*;
56
57#[inline]
61fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
62 LogicalFieldId::for_user(table_id, col_id)
63}
64
65#[inline]
67fn rid_table(table_id: TableId) -> u64 {
68 LogicalFieldId::for_user(table_id, ROW_ID_FIELD_ID).into()
69}
70
71#[inline]
73fn rid_col(table_id: TableId, col_id: u32) -> u64 {
74 rowid_fid(lfid(table_id, col_id)).into()
75}
76
77#[inline]
78fn constraint_meta_lfid() -> LogicalFieldId {
79 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
80}
81
82#[inline]
83fn constraint_name_lfid() -> LogicalFieldId {
84 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
85}
86
87#[inline]
88fn constraint_row_lfid() -> LogicalFieldId {
89 rowid_fid(constraint_meta_lfid())
90}
91
92#[derive(Clone, Debug, Encode, Decode)]
93pub struct ConstraintNameRecord {
94 pub constraint_id: ConstraintId,
95 pub name: Option<String>,
96}
97
98fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
99 bitcode::decode(bytes).map_err(|err| {
100 llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
101 })
102}
103
104struct ConstraintRowCollector<'a, P, F>
105where
106 P: Pager<Blob = EntryHandle> + Send + Sync,
107 F: FnMut(Vec<ConstraintRecord>),
108{
109 store: &'a ColumnStore<P>,
110 lfid: LogicalFieldId,
111 table_id: TableId,
112 on_batch: &'a mut F,
113 buffer: Vec<RowId>,
114 error: Option<llkv_result::Error>,
115}
116
117impl<'a, P, F> ConstraintRowCollector<'a, P, F>
118where
119 P: Pager<Blob = EntryHandle> + Send + Sync,
120 F: FnMut(Vec<ConstraintRecord>),
121{
122 fn flush_buffer(&mut self) -> LlkvResult<()> {
123 if self.buffer.is_empty() {
124 return Ok(());
125 }
126
127 let row_ids = mem::take(&mut self.buffer);
128 let batch =
129 self.store
130 .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
131
132 if batch.num_columns() == 0 {
133 return Ok(());
134 }
135
136 let array = batch
137 .column(0)
138 .as_any()
139 .downcast_ref::<BinaryArray>()
140 .ok_or_else(|| {
141 llkv_result::Error::Internal(
142 "constraint metadata column stored unexpected type".into(),
143 )
144 })?;
145
146 let mut records = Vec::with_capacity(row_ids.len());
147 for (idx, row_id) in row_ids.into_iter().enumerate() {
148 if array.is_null(idx) {
149 continue;
150 }
151
152 let record = decode_constraint_record(array.value(idx))?;
153 let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
154 if table_from_id != self.table_id {
155 continue;
156 }
157 if record.constraint_id != constraint_id {
158 return Err(llkv_result::Error::Internal(
159 "constraint metadata id mismatch".into(),
160 ));
161 }
162 records.push(record);
163 }
164
165 if !records.is_empty() {
166 (self.on_batch)(records);
167 }
168
169 Ok(())
170 }
171
172 fn finish(&mut self) -> LlkvResult<()> {
173 if let Some(err) = self.error.take() {
174 return Err(err);
175 }
176 self.flush_buffer()
177 }
178}
179
180impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
181where
182 P: Pager<Blob = EntryHandle> + Send + Sync,
183 F: FnMut(Vec<ConstraintRecord>),
184{
185 fn u64_chunk(&mut self, values: &UInt64Array) {
186 if self.error.is_some() {
187 return;
188 }
189
190 for idx in 0..values.len() {
191 let row_id = values.value(idx);
192 let (table_id, _) = decode_constraint_row_id(row_id);
193 if table_id != self.table_id {
194 continue;
195 }
196 self.buffer.push(row_id);
197 if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
198 && let Err(err) = self.flush_buffer()
199 {
200 self.error = Some(err);
201 return;
202 }
203 }
204 }
205}
206
207impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
208where
209 P: Pager<Blob = EntryHandle> + Send + Sync,
210 F: FnMut(Vec<ConstraintRecord>),
211{
212}
213
214impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
215where
216 P: Pager<Blob = EntryHandle> + Send + Sync,
217 F: FnMut(Vec<ConstraintRecord>),
218{
219}
220
221impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
222where
223 P: Pager<Blob = EntryHandle> + Send + Sync,
224 F: FnMut(Vec<ConstraintRecord>),
225{
226}
227
228#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
234pub struct TableMeta {
235 pub table_id: TableId,
237 pub name: Option<String>,
239 pub created_at_micros: u64,
241 pub flags: u32,
243 pub epoch: u64,
245 pub view_definition: Option<String>,
248}
249
250#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
254pub struct ColMeta {
255 pub col_id: u32,
257 pub name: Option<String>,
259 pub flags: u32,
261 pub default: Option<Vec<u8>>,
263}
264
265#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
269pub struct SchemaMeta {
270 pub name: String,
272 pub created_at_micros: u64,
274 pub flags: u32,
276}
277
278#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
283pub struct CustomTypeMeta {
284 pub name: String,
286 pub base_type_sql: String,
289 pub created_at_micros: u64,
291}
292
293#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
297pub struct MultiColumnIndexEntryMeta {
298 pub index_name: Option<String>,
300 pub canonical_name: String,
302 pub column_ids: Vec<FieldId>,
304 pub unique: bool,
306}
307
308#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
310pub struct SingleColumnIndexEntryMeta {
311 pub index_name: String,
313 pub canonical_name: String,
315 pub column_id: FieldId,
317 pub column_name: String,
319 pub unique: bool,
321 pub ascending: bool,
323 pub nulls_first: bool,
325}
326
327#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
329pub struct TableSingleColumnIndexMeta {
330 pub table_id: TableId,
332 pub indexes: Vec<SingleColumnIndexEntryMeta>,
334}
335
336#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
338pub struct TableMultiColumnIndexMeta {
339 pub table_id: TableId,
341 pub indexes: Vec<MultiColumnIndexEntryMeta>,
343}
344
345pub struct SysCatalog<'a, P = MemPager>
358where
359 P: Pager<Blob = EntryHandle> + Send + Sync,
360{
361 store: &'a ColumnStore<P>,
362}
363
364impl<'a, P> SysCatalog<'a, P>
365where
366 P: Pager<Blob = EntryHandle> + Send + Sync,
367{
368 fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
369 if row_ids.is_empty() {
370 return Ok(());
371 }
372
373 let lfid_val: u64 = meta_field.into();
374 let schema = Arc::new(Schema::new(vec![
375 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
376 Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
377 crate::constants::FIELD_ID_META_KEY.to_string(),
378 lfid_val.to_string(),
379 )])),
380 ]));
381
382 let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
383 let mut builder = BinaryBuilder::new();
384 for _ in row_ids {
385 builder.append_null();
386 }
387 let meta_array = Arc::new(builder.finish());
388
389 let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
390 self.store.append(&batch)?;
391 Ok(())
392 }
393
394 pub fn new(store: &'a ColumnStore<P>) -> Self {
396 Self { store }
397 }
398
399 pub fn put_table_meta(&self, meta: &TableMeta) {
404 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
405 let schema = Arc::new(Schema::new(vec![
406 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
407 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
408 crate::constants::FIELD_ID_META_KEY.to_string(),
409 lfid_val.to_string(),
410 )])),
411 ]));
412
413 let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
414 let meta_encoded = bitcode::encode(meta);
415 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
416
417 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
418 self.store.append(&batch).unwrap();
419 }
420
421 pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
425 let row_id = rid_table(table_id);
426 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
427 let batch = self
428 .store
429 .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
430 .ok()?;
431
432 if batch.num_rows() == 0 || batch.num_columns() == 0 {
433 return None;
434 }
435
436 let array = batch
437 .column(0)
438 .as_any()
439 .downcast_ref::<BinaryArray>()
440 .expect("table meta column must be BinaryArray");
441
442 if array.is_null(0) {
443 return None;
444 }
445
446 bitcode::decode(array.value(0)).ok()
447 }
448
449 pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
451 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
452 let schema = Arc::new(Schema::new(vec![
453 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
454 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
455 crate::constants::FIELD_ID_META_KEY.to_string(),
456 lfid_val.to_string(),
457 )])),
458 ]));
459
460 let rid_value = rid_col(table_id, meta.col_id);
461 let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
462 let meta_encoded = bitcode::encode(meta);
463 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
464
465 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
466 self.store.append(&batch).unwrap();
467 }
468
469 pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
471 if col_ids.is_empty() {
472 return Vec::new();
473 }
474
475 let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
476 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
477
478 let batch =
479 match self
480 .store
481 .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
482 {
483 Ok(batch) => batch,
484 Err(_) => return vec![None; col_ids.len()],
485 };
486
487 let meta_col = batch
488 .column(0)
489 .as_any()
490 .downcast_ref::<BinaryArray>()
491 .expect("catalog meta column should be Binary");
492
493 col_ids
494 .iter()
495 .enumerate()
496 .map(|(idx, _)| {
497 if meta_col.is_null(idx) {
498 None
499 } else {
500 bitcode::decode(meta_col.value(idx)).ok()
501 }
502 })
503 .collect()
504 }
505
506 pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
508 if col_ids.is_empty() {
509 return Ok(());
510 }
511
512 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
513 let row_ids: Vec<RowId> = col_ids
514 .iter()
515 .map(|&col_id| rid_col(table_id, col_id))
516 .collect();
517 self.write_null_entries(meta_field, &row_ids)
518 }
519
520 pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
522 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
523 let row_id = rid_table(table_id);
524 self.write_null_entries(meta_field, &[row_id])
525 }
526
527 pub fn delete_constraint_records(
529 &self,
530 table_id: TableId,
531 constraint_ids: &[ConstraintId],
532 ) -> LlkvResult<()> {
533 if constraint_ids.is_empty() {
534 return Ok(());
535 }
536
537 let meta_field = constraint_meta_lfid();
538 let row_ids: Vec<RowId> = constraint_ids
539 .iter()
540 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
541 .collect();
542 self.write_null_entries(meta_field, &row_ids)
543 }
544
545 pub fn delete_constraint_names(
547 &self,
548 table_id: TableId,
549 constraint_ids: &[ConstraintId],
550 ) -> LlkvResult<()> {
551 if constraint_ids.is_empty() {
552 return Ok(());
553 }
554
555 let lfid = constraint_name_lfid();
556 let row_ids: Vec<RowId> = constraint_ids
557 .iter()
558 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
559 .collect();
560 self.write_null_entries(lfid, &row_ids)
561 }
562
563 pub fn delete_multi_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
565 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
566 let row_id = rid_table(table_id);
567 self.write_null_entries(meta_field, &[row_id])
568 }
569
570 pub fn delete_single_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
572 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
573 let row_id = rid_table(table_id);
574 self.write_null_entries(meta_field, &[row_id])
575 }
576
577 pub fn put_multi_column_indexes(
579 &self,
580 table_id: TableId,
581 indexes: &[MultiColumnIndexEntryMeta],
582 ) -> LlkvResult<()> {
583 let lfid_val: u64 =
584 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
585 let schema = Arc::new(Schema::new(vec![
586 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
587 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
588 crate::constants::FIELD_ID_META_KEY.to_string(),
589 lfid_val.to_string(),
590 )])),
591 ]));
592
593 let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
594 let meta = TableMultiColumnIndexMeta {
595 table_id,
596 indexes: indexes.to_vec(),
597 };
598 let encoded = bitcode::encode(&meta);
599 let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
600
601 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
602 self.store.append(&batch)?;
603 Ok(())
604 }
605
606 pub fn put_single_column_indexes(
608 &self,
609 table_id: TableId,
610 indexes: &[SingleColumnIndexEntryMeta],
611 ) -> LlkvResult<()> {
612 let lfid_val: u64 =
613 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID).into();
614 let schema = Arc::new(Schema::new(vec![
615 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
616 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
617 crate::constants::FIELD_ID_META_KEY.to_string(),
618 lfid_val.to_string(),
619 )])),
620 ]));
621
622 let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
623 let meta = TableSingleColumnIndexMeta {
624 table_id,
625 indexes: indexes.to_vec(),
626 };
627 let encoded = bitcode::encode(&meta);
628 let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
629
630 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
631 self.store.append(&batch)?;
632 Ok(())
633 }
634
635 pub fn get_multi_column_indexes(
637 &self,
638 table_id: TableId,
639 ) -> LlkvResult<Vec<MultiColumnIndexEntryMeta>> {
640 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
641 let row_id = rid_table(table_id);
642 let batch = match self
643 .store
644 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
645 {
646 Ok(batch) => batch,
647 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
648 Err(err) => return Err(err),
649 };
650
651 if batch.num_columns() == 0 || batch.num_rows() == 0 {
652 return Ok(Vec::new());
653 }
654
655 let array = batch
656 .column(0)
657 .as_any()
658 .downcast_ref::<BinaryArray>()
659 .ok_or_else(|| {
660 llkv_result::Error::Internal(
661 "catalog multi-column index column stored unexpected type".into(),
662 )
663 })?;
664
665 if array.is_null(0) {
666 return Ok(Vec::new());
667 }
668
669 let meta: TableMultiColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
670 llkv_result::Error::Internal(format!(
671 "failed to decode multi-column index metadata: {err}"
672 ))
673 })?;
674
675 Ok(meta.indexes)
676 }
677
678 pub fn get_single_column_indexes(
680 &self,
681 table_id: TableId,
682 ) -> LlkvResult<Vec<SingleColumnIndexEntryMeta>> {
683 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
684 let row_id = rid_table(table_id);
685 let batch = match self
686 .store
687 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
688 {
689 Ok(batch) => batch,
690 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
691 Err(err) => return Err(err),
692 };
693
694 if batch.num_columns() == 0 || batch.num_rows() == 0 {
695 return Ok(Vec::new());
696 }
697
698 let array = batch
699 .column(0)
700 .as_any()
701 .downcast_ref::<BinaryArray>()
702 .ok_or_else(|| {
703 llkv_result::Error::Internal(
704 "catalog single-column index column stored unexpected type".into(),
705 )
706 })?;
707
708 if array.is_null(0) {
709 return Ok(Vec::new());
710 }
711
712 let meta: TableSingleColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
713 llkv_result::Error::Internal(format!(
714 "failed to decode single-column index metadata: {err}"
715 ))
716 })?;
717
718 Ok(meta.indexes)
719 }
720
721 pub fn all_multi_column_index_metas(&self) -> LlkvResult<Vec<TableMultiColumnIndexMeta>> {
723 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
724 let row_field = rowid_fid(meta_field);
725
726 struct RowIdCollector {
727 row_ids: Vec<RowId>,
728 }
729
730 impl PrimitiveVisitor for RowIdCollector {
731 fn u64_chunk(&mut self, values: &UInt64Array) {
732 for i in 0..values.len() {
733 self.row_ids.push(values.value(i));
734 }
735 }
736 }
737 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
738 impl PrimitiveSortedVisitor for RowIdCollector {}
739 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
740
741 let mut collector = RowIdCollector {
742 row_ids: Vec::new(),
743 };
744 match ScanBuilder::new(self.store, row_field)
745 .options(ScanOptions::default())
746 .run(&mut collector)
747 {
748 Ok(()) => {}
749 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
750 Err(err) => return Err(err),
751 }
752
753 if collector.row_ids.is_empty() {
754 return Ok(Vec::new());
755 }
756
757 let batch = match self.store.gather_rows(
758 &[meta_field],
759 &collector.row_ids,
760 GatherNullPolicy::IncludeNulls,
761 ) {
762 Ok(batch) => batch,
763 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
764 Err(err) => return Err(err),
765 };
766
767 if batch.num_columns() == 0 {
768 return Ok(Vec::new());
769 }
770
771 let array = batch
772 .column(0)
773 .as_any()
774 .downcast_ref::<BinaryArray>()
775 .ok_or_else(|| {
776 llkv_result::Error::Internal(
777 "catalog multi-column index column stored unexpected type".into(),
778 )
779 })?;
780
781 let mut metas = Vec::with_capacity(batch.num_rows());
782 for idx in 0..batch.num_rows() {
783 if array.is_null(idx) {
784 continue;
785 }
786 let meta: TableMultiColumnIndexMeta =
787 bitcode::decode(array.value(idx)).map_err(|err| {
788 llkv_result::Error::Internal(format!(
789 "failed to decode multi-column index metadata: {err}"
790 ))
791 })?;
792 metas.push(meta);
793 }
794
795 Ok(metas)
796 }
797
798 pub fn put_constraint_records(
800 &self,
801 table_id: TableId,
802 records: &[ConstraintRecord],
803 ) -> LlkvResult<()> {
804 if records.is_empty() {
805 return Ok(());
806 }
807
808 let lfid_val: u64 = constraint_meta_lfid().into();
809 let schema = Arc::new(Schema::new(vec![
810 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
811 Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
812 crate::constants::FIELD_ID_META_KEY.to_string(),
813 lfid_val.to_string(),
814 )])),
815 ]));
816
817 let row_ids: Vec<RowId> = records
818 .iter()
819 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
820 .collect();
821
822 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
823 let payload_array = Arc::new(BinaryArray::from_iter_values(
824 records.iter().map(bitcode::encode),
825 ));
826
827 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
828 self.store.append(&batch)?;
829 Ok(())
830 }
831
832 pub fn put_constraint_names(
834 &self,
835 table_id: TableId,
836 names: &[ConstraintNameRecord],
837 ) -> LlkvResult<()> {
838 if names.is_empty() {
839 return Ok(());
840 }
841
842 let lfid_val: u64 = constraint_name_lfid().into();
843 let schema = Arc::new(Schema::new(vec![
844 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
845 Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
846 (
847 crate::constants::FIELD_ID_META_KEY.to_string(),
848 lfid_val.to_string(),
849 ),
850 ])),
851 ]));
852
853 let row_ids: Vec<RowId> = names
854 .iter()
855 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
856 .collect();
857 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
858 let payload_array = Arc::new(BinaryArray::from_iter_values(
859 names.iter().map(bitcode::encode),
860 ));
861
862 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
863 self.store.append(&batch)?;
864 Ok(())
865 }
866
867 pub fn get_constraint_records(
869 &self,
870 table_id: TableId,
871 constraint_ids: &[ConstraintId],
872 ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
873 if constraint_ids.is_empty() {
874 return Ok(Vec::new());
875 }
876
877 let lfid = constraint_meta_lfid();
878 let row_ids: Vec<RowId> = constraint_ids
879 .iter()
880 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
881 .collect();
882
883 let batch = match self
884 .store
885 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
886 {
887 Ok(batch) => batch,
888 Err(llkv_result::Error::NotFound) => {
889 return Ok(vec![None; constraint_ids.len()]);
890 }
891 Err(err) => return Err(err),
892 };
893
894 if batch.num_columns() == 0 || batch.num_rows() == 0 {
895 return Ok(vec![None; constraint_ids.len()]);
896 }
897
898 let array = batch
899 .column(0)
900 .as_any()
901 .downcast_ref::<BinaryArray>()
902 .ok_or_else(|| {
903 llkv_result::Error::Internal(
904 "constraint metadata column stored unexpected type".into(),
905 )
906 })?;
907
908 let mut results = Vec::with_capacity(constraint_ids.len());
909 for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
910 if array.is_null(idx) {
911 results.push(None);
912 continue;
913 }
914 let record = decode_constraint_record(array.value(idx))?;
915 if record.constraint_id != constraint_id {
916 return Err(llkv_result::Error::Internal(
917 "constraint metadata id mismatch".into(),
918 ));
919 }
920 results.push(Some(record));
921 }
922
923 Ok(results)
924 }
925
926 pub fn get_constraint_names(
928 &self,
929 table_id: TableId,
930 constraint_ids: &[ConstraintId],
931 ) -> LlkvResult<Vec<Option<String>>> {
932 if constraint_ids.is_empty() {
933 return Ok(Vec::new());
934 }
935
936 let lfid = constraint_name_lfid();
937 let row_ids: Vec<RowId> = constraint_ids
938 .iter()
939 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
940 .collect();
941
942 let batch = match self
943 .store
944 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
945 {
946 Ok(batch) => batch,
947 Err(llkv_result::Error::NotFound) => {
948 return Ok(vec![None; constraint_ids.len()]);
949 }
950 Err(err) => return Err(err),
951 };
952
953 if batch.num_columns() == 0 {
954 return Ok(vec![None; constraint_ids.len()]);
955 }
956
957 let array = batch
958 .column(0)
959 .as_any()
960 .downcast_ref::<BinaryArray>()
961 .ok_or_else(|| {
962 llkv_result::Error::Internal(
963 "constraint name metadata column stored unexpected type".into(),
964 )
965 })?;
966
967 let mut results = Vec::with_capacity(row_ids.len());
968 for idx in 0..row_ids.len() {
969 if array.is_null(idx) {
970 results.push(None);
971 } else {
972 let record: ConstraintNameRecord =
973 bitcode::decode(array.value(idx)).map_err(|err| {
974 llkv_result::Error::Internal(format!(
975 "failed to decode constraint name metadata: {err}"
976 ))
977 })?;
978 results.push(record.name);
979 }
980 }
981
982 Ok(results)
983 }
984
985 pub fn scan_constraint_records_for_table<F>(
987 &self,
988 table_id: TableId,
989 mut on_batch: F,
990 ) -> LlkvResult<()>
991 where
992 F: FnMut(Vec<ConstraintRecord>),
993 {
994 let row_field = constraint_row_lfid();
995 let mut visitor = ConstraintRowCollector {
996 store: self.store,
997 lfid: constraint_meta_lfid(),
998 table_id,
999 on_batch: &mut on_batch,
1000 buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
1001 error: None,
1002 };
1003
1004 match ScanBuilder::new(self.store, row_field)
1005 .options(ScanOptions::default())
1006 .run(&mut visitor)
1007 {
1008 Ok(()) => {}
1009 Err(llkv_result::Error::NotFound) => return Ok(()),
1010 Err(err) => return Err(err),
1011 }
1012
1013 visitor.finish()
1014 }
1015
1016 pub fn constraint_records_for_table(
1018 &self,
1019 table_id: TableId,
1020 ) -> LlkvResult<Vec<ConstraintRecord>> {
1021 let mut all = Vec::new();
1022 self.scan_constraint_records_for_table(table_id, |mut chunk| {
1023 all.append(&mut chunk);
1024 })?;
1025 Ok(all)
1026 }
1027
1028 pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
1029 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
1030 let schema = Arc::new(Schema::new(vec![
1031 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1032 Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1033 crate::constants::FIELD_ID_META_KEY.to_string(),
1034 lfid_val.to_string(),
1035 )])),
1036 ]));
1037
1038 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
1039 let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
1040 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1041 self.store.append(&batch)?;
1042 Ok(())
1043 }
1044
1045 pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
1046 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
1047 let batch = match self.store.gather_rows(
1048 &[lfid],
1049 &[CATALOG_NEXT_TABLE_ROW_ID],
1050 GatherNullPolicy::IncludeNulls,
1051 ) {
1052 Ok(batch) => batch,
1053 Err(llkv_result::Error::NotFound) => return Ok(None),
1054 Err(err) => return Err(err),
1055 };
1056
1057 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1058 return Ok(None);
1059 }
1060
1061 let array = batch
1062 .column(0)
1063 .as_any()
1064 .downcast_ref::<UInt64Array>()
1065 .ok_or_else(|| {
1066 llkv_result::Error::Internal(
1067 "catalog next_table_id column stored unexpected type".into(),
1068 )
1069 })?;
1070 if array.is_empty() || array.is_null(0) {
1071 return Ok(None);
1072 }
1073
1074 let value = array.value(0);
1075 if value > TableId::MAX as u64 {
1076 return Err(llkv_result::Error::InvalidArgumentError(
1077 "persisted next_table_id exceeds TableId range".into(),
1078 ));
1079 }
1080
1081 Ok(Some(value as TableId))
1082 }
1083
1084 pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
1085 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1086 let row_field = rowid_fid(meta_field);
1087
1088 let mut collector = MaxRowIdCollector { max: None };
1089 match ScanBuilder::new(self.store, row_field)
1090 .options(ScanOptions::default())
1091 .run(&mut collector)
1092 {
1093 Ok(()) => {}
1094 Err(llkv_result::Error::NotFound) => return Ok(None),
1095 Err(err) => return Err(err),
1096 }
1097
1098 let max_value = match collector.max {
1099 Some(value) => value,
1100 None => return Ok(None),
1101 };
1102
1103 let logical: LogicalFieldId = max_value.into();
1104 Ok(Some(logical.table_id()))
1105 }
1106
1107 pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
1113 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1114 let row_field = rowid_fid(meta_field);
1115
1116 struct RowIdCollector {
1118 row_ids: Vec<RowId>,
1119 }
1120
1121 impl PrimitiveVisitor for RowIdCollector {
1122 fn u64_chunk(&mut self, values: &UInt64Array) {
1123 for i in 0..values.len() {
1124 self.row_ids.push(values.value(i));
1125 }
1126 }
1127 }
1128 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1129 impl PrimitiveSortedVisitor for RowIdCollector {}
1130 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1131
1132 let mut collector = RowIdCollector {
1133 row_ids: Vec::new(),
1134 };
1135 match ScanBuilder::new(self.store, row_field)
1136 .options(ScanOptions::default())
1137 .run(&mut collector)
1138 {
1139 Ok(()) => {}
1140 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1141 Err(err) => return Err(err),
1142 }
1143
1144 if collector.row_ids.is_empty() {
1145 return Ok(Vec::new());
1146 }
1147
1148 let batch = self.store.gather_rows(
1150 &[meta_field],
1151 &collector.row_ids,
1152 GatherNullPolicy::IncludeNulls,
1153 )?;
1154
1155 let meta_col = batch
1156 .column(0)
1157 .as_any()
1158 .downcast_ref::<BinaryArray>()
1159 .ok_or_else(|| {
1160 llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1161 })?;
1162
1163 let mut result = Vec::new();
1164 for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1165 if !meta_col.is_null(idx) {
1166 let bytes = meta_col.value(idx);
1167 if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1168 let logical: LogicalFieldId = row_id.into();
1169 let table_id = logical.table_id();
1170 result.push((table_id, meta));
1171 }
1172 }
1173 }
1174
1175 Ok(result)
1176 }
1177
1178 pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1180 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1181 let schema = Arc::new(Schema::new(vec![
1182 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1183 Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1184 crate::constants::FIELD_ID_META_KEY.to_string(),
1185 lfid_val.to_string(),
1186 )])),
1187 ]));
1188
1189 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1190 let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1191 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1192 self.store.append(&batch)?;
1193 Ok(())
1194 }
1195
1196 pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1198 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1199 let batch = match self.store.gather_rows(
1200 &[lfid],
1201 &[CATALOG_NEXT_TXN_ROW_ID],
1202 GatherNullPolicy::IncludeNulls,
1203 ) {
1204 Ok(batch) => batch,
1205 Err(llkv_result::Error::NotFound) => return Ok(None),
1206 Err(err) => return Err(err),
1207 };
1208
1209 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1210 return Ok(None);
1211 }
1212
1213 let array = batch
1214 .column(0)
1215 .as_any()
1216 .downcast_ref::<UInt64Array>()
1217 .ok_or_else(|| {
1218 llkv_result::Error::Internal(
1219 "catalog next_txn_id column stored unexpected type".into(),
1220 )
1221 })?;
1222 if array.is_empty() || array.is_null(0) {
1223 return Ok(None);
1224 }
1225
1226 let value = array.value(0);
1227 Ok(Some(value))
1228 }
1229
1230 pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1232 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1233 let schema = Arc::new(Schema::new(vec![
1234 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1235 Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1236 HashMap::from([(
1237 crate::constants::FIELD_ID_META_KEY.to_string(),
1238 lfid_val.to_string(),
1239 )]),
1240 ),
1241 ]));
1242
1243 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1244 let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1245 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1246 self.store.append(&batch)?;
1247 Ok(())
1248 }
1249
1250 pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1252 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1253 let batch = match self.store.gather_rows(
1254 &[lfid],
1255 &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1256 GatherNullPolicy::IncludeNulls,
1257 ) {
1258 Ok(batch) => batch,
1259 Err(llkv_result::Error::NotFound) => return Ok(None),
1260 Err(err) => return Err(err),
1261 };
1262
1263 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1264 return Ok(None);
1265 }
1266
1267 let array = batch
1268 .column(0)
1269 .as_any()
1270 .downcast_ref::<UInt64Array>()
1271 .ok_or_else(|| {
1272 llkv_result::Error::Internal(
1273 "catalog last_committed_txn_id column stored unexpected type".into(),
1274 )
1275 })?;
1276 if array.is_empty() || array.is_null(0) {
1277 return Ok(None);
1278 }
1279
1280 let value = array.value(0);
1281 Ok(Some(value))
1282 }
1283
1284 pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1289 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1290 let schema = Arc::new(Schema::new(vec![
1291 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1292 Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1293 crate::constants::FIELD_ID_META_KEY.to_string(),
1294 lfid_val.to_string(),
1295 )])),
1296 ]));
1297
1298 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1299 let encoded = bitcode::encode(state);
1300 let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1301
1302 let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1303 self.store.append(&batch)?;
1304 Ok(())
1305 }
1306
1307 pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1311 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1312 let batch = match self.store.gather_rows(
1313 &[lfid],
1314 &[CATALOG_STATE_ROW_ID],
1315 GatherNullPolicy::IncludeNulls,
1316 ) {
1317 Ok(batch) => batch,
1318 Err(llkv_result::Error::NotFound) => return Ok(None),
1319 Err(err) => return Err(err),
1320 };
1321
1322 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1323 return Ok(None);
1324 }
1325
1326 let array = batch
1327 .column(0)
1328 .as_any()
1329 .downcast_ref::<BinaryArray>()
1330 .ok_or_else(|| {
1331 llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1332 })?;
1333 if array.is_empty() || array.is_null(0) {
1334 return Ok(None);
1335 }
1336
1337 let bytes = array.value(0);
1338 let state = bitcode::decode(bytes).map_err(|e| {
1339 llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1340 })?;
1341 Ok(Some(state))
1342 }
1343
1344 pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1349 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1350 let schema = Arc::new(Schema::new(vec![
1351 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1352 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1353 crate::constants::FIELD_ID_META_KEY.to_string(),
1354 lfid_val.to_string(),
1355 )])),
1356 ]));
1357
1358 let canonical = meta.name.to_ascii_lowercase();
1360 let row_id_val = schema_name_to_row_id(&canonical);
1361 let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1362 let meta_encoded = bitcode::encode(meta);
1363 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1364
1365 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1366 self.store.append(&batch)?;
1367 Ok(())
1368 }
1369
1370 pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1374 let canonical = schema_name.to_ascii_lowercase();
1375 let row_id = schema_name_to_row_id(&canonical);
1376 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1377
1378 let batch = match self
1379 .store
1380 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1381 {
1382 Ok(batch) => batch,
1383 Err(llkv_result::Error::NotFound) => return Ok(None),
1384 Err(err) => return Err(err),
1385 };
1386
1387 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1388 return Ok(None);
1389 }
1390
1391 let array = batch
1392 .column(0)
1393 .as_any()
1394 .downcast_ref::<BinaryArray>()
1395 .ok_or_else(|| {
1396 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1397 })?;
1398
1399 if array.is_empty() || array.is_null(0) {
1400 return Ok(None);
1401 }
1402
1403 let bytes = array.value(0);
1404 let meta = bitcode::decode(bytes).map_err(|e| {
1405 llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1406 })?;
1407 Ok(Some(meta))
1408 }
1409
1410 pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1414 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1415 let row_field = rowid_fid(meta_field);
1416
1417 struct RowIdCollector {
1419 row_ids: Vec<RowId>,
1420 }
1421
1422 impl PrimitiveVisitor for RowIdCollector {
1423 fn u64_chunk(&mut self, values: &UInt64Array) {
1424 for i in 0..values.len() {
1425 self.row_ids.push(values.value(i));
1426 }
1427 }
1428 }
1429 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1430 impl PrimitiveSortedVisitor for RowIdCollector {}
1431 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1432
1433 let mut collector = RowIdCollector {
1434 row_ids: Vec::new(),
1435 };
1436 match ScanBuilder::new(self.store, row_field)
1437 .options(ScanOptions::default())
1438 .run(&mut collector)
1439 {
1440 Ok(()) => {}
1441 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1442 Err(err) => return Err(err),
1443 }
1444
1445 if collector.row_ids.is_empty() {
1446 return Ok(Vec::new());
1447 }
1448
1449 let batch = self.store.gather_rows(
1451 &[meta_field],
1452 &collector.row_ids,
1453 GatherNullPolicy::IncludeNulls,
1454 )?;
1455
1456 let meta_col = batch
1457 .column(0)
1458 .as_any()
1459 .downcast_ref::<BinaryArray>()
1460 .ok_or_else(|| {
1461 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1462 })?;
1463
1464 let mut result = Vec::new();
1465 for idx in 0..collector.row_ids.len() {
1466 if !meta_col.is_null(idx) {
1467 let bytes = meta_col.value(idx);
1468 if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1469 result.push(meta);
1470 }
1471 }
1472 }
1473
1474 Ok(result)
1475 }
1476
1477 pub fn put_custom_type_meta(&self, meta: &CustomTypeMeta) -> LlkvResult<()> {
1481 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID).into();
1482 let schema = Arc::new(Schema::new(vec![
1483 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1484 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1485 crate::constants::FIELD_ID_META_KEY.to_string(),
1486 lfid_val.to_string(),
1487 )])),
1488 ]));
1489
1490 let canonical = meta.name.to_ascii_lowercase();
1492 let row_id_val = schema_name_to_row_id(&canonical); let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1494 let meta_encoded = bitcode::encode(meta);
1495 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1496
1497 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1498 self.store.append(&batch)?;
1499 Ok(())
1500 }
1501
1502 pub fn get_custom_type_meta(&self, type_name: &str) -> LlkvResult<Option<CustomTypeMeta>> {
1506 let canonical = type_name.to_ascii_lowercase();
1507 let row_id = schema_name_to_row_id(&canonical);
1508 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1509
1510 let batch = match self
1511 .store
1512 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1513 {
1514 Ok(batch) => batch,
1515 Err(llkv_result::Error::NotFound) => return Ok(None),
1516 Err(err) => return Err(err),
1517 };
1518
1519 let meta_col = batch
1520 .column(0)
1521 .as_any()
1522 .downcast_ref::<BinaryArray>()
1523 .ok_or_else(|| {
1524 llkv_result::Error::Internal(
1525 "catalog custom_type_meta column should be Binary".into(),
1526 )
1527 })?;
1528
1529 if meta_col.is_null(0) {
1530 return Ok(None);
1531 }
1532
1533 let bytes = meta_col.value(0);
1534 let meta = bitcode::decode(bytes).map_err(|err| {
1535 llkv_result::Error::Internal(format!("failed to decode custom type metadata: {err}"))
1536 })?;
1537 Ok(Some(meta))
1538 }
1539
1540 pub fn delete_custom_type_meta(&self, type_name: &str) -> LlkvResult<()> {
1544 let canonical = type_name.to_ascii_lowercase();
1545 let row_id = schema_name_to_row_id(&canonical);
1546 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1547
1548 let lfid_val: u64 = lfid.into();
1550 let schema = Arc::new(Schema::new(vec![
1551 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1552 Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
1553 crate::constants::FIELD_ID_META_KEY.to_string(),
1554 lfid_val.to_string(),
1555 )])),
1556 ]));
1557
1558 let row_id_arr = Arc::new(UInt64Array::from(vec![row_id]));
1559 let mut meta_builder = BinaryBuilder::new();
1560 meta_builder.append_null();
1561 let meta_arr = Arc::new(meta_builder.finish());
1562
1563 let batch = RecordBatch::try_new(schema, vec![row_id_arr, meta_arr])?;
1564 self.store.append(&batch)?;
1565 Ok(())
1566 }
1567
1568 pub fn all_custom_type_metas(&self) -> LlkvResult<Vec<CustomTypeMeta>> {
1572 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1573 let row_field = rowid_fid(meta_field);
1574
1575 struct RowIdCollector {
1577 row_ids: Vec<RowId>,
1578 }
1579
1580 impl PrimitiveVisitor for RowIdCollector {
1581 fn u64_chunk(&mut self, values: &UInt64Array) {
1582 for i in 0..values.len() {
1583 self.row_ids.push(values.value(i));
1584 }
1585 }
1586 }
1587 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1588 impl PrimitiveSortedVisitor for RowIdCollector {}
1589 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1590
1591 let mut collector = RowIdCollector {
1592 row_ids: Vec::new(),
1593 };
1594 match ScanBuilder::new(self.store, row_field)
1595 .options(ScanOptions::default())
1596 .run(&mut collector)
1597 {
1598 Ok(()) => {}
1599 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1600 Err(err) => return Err(err),
1601 }
1602
1603 if collector.row_ids.is_empty() {
1604 return Ok(Vec::new());
1605 }
1606
1607 let batch = self.store.gather_rows(
1609 &[meta_field],
1610 &collector.row_ids,
1611 GatherNullPolicy::IncludeNulls,
1612 )?;
1613
1614 let meta_col = batch
1615 .column(0)
1616 .as_any()
1617 .downcast_ref::<BinaryArray>()
1618 .ok_or_else(|| {
1619 llkv_result::Error::Internal(
1620 "catalog custom_type_meta column should be Binary".into(),
1621 )
1622 })?;
1623
1624 let mut result = Vec::new();
1625 for idx in 0..collector.row_ids.len() {
1626 if !meta_col.is_null(idx) {
1627 let bytes = meta_col.value(idx);
1628 if let Ok(meta) = bitcode::decode::<CustomTypeMeta>(bytes) {
1629 result.push(meta);
1630 }
1631 }
1632 }
1633
1634 Ok(result)
1635 }
1636}
1637
1638fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1643 const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1645 const FNV_PRIME: u64 = 0x1000_0000_01b3;
1646
1647 let mut hash = FNV_OFFSET;
1648 for byte in canonical_name.as_bytes() {
1649 hash ^= u64::from(*byte);
1650 hash = hash.wrapping_mul(FNV_PRIME);
1651 }
1652
1653 hash | (1u64 << 63)
1655}
1656
1657struct MaxRowIdCollector {
1658 max: Option<RowId>,
1659}
1660
1661impl PrimitiveVisitor for MaxRowIdCollector {
1662 fn u64_chunk(&mut self, values: &UInt64Array) {
1663 for i in 0..values.len() {
1664 let value = values.value(i);
1665 self.max = match self.max {
1666 Some(curr) if curr >= value => Some(curr),
1667 _ => Some(value),
1668 };
1669 }
1670 }
1671}
1672
1673impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1674impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1675impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1676
1677#[cfg(test)]
1678mod tests {
1679 use super::*;
1680 use crate::constraints::{
1681 ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1682 };
1683 use llkv_column_map::ColumnStore;
1684 use std::sync::Arc;
1685
1686 #[test]
1687 fn constraint_records_roundtrip() {
1688 let pager = Arc::new(MemPager::default());
1689 let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1690 let catalog = SysCatalog::new(&store);
1691
1692 let table_id: TableId = 42;
1693 let record1 = ConstraintRecord {
1694 constraint_id: 1,
1695 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1696 field_ids: vec![1, 2],
1697 }),
1698 state: ConstraintState::Active,
1699 revision: 1,
1700 last_modified_micros: 100,
1701 };
1702 let record2 = ConstraintRecord {
1703 constraint_id: 2,
1704 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1705 state: ConstraintState::Active,
1706 revision: 2,
1707 last_modified_micros: 200,
1708 };
1709 catalog
1710 .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1711 .unwrap();
1712
1713 let other_table_record = ConstraintRecord {
1714 constraint_id: 1,
1715 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1716 state: ConstraintState::Active,
1717 revision: 1,
1718 last_modified_micros: 150,
1719 };
1720 catalog
1721 .put_constraint_records(7, &[other_table_record])
1722 .unwrap();
1723
1724 let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1725 fetched.sort_by_key(|record| record.constraint_id);
1726
1727 assert_eq!(fetched.len(), 2);
1728 assert_eq!(fetched[0], record1);
1729 assert_eq!(fetched[1], record2);
1730
1731 let single = catalog
1732 .get_constraint_records(table_id, &[record1.constraint_id])
1733 .unwrap();
1734 assert_eq!(single.len(), 1);
1735 assert_eq!(single[0].as_ref(), Some(&record1));
1736
1737 let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1738 assert_eq!(missing.len(), 1);
1739 assert!(missing[0].is_none());
1740 }
1741}