1use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37 ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, ROW_ID_FIELD_ID, RowId, TableId};
40use llkv_column_map::store::scan::{
41 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::types::LogicalFieldId;
46use llkv_column_map::{
47 ColumnStore,
48 store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
49};
50use llkv_result::{self, Result as LlkvResult};
51use llkv_storage::pager::{MemPager, Pager};
52use simd_r_drive_entry_handle::EntryHandle;
53
54use crate::reserved::*;
56
57#[inline]
61fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
62 LogicalFieldId::for_user(table_id, col_id)
63}
64
65#[inline]
67fn rid_table(table_id: TableId) -> u64 {
68 LogicalFieldId::for_user(table_id, ROW_ID_FIELD_ID).into()
69}
70
71#[inline]
73fn rid_col(table_id: TableId, col_id: u32) -> u64 {
74 rowid_fid(lfid(table_id, col_id)).into()
75}
76
77#[inline]
78fn constraint_meta_lfid() -> LogicalFieldId {
79 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
80}
81
82#[inline]
83fn constraint_name_lfid() -> LogicalFieldId {
84 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
85}
86
87#[inline]
88fn constraint_row_lfid() -> LogicalFieldId {
89 rowid_fid(constraint_meta_lfid())
90}
91
92#[derive(Clone, Debug, Encode, Decode)]
93pub struct ConstraintNameRecord {
94 pub constraint_id: ConstraintId,
95 pub name: Option<String>,
96}
97
98fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
99 bitcode::decode(bytes).map_err(|err| {
100 llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
101 })
102}
103
104struct ConstraintRowCollector<'a, P, F>
105where
106 P: Pager<Blob = EntryHandle> + Send + Sync,
107 F: FnMut(Vec<ConstraintRecord>),
108{
109 store: &'a ColumnStore<P>,
110 lfid: LogicalFieldId,
111 table_id: TableId,
112 on_batch: &'a mut F,
113 buffer: Vec<RowId>,
114 error: Option<llkv_result::Error>,
115}
116
117impl<'a, P, F> ConstraintRowCollector<'a, P, F>
118where
119 P: Pager<Blob = EntryHandle> + Send + Sync,
120 F: FnMut(Vec<ConstraintRecord>),
121{
122 fn flush_buffer(&mut self) -> LlkvResult<()> {
123 if self.buffer.is_empty() {
124 return Ok(());
125 }
126
127 let row_ids = mem::take(&mut self.buffer);
128 let batch =
129 self.store
130 .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
131
132 if batch.num_columns() == 0 {
133 return Ok(());
134 }
135
136 let array = batch
137 .column(0)
138 .as_any()
139 .downcast_ref::<BinaryArray>()
140 .ok_or_else(|| {
141 llkv_result::Error::Internal(
142 "constraint metadata column stored unexpected type".into(),
143 )
144 })?;
145
146 let mut records = Vec::with_capacity(row_ids.len());
147 for (idx, row_id) in row_ids.into_iter().enumerate() {
148 if array.is_null(idx) {
149 continue;
150 }
151
152 let record = decode_constraint_record(array.value(idx))?;
153 let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
154 if table_from_id != self.table_id {
155 continue;
156 }
157 if record.constraint_id != constraint_id {
158 return Err(llkv_result::Error::Internal(
159 "constraint metadata id mismatch".into(),
160 ));
161 }
162 records.push(record);
163 }
164
165 if !records.is_empty() {
166 (self.on_batch)(records);
167 }
168
169 Ok(())
170 }
171
172 fn finish(&mut self) -> LlkvResult<()> {
173 if let Some(err) = self.error.take() {
174 return Err(err);
175 }
176 self.flush_buffer()
177 }
178}
179
180impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
181where
182 P: Pager<Blob = EntryHandle> + Send + Sync,
183 F: FnMut(Vec<ConstraintRecord>),
184{
185 fn u64_chunk(&mut self, values: &UInt64Array) {
186 if self.error.is_some() {
187 return;
188 }
189
190 for idx in 0..values.len() {
191 let row_id = values.value(idx);
192 let (table_id, _) = decode_constraint_row_id(row_id);
193 if table_id != self.table_id {
194 continue;
195 }
196 self.buffer.push(row_id);
197 if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
198 && let Err(err) = self.flush_buffer()
199 {
200 self.error = Some(err);
201 return;
202 }
203 }
204 }
205}
206
207impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
208where
209 P: Pager<Blob = EntryHandle> + Send + Sync,
210 F: FnMut(Vec<ConstraintRecord>),
211{
212}
213
214impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
215where
216 P: Pager<Blob = EntryHandle> + Send + Sync,
217 F: FnMut(Vec<ConstraintRecord>),
218{
219}
220
221impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
222where
223 P: Pager<Blob = EntryHandle> + Send + Sync,
224 F: FnMut(Vec<ConstraintRecord>),
225{
226}
227
228#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
234pub struct TableMeta {
235 pub table_id: TableId,
237 pub name: Option<String>,
239 pub created_at_micros: u64,
241 pub flags: u32,
243 pub epoch: u64,
245 pub view_definition: Option<String>,
248}
249
250#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
254pub struct ColMeta {
255 pub col_id: u32,
257 pub name: Option<String>,
259 pub flags: u32,
261 pub default: Option<Vec<u8>>,
263}
264
265#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
269pub struct SchemaMeta {
270 pub name: String,
272 pub created_at_micros: u64,
274 pub flags: u32,
276}
277
278#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
283pub struct CustomTypeMeta {
284 pub name: String,
286 pub base_type_sql: String,
289 pub created_at_micros: u64,
291}
292
293#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
295pub struct MultiColumnUniqueEntryMeta {
296 pub index_name: Option<String>,
298 pub column_ids: Vec<u32>,
300}
301
302#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
304pub struct SingleColumnIndexEntryMeta {
305 pub index_name: String,
307 pub canonical_name: String,
309 pub column_id: FieldId,
311 pub column_name: String,
313 pub unique: bool,
315}
316
317#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
319pub struct TableSingleColumnIndexMeta {
320 pub table_id: TableId,
322 pub indexes: Vec<SingleColumnIndexEntryMeta>,
324}
325
326#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
328pub struct TableMultiColumnUniqueMeta {
329 pub table_id: TableId,
331 pub uniques: Vec<MultiColumnUniqueEntryMeta>,
333}
334
335pub struct SysCatalog<'a, P = MemPager>
348where
349 P: Pager<Blob = EntryHandle> + Send + Sync,
350{
351 store: &'a ColumnStore<P>,
352}
353
354impl<'a, P> SysCatalog<'a, P>
355where
356 P: Pager<Blob = EntryHandle> + Send + Sync,
357{
358 fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
359 if row_ids.is_empty() {
360 return Ok(());
361 }
362
363 let lfid_val: u64 = meta_field.into();
364 let schema = Arc::new(Schema::new(vec![
365 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
366 Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
367 crate::constants::FIELD_ID_META_KEY.to_string(),
368 lfid_val.to_string(),
369 )])),
370 ]));
371
372 let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
373 let mut builder = BinaryBuilder::new();
374 for _ in row_ids {
375 builder.append_null();
376 }
377 let meta_array = Arc::new(builder.finish());
378
379 let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
380 self.store.append(&batch)?;
381 Ok(())
382 }
383
384 pub fn new(store: &'a ColumnStore<P>) -> Self {
386 Self { store }
387 }
388
389 pub fn put_table_meta(&self, meta: &TableMeta) {
394 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
395 let schema = Arc::new(Schema::new(vec![
396 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
397 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
398 crate::constants::FIELD_ID_META_KEY.to_string(),
399 lfid_val.to_string(),
400 )])),
401 ]));
402
403 let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
404 let meta_encoded = bitcode::encode(meta);
405 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
406
407 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
408 self.store.append(&batch).unwrap();
409 }
410
411 pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
415 let row_id = rid_table(table_id);
416 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
417 let batch = self
418 .store
419 .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
420 .ok()?;
421
422 if batch.num_rows() == 0 || batch.num_columns() == 0 {
423 return None;
424 }
425
426 let array = batch
427 .column(0)
428 .as_any()
429 .downcast_ref::<BinaryArray>()
430 .expect("table meta column must be BinaryArray");
431
432 if array.is_null(0) {
433 return None;
434 }
435
436 bitcode::decode(array.value(0)).ok()
437 }
438
439 pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
441 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
442 let schema = Arc::new(Schema::new(vec![
443 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
444 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
445 crate::constants::FIELD_ID_META_KEY.to_string(),
446 lfid_val.to_string(),
447 )])),
448 ]));
449
450 let rid_value = rid_col(table_id, meta.col_id);
451 let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
452 let meta_encoded = bitcode::encode(meta);
453 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
454
455 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
456 self.store.append(&batch).unwrap();
457 }
458
459 pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
461 if col_ids.is_empty() {
462 return Vec::new();
463 }
464
465 let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
466 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
467
468 let batch =
469 match self
470 .store
471 .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
472 {
473 Ok(batch) => batch,
474 Err(_) => return vec![None; col_ids.len()],
475 };
476
477 let meta_col = batch
478 .column(0)
479 .as_any()
480 .downcast_ref::<BinaryArray>()
481 .expect("catalog meta column should be Binary");
482
483 col_ids
484 .iter()
485 .enumerate()
486 .map(|(idx, _)| {
487 if meta_col.is_null(idx) {
488 None
489 } else {
490 bitcode::decode(meta_col.value(idx)).ok()
491 }
492 })
493 .collect()
494 }
495
496 pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
498 if col_ids.is_empty() {
499 return Ok(());
500 }
501
502 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
503 let row_ids: Vec<RowId> = col_ids
504 .iter()
505 .map(|&col_id| rid_col(table_id, col_id))
506 .collect();
507 self.write_null_entries(meta_field, &row_ids)
508 }
509
510 pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
512 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
513 let row_id = rid_table(table_id);
514 self.write_null_entries(meta_field, &[row_id])
515 }
516
517 pub fn delete_constraint_records(
519 &self,
520 table_id: TableId,
521 constraint_ids: &[ConstraintId],
522 ) -> LlkvResult<()> {
523 if constraint_ids.is_empty() {
524 return Ok(());
525 }
526
527 let meta_field = constraint_meta_lfid();
528 let row_ids: Vec<RowId> = constraint_ids
529 .iter()
530 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
531 .collect();
532 self.write_null_entries(meta_field, &row_ids)
533 }
534
535 pub fn delete_constraint_names(
537 &self,
538 table_id: TableId,
539 constraint_ids: &[ConstraintId],
540 ) -> LlkvResult<()> {
541 if constraint_ids.is_empty() {
542 return Ok(());
543 }
544
545 let lfid = constraint_name_lfid();
546 let row_ids: Vec<RowId> = constraint_ids
547 .iter()
548 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
549 .collect();
550 self.write_null_entries(lfid, &row_ids)
551 }
552
553 pub fn delete_multi_column_uniques(&self, table_id: TableId) -> LlkvResult<()> {
555 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
556 let row_id = rid_table(table_id);
557 self.write_null_entries(meta_field, &[row_id])
558 }
559
560 pub fn delete_single_column_indexes(&self, table_id: TableId) -> LlkvResult<()> {
562 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
563 let row_id = rid_table(table_id);
564 self.write_null_entries(meta_field, &[row_id])
565 }
566
567 pub fn put_multi_column_uniques(
569 &self,
570 table_id: TableId,
571 uniques: &[MultiColumnUniqueEntryMeta],
572 ) -> LlkvResult<()> {
573 let lfid_val: u64 =
574 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
575 let schema = Arc::new(Schema::new(vec![
576 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
577 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
578 crate::constants::FIELD_ID_META_KEY.to_string(),
579 lfid_val.to_string(),
580 )])),
581 ]));
582
583 let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
584 let meta = TableMultiColumnUniqueMeta {
585 table_id,
586 uniques: uniques.to_vec(),
587 };
588 let encoded = bitcode::encode(&meta);
589 let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
590
591 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
592 self.store.append(&batch)?;
593 Ok(())
594 }
595
596 pub fn put_single_column_indexes(
598 &self,
599 table_id: TableId,
600 indexes: &[SingleColumnIndexEntryMeta],
601 ) -> LlkvResult<()> {
602 let lfid_val: u64 =
603 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID).into();
604 let schema = Arc::new(Schema::new(vec![
605 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
606 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
607 crate::constants::FIELD_ID_META_KEY.to_string(),
608 lfid_val.to_string(),
609 )])),
610 ]));
611
612 let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
613 let meta = TableSingleColumnIndexMeta {
614 table_id,
615 indexes: indexes.to_vec(),
616 };
617 let encoded = bitcode::encode(&meta);
618 let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
619
620 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
621 self.store.append(&batch)?;
622 Ok(())
623 }
624
625 pub fn get_multi_column_uniques(
627 &self,
628 table_id: TableId,
629 ) -> LlkvResult<Vec<MultiColumnUniqueEntryMeta>> {
630 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
631 let row_id = rid_table(table_id);
632 let batch = match self
633 .store
634 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
635 {
636 Ok(batch) => batch,
637 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
638 Err(err) => return Err(err),
639 };
640
641 if batch.num_columns() == 0 || batch.num_rows() == 0 {
642 return Ok(Vec::new());
643 }
644
645 let array = batch
646 .column(0)
647 .as_any()
648 .downcast_ref::<BinaryArray>()
649 .ok_or_else(|| {
650 llkv_result::Error::Internal(
651 "catalog multi-column unique column stored unexpected type".into(),
652 )
653 })?;
654
655 if array.is_null(0) {
656 return Ok(Vec::new());
657 }
658
659 let meta: TableMultiColumnUniqueMeta = bitcode::decode(array.value(0)).map_err(|err| {
660 llkv_result::Error::Internal(format!(
661 "failed to decode multi-column unique metadata: {err}"
662 ))
663 })?;
664
665 Ok(meta.uniques)
666 }
667
668 pub fn get_single_column_indexes(
670 &self,
671 table_id: TableId,
672 ) -> LlkvResult<Vec<SingleColumnIndexEntryMeta>> {
673 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SINGLE_COLUMN_INDEX_META_ID);
674 let row_id = rid_table(table_id);
675 let batch = match self
676 .store
677 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
678 {
679 Ok(batch) => batch,
680 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
681 Err(err) => return Err(err),
682 };
683
684 if batch.num_columns() == 0 || batch.num_rows() == 0 {
685 return Ok(Vec::new());
686 }
687
688 let array = batch
689 .column(0)
690 .as_any()
691 .downcast_ref::<BinaryArray>()
692 .ok_or_else(|| {
693 llkv_result::Error::Internal(
694 "catalog single-column index column stored unexpected type".into(),
695 )
696 })?;
697
698 if array.is_null(0) {
699 return Ok(Vec::new());
700 }
701
702 let meta: TableSingleColumnIndexMeta = bitcode::decode(array.value(0)).map_err(|err| {
703 llkv_result::Error::Internal(format!(
704 "failed to decode single-column index metadata: {err}"
705 ))
706 })?;
707
708 Ok(meta.indexes)
709 }
710
711 pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnUniqueMeta>> {
713 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
714 let row_field = rowid_fid(meta_field);
715
716 struct RowIdCollector {
717 row_ids: Vec<RowId>,
718 }
719
720 impl PrimitiveVisitor for RowIdCollector {
721 fn u64_chunk(&mut self, values: &UInt64Array) {
722 for i in 0..values.len() {
723 self.row_ids.push(values.value(i));
724 }
725 }
726 }
727 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
728 impl PrimitiveSortedVisitor for RowIdCollector {}
729 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
730
731 let mut collector = RowIdCollector {
732 row_ids: Vec::new(),
733 };
734 match ScanBuilder::new(self.store, row_field)
735 .options(ScanOptions::default())
736 .run(&mut collector)
737 {
738 Ok(()) => {}
739 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
740 Err(err) => return Err(err),
741 }
742
743 if collector.row_ids.is_empty() {
744 return Ok(Vec::new());
745 }
746
747 let batch = match self.store.gather_rows(
748 &[meta_field],
749 &collector.row_ids,
750 GatherNullPolicy::IncludeNulls,
751 ) {
752 Ok(batch) => batch,
753 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
754 Err(err) => return Err(err),
755 };
756
757 if batch.num_columns() == 0 {
758 return Ok(Vec::new());
759 }
760
761 let array = batch
762 .column(0)
763 .as_any()
764 .downcast_ref::<BinaryArray>()
765 .ok_or_else(|| {
766 llkv_result::Error::Internal(
767 "catalog multi-column unique column stored unexpected type".into(),
768 )
769 })?;
770
771 let mut metas = Vec::with_capacity(batch.num_rows());
772 for idx in 0..batch.num_rows() {
773 if array.is_null(idx) {
774 continue;
775 }
776 let meta: TableMultiColumnUniqueMeta =
777 bitcode::decode(array.value(idx)).map_err(|err| {
778 llkv_result::Error::Internal(format!(
779 "failed to decode multi-column unique metadata: {err}"
780 ))
781 })?;
782 metas.push(meta);
783 }
784
785 Ok(metas)
786 }
787
788 pub fn put_constraint_records(
790 &self,
791 table_id: TableId,
792 records: &[ConstraintRecord],
793 ) -> LlkvResult<()> {
794 if records.is_empty() {
795 return Ok(());
796 }
797
798 let lfid_val: u64 = constraint_meta_lfid().into();
799 let schema = Arc::new(Schema::new(vec![
800 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
801 Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
802 crate::constants::FIELD_ID_META_KEY.to_string(),
803 lfid_val.to_string(),
804 )])),
805 ]));
806
807 let row_ids: Vec<RowId> = records
808 .iter()
809 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
810 .collect();
811
812 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
813 let payload_array = Arc::new(BinaryArray::from_iter_values(
814 records.iter().map(bitcode::encode),
815 ));
816
817 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
818 self.store.append(&batch)?;
819 Ok(())
820 }
821
822 pub fn put_constraint_names(
824 &self,
825 table_id: TableId,
826 names: &[ConstraintNameRecord],
827 ) -> LlkvResult<()> {
828 if names.is_empty() {
829 return Ok(());
830 }
831
832 let lfid_val: u64 = constraint_name_lfid().into();
833 let schema = Arc::new(Schema::new(vec![
834 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
835 Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
836 (
837 crate::constants::FIELD_ID_META_KEY.to_string(),
838 lfid_val.to_string(),
839 ),
840 ])),
841 ]));
842
843 let row_ids: Vec<RowId> = names
844 .iter()
845 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
846 .collect();
847 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
848 let payload_array = Arc::new(BinaryArray::from_iter_values(
849 names.iter().map(bitcode::encode),
850 ));
851
852 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
853 self.store.append(&batch)?;
854 Ok(())
855 }
856
857 pub fn get_constraint_records(
859 &self,
860 table_id: TableId,
861 constraint_ids: &[ConstraintId],
862 ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
863 if constraint_ids.is_empty() {
864 return Ok(Vec::new());
865 }
866
867 let lfid = constraint_meta_lfid();
868 let row_ids: Vec<RowId> = constraint_ids
869 .iter()
870 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
871 .collect();
872
873 let batch = match self
874 .store
875 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
876 {
877 Ok(batch) => batch,
878 Err(llkv_result::Error::NotFound) => {
879 return Ok(vec![None; constraint_ids.len()]);
880 }
881 Err(err) => return Err(err),
882 };
883
884 if batch.num_columns() == 0 || batch.num_rows() == 0 {
885 return Ok(vec![None; constraint_ids.len()]);
886 }
887
888 let array = batch
889 .column(0)
890 .as_any()
891 .downcast_ref::<BinaryArray>()
892 .ok_or_else(|| {
893 llkv_result::Error::Internal(
894 "constraint metadata column stored unexpected type".into(),
895 )
896 })?;
897
898 let mut results = Vec::with_capacity(constraint_ids.len());
899 for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
900 if array.is_null(idx) {
901 results.push(None);
902 continue;
903 }
904 let record = decode_constraint_record(array.value(idx))?;
905 if record.constraint_id != constraint_id {
906 return Err(llkv_result::Error::Internal(
907 "constraint metadata id mismatch".into(),
908 ));
909 }
910 results.push(Some(record));
911 }
912
913 Ok(results)
914 }
915
916 pub fn get_constraint_names(
918 &self,
919 table_id: TableId,
920 constraint_ids: &[ConstraintId],
921 ) -> LlkvResult<Vec<Option<String>>> {
922 if constraint_ids.is_empty() {
923 return Ok(Vec::new());
924 }
925
926 let lfid = constraint_name_lfid();
927 let row_ids: Vec<RowId> = constraint_ids
928 .iter()
929 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
930 .collect();
931
932 let batch = match self
933 .store
934 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
935 {
936 Ok(batch) => batch,
937 Err(llkv_result::Error::NotFound) => {
938 return Ok(vec![None; constraint_ids.len()]);
939 }
940 Err(err) => return Err(err),
941 };
942
943 if batch.num_columns() == 0 {
944 return Ok(vec![None; constraint_ids.len()]);
945 }
946
947 let array = batch
948 .column(0)
949 .as_any()
950 .downcast_ref::<BinaryArray>()
951 .ok_or_else(|| {
952 llkv_result::Error::Internal(
953 "constraint name metadata column stored unexpected type".into(),
954 )
955 })?;
956
957 let mut results = Vec::with_capacity(row_ids.len());
958 for idx in 0..row_ids.len() {
959 if array.is_null(idx) {
960 results.push(None);
961 } else {
962 let record: ConstraintNameRecord =
963 bitcode::decode(array.value(idx)).map_err(|err| {
964 llkv_result::Error::Internal(format!(
965 "failed to decode constraint name metadata: {err}"
966 ))
967 })?;
968 results.push(record.name);
969 }
970 }
971
972 Ok(results)
973 }
974
975 pub fn scan_constraint_records_for_table<F>(
977 &self,
978 table_id: TableId,
979 mut on_batch: F,
980 ) -> LlkvResult<()>
981 where
982 F: FnMut(Vec<ConstraintRecord>),
983 {
984 let row_field = constraint_row_lfid();
985 let mut visitor = ConstraintRowCollector {
986 store: self.store,
987 lfid: constraint_meta_lfid(),
988 table_id,
989 on_batch: &mut on_batch,
990 buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
991 error: None,
992 };
993
994 match ScanBuilder::new(self.store, row_field)
995 .options(ScanOptions::default())
996 .run(&mut visitor)
997 {
998 Ok(()) => {}
999 Err(llkv_result::Error::NotFound) => return Ok(()),
1000 Err(err) => return Err(err),
1001 }
1002
1003 visitor.finish()
1004 }
1005
1006 pub fn constraint_records_for_table(
1008 &self,
1009 table_id: TableId,
1010 ) -> LlkvResult<Vec<ConstraintRecord>> {
1011 let mut all = Vec::new();
1012 self.scan_constraint_records_for_table(table_id, |mut chunk| {
1013 all.append(&mut chunk);
1014 })?;
1015 Ok(all)
1016 }
1017
1018 pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
1019 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
1020 let schema = Arc::new(Schema::new(vec![
1021 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1022 Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1023 crate::constants::FIELD_ID_META_KEY.to_string(),
1024 lfid_val.to_string(),
1025 )])),
1026 ]));
1027
1028 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
1029 let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
1030 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1031 self.store.append(&batch)?;
1032 Ok(())
1033 }
1034
1035 pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
1036 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
1037 let batch = match self.store.gather_rows(
1038 &[lfid],
1039 &[CATALOG_NEXT_TABLE_ROW_ID],
1040 GatherNullPolicy::IncludeNulls,
1041 ) {
1042 Ok(batch) => batch,
1043 Err(llkv_result::Error::NotFound) => return Ok(None),
1044 Err(err) => return Err(err),
1045 };
1046
1047 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1048 return Ok(None);
1049 }
1050
1051 let array = batch
1052 .column(0)
1053 .as_any()
1054 .downcast_ref::<UInt64Array>()
1055 .ok_or_else(|| {
1056 llkv_result::Error::Internal(
1057 "catalog next_table_id column stored unexpected type".into(),
1058 )
1059 })?;
1060 if array.is_empty() || array.is_null(0) {
1061 return Ok(None);
1062 }
1063
1064 let value = array.value(0);
1065 if value > TableId::MAX as u64 {
1066 return Err(llkv_result::Error::InvalidArgumentError(
1067 "persisted next_table_id exceeds TableId range".into(),
1068 ));
1069 }
1070
1071 Ok(Some(value as TableId))
1072 }
1073
1074 pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
1075 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1076 let row_field = rowid_fid(meta_field);
1077
1078 let mut collector = MaxRowIdCollector { max: None };
1079 match ScanBuilder::new(self.store, row_field)
1080 .options(ScanOptions::default())
1081 .run(&mut collector)
1082 {
1083 Ok(()) => {}
1084 Err(llkv_result::Error::NotFound) => return Ok(None),
1085 Err(err) => return Err(err),
1086 }
1087
1088 let max_value = match collector.max {
1089 Some(value) => value,
1090 None => return Ok(None),
1091 };
1092
1093 let logical: LogicalFieldId = max_value.into();
1094 Ok(Some(logical.table_id()))
1095 }
1096
1097 pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
1103 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
1104 let row_field = rowid_fid(meta_field);
1105
1106 struct RowIdCollector {
1108 row_ids: Vec<RowId>,
1109 }
1110
1111 impl PrimitiveVisitor for RowIdCollector {
1112 fn u64_chunk(&mut self, values: &UInt64Array) {
1113 for i in 0..values.len() {
1114 self.row_ids.push(values.value(i));
1115 }
1116 }
1117 }
1118 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1119 impl PrimitiveSortedVisitor for RowIdCollector {}
1120 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1121
1122 let mut collector = RowIdCollector {
1123 row_ids: Vec::new(),
1124 };
1125 match ScanBuilder::new(self.store, row_field)
1126 .options(ScanOptions::default())
1127 .run(&mut collector)
1128 {
1129 Ok(()) => {}
1130 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1131 Err(err) => return Err(err),
1132 }
1133
1134 if collector.row_ids.is_empty() {
1135 return Ok(Vec::new());
1136 }
1137
1138 let batch = self.store.gather_rows(
1140 &[meta_field],
1141 &collector.row_ids,
1142 GatherNullPolicy::IncludeNulls,
1143 )?;
1144
1145 let meta_col = batch
1146 .column(0)
1147 .as_any()
1148 .downcast_ref::<BinaryArray>()
1149 .ok_or_else(|| {
1150 llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1151 })?;
1152
1153 let mut result = Vec::new();
1154 for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1155 if !meta_col.is_null(idx) {
1156 let bytes = meta_col.value(idx);
1157 if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1158 let logical: LogicalFieldId = row_id.into();
1159 let table_id = logical.table_id();
1160 result.push((table_id, meta));
1161 }
1162 }
1163 }
1164
1165 Ok(result)
1166 }
1167
1168 pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1170 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1171 let schema = Arc::new(Schema::new(vec![
1172 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1173 Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1174 crate::constants::FIELD_ID_META_KEY.to_string(),
1175 lfid_val.to_string(),
1176 )])),
1177 ]));
1178
1179 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1180 let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1181 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1182 self.store.append(&batch)?;
1183 Ok(())
1184 }
1185
1186 pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1188 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1189 let batch = match self.store.gather_rows(
1190 &[lfid],
1191 &[CATALOG_NEXT_TXN_ROW_ID],
1192 GatherNullPolicy::IncludeNulls,
1193 ) {
1194 Ok(batch) => batch,
1195 Err(llkv_result::Error::NotFound) => return Ok(None),
1196 Err(err) => return Err(err),
1197 };
1198
1199 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1200 return Ok(None);
1201 }
1202
1203 let array = batch
1204 .column(0)
1205 .as_any()
1206 .downcast_ref::<UInt64Array>()
1207 .ok_or_else(|| {
1208 llkv_result::Error::Internal(
1209 "catalog next_txn_id column stored unexpected type".into(),
1210 )
1211 })?;
1212 if array.is_empty() || array.is_null(0) {
1213 return Ok(None);
1214 }
1215
1216 let value = array.value(0);
1217 Ok(Some(value))
1218 }
1219
1220 pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1222 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1223 let schema = Arc::new(Schema::new(vec![
1224 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1225 Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1226 HashMap::from([(
1227 crate::constants::FIELD_ID_META_KEY.to_string(),
1228 lfid_val.to_string(),
1229 )]),
1230 ),
1231 ]));
1232
1233 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1234 let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1235 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1236 self.store.append(&batch)?;
1237 Ok(())
1238 }
1239
1240 pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1242 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1243 let batch = match self.store.gather_rows(
1244 &[lfid],
1245 &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1246 GatherNullPolicy::IncludeNulls,
1247 ) {
1248 Ok(batch) => batch,
1249 Err(llkv_result::Error::NotFound) => return Ok(None),
1250 Err(err) => return Err(err),
1251 };
1252
1253 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1254 return Ok(None);
1255 }
1256
1257 let array = batch
1258 .column(0)
1259 .as_any()
1260 .downcast_ref::<UInt64Array>()
1261 .ok_or_else(|| {
1262 llkv_result::Error::Internal(
1263 "catalog last_committed_txn_id column stored unexpected type".into(),
1264 )
1265 })?;
1266 if array.is_empty() || array.is_null(0) {
1267 return Ok(None);
1268 }
1269
1270 let value = array.value(0);
1271 Ok(Some(value))
1272 }
1273
1274 pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1279 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1280 let schema = Arc::new(Schema::new(vec![
1281 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1282 Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1283 crate::constants::FIELD_ID_META_KEY.to_string(),
1284 lfid_val.to_string(),
1285 )])),
1286 ]));
1287
1288 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1289 let encoded = bitcode::encode(state);
1290 let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1291
1292 let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1293 self.store.append(&batch)?;
1294 Ok(())
1295 }
1296
1297 pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1301 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1302 let batch = match self.store.gather_rows(
1303 &[lfid],
1304 &[CATALOG_STATE_ROW_ID],
1305 GatherNullPolicy::IncludeNulls,
1306 ) {
1307 Ok(batch) => batch,
1308 Err(llkv_result::Error::NotFound) => return Ok(None),
1309 Err(err) => return Err(err),
1310 };
1311
1312 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1313 return Ok(None);
1314 }
1315
1316 let array = batch
1317 .column(0)
1318 .as_any()
1319 .downcast_ref::<BinaryArray>()
1320 .ok_or_else(|| {
1321 llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1322 })?;
1323 if array.is_empty() || array.is_null(0) {
1324 return Ok(None);
1325 }
1326
1327 let bytes = array.value(0);
1328 let state = bitcode::decode(bytes).map_err(|e| {
1329 llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1330 })?;
1331 Ok(Some(state))
1332 }
1333
1334 pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1339 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1340 let schema = Arc::new(Schema::new(vec![
1341 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1342 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1343 crate::constants::FIELD_ID_META_KEY.to_string(),
1344 lfid_val.to_string(),
1345 )])),
1346 ]));
1347
1348 let canonical = meta.name.to_ascii_lowercase();
1350 let row_id_val = schema_name_to_row_id(&canonical);
1351 let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1352 let meta_encoded = bitcode::encode(meta);
1353 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1354
1355 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1356 self.store.append(&batch)?;
1357 Ok(())
1358 }
1359
1360 pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1364 let canonical = schema_name.to_ascii_lowercase();
1365 let row_id = schema_name_to_row_id(&canonical);
1366 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1367
1368 let batch = match self
1369 .store
1370 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1371 {
1372 Ok(batch) => batch,
1373 Err(llkv_result::Error::NotFound) => return Ok(None),
1374 Err(err) => return Err(err),
1375 };
1376
1377 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1378 return Ok(None);
1379 }
1380
1381 let array = batch
1382 .column(0)
1383 .as_any()
1384 .downcast_ref::<BinaryArray>()
1385 .ok_or_else(|| {
1386 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1387 })?;
1388
1389 if array.is_empty() || array.is_null(0) {
1390 return Ok(None);
1391 }
1392
1393 let bytes = array.value(0);
1394 let meta = bitcode::decode(bytes).map_err(|e| {
1395 llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1396 })?;
1397 Ok(Some(meta))
1398 }
1399
1400 pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1404 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1405 let row_field = rowid_fid(meta_field);
1406
1407 struct RowIdCollector {
1409 row_ids: Vec<RowId>,
1410 }
1411
1412 impl PrimitiveVisitor for RowIdCollector {
1413 fn u64_chunk(&mut self, values: &UInt64Array) {
1414 for i in 0..values.len() {
1415 self.row_ids.push(values.value(i));
1416 }
1417 }
1418 }
1419 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1420 impl PrimitiveSortedVisitor for RowIdCollector {}
1421 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1422
1423 let mut collector = RowIdCollector {
1424 row_ids: Vec::new(),
1425 };
1426 match ScanBuilder::new(self.store, row_field)
1427 .options(ScanOptions::default())
1428 .run(&mut collector)
1429 {
1430 Ok(()) => {}
1431 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1432 Err(err) => return Err(err),
1433 }
1434
1435 if collector.row_ids.is_empty() {
1436 return Ok(Vec::new());
1437 }
1438
1439 let batch = self.store.gather_rows(
1441 &[meta_field],
1442 &collector.row_ids,
1443 GatherNullPolicy::IncludeNulls,
1444 )?;
1445
1446 let meta_col = batch
1447 .column(0)
1448 .as_any()
1449 .downcast_ref::<BinaryArray>()
1450 .ok_or_else(|| {
1451 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1452 })?;
1453
1454 let mut result = Vec::new();
1455 for idx in 0..collector.row_ids.len() {
1456 if !meta_col.is_null(idx) {
1457 let bytes = meta_col.value(idx);
1458 if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1459 result.push(meta);
1460 }
1461 }
1462 }
1463
1464 Ok(result)
1465 }
1466
1467 pub fn put_custom_type_meta(&self, meta: &CustomTypeMeta) -> LlkvResult<()> {
1471 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID).into();
1472 let schema = Arc::new(Schema::new(vec![
1473 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1474 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1475 crate::constants::FIELD_ID_META_KEY.to_string(),
1476 lfid_val.to_string(),
1477 )])),
1478 ]));
1479
1480 let canonical = meta.name.to_ascii_lowercase();
1482 let row_id_val = schema_name_to_row_id(&canonical); let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1484 let meta_encoded = bitcode::encode(meta);
1485 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1486
1487 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1488 self.store.append(&batch)?;
1489 Ok(())
1490 }
1491
1492 pub fn get_custom_type_meta(&self, type_name: &str) -> LlkvResult<Option<CustomTypeMeta>> {
1496 let canonical = type_name.to_ascii_lowercase();
1497 let row_id = schema_name_to_row_id(&canonical);
1498 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1499
1500 let batch = match self
1501 .store
1502 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1503 {
1504 Ok(batch) => batch,
1505 Err(llkv_result::Error::NotFound) => return Ok(None),
1506 Err(err) => return Err(err),
1507 };
1508
1509 let meta_col = batch
1510 .column(0)
1511 .as_any()
1512 .downcast_ref::<BinaryArray>()
1513 .ok_or_else(|| {
1514 llkv_result::Error::Internal(
1515 "catalog custom_type_meta column should be Binary".into(),
1516 )
1517 })?;
1518
1519 if meta_col.is_null(0) {
1520 return Ok(None);
1521 }
1522
1523 let bytes = meta_col.value(0);
1524 let meta = bitcode::decode(bytes).map_err(|err| {
1525 llkv_result::Error::Internal(format!("failed to decode custom type metadata: {err}"))
1526 })?;
1527 Ok(Some(meta))
1528 }
1529
1530 pub fn delete_custom_type_meta(&self, type_name: &str) -> LlkvResult<()> {
1534 let canonical = type_name.to_ascii_lowercase();
1535 let row_id = schema_name_to_row_id(&canonical);
1536 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1537
1538 let lfid_val: u64 = lfid.into();
1540 let schema = Arc::new(Schema::new(vec![
1541 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1542 Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
1543 crate::constants::FIELD_ID_META_KEY.to_string(),
1544 lfid_val.to_string(),
1545 )])),
1546 ]));
1547
1548 let row_id_arr = Arc::new(UInt64Array::from(vec![row_id]));
1549 let mut meta_builder = BinaryBuilder::new();
1550 meta_builder.append_null();
1551 let meta_arr = Arc::new(meta_builder.finish());
1552
1553 let batch = RecordBatch::try_new(schema, vec![row_id_arr, meta_arr])?;
1554 self.store.append(&batch)?;
1555 Ok(())
1556 }
1557
1558 pub fn all_custom_type_metas(&self) -> LlkvResult<Vec<CustomTypeMeta>> {
1562 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CUSTOM_TYPE_META_ID);
1563 let row_field = rowid_fid(meta_field);
1564
1565 struct RowIdCollector {
1567 row_ids: Vec<RowId>,
1568 }
1569
1570 impl PrimitiveVisitor for RowIdCollector {
1571 fn u64_chunk(&mut self, values: &UInt64Array) {
1572 for i in 0..values.len() {
1573 self.row_ids.push(values.value(i));
1574 }
1575 }
1576 }
1577 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1578 impl PrimitiveSortedVisitor for RowIdCollector {}
1579 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1580
1581 let mut collector = RowIdCollector {
1582 row_ids: Vec::new(),
1583 };
1584 match ScanBuilder::new(self.store, row_field)
1585 .options(ScanOptions::default())
1586 .run(&mut collector)
1587 {
1588 Ok(()) => {}
1589 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1590 Err(err) => return Err(err),
1591 }
1592
1593 if collector.row_ids.is_empty() {
1594 return Ok(Vec::new());
1595 }
1596
1597 let batch = self.store.gather_rows(
1599 &[meta_field],
1600 &collector.row_ids,
1601 GatherNullPolicy::IncludeNulls,
1602 )?;
1603
1604 let meta_col = batch
1605 .column(0)
1606 .as_any()
1607 .downcast_ref::<BinaryArray>()
1608 .ok_or_else(|| {
1609 llkv_result::Error::Internal(
1610 "catalog custom_type_meta column should be Binary".into(),
1611 )
1612 })?;
1613
1614 let mut result = Vec::new();
1615 for idx in 0..collector.row_ids.len() {
1616 if !meta_col.is_null(idx) {
1617 let bytes = meta_col.value(idx);
1618 if let Ok(meta) = bitcode::decode::<CustomTypeMeta>(bytes) {
1619 result.push(meta);
1620 }
1621 }
1622 }
1623
1624 Ok(result)
1625 }
1626}
1627
1628fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1633 const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1635 const FNV_PRIME: u64 = 0x1000_0000_01b3;
1636
1637 let mut hash = FNV_OFFSET;
1638 for byte in canonical_name.as_bytes() {
1639 hash ^= u64::from(*byte);
1640 hash = hash.wrapping_mul(FNV_PRIME);
1641 }
1642
1643 hash | (1u64 << 63)
1645}
1646
1647struct MaxRowIdCollector {
1648 max: Option<RowId>,
1649}
1650
1651impl PrimitiveVisitor for MaxRowIdCollector {
1652 fn u64_chunk(&mut self, values: &UInt64Array) {
1653 for i in 0..values.len() {
1654 let value = values.value(i);
1655 self.max = match self.max {
1656 Some(curr) if curr >= value => Some(curr),
1657 _ => Some(value),
1658 };
1659 }
1660 }
1661}
1662
1663impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1664impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1665impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1666
1667#[cfg(test)]
1668mod tests {
1669 use super::*;
1670 use crate::constraints::{
1671 ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1672 };
1673 use llkv_column_map::ColumnStore;
1674 use std::sync::Arc;
1675
1676 #[test]
1677 fn constraint_records_roundtrip() {
1678 let pager = Arc::new(MemPager::default());
1679 let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1680 let catalog = SysCatalog::new(&store);
1681
1682 let table_id: TableId = 42;
1683 let record1 = ConstraintRecord {
1684 constraint_id: 1,
1685 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1686 field_ids: vec![1, 2],
1687 }),
1688 state: ConstraintState::Active,
1689 revision: 1,
1690 last_modified_micros: 100,
1691 };
1692 let record2 = ConstraintRecord {
1693 constraint_id: 2,
1694 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1695 state: ConstraintState::Active,
1696 revision: 2,
1697 last_modified_micros: 200,
1698 };
1699 catalog
1700 .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1701 .unwrap();
1702
1703 let other_table_record = ConstraintRecord {
1704 constraint_id: 1,
1705 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1706 state: ConstraintState::Active,
1707 revision: 1,
1708 last_modified_micros: 150,
1709 };
1710 catalog
1711 .put_constraint_records(7, &[other_table_record])
1712 .unwrap();
1713
1714 let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1715 fetched.sort_by_key(|record| record.constraint_id);
1716
1717 assert_eq!(fetched.len(), 2);
1718 assert_eq!(fetched[0], record1);
1719 assert_eq!(fetched[1], record2);
1720
1721 let single = catalog
1722 .get_constraint_records(table_id, &[record1.constraint_id])
1723 .unwrap();
1724 assert_eq!(single.len(), 1);
1725 assert_eq!(single[0].as_ref(), Some(&record1));
1726
1727 let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1728 assert_eq!(missing.len(), 1);
1729 assert!(missing[0].is_none());
1730 }
1731}