1use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37 ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, RowId, TableId};
40use llkv_column_map::store::scan::{
41 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::types::LogicalFieldId;
46use llkv_column_map::{
47 ColumnStore,
48 store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
49 types::Namespace,
50};
51use llkv_result::{self, Result as LlkvResult};
52use llkv_storage::pager::{MemPager, Pager};
53use simd_r_drive_entry_handle::EntryHandle;
54
55use crate::reserved::*;
57
58#[inline]
62fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
63 LogicalFieldId::new()
64 .with_namespace(Namespace::UserData)
65 .with_table_id(table_id)
66 .with_field_id(col_id)
67}
68
69#[inline]
71fn rid_table(table_id: TableId) -> u64 {
72 let fid = LogicalFieldId::new()
73 .with_namespace(Namespace::UserData)
74 .with_table_id(table_id)
75 .with_field_id(0);
76 fid.into()
77}
78
79#[inline]
81fn rid_col(table_id: TableId, col_id: u32) -> u64 {
82 lfid(table_id, col_id).into()
83}
84
85#[inline]
86fn constraint_meta_lfid() -> LogicalFieldId {
87 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
88}
89
90#[inline]
91fn constraint_name_lfid() -> LogicalFieldId {
92 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
93}
94
95#[inline]
96fn constraint_row_lfid() -> LogicalFieldId {
97 rowid_fid(constraint_meta_lfid())
98}
99
100#[derive(Clone, Debug, Encode, Decode)]
101pub struct ConstraintNameRecord {
102 pub constraint_id: ConstraintId,
103 pub name: Option<String>,
104}
105
106fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
107 bitcode::decode(bytes).map_err(|err| {
108 llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
109 })
110}
111
112struct ConstraintRowCollector<'a, P, F>
113where
114 P: Pager<Blob = EntryHandle> + Send + Sync,
115 F: FnMut(Vec<ConstraintRecord>),
116{
117 store: &'a ColumnStore<P>,
118 lfid: LogicalFieldId,
119 table_id: TableId,
120 on_batch: &'a mut F,
121 buffer: Vec<RowId>,
122 error: Option<llkv_result::Error>,
123}
124
125impl<'a, P, F> ConstraintRowCollector<'a, P, F>
126where
127 P: Pager<Blob = EntryHandle> + Send + Sync,
128 F: FnMut(Vec<ConstraintRecord>),
129{
130 fn flush_buffer(&mut self) -> LlkvResult<()> {
131 if self.buffer.is_empty() {
132 return Ok(());
133 }
134
135 let row_ids = mem::take(&mut self.buffer);
136 let batch =
137 self.store
138 .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
139
140 if batch.num_columns() == 0 {
141 return Ok(());
142 }
143
144 let array = batch
145 .column(0)
146 .as_any()
147 .downcast_ref::<BinaryArray>()
148 .ok_or_else(|| {
149 llkv_result::Error::Internal(
150 "constraint metadata column stored unexpected type".into(),
151 )
152 })?;
153
154 let mut records = Vec::with_capacity(row_ids.len());
155 for (idx, row_id) in row_ids.into_iter().enumerate() {
156 if array.is_null(idx) {
157 continue;
158 }
159
160 let record = decode_constraint_record(array.value(idx))?;
161 let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
162 if table_from_id != self.table_id {
163 continue;
164 }
165 if record.constraint_id != constraint_id {
166 return Err(llkv_result::Error::Internal(
167 "constraint metadata id mismatch".into(),
168 ));
169 }
170 records.push(record);
171 }
172
173 if !records.is_empty() {
174 (self.on_batch)(records);
175 }
176
177 Ok(())
178 }
179
180 fn finish(&mut self) -> LlkvResult<()> {
181 if let Some(err) = self.error.take() {
182 return Err(err);
183 }
184 self.flush_buffer()
185 }
186}
187
188impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
189where
190 P: Pager<Blob = EntryHandle> + Send + Sync,
191 F: FnMut(Vec<ConstraintRecord>),
192{
193 fn u64_chunk(&mut self, values: &UInt64Array) {
194 if self.error.is_some() {
195 return;
196 }
197
198 for idx in 0..values.len() {
199 let row_id = values.value(idx);
200 let (table_id, _) = decode_constraint_row_id(row_id);
201 if table_id != self.table_id {
202 continue;
203 }
204 self.buffer.push(row_id);
205 if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
206 && let Err(err) = self.flush_buffer()
207 {
208 self.error = Some(err);
209 return;
210 }
211 }
212 }
213}
214
215impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
216where
217 P: Pager<Blob = EntryHandle> + Send + Sync,
218 F: FnMut(Vec<ConstraintRecord>),
219{
220}
221
222impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
223where
224 P: Pager<Blob = EntryHandle> + Send + Sync,
225 F: FnMut(Vec<ConstraintRecord>),
226{
227}
228
229impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
230where
231 P: Pager<Blob = EntryHandle> + Send + Sync,
232 F: FnMut(Vec<ConstraintRecord>),
233{
234}
235
236#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
242pub struct TableMeta {
243 pub table_id: TableId,
245 pub name: Option<String>,
247 pub created_at_micros: u64,
249 pub flags: u32,
251 pub epoch: u64,
253}
254
255#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
259pub struct ColMeta {
260 pub col_id: u32,
262 pub name: Option<String>,
264 pub flags: u32,
266 pub default: Option<Vec<u8>>,
268}
269
270#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
274pub struct SchemaMeta {
275 pub name: String,
277 pub created_at_micros: u64,
279 pub flags: u32,
281}
282
283#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
285pub struct MultiColumnUniqueEntryMeta {
286 pub index_name: Option<String>,
288 pub column_ids: Vec<u32>,
290}
291
292#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
294pub struct TableMultiColumnUniqueMeta {
295 pub table_id: TableId,
297 pub uniques: Vec<MultiColumnUniqueEntryMeta>,
299}
300
301pub struct SysCatalog<'a, P = MemPager>
314where
315 P: Pager<Blob = EntryHandle> + Send + Sync,
316{
317 store: &'a ColumnStore<P>,
318}
319
320impl<'a, P> SysCatalog<'a, P>
321where
322 P: Pager<Blob = EntryHandle> + Send + Sync,
323{
324 fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
325 if row_ids.is_empty() {
326 return Ok(());
327 }
328
329 let lfid_val: u64 = meta_field.into();
330 let schema = Arc::new(Schema::new(vec![
331 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
332 Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
333 crate::constants::FIELD_ID_META_KEY.to_string(),
334 lfid_val.to_string(),
335 )])),
336 ]));
337
338 let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
339 let mut builder = BinaryBuilder::new();
340 for _ in row_ids {
341 builder.append_null();
342 }
343 let meta_array = Arc::new(builder.finish());
344
345 let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
346 self.store.append(&batch)?;
347 Ok(())
348 }
349
350 pub fn new(store: &'a ColumnStore<P>) -> Self {
352 Self { store }
353 }
354
355 pub fn put_table_meta(&self, meta: &TableMeta) {
360 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
361 let schema = Arc::new(Schema::new(vec![
362 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
363 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
364 crate::constants::FIELD_ID_META_KEY.to_string(),
365 lfid_val.to_string(),
366 )])),
367 ]));
368
369 let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
370 let meta_encoded = bitcode::encode(meta);
371 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
372
373 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
374 self.store.append(&batch).unwrap();
375 }
376
377 pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
381 let row_id = rid_table(table_id);
382 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
383 let batch = self
384 .store
385 .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
386 .ok()?;
387
388 if batch.num_rows() == 0 || batch.num_columns() == 0 {
389 return None;
390 }
391
392 let array = batch
393 .column(0)
394 .as_any()
395 .downcast_ref::<BinaryArray>()
396 .expect("table meta column must be BinaryArray");
397
398 if array.is_null(0) {
399 return None;
400 }
401
402 bitcode::decode(array.value(0)).ok()
403 }
404
405 pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
407 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
408 let schema = Arc::new(Schema::new(vec![
409 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
410 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
411 crate::constants::FIELD_ID_META_KEY.to_string(),
412 lfid_val.to_string(),
413 )])),
414 ]));
415
416 let rid_value = rid_col(table_id, meta.col_id);
417 let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
418 let meta_encoded = bitcode::encode(meta);
419 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
420
421 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
422 self.store.append(&batch).unwrap();
423 }
424
425 pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
427 if col_ids.is_empty() {
428 return Vec::new();
429 }
430
431 let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
432 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
433
434 let batch =
435 match self
436 .store
437 .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
438 {
439 Ok(batch) => batch,
440 Err(_) => return vec![None; col_ids.len()],
441 };
442
443 let meta_col = batch
444 .column(0)
445 .as_any()
446 .downcast_ref::<BinaryArray>()
447 .expect("catalog meta column should be Binary");
448
449 col_ids
450 .iter()
451 .enumerate()
452 .map(|(idx, _)| {
453 if meta_col.is_null(idx) {
454 None
455 } else {
456 bitcode::decode(meta_col.value(idx)).ok()
457 }
458 })
459 .collect()
460 }
461
462 pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
464 if col_ids.is_empty() {
465 return Ok(());
466 }
467
468 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
469 let row_ids: Vec<RowId> = col_ids
470 .iter()
471 .map(|&col_id| rid_col(table_id, col_id))
472 .collect();
473 self.write_null_entries(meta_field, &row_ids)
474 }
475
476 pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
478 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
479 let row_id = rid_table(table_id);
480 self.write_null_entries(meta_field, &[row_id])
481 }
482
483 pub fn delete_constraint_records(
485 &self,
486 table_id: TableId,
487 constraint_ids: &[ConstraintId],
488 ) -> LlkvResult<()> {
489 if constraint_ids.is_empty() {
490 return Ok(());
491 }
492
493 let meta_field = constraint_meta_lfid();
494 let row_ids: Vec<RowId> = constraint_ids
495 .iter()
496 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
497 .collect();
498 self.write_null_entries(meta_field, &row_ids)
499 }
500
501 pub fn delete_constraint_names(
503 &self,
504 table_id: TableId,
505 constraint_ids: &[ConstraintId],
506 ) -> LlkvResult<()> {
507 if constraint_ids.is_empty() {
508 return Ok(());
509 }
510
511 let lfid = constraint_name_lfid();
512 let row_ids: Vec<RowId> = constraint_ids
513 .iter()
514 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
515 .collect();
516 self.write_null_entries(lfid, &row_ids)
517 }
518
519 pub fn delete_multi_column_uniques(&self, table_id: TableId) -> LlkvResult<()> {
521 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
522 let row_id = rid_table(table_id);
523 self.write_null_entries(meta_field, &[row_id])
524 }
525
526 pub fn put_multi_column_uniques(
528 &self,
529 table_id: TableId,
530 uniques: &[MultiColumnUniqueEntryMeta],
531 ) -> LlkvResult<()> {
532 let lfid_val: u64 =
533 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
534 let schema = Arc::new(Schema::new(vec![
535 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
536 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
537 crate::constants::FIELD_ID_META_KEY.to_string(),
538 lfid_val.to_string(),
539 )])),
540 ]));
541
542 let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
543 let meta = TableMultiColumnUniqueMeta {
544 table_id,
545 uniques: uniques.to_vec(),
546 };
547 let encoded = bitcode::encode(&meta);
548 let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
549
550 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
551 self.store.append(&batch)?;
552 Ok(())
553 }
554
555 pub fn get_multi_column_uniques(
557 &self,
558 table_id: TableId,
559 ) -> LlkvResult<Vec<MultiColumnUniqueEntryMeta>> {
560 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
561 let row_id = rid_table(table_id);
562 let batch = match self
563 .store
564 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
565 {
566 Ok(batch) => batch,
567 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
568 Err(err) => return Err(err),
569 };
570
571 if batch.num_columns() == 0 || batch.num_rows() == 0 {
572 return Ok(Vec::new());
573 }
574
575 let array = batch
576 .column(0)
577 .as_any()
578 .downcast_ref::<BinaryArray>()
579 .ok_or_else(|| {
580 llkv_result::Error::Internal(
581 "catalog multi-column unique column stored unexpected type".into(),
582 )
583 })?;
584
585 if array.is_null(0) {
586 return Ok(Vec::new());
587 }
588
589 let meta: TableMultiColumnUniqueMeta = bitcode::decode(array.value(0)).map_err(|err| {
590 llkv_result::Error::Internal(format!(
591 "failed to decode multi-column unique metadata: {err}"
592 ))
593 })?;
594
595 Ok(meta.uniques)
596 }
597
598 pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnUniqueMeta>> {
600 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
601 let row_field = rowid_fid(meta_field);
602
603 struct RowIdCollector {
604 row_ids: Vec<RowId>,
605 }
606
607 impl PrimitiveVisitor for RowIdCollector {
608 fn u64_chunk(&mut self, values: &UInt64Array) {
609 for i in 0..values.len() {
610 self.row_ids.push(values.value(i));
611 }
612 }
613 }
614 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
615 impl PrimitiveSortedVisitor for RowIdCollector {}
616 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
617
618 let mut collector = RowIdCollector {
619 row_ids: Vec::new(),
620 };
621 match ScanBuilder::new(self.store, row_field)
622 .options(ScanOptions::default())
623 .run(&mut collector)
624 {
625 Ok(()) => {}
626 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
627 Err(err) => return Err(err),
628 }
629
630 if collector.row_ids.is_empty() {
631 return Ok(Vec::new());
632 }
633
634 let batch = match self.store.gather_rows(
635 &[meta_field],
636 &collector.row_ids,
637 GatherNullPolicy::IncludeNulls,
638 ) {
639 Ok(batch) => batch,
640 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
641 Err(err) => return Err(err),
642 };
643
644 if batch.num_columns() == 0 {
645 return Ok(Vec::new());
646 }
647
648 let array = batch
649 .column(0)
650 .as_any()
651 .downcast_ref::<BinaryArray>()
652 .ok_or_else(|| {
653 llkv_result::Error::Internal(
654 "catalog multi-column unique column stored unexpected type".into(),
655 )
656 })?;
657
658 let mut metas = Vec::with_capacity(batch.num_rows());
659 for idx in 0..batch.num_rows() {
660 if array.is_null(idx) {
661 continue;
662 }
663 let meta: TableMultiColumnUniqueMeta =
664 bitcode::decode(array.value(idx)).map_err(|err| {
665 llkv_result::Error::Internal(format!(
666 "failed to decode multi-column unique metadata: {err}"
667 ))
668 })?;
669 metas.push(meta);
670 }
671
672 Ok(metas)
673 }
674
675 pub fn put_constraint_records(
678 &self,
679 table_id: TableId,
680 records: &[ConstraintRecord],
681 ) -> LlkvResult<()> {
682 if records.is_empty() {
683 return Ok(());
684 }
685
686 let lfid_val: u64 = constraint_meta_lfid().into();
687 let schema = Arc::new(Schema::new(vec![
688 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
689 Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
690 crate::constants::FIELD_ID_META_KEY.to_string(),
691 lfid_val.to_string(),
692 )])),
693 ]));
694
695 let row_ids: Vec<RowId> = records
696 .iter()
697 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
698 .collect();
699
700 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
701 let payload_array = Arc::new(BinaryArray::from_iter_values(
702 records.iter().map(bitcode::encode),
703 ));
704
705 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
706 self.store.append(&batch)?;
707 Ok(())
708 }
709
710 pub fn put_constraint_names(
712 &self,
713 table_id: TableId,
714 names: &[ConstraintNameRecord],
715 ) -> LlkvResult<()> {
716 if names.is_empty() {
717 return Ok(());
718 }
719
720 let lfid_val: u64 = constraint_name_lfid().into();
721 let schema = Arc::new(Schema::new(vec![
722 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
723 Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
724 (
725 crate::constants::FIELD_ID_META_KEY.to_string(),
726 lfid_val.to_string(),
727 ),
728 ])),
729 ]));
730
731 let row_ids: Vec<RowId> = names
732 .iter()
733 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
734 .collect();
735 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
736 let payload_array = Arc::new(BinaryArray::from_iter_values(
737 names.iter().map(bitcode::encode),
738 ));
739
740 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
741 self.store.append(&batch)?;
742 Ok(())
743 }
744
745 pub fn get_constraint_records(
747 &self,
748 table_id: TableId,
749 constraint_ids: &[ConstraintId],
750 ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
751 if constraint_ids.is_empty() {
752 return Ok(Vec::new());
753 }
754
755 let lfid = constraint_meta_lfid();
756 let row_ids: Vec<RowId> = constraint_ids
757 .iter()
758 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
759 .collect();
760
761 let batch = match self
762 .store
763 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
764 {
765 Ok(batch) => batch,
766 Err(llkv_result::Error::NotFound) => {
767 return Ok(vec![None; constraint_ids.len()]);
768 }
769 Err(err) => return Err(err),
770 };
771
772 if batch.num_columns() == 0 || batch.num_rows() == 0 {
773 return Ok(vec![None; constraint_ids.len()]);
774 }
775
776 let array = batch
777 .column(0)
778 .as_any()
779 .downcast_ref::<BinaryArray>()
780 .ok_or_else(|| {
781 llkv_result::Error::Internal(
782 "constraint metadata column stored unexpected type".into(),
783 )
784 })?;
785
786 let mut results = Vec::with_capacity(constraint_ids.len());
787 for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
788 if array.is_null(idx) {
789 results.push(None);
790 continue;
791 }
792 let record = decode_constraint_record(array.value(idx))?;
793 if record.constraint_id != constraint_id {
794 return Err(llkv_result::Error::Internal(
795 "constraint metadata id mismatch".into(),
796 ));
797 }
798 results.push(Some(record));
799 }
800
801 Ok(results)
802 }
803
804 pub fn get_constraint_names(
806 &self,
807 table_id: TableId,
808 constraint_ids: &[ConstraintId],
809 ) -> LlkvResult<Vec<Option<String>>> {
810 if constraint_ids.is_empty() {
811 return Ok(Vec::new());
812 }
813
814 let lfid = constraint_name_lfid();
815 let row_ids: Vec<RowId> = constraint_ids
816 .iter()
817 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
818 .collect();
819
820 let batch = match self
821 .store
822 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
823 {
824 Ok(batch) => batch,
825 Err(llkv_result::Error::NotFound) => {
826 return Ok(vec![None; constraint_ids.len()]);
827 }
828 Err(err) => return Err(err),
829 };
830
831 if batch.num_columns() == 0 {
832 return Ok(vec![None; constraint_ids.len()]);
833 }
834
835 let array = batch
836 .column(0)
837 .as_any()
838 .downcast_ref::<BinaryArray>()
839 .ok_or_else(|| {
840 llkv_result::Error::Internal(
841 "constraint name metadata column stored unexpected type".into(),
842 )
843 })?;
844
845 let mut results = Vec::with_capacity(row_ids.len());
846 for idx in 0..row_ids.len() {
847 if array.is_null(idx) {
848 results.push(None);
849 } else {
850 let record: ConstraintNameRecord =
851 bitcode::decode(array.value(idx)).map_err(|err| {
852 llkv_result::Error::Internal(format!(
853 "failed to decode constraint name metadata: {err}"
854 ))
855 })?;
856 results.push(record.name);
857 }
858 }
859
860 Ok(results)
861 }
862
863 pub fn scan_constraint_records_for_table<F>(
865 &self,
866 table_id: TableId,
867 mut on_batch: F,
868 ) -> LlkvResult<()>
869 where
870 F: FnMut(Vec<ConstraintRecord>),
871 {
872 let row_field = constraint_row_lfid();
873 let mut visitor = ConstraintRowCollector {
874 store: self.store,
875 lfid: constraint_meta_lfid(),
876 table_id,
877 on_batch: &mut on_batch,
878 buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
879 error: None,
880 };
881
882 match ScanBuilder::new(self.store, row_field)
883 .options(ScanOptions::default())
884 .run(&mut visitor)
885 {
886 Ok(()) => {}
887 Err(llkv_result::Error::NotFound) => return Ok(()),
888 Err(err) => return Err(err),
889 }
890
891 visitor.finish()
892 }
893
894 pub fn constraint_records_for_table(
896 &self,
897 table_id: TableId,
898 ) -> LlkvResult<Vec<ConstraintRecord>> {
899 let mut all = Vec::new();
900 self.scan_constraint_records_for_table(table_id, |mut chunk| {
901 all.append(&mut chunk);
902 })?;
903 Ok(all)
904 }
905
906 pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
907 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
908 let schema = Arc::new(Schema::new(vec![
909 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
910 Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
911 crate::constants::FIELD_ID_META_KEY.to_string(),
912 lfid_val.to_string(),
913 )])),
914 ]));
915
916 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
917 let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
918 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
919 self.store.append(&batch)?;
920 Ok(())
921 }
922
923 pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
924 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
925 let batch = match self.store.gather_rows(
926 &[lfid],
927 &[CATALOG_NEXT_TABLE_ROW_ID],
928 GatherNullPolicy::IncludeNulls,
929 ) {
930 Ok(batch) => batch,
931 Err(llkv_result::Error::NotFound) => return Ok(None),
932 Err(err) => return Err(err),
933 };
934
935 if batch.num_columns() == 0 || batch.num_rows() == 0 {
936 return Ok(None);
937 }
938
939 let array = batch
940 .column(0)
941 .as_any()
942 .downcast_ref::<UInt64Array>()
943 .ok_or_else(|| {
944 llkv_result::Error::Internal(
945 "catalog next_table_id column stored unexpected type".into(),
946 )
947 })?;
948 if array.is_empty() || array.is_null(0) {
949 return Ok(None);
950 }
951
952 let value = array.value(0);
953 if value > TableId::MAX as u64 {
954 return Err(llkv_result::Error::InvalidArgumentError(
955 "persisted next_table_id exceeds TableId range".into(),
956 ));
957 }
958
959 Ok(Some(value as TableId))
960 }
961
962 pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
963 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
964 let row_field = rowid_fid(meta_field);
965
966 let mut collector = MaxRowIdCollector { max: None };
967 match ScanBuilder::new(self.store, row_field)
968 .options(ScanOptions::default())
969 .run(&mut collector)
970 {
971 Ok(()) => {}
972 Err(llkv_result::Error::NotFound) => return Ok(None),
973 Err(err) => return Err(err),
974 }
975
976 let max_value = match collector.max {
977 Some(value) => value,
978 None => return Ok(None),
979 };
980
981 let logical: LogicalFieldId = max_value.into();
982 Ok(Some(logical.table_id()))
983 }
984
985 pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
991 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
992 let row_field = rowid_fid(meta_field);
993
994 struct RowIdCollector {
996 row_ids: Vec<RowId>,
997 }
998
999 impl PrimitiveVisitor for RowIdCollector {
1000 fn u64_chunk(&mut self, values: &UInt64Array) {
1001 for i in 0..values.len() {
1002 self.row_ids.push(values.value(i));
1003 }
1004 }
1005 }
1006 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1007 impl PrimitiveSortedVisitor for RowIdCollector {}
1008 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1009
1010 let mut collector = RowIdCollector {
1011 row_ids: Vec::new(),
1012 };
1013 match ScanBuilder::new(self.store, row_field)
1014 .options(ScanOptions::default())
1015 .run(&mut collector)
1016 {
1017 Ok(()) => {}
1018 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1019 Err(err) => return Err(err),
1020 }
1021
1022 if collector.row_ids.is_empty() {
1023 return Ok(Vec::new());
1024 }
1025
1026 let batch = self.store.gather_rows(
1028 &[meta_field],
1029 &collector.row_ids,
1030 GatherNullPolicy::IncludeNulls,
1031 )?;
1032
1033 let meta_col = batch
1034 .column(0)
1035 .as_any()
1036 .downcast_ref::<BinaryArray>()
1037 .ok_or_else(|| {
1038 llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1039 })?;
1040
1041 let mut result = Vec::new();
1042 for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1043 if !meta_col.is_null(idx) {
1044 let bytes = meta_col.value(idx);
1045 if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1046 let logical: LogicalFieldId = row_id.into();
1047 let table_id = logical.table_id();
1048 result.push((table_id, meta));
1049 }
1050 }
1051 }
1052
1053 Ok(result)
1054 }
1055
1056 pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1058 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1059 let schema = Arc::new(Schema::new(vec![
1060 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1061 Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1062 crate::constants::FIELD_ID_META_KEY.to_string(),
1063 lfid_val.to_string(),
1064 )])),
1065 ]));
1066
1067 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1068 let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1069 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1070 self.store.append(&batch)?;
1071 Ok(())
1072 }
1073
1074 pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1076 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1077 let batch = match self.store.gather_rows(
1078 &[lfid],
1079 &[CATALOG_NEXT_TXN_ROW_ID],
1080 GatherNullPolicy::IncludeNulls,
1081 ) {
1082 Ok(batch) => batch,
1083 Err(llkv_result::Error::NotFound) => return Ok(None),
1084 Err(err) => return Err(err),
1085 };
1086
1087 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1088 return Ok(None);
1089 }
1090
1091 let array = batch
1092 .column(0)
1093 .as_any()
1094 .downcast_ref::<UInt64Array>()
1095 .ok_or_else(|| {
1096 llkv_result::Error::Internal(
1097 "catalog next_txn_id column stored unexpected type".into(),
1098 )
1099 })?;
1100 if array.is_empty() || array.is_null(0) {
1101 return Ok(None);
1102 }
1103
1104 let value = array.value(0);
1105 Ok(Some(value))
1106 }
1107
1108 pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1110 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1111 let schema = Arc::new(Schema::new(vec![
1112 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1113 Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1114 HashMap::from([(
1115 crate::constants::FIELD_ID_META_KEY.to_string(),
1116 lfid_val.to_string(),
1117 )]),
1118 ),
1119 ]));
1120
1121 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1122 let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1123 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1124 self.store.append(&batch)?;
1125 Ok(())
1126 }
1127
1128 pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1130 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1131 let batch = match self.store.gather_rows(
1132 &[lfid],
1133 &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1134 GatherNullPolicy::IncludeNulls,
1135 ) {
1136 Ok(batch) => batch,
1137 Err(llkv_result::Error::NotFound) => return Ok(None),
1138 Err(err) => return Err(err),
1139 };
1140
1141 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1142 return Ok(None);
1143 }
1144
1145 let array = batch
1146 .column(0)
1147 .as_any()
1148 .downcast_ref::<UInt64Array>()
1149 .ok_or_else(|| {
1150 llkv_result::Error::Internal(
1151 "catalog last_committed_txn_id column stored unexpected type".into(),
1152 )
1153 })?;
1154 if array.is_empty() || array.is_null(0) {
1155 return Ok(None);
1156 }
1157
1158 let value = array.value(0);
1159 Ok(Some(value))
1160 }
1161
1162 pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1167 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1168 let schema = Arc::new(Schema::new(vec![
1169 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1170 Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1171 crate::constants::FIELD_ID_META_KEY.to_string(),
1172 lfid_val.to_string(),
1173 )])),
1174 ]));
1175
1176 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1177 let encoded = bitcode::encode(state);
1178 let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1179
1180 let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1181 self.store.append(&batch)?;
1182 Ok(())
1183 }
1184
1185 pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1189 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1190 let batch = match self.store.gather_rows(
1191 &[lfid],
1192 &[CATALOG_STATE_ROW_ID],
1193 GatherNullPolicy::IncludeNulls,
1194 ) {
1195 Ok(batch) => batch,
1196 Err(llkv_result::Error::NotFound) => return Ok(None),
1197 Err(err) => return Err(err),
1198 };
1199
1200 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1201 return Ok(None);
1202 }
1203
1204 let array = batch
1205 .column(0)
1206 .as_any()
1207 .downcast_ref::<BinaryArray>()
1208 .ok_or_else(|| {
1209 llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1210 })?;
1211 if array.is_empty() || array.is_null(0) {
1212 return Ok(None);
1213 }
1214
1215 let bytes = array.value(0);
1216 let state = bitcode::decode(bytes).map_err(|e| {
1217 llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1218 })?;
1219 Ok(Some(state))
1220 }
1221
1222 pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1227 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1228 let schema = Arc::new(Schema::new(vec![
1229 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1230 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1231 crate::constants::FIELD_ID_META_KEY.to_string(),
1232 lfid_val.to_string(),
1233 )])),
1234 ]));
1235
1236 let canonical = meta.name.to_ascii_lowercase();
1238 let row_id_val = schema_name_to_row_id(&canonical);
1239 let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1240 let meta_encoded = bitcode::encode(meta);
1241 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1242
1243 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1244 self.store.append(&batch)?;
1245 Ok(())
1246 }
1247
1248 pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1252 let canonical = schema_name.to_ascii_lowercase();
1253 let row_id = schema_name_to_row_id(&canonical);
1254 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1255
1256 let batch = match self
1257 .store
1258 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1259 {
1260 Ok(batch) => batch,
1261 Err(llkv_result::Error::NotFound) => return Ok(None),
1262 Err(err) => return Err(err),
1263 };
1264
1265 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1266 return Ok(None);
1267 }
1268
1269 let array = batch
1270 .column(0)
1271 .as_any()
1272 .downcast_ref::<BinaryArray>()
1273 .ok_or_else(|| {
1274 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1275 })?;
1276
1277 if array.is_empty() || array.is_null(0) {
1278 return Ok(None);
1279 }
1280
1281 let bytes = array.value(0);
1282 let meta = bitcode::decode(bytes).map_err(|e| {
1283 llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1284 })?;
1285 Ok(Some(meta))
1286 }
1287
1288 pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1292 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1293 let row_field = rowid_fid(meta_field);
1294
1295 struct RowIdCollector {
1297 row_ids: Vec<RowId>,
1298 }
1299
1300 impl PrimitiveVisitor for RowIdCollector {
1301 fn u64_chunk(&mut self, values: &UInt64Array) {
1302 for i in 0..values.len() {
1303 self.row_ids.push(values.value(i));
1304 }
1305 }
1306 }
1307 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1308 impl PrimitiveSortedVisitor for RowIdCollector {}
1309 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1310
1311 let mut collector = RowIdCollector {
1312 row_ids: Vec::new(),
1313 };
1314 match ScanBuilder::new(self.store, row_field)
1315 .options(ScanOptions::default())
1316 .run(&mut collector)
1317 {
1318 Ok(()) => {}
1319 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1320 Err(err) => return Err(err),
1321 }
1322
1323 if collector.row_ids.is_empty() {
1324 return Ok(Vec::new());
1325 }
1326
1327 let batch = self.store.gather_rows(
1329 &[meta_field],
1330 &collector.row_ids,
1331 GatherNullPolicy::IncludeNulls,
1332 )?;
1333
1334 let meta_col = batch
1335 .column(0)
1336 .as_any()
1337 .downcast_ref::<BinaryArray>()
1338 .ok_or_else(|| {
1339 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1340 })?;
1341
1342 let mut result = Vec::new();
1343 for idx in 0..collector.row_ids.len() {
1344 if !meta_col.is_null(idx) {
1345 let bytes = meta_col.value(idx);
1346 if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1347 result.push(meta);
1348 }
1349 }
1350 }
1351
1352 Ok(result)
1353 }
1354}
1355
1356fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1361 const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1363 const FNV_PRIME: u64 = 0x1000_0000_01b3;
1364
1365 let mut hash = FNV_OFFSET;
1366 for byte in canonical_name.as_bytes() {
1367 hash ^= u64::from(*byte);
1368 hash = hash.wrapping_mul(FNV_PRIME);
1369 }
1370
1371 hash | (1u64 << 63)
1373}
1374
1375struct MaxRowIdCollector {
1376 max: Option<RowId>,
1377}
1378
1379impl PrimitiveVisitor for MaxRowIdCollector {
1380 fn u64_chunk(&mut self, values: &UInt64Array) {
1381 for i in 0..values.len() {
1382 let value = values.value(i);
1383 self.max = match self.max {
1384 Some(curr) if curr >= value => Some(curr),
1385 _ => Some(value),
1386 };
1387 }
1388 }
1389}
1390
1391impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1392impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1393impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1394
1395#[cfg(test)]
1396mod tests {
1397 use super::*;
1398 use crate::constraints::{
1399 ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1400 };
1401 use llkv_column_map::ColumnStore;
1402 use std::sync::Arc;
1403
1404 #[test]
1405 fn constraint_records_roundtrip() {
1406 let pager = Arc::new(MemPager::default());
1407 let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1408 let catalog = SysCatalog::new(&store);
1409
1410 let table_id: TableId = 42;
1411 let record1 = ConstraintRecord {
1412 constraint_id: 1,
1413 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1414 field_ids: vec![1, 2],
1415 }),
1416 state: ConstraintState::Active,
1417 revision: 1,
1418 last_modified_micros: 100,
1419 };
1420 let record2 = ConstraintRecord {
1421 constraint_id: 2,
1422 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1423 state: ConstraintState::Active,
1424 revision: 2,
1425 last_modified_micros: 200,
1426 };
1427 catalog
1428 .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1429 .unwrap();
1430
1431 let other_table_record = ConstraintRecord {
1432 constraint_id: 1,
1433 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1434 state: ConstraintState::Active,
1435 revision: 1,
1436 last_modified_micros: 150,
1437 };
1438 catalog
1439 .put_constraint_records(7, &[other_table_record])
1440 .unwrap();
1441
1442 let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1443 fetched.sort_by_key(|record| record.constraint_id);
1444
1445 assert_eq!(fetched.len(), 2);
1446 assert_eq!(fetched[0], record1);
1447 assert_eq!(fetched[1], record2);
1448
1449 let single = catalog
1450 .get_constraint_records(table_id, &[record1.constraint_id])
1451 .unwrap();
1452 assert_eq!(single.len(), 1);
1453 assert_eq!(single[0].as_ref(), Some(&record1));
1454
1455 let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1456 assert_eq!(missing.len(), 1);
1457 assert!(missing[0].is_none());
1458 }
1459}