1use std::collections::HashMap;
27use std::mem;
28use std::sync::Arc;
29
30use arrow::array::{Array, BinaryArray, BinaryBuilder, UInt64Array};
31use arrow::datatypes::{DataType, Field, Schema};
32use arrow::record_batch::RecordBatch;
33use bitcode::{Decode, Encode};
34
35use crate::constants::CONSTRAINT_SCAN_CHUNK_SIZE;
36use crate::constraints::{
37 ConstraintId, ConstraintRecord, decode_constraint_row_id, encode_constraint_row_id,
38};
39use crate::types::{FieldId, ROW_ID_FIELD_ID, RowId, TableId};
40use llkv_column_map::store::scan::{
41 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
42 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
43};
44
45use llkv_column_map::types::LogicalFieldId;
46use llkv_column_map::{
47 ColumnStore,
48 store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
49};
50use llkv_result::{self, Result as LlkvResult};
51use llkv_storage::pager::{MemPager, Pager};
52use simd_r_drive_entry_handle::EntryHandle;
53
54use crate::reserved::*;
56
57#[inline]
61fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
62 LogicalFieldId::for_user(table_id, col_id)
63}
64
65#[inline]
67fn rid_table(table_id: TableId) -> u64 {
68 LogicalFieldId::for_user(table_id, ROW_ID_FIELD_ID).into()
69}
70
71#[inline]
73fn rid_col(table_id: TableId, col_id: u32) -> u64 {
74 rowid_fid(lfid(table_id, col_id)).into()
75}
76
77#[inline]
78fn constraint_meta_lfid() -> LogicalFieldId {
79 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_META_ID)
80}
81
82#[inline]
83fn constraint_name_lfid() -> LogicalFieldId {
84 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CONSTRAINT_NAME_ID)
85}
86
87#[inline]
88fn constraint_row_lfid() -> LogicalFieldId {
89 rowid_fid(constraint_meta_lfid())
90}
91
92#[derive(Clone, Debug, Encode, Decode)]
93pub struct ConstraintNameRecord {
94 pub constraint_id: ConstraintId,
95 pub name: Option<String>,
96}
97
98fn decode_constraint_record(bytes: &[u8]) -> LlkvResult<ConstraintRecord> {
99 bitcode::decode(bytes).map_err(|err| {
100 llkv_result::Error::Internal(format!("failed to decode constraint metadata: {err}"))
101 })
102}
103
104struct ConstraintRowCollector<'a, P, F>
105where
106 P: Pager<Blob = EntryHandle> + Send + Sync,
107 F: FnMut(Vec<ConstraintRecord>),
108{
109 store: &'a ColumnStore<P>,
110 lfid: LogicalFieldId,
111 table_id: TableId,
112 on_batch: &'a mut F,
113 buffer: Vec<RowId>,
114 error: Option<llkv_result::Error>,
115}
116
117impl<'a, P, F> ConstraintRowCollector<'a, P, F>
118where
119 P: Pager<Blob = EntryHandle> + Send + Sync,
120 F: FnMut(Vec<ConstraintRecord>),
121{
122 fn flush_buffer(&mut self) -> LlkvResult<()> {
123 if self.buffer.is_empty() {
124 return Ok(());
125 }
126
127 let row_ids = mem::take(&mut self.buffer);
128 let batch =
129 self.store
130 .gather_rows(&[self.lfid], &row_ids, GatherNullPolicy::IncludeNulls)?;
131
132 if batch.num_columns() == 0 {
133 return Ok(());
134 }
135
136 let array = batch
137 .column(0)
138 .as_any()
139 .downcast_ref::<BinaryArray>()
140 .ok_or_else(|| {
141 llkv_result::Error::Internal(
142 "constraint metadata column stored unexpected type".into(),
143 )
144 })?;
145
146 let mut records = Vec::with_capacity(row_ids.len());
147 for (idx, row_id) in row_ids.into_iter().enumerate() {
148 if array.is_null(idx) {
149 continue;
150 }
151
152 let record = decode_constraint_record(array.value(idx))?;
153 let (table_from_id, constraint_id) = decode_constraint_row_id(row_id);
154 if table_from_id != self.table_id {
155 continue;
156 }
157 if record.constraint_id != constraint_id {
158 return Err(llkv_result::Error::Internal(
159 "constraint metadata id mismatch".into(),
160 ));
161 }
162 records.push(record);
163 }
164
165 if !records.is_empty() {
166 (self.on_batch)(records);
167 }
168
169 Ok(())
170 }
171
172 fn finish(&mut self) -> LlkvResult<()> {
173 if let Some(err) = self.error.take() {
174 return Err(err);
175 }
176 self.flush_buffer()
177 }
178}
179
180impl<'a, P, F> PrimitiveVisitor for ConstraintRowCollector<'a, P, F>
181where
182 P: Pager<Blob = EntryHandle> + Send + Sync,
183 F: FnMut(Vec<ConstraintRecord>),
184{
185 fn u64_chunk(&mut self, values: &UInt64Array) {
186 if self.error.is_some() {
187 return;
188 }
189
190 for idx in 0..values.len() {
191 let row_id = values.value(idx);
192 let (table_id, _) = decode_constraint_row_id(row_id);
193 if table_id != self.table_id {
194 continue;
195 }
196 self.buffer.push(row_id);
197 if self.buffer.len() >= CONSTRAINT_SCAN_CHUNK_SIZE
198 && let Err(err) = self.flush_buffer()
199 {
200 self.error = Some(err);
201 return;
202 }
203 }
204 }
205}
206
207impl<'a, P, F> PrimitiveWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
208where
209 P: Pager<Blob = EntryHandle> + Send + Sync,
210 F: FnMut(Vec<ConstraintRecord>),
211{
212}
213
214impl<'a, P, F> PrimitiveSortedVisitor for ConstraintRowCollector<'a, P, F>
215where
216 P: Pager<Blob = EntryHandle> + Send + Sync,
217 F: FnMut(Vec<ConstraintRecord>),
218{
219}
220
221impl<'a, P, F> PrimitiveSortedWithRowIdsVisitor for ConstraintRowCollector<'a, P, F>
222where
223 P: Pager<Blob = EntryHandle> + Send + Sync,
224 F: FnMut(Vec<ConstraintRecord>),
225{
226}
227
228#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
234pub struct TableMeta {
235 pub table_id: TableId,
237 pub name: Option<String>,
239 pub created_at_micros: u64,
241 pub flags: u32,
243 pub epoch: u64,
245}
246
247#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
251pub struct ColMeta {
252 pub col_id: u32,
254 pub name: Option<String>,
256 pub flags: u32,
258 pub default: Option<Vec<u8>>,
260}
261
262#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
266pub struct SchemaMeta {
267 pub name: String,
269 pub created_at_micros: u64,
271 pub flags: u32,
273}
274
275#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
277pub struct MultiColumnUniqueEntryMeta {
278 pub index_name: Option<String>,
280 pub column_ids: Vec<u32>,
282}
283
284#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
286pub struct TableMultiColumnUniqueMeta {
287 pub table_id: TableId,
289 pub uniques: Vec<MultiColumnUniqueEntryMeta>,
291}
292
293pub struct SysCatalog<'a, P = MemPager>
306where
307 P: Pager<Blob = EntryHandle> + Send + Sync,
308{
309 store: &'a ColumnStore<P>,
310}
311
312impl<'a, P> SysCatalog<'a, P>
313where
314 P: Pager<Blob = EntryHandle> + Send + Sync,
315{
316 fn write_null_entries(&self, meta_field: LogicalFieldId, row_ids: &[RowId]) -> LlkvResult<()> {
317 if row_ids.is_empty() {
318 return Ok(());
319 }
320
321 let lfid_val: u64 = meta_field.into();
322 let schema = Arc::new(Schema::new(vec![
323 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
324 Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
325 crate::constants::FIELD_ID_META_KEY.to_string(),
326 lfid_val.to_string(),
327 )])),
328 ]));
329
330 let row_array = Arc::new(UInt64Array::from(row_ids.to_vec()));
331 let mut builder = BinaryBuilder::new();
332 for _ in row_ids {
333 builder.append_null();
334 }
335 let meta_array = Arc::new(builder.finish());
336
337 let batch = RecordBatch::try_new(schema, vec![row_array, meta_array])?;
338 self.store.append(&batch)?;
339 Ok(())
340 }
341
342 pub fn new(store: &'a ColumnStore<P>) -> Self {
344 Self { store }
345 }
346
347 pub fn put_table_meta(&self, meta: &TableMeta) {
352 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
353 let schema = Arc::new(Schema::new(vec![
354 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
355 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
356 crate::constants::FIELD_ID_META_KEY.to_string(),
357 lfid_val.to_string(),
358 )])),
359 ]));
360
361 let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
362 let meta_encoded = bitcode::encode(meta);
363 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
364
365 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
366 self.store.append(&batch).unwrap();
367 }
368
369 pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
373 let row_id = rid_table(table_id);
374 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
375 let batch = self
376 .store
377 .gather_rows(&[catalog_field], &[row_id], GatherNullPolicy::IncludeNulls)
378 .ok()?;
379
380 if batch.num_rows() == 0 || batch.num_columns() == 0 {
381 return None;
382 }
383
384 let array = batch
385 .column(0)
386 .as_any()
387 .downcast_ref::<BinaryArray>()
388 .expect("table meta column must be BinaryArray");
389
390 if array.is_null(0) {
391 return None;
392 }
393
394 bitcode::decode(array.value(0)).ok()
395 }
396
397 pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
399 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
400 let schema = Arc::new(Schema::new(vec![
401 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
402 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
403 crate::constants::FIELD_ID_META_KEY.to_string(),
404 lfid_val.to_string(),
405 )])),
406 ]));
407
408 let rid_value = rid_col(table_id, meta.col_id);
409 let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
410 let meta_encoded = bitcode::encode(meta);
411 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
412
413 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
414 self.store.append(&batch).unwrap();
415 }
416
417 pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
419 if col_ids.is_empty() {
420 return Vec::new();
421 }
422
423 let row_ids: Vec<RowId> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
424 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
425
426 let batch =
427 match self
428 .store
429 .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
430 {
431 Ok(batch) => batch,
432 Err(_) => return vec![None; col_ids.len()],
433 };
434
435 let meta_col = batch
436 .column(0)
437 .as_any()
438 .downcast_ref::<BinaryArray>()
439 .expect("catalog meta column should be Binary");
440
441 col_ids
442 .iter()
443 .enumerate()
444 .map(|(idx, _)| {
445 if meta_col.is_null(idx) {
446 None
447 } else {
448 bitcode::decode(meta_col.value(idx)).ok()
449 }
450 })
451 .collect()
452 }
453
454 pub fn delete_col_meta(&self, table_id: TableId, col_ids: &[FieldId]) -> LlkvResult<()> {
456 if col_ids.is_empty() {
457 return Ok(());
458 }
459
460 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
461 let row_ids: Vec<RowId> = col_ids
462 .iter()
463 .map(|&col_id| rid_col(table_id, col_id))
464 .collect();
465 self.write_null_entries(meta_field, &row_ids)
466 }
467
468 pub fn delete_table_meta(&self, table_id: TableId) -> LlkvResult<()> {
470 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
471 let row_id = rid_table(table_id);
472 self.write_null_entries(meta_field, &[row_id])
473 }
474
475 pub fn delete_constraint_records(
477 &self,
478 table_id: TableId,
479 constraint_ids: &[ConstraintId],
480 ) -> LlkvResult<()> {
481 if constraint_ids.is_empty() {
482 return Ok(());
483 }
484
485 let meta_field = constraint_meta_lfid();
486 let row_ids: Vec<RowId> = constraint_ids
487 .iter()
488 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
489 .collect();
490 self.write_null_entries(meta_field, &row_ids)
491 }
492
493 pub fn delete_constraint_names(
495 &self,
496 table_id: TableId,
497 constraint_ids: &[ConstraintId],
498 ) -> LlkvResult<()> {
499 if constraint_ids.is_empty() {
500 return Ok(());
501 }
502
503 let lfid = constraint_name_lfid();
504 let row_ids: Vec<RowId> = constraint_ids
505 .iter()
506 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
507 .collect();
508 self.write_null_entries(lfid, &row_ids)
509 }
510
511 pub fn delete_multi_column_uniques(&self, table_id: TableId) -> LlkvResult<()> {
513 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
514 let row_id = rid_table(table_id);
515 self.write_null_entries(meta_field, &[row_id])
516 }
517
518 pub fn put_multi_column_uniques(
520 &self,
521 table_id: TableId,
522 uniques: &[MultiColumnUniqueEntryMeta],
523 ) -> LlkvResult<()> {
524 let lfid_val: u64 =
525 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID).into();
526 let schema = Arc::new(Schema::new(vec![
527 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
528 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
529 crate::constants::FIELD_ID_META_KEY.to_string(),
530 lfid_val.to_string(),
531 )])),
532 ]));
533
534 let row_id = Arc::new(UInt64Array::from(vec![rid_table(table_id)]));
535 let meta = TableMultiColumnUniqueMeta {
536 table_id,
537 uniques: uniques.to_vec(),
538 };
539 let encoded = bitcode::encode(&meta);
540 let meta_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
541
542 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
543 self.store.append(&batch)?;
544 Ok(())
545 }
546
547 pub fn get_multi_column_uniques(
549 &self,
550 table_id: TableId,
551 ) -> LlkvResult<Vec<MultiColumnUniqueEntryMeta>> {
552 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
553 let row_id = rid_table(table_id);
554 let batch = match self
555 .store
556 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
557 {
558 Ok(batch) => batch,
559 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
560 Err(err) => return Err(err),
561 };
562
563 if batch.num_columns() == 0 || batch.num_rows() == 0 {
564 return Ok(Vec::new());
565 }
566
567 let array = batch
568 .column(0)
569 .as_any()
570 .downcast_ref::<BinaryArray>()
571 .ok_or_else(|| {
572 llkv_result::Error::Internal(
573 "catalog multi-column unique column stored unexpected type".into(),
574 )
575 })?;
576
577 if array.is_null(0) {
578 return Ok(Vec::new());
579 }
580
581 let meta: TableMultiColumnUniqueMeta = bitcode::decode(array.value(0)).map_err(|err| {
582 llkv_result::Error::Internal(format!(
583 "failed to decode multi-column unique metadata: {err}"
584 ))
585 })?;
586
587 Ok(meta.uniques)
588 }
589
590 pub fn all_multi_column_unique_metas(&self) -> LlkvResult<Vec<TableMultiColumnUniqueMeta>> {
592 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_MULTI_COLUMN_UNIQUE_META_ID);
593 let row_field = rowid_fid(meta_field);
594
595 struct RowIdCollector {
596 row_ids: Vec<RowId>,
597 }
598
599 impl PrimitiveVisitor for RowIdCollector {
600 fn u64_chunk(&mut self, values: &UInt64Array) {
601 for i in 0..values.len() {
602 self.row_ids.push(values.value(i));
603 }
604 }
605 }
606 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
607 impl PrimitiveSortedVisitor for RowIdCollector {}
608 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
609
610 let mut collector = RowIdCollector {
611 row_ids: Vec::new(),
612 };
613 match ScanBuilder::new(self.store, row_field)
614 .options(ScanOptions::default())
615 .run(&mut collector)
616 {
617 Ok(()) => {}
618 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
619 Err(err) => return Err(err),
620 }
621
622 if collector.row_ids.is_empty() {
623 return Ok(Vec::new());
624 }
625
626 let batch = match self.store.gather_rows(
627 &[meta_field],
628 &collector.row_ids,
629 GatherNullPolicy::IncludeNulls,
630 ) {
631 Ok(batch) => batch,
632 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
633 Err(err) => return Err(err),
634 };
635
636 if batch.num_columns() == 0 {
637 return Ok(Vec::new());
638 }
639
640 let array = batch
641 .column(0)
642 .as_any()
643 .downcast_ref::<BinaryArray>()
644 .ok_or_else(|| {
645 llkv_result::Error::Internal(
646 "catalog multi-column unique column stored unexpected type".into(),
647 )
648 })?;
649
650 let mut metas = Vec::with_capacity(batch.num_rows());
651 for idx in 0..batch.num_rows() {
652 if array.is_null(idx) {
653 continue;
654 }
655 let meta: TableMultiColumnUniqueMeta =
656 bitcode::decode(array.value(idx)).map_err(|err| {
657 llkv_result::Error::Internal(format!(
658 "failed to decode multi-column unique metadata: {err}"
659 ))
660 })?;
661 metas.push(meta);
662 }
663
664 Ok(metas)
665 }
666
667 pub fn put_constraint_records(
669 &self,
670 table_id: TableId,
671 records: &[ConstraintRecord],
672 ) -> LlkvResult<()> {
673 if records.is_empty() {
674 return Ok(());
675 }
676
677 let lfid_val: u64 = constraint_meta_lfid().into();
678 let schema = Arc::new(Schema::new(vec![
679 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
680 Field::new("constraint", DataType::Binary, false).with_metadata(HashMap::from([(
681 crate::constants::FIELD_ID_META_KEY.to_string(),
682 lfid_val.to_string(),
683 )])),
684 ]));
685
686 let row_ids: Vec<RowId> = records
687 .iter()
688 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
689 .collect();
690
691 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
692 let payload_array = Arc::new(BinaryArray::from_iter_values(
693 records.iter().map(bitcode::encode),
694 ));
695
696 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
697 self.store.append(&batch)?;
698 Ok(())
699 }
700
701 pub fn put_constraint_names(
703 &self,
704 table_id: TableId,
705 names: &[ConstraintNameRecord],
706 ) -> LlkvResult<()> {
707 if names.is_empty() {
708 return Ok(());
709 }
710
711 let lfid_val: u64 = constraint_name_lfid().into();
712 let schema = Arc::new(Schema::new(vec![
713 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
714 Field::new("constraint_name", DataType::Binary, false).with_metadata(HashMap::from([
715 (
716 crate::constants::FIELD_ID_META_KEY.to_string(),
717 lfid_val.to_string(),
718 ),
719 ])),
720 ]));
721
722 let row_ids: Vec<RowId> = names
723 .iter()
724 .map(|record| encode_constraint_row_id(table_id, record.constraint_id))
725 .collect();
726 let row_ids_array = Arc::new(UInt64Array::from(row_ids));
727 let payload_array = Arc::new(BinaryArray::from_iter_values(
728 names.iter().map(bitcode::encode),
729 ));
730
731 let batch = RecordBatch::try_new(schema, vec![row_ids_array, payload_array])?;
732 self.store.append(&batch)?;
733 Ok(())
734 }
735
736 pub fn get_constraint_records(
738 &self,
739 table_id: TableId,
740 constraint_ids: &[ConstraintId],
741 ) -> LlkvResult<Vec<Option<ConstraintRecord>>> {
742 if constraint_ids.is_empty() {
743 return Ok(Vec::new());
744 }
745
746 let lfid = constraint_meta_lfid();
747 let row_ids: Vec<RowId> = constraint_ids
748 .iter()
749 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
750 .collect();
751
752 let batch = match self
753 .store
754 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
755 {
756 Ok(batch) => batch,
757 Err(llkv_result::Error::NotFound) => {
758 return Ok(vec![None; constraint_ids.len()]);
759 }
760 Err(err) => return Err(err),
761 };
762
763 if batch.num_columns() == 0 || batch.num_rows() == 0 {
764 return Ok(vec![None; constraint_ids.len()]);
765 }
766
767 let array = batch
768 .column(0)
769 .as_any()
770 .downcast_ref::<BinaryArray>()
771 .ok_or_else(|| {
772 llkv_result::Error::Internal(
773 "constraint metadata column stored unexpected type".into(),
774 )
775 })?;
776
777 let mut results = Vec::with_capacity(constraint_ids.len());
778 for (idx, &constraint_id) in constraint_ids.iter().enumerate() {
779 if array.is_null(idx) {
780 results.push(None);
781 continue;
782 }
783 let record = decode_constraint_record(array.value(idx))?;
784 if record.constraint_id != constraint_id {
785 return Err(llkv_result::Error::Internal(
786 "constraint metadata id mismatch".into(),
787 ));
788 }
789 results.push(Some(record));
790 }
791
792 Ok(results)
793 }
794
795 pub fn get_constraint_names(
797 &self,
798 table_id: TableId,
799 constraint_ids: &[ConstraintId],
800 ) -> LlkvResult<Vec<Option<String>>> {
801 if constraint_ids.is_empty() {
802 return Ok(Vec::new());
803 }
804
805 let lfid = constraint_name_lfid();
806 let row_ids: Vec<RowId> = constraint_ids
807 .iter()
808 .map(|&constraint_id| encode_constraint_row_id(table_id, constraint_id))
809 .collect();
810
811 let batch = match self
812 .store
813 .gather_rows(&[lfid], &row_ids, GatherNullPolicy::IncludeNulls)
814 {
815 Ok(batch) => batch,
816 Err(llkv_result::Error::NotFound) => {
817 return Ok(vec![None; constraint_ids.len()]);
818 }
819 Err(err) => return Err(err),
820 };
821
822 if batch.num_columns() == 0 {
823 return Ok(vec![None; constraint_ids.len()]);
824 }
825
826 let array = batch
827 .column(0)
828 .as_any()
829 .downcast_ref::<BinaryArray>()
830 .ok_or_else(|| {
831 llkv_result::Error::Internal(
832 "constraint name metadata column stored unexpected type".into(),
833 )
834 })?;
835
836 let mut results = Vec::with_capacity(row_ids.len());
837 for idx in 0..row_ids.len() {
838 if array.is_null(idx) {
839 results.push(None);
840 } else {
841 let record: ConstraintNameRecord =
842 bitcode::decode(array.value(idx)).map_err(|err| {
843 llkv_result::Error::Internal(format!(
844 "failed to decode constraint name metadata: {err}"
845 ))
846 })?;
847 results.push(record.name);
848 }
849 }
850
851 Ok(results)
852 }
853
854 pub fn scan_constraint_records_for_table<F>(
856 &self,
857 table_id: TableId,
858 mut on_batch: F,
859 ) -> LlkvResult<()>
860 where
861 F: FnMut(Vec<ConstraintRecord>),
862 {
863 let row_field = constraint_row_lfid();
864 let mut visitor = ConstraintRowCollector {
865 store: self.store,
866 lfid: constraint_meta_lfid(),
867 table_id,
868 on_batch: &mut on_batch,
869 buffer: Vec::with_capacity(CONSTRAINT_SCAN_CHUNK_SIZE),
870 error: None,
871 };
872
873 match ScanBuilder::new(self.store, row_field)
874 .options(ScanOptions::default())
875 .run(&mut visitor)
876 {
877 Ok(()) => {}
878 Err(llkv_result::Error::NotFound) => return Ok(()),
879 Err(err) => return Err(err),
880 }
881
882 visitor.finish()
883 }
884
885 pub fn constraint_records_for_table(
887 &self,
888 table_id: TableId,
889 ) -> LlkvResult<Vec<ConstraintRecord>> {
890 let mut all = Vec::new();
891 self.scan_constraint_records_for_table(table_id, |mut chunk| {
892 all.append(&mut chunk);
893 })?;
894 Ok(all)
895 }
896
897 pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
898 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
899 let schema = Arc::new(Schema::new(vec![
900 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
901 Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
902 crate::constants::FIELD_ID_META_KEY.to_string(),
903 lfid_val.to_string(),
904 )])),
905 ]));
906
907 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
908 let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
909 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
910 self.store.append(&batch)?;
911 Ok(())
912 }
913
914 pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
915 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
916 let batch = match self.store.gather_rows(
917 &[lfid],
918 &[CATALOG_NEXT_TABLE_ROW_ID],
919 GatherNullPolicy::IncludeNulls,
920 ) {
921 Ok(batch) => batch,
922 Err(llkv_result::Error::NotFound) => return Ok(None),
923 Err(err) => return Err(err),
924 };
925
926 if batch.num_columns() == 0 || batch.num_rows() == 0 {
927 return Ok(None);
928 }
929
930 let array = batch
931 .column(0)
932 .as_any()
933 .downcast_ref::<UInt64Array>()
934 .ok_or_else(|| {
935 llkv_result::Error::Internal(
936 "catalog next_table_id column stored unexpected type".into(),
937 )
938 })?;
939 if array.is_empty() || array.is_null(0) {
940 return Ok(None);
941 }
942
943 let value = array.value(0);
944 if value > TableId::MAX as u64 {
945 return Err(llkv_result::Error::InvalidArgumentError(
946 "persisted next_table_id exceeds TableId range".into(),
947 ));
948 }
949
950 Ok(Some(value as TableId))
951 }
952
953 pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
954 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
955 let row_field = rowid_fid(meta_field);
956
957 let mut collector = MaxRowIdCollector { max: None };
958 match ScanBuilder::new(self.store, row_field)
959 .options(ScanOptions::default())
960 .run(&mut collector)
961 {
962 Ok(()) => {}
963 Err(llkv_result::Error::NotFound) => return Ok(None),
964 Err(err) => return Err(err),
965 }
966
967 let max_value = match collector.max {
968 Some(value) => value,
969 None => return Ok(None),
970 };
971
972 let logical: LogicalFieldId = max_value.into();
973 Ok(Some(logical.table_id()))
974 }
975
976 pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
982 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
983 let row_field = rowid_fid(meta_field);
984
985 struct RowIdCollector {
987 row_ids: Vec<RowId>,
988 }
989
990 impl PrimitiveVisitor for RowIdCollector {
991 fn u64_chunk(&mut self, values: &UInt64Array) {
992 for i in 0..values.len() {
993 self.row_ids.push(values.value(i));
994 }
995 }
996 }
997 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
998 impl PrimitiveSortedVisitor for RowIdCollector {}
999 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1000
1001 let mut collector = RowIdCollector {
1002 row_ids: Vec::new(),
1003 };
1004 match ScanBuilder::new(self.store, row_field)
1005 .options(ScanOptions::default())
1006 .run(&mut collector)
1007 {
1008 Ok(()) => {}
1009 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1010 Err(err) => return Err(err),
1011 }
1012
1013 if collector.row_ids.is_empty() {
1014 return Ok(Vec::new());
1015 }
1016
1017 let batch = self.store.gather_rows(
1019 &[meta_field],
1020 &collector.row_ids,
1021 GatherNullPolicy::IncludeNulls,
1022 )?;
1023
1024 let meta_col = batch
1025 .column(0)
1026 .as_any()
1027 .downcast_ref::<BinaryArray>()
1028 .ok_or_else(|| {
1029 llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
1030 })?;
1031
1032 let mut result = Vec::new();
1033 for (idx, &row_id) in collector.row_ids.iter().enumerate() {
1034 if !meta_col.is_null(idx) {
1035 let bytes = meta_col.value(idx);
1036 if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
1037 let logical: LogicalFieldId = row_id.into();
1038 let table_id = logical.table_id();
1039 result.push((table_id, meta));
1040 }
1041 }
1042 }
1043
1044 Ok(result)
1045 }
1046
1047 pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
1049 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
1050 let schema = Arc::new(Schema::new(vec![
1051 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1052 Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
1053 crate::constants::FIELD_ID_META_KEY.to_string(),
1054 lfid_val.to_string(),
1055 )])),
1056 ]));
1057
1058 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
1059 let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
1060 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1061 self.store.append(&batch)?;
1062 Ok(())
1063 }
1064
1065 pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
1067 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
1068 let batch = match self.store.gather_rows(
1069 &[lfid],
1070 &[CATALOG_NEXT_TXN_ROW_ID],
1071 GatherNullPolicy::IncludeNulls,
1072 ) {
1073 Ok(batch) => batch,
1074 Err(llkv_result::Error::NotFound) => return Ok(None),
1075 Err(err) => return Err(err),
1076 };
1077
1078 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1079 return Ok(None);
1080 }
1081
1082 let array = batch
1083 .column(0)
1084 .as_any()
1085 .downcast_ref::<UInt64Array>()
1086 .ok_or_else(|| {
1087 llkv_result::Error::Internal(
1088 "catalog next_txn_id column stored unexpected type".into(),
1089 )
1090 })?;
1091 if array.is_empty() || array.is_null(0) {
1092 return Ok(None);
1093 }
1094
1095 let value = array.value(0);
1096 Ok(Some(value))
1097 }
1098
1099 pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
1101 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
1102 let schema = Arc::new(Schema::new(vec![
1103 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1104 Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
1105 HashMap::from([(
1106 crate::constants::FIELD_ID_META_KEY.to_string(),
1107 lfid_val.to_string(),
1108 )]),
1109 ),
1110 ]));
1111
1112 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
1113 let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
1114 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
1115 self.store.append(&batch)?;
1116 Ok(())
1117 }
1118
1119 pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
1121 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
1122 let batch = match self.store.gather_rows(
1123 &[lfid],
1124 &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
1125 GatherNullPolicy::IncludeNulls,
1126 ) {
1127 Ok(batch) => batch,
1128 Err(llkv_result::Error::NotFound) => return Ok(None),
1129 Err(err) => return Err(err),
1130 };
1131
1132 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1133 return Ok(None);
1134 }
1135
1136 let array = batch
1137 .column(0)
1138 .as_any()
1139 .downcast_ref::<UInt64Array>()
1140 .ok_or_else(|| {
1141 llkv_result::Error::Internal(
1142 "catalog last_committed_txn_id column stored unexpected type".into(),
1143 )
1144 })?;
1145 if array.is_empty() || array.is_null(0) {
1146 return Ok(None);
1147 }
1148
1149 let value = array.value(0);
1150 Ok(Some(value))
1151 }
1152
1153 pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
1158 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
1159 let schema = Arc::new(Schema::new(vec![
1160 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1161 Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
1162 crate::constants::FIELD_ID_META_KEY.to_string(),
1163 lfid_val.to_string(),
1164 )])),
1165 ]));
1166
1167 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
1168 let encoded = bitcode::encode(state);
1169 let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
1170
1171 let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
1172 self.store.append(&batch)?;
1173 Ok(())
1174 }
1175
1176 pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
1180 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
1181 let batch = match self.store.gather_rows(
1182 &[lfid],
1183 &[CATALOG_STATE_ROW_ID],
1184 GatherNullPolicy::IncludeNulls,
1185 ) {
1186 Ok(batch) => batch,
1187 Err(llkv_result::Error::NotFound) => return Ok(None),
1188 Err(err) => return Err(err),
1189 };
1190
1191 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1192 return Ok(None);
1193 }
1194
1195 let array = batch
1196 .column(0)
1197 .as_any()
1198 .downcast_ref::<BinaryArray>()
1199 .ok_or_else(|| {
1200 llkv_result::Error::Internal("catalog state column stored unexpected type".into())
1201 })?;
1202 if array.is_empty() || array.is_null(0) {
1203 return Ok(None);
1204 }
1205
1206 let bytes = array.value(0);
1207 let state = bitcode::decode(bytes).map_err(|e| {
1208 llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
1209 })?;
1210 Ok(Some(state))
1211 }
1212
1213 pub fn put_schema_meta(&self, meta: &SchemaMeta) -> LlkvResult<()> {
1218 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID).into();
1219 let schema = Arc::new(Schema::new(vec![
1220 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1221 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
1222 crate::constants::FIELD_ID_META_KEY.to_string(),
1223 lfid_val.to_string(),
1224 )])),
1225 ]));
1226
1227 let canonical = meta.name.to_ascii_lowercase();
1229 let row_id_val = schema_name_to_row_id(&canonical);
1230 let row_id = Arc::new(UInt64Array::from(vec![row_id_val]));
1231 let meta_encoded = bitcode::encode(meta);
1232 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
1233
1234 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes])?;
1235 self.store.append(&batch)?;
1236 Ok(())
1237 }
1238
1239 pub fn get_schema_meta(&self, schema_name: &str) -> LlkvResult<Option<SchemaMeta>> {
1243 let canonical = schema_name.to_ascii_lowercase();
1244 let row_id = schema_name_to_row_id(&canonical);
1245 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1246
1247 let batch = match self
1248 .store
1249 .gather_rows(&[lfid], &[row_id], GatherNullPolicy::IncludeNulls)
1250 {
1251 Ok(batch) => batch,
1252 Err(llkv_result::Error::NotFound) => return Ok(None),
1253 Err(err) => return Err(err),
1254 };
1255
1256 if batch.num_columns() == 0 || batch.num_rows() == 0 {
1257 return Ok(None);
1258 }
1259
1260 let array = batch
1261 .column(0)
1262 .as_any()
1263 .downcast_ref::<BinaryArray>()
1264 .ok_or_else(|| {
1265 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1266 })?;
1267
1268 if array.is_empty() || array.is_null(0) {
1269 return Ok(None);
1270 }
1271
1272 let bytes = array.value(0);
1273 let meta = bitcode::decode(bytes).map_err(|e| {
1274 llkv_result::Error::Internal(format!("Failed to decode schema metadata: {}", e))
1275 })?;
1276 Ok(Some(meta))
1277 }
1278
1279 pub fn all_schema_metas(&self) -> LlkvResult<Vec<SchemaMeta>> {
1283 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_SCHEMA_META_ID);
1284 let row_field = rowid_fid(meta_field);
1285
1286 struct RowIdCollector {
1288 row_ids: Vec<RowId>,
1289 }
1290
1291 impl PrimitiveVisitor for RowIdCollector {
1292 fn u64_chunk(&mut self, values: &UInt64Array) {
1293 for i in 0..values.len() {
1294 self.row_ids.push(values.value(i));
1295 }
1296 }
1297 }
1298 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
1299 impl PrimitiveSortedVisitor for RowIdCollector {}
1300 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
1301
1302 let mut collector = RowIdCollector {
1303 row_ids: Vec::new(),
1304 };
1305 match ScanBuilder::new(self.store, row_field)
1306 .options(ScanOptions::default())
1307 .run(&mut collector)
1308 {
1309 Ok(()) => {}
1310 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
1311 Err(err) => return Err(err),
1312 }
1313
1314 if collector.row_ids.is_empty() {
1315 return Ok(Vec::new());
1316 }
1317
1318 let batch = self.store.gather_rows(
1320 &[meta_field],
1321 &collector.row_ids,
1322 GatherNullPolicy::IncludeNulls,
1323 )?;
1324
1325 let meta_col = batch
1326 .column(0)
1327 .as_any()
1328 .downcast_ref::<BinaryArray>()
1329 .ok_or_else(|| {
1330 llkv_result::Error::Internal("catalog schema_meta column should be Binary".into())
1331 })?;
1332
1333 let mut result = Vec::new();
1334 for idx in 0..collector.row_ids.len() {
1335 if !meta_col.is_null(idx) {
1336 let bytes = meta_col.value(idx);
1337 if let Ok(meta) = bitcode::decode::<SchemaMeta>(bytes) {
1338 result.push(meta);
1339 }
1340 }
1341 }
1342
1343 Ok(result)
1344 }
1345}
1346
1347fn schema_name_to_row_id(canonical_name: &str) -> RowId {
1352 const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
1354 const FNV_PRIME: u64 = 0x1000_0000_01b3;
1355
1356 let mut hash = FNV_OFFSET;
1357 for byte in canonical_name.as_bytes() {
1358 hash ^= u64::from(*byte);
1359 hash = hash.wrapping_mul(FNV_PRIME);
1360 }
1361
1362 hash | (1u64 << 63)
1364}
1365
1366struct MaxRowIdCollector {
1367 max: Option<RowId>,
1368}
1369
1370impl PrimitiveVisitor for MaxRowIdCollector {
1371 fn u64_chunk(&mut self, values: &UInt64Array) {
1372 for i in 0..values.len() {
1373 let value = values.value(i);
1374 self.max = match self.max {
1375 Some(curr) if curr >= value => Some(curr),
1376 _ => Some(value),
1377 };
1378 }
1379 }
1380}
1381
1382impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
1383impl PrimitiveSortedVisitor for MaxRowIdCollector {}
1384impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}
1385
1386#[cfg(test)]
1387mod tests {
1388 use super::*;
1389 use crate::constraints::{
1390 ConstraintKind, ConstraintState, PrimaryKeyConstraint, UniqueConstraint,
1391 };
1392 use llkv_column_map::ColumnStore;
1393 use std::sync::Arc;
1394
1395 #[test]
1396 fn constraint_records_roundtrip() {
1397 let pager = Arc::new(MemPager::default());
1398 let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
1399 let catalog = SysCatalog::new(&store);
1400
1401 let table_id: TableId = 42;
1402 let record1 = ConstraintRecord {
1403 constraint_id: 1,
1404 kind: ConstraintKind::PrimaryKey(PrimaryKeyConstraint {
1405 field_ids: vec![1, 2],
1406 }),
1407 state: ConstraintState::Active,
1408 revision: 1,
1409 last_modified_micros: 100,
1410 };
1411 let record2 = ConstraintRecord {
1412 constraint_id: 2,
1413 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![3] }),
1414 state: ConstraintState::Active,
1415 revision: 2,
1416 last_modified_micros: 200,
1417 };
1418 catalog
1419 .put_constraint_records(table_id, &[record1.clone(), record2.clone()])
1420 .unwrap();
1421
1422 let other_table_record = ConstraintRecord {
1423 constraint_id: 1,
1424 kind: ConstraintKind::Unique(UniqueConstraint { field_ids: vec![5] }),
1425 state: ConstraintState::Active,
1426 revision: 1,
1427 last_modified_micros: 150,
1428 };
1429 catalog
1430 .put_constraint_records(7, &[other_table_record])
1431 .unwrap();
1432
1433 let mut fetched = catalog.constraint_records_for_table(table_id).unwrap();
1434 fetched.sort_by_key(|record| record.constraint_id);
1435
1436 assert_eq!(fetched.len(), 2);
1437 assert_eq!(fetched[0], record1);
1438 assert_eq!(fetched[1], record2);
1439
1440 let single = catalog
1441 .get_constraint_records(table_id, &[record1.constraint_id])
1442 .unwrap();
1443 assert_eq!(single.len(), 1);
1444 assert_eq!(single[0].as_ref(), Some(&record1));
1445
1446 let missing = catalog.get_constraint_records(table_id, &[999]).unwrap();
1447 assert_eq!(missing.len(), 1);
1448 assert!(missing[0].is_none());
1449 }
1450}