llkv_table/
sys_catalog.rs

1//! System catalog stored inside ColumnStore (table id 0).
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use arrow::array::{Array, BinaryArray, UInt64Array};
7use arrow::datatypes::{DataType, Field, Schema};
8use arrow::record_batch::RecordBatch;
9use bitcode::{Decode, Encode};
10
11use crate::types::TableId;
12use llkv_column_map::store::scan::{
13    PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
14    PrimitiveWithRowIdsVisitor,
15};
16
17use llkv_column_map::types::LogicalFieldId;
18use llkv_column_map::{
19    ColumnStore,
20    store::{GatherNullPolicy, ROW_ID_COLUMN_NAME},
21    types::Namespace,
22};
23use llkv_storage::pager::{MemPager, Pager};
24use simd_r_drive_entry_handle::EntryHandle;
25
26// ----- Catalog constants -----
27
28/// Reserved catalog table id.
29pub const CATALOG_TID: TableId = 0;
30
31/// Catalog column ids (within table id 0).
32const F_TABLE_META: u32 = 1; // bytes: bitcode(TableMeta)
33const F_COL_META: u32 = 10; // bytes: bitcode(ColMeta)
34
35// ----- Namespacing helpers -----
36
37// TODO: Dedupe with llkv_column_map::types::lfid()
38#[inline]
39fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
40    LogicalFieldId::new()
41        .with_namespace(Namespace::UserData)
42        .with_table_id(table_id)
43        .with_field_id(col_id)
44}
45
46// TODO: Migrate to llkv_column_map::types::rid_table()
47#[inline]
48fn rid_table(table_id: TableId) -> u64 {
49    let fid = LogicalFieldId::new()
50        .with_namespace(Namespace::UserData)
51        .with_table_id(table_id)
52        .with_field_id(0);
53    fid.into()
54}
55
56// TODO: Migrate to llkv_column_map::types::rid_col()
57#[inline]
58fn rid_col(table_id: TableId, col_id: u32) -> u64 {
59    lfid(table_id, col_id).into()
60}
61
62// ----- Public catalog types -----
63
64#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
65pub struct TableMeta {
66    pub table_id: TableId,
67    pub name: Option<String>,
68    pub created_at_micros: u64,
69    pub flags: u32,
70    pub epoch: u64,
71}
72
73#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
74pub struct ColMeta {
75    pub col_id: u32,
76    pub name: Option<String>,
77    pub flags: u32,
78    pub default: Option<Vec<u8>>,
79}
80
81// ----- SysCatalog -----
82
83pub struct SysCatalog<'a, P = MemPager>
84where
85    P: Pager<Blob = EntryHandle> + Send + Sync,
86{
87    store: &'a ColumnStore<P>,
88}
89
90impl<'a, P> SysCatalog<'a, P>
91where
92    P: Pager<Blob = EntryHandle> + Send + Sync,
93{
94    pub fn new(store: &'a ColumnStore<P>) -> Self {
95        Self { store }
96    }
97
98    /// Upsert table metadata.
99    pub fn put_table_meta(&self, meta: &TableMeta) {
100        let lfid_val: u64 = lfid(CATALOG_TID, F_TABLE_META).into();
101        let schema = Arc::new(Schema::new(vec![
102            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
103            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
104                "field_id".to_string(),
105                lfid_val.to_string(),
106            )])),
107        ]));
108
109        let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
110        let meta_encoded = bitcode::encode(meta);
111        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
112
113        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
114        self.store.append(&batch).unwrap();
115    }
116
117    /// Fetch table metadata by table_id.
118    pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
119        struct MetaVisitor {
120            target_rid: u64,
121            meta: Option<TableMeta>,
122        }
123        impl PrimitiveVisitor for MetaVisitor {}
124        impl PrimitiveWithRowIdsVisitor for MetaVisitor {
125            fn u64_chunk_with_rids(&mut self, v: &UInt64Array, r: &UInt64Array) {
126                for i in 0..r.len() {
127                    if r.value(i) == self.target_rid {
128                        // This logic assumes the 'meta' column is u64. It needs to be Binary.
129                        // This scan implementation is a placeholder and needs to be updated
130                        // to correctly handle BinaryArray for the metadata.
131                        // For now, this lets the code compile.
132                        let _bytes = v.value(i);
133                        // self.meta = Some(bitcode::decode(&bytes.to_be_bytes()).unwrap());
134                        break;
135                    }
136                }
137            }
138        }
139        impl PrimitiveSortedVisitor for MetaVisitor {}
140        impl PrimitiveSortedWithRowIdsVisitor for MetaVisitor {}
141
142        let mut visitor = MetaVisitor {
143            target_rid: rid_table(table_id),
144            meta: None,
145        };
146        // Note: The scan needs `with_row_ids` to be true for `u64_chunk_with_rids` to be called.
147        // A full implementation would require passing ScanOptions.
148        let scan_opts = llkv_column_map::store::scan::ScanOptions {
149            with_row_ids: true,
150            ..Default::default()
151        };
152
153        let _ = self
154            .store
155            .scan(lfid(CATALOG_TID, F_TABLE_META), scan_opts, &mut visitor);
156        visitor.meta
157    }
158
159    /// Upsert a single column’s metadata.
160    pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
161        let lfid_val: u64 = lfid(CATALOG_TID, F_COL_META).into();
162        let schema = Arc::new(Schema::new(vec![
163            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
164            Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
165                "field_id".to_string(),
166                lfid_val.to_string(),
167            )])),
168        ]));
169
170        let rid_value = rid_col(table_id, meta.col_id);
171        let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
172        let meta_encoded = bitcode::encode(meta);
173        let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
174
175        let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
176        self.store.append(&batch).unwrap();
177    }
178
179    /// Batch fetch specific column metas by col_id using a shared keyset.
180    pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
181        if col_ids.is_empty() {
182            return Vec::new();
183        }
184
185        let row_ids: Vec<u64> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
186        let catalog_field = lfid(CATALOG_TID, F_COL_META);
187
188        let batch =
189            match self
190                .store
191                .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
192            {
193                Ok(batch) => batch,
194                Err(_) => return vec![None; col_ids.len()],
195            };
196
197        let meta_col = batch
198            .column(0)
199            .as_any()
200            .downcast_ref::<BinaryArray>()
201            .expect("catalog meta column should be Binary");
202
203        col_ids
204            .iter()
205            .enumerate()
206            .map(|(idx, _)| {
207                if meta_col.is_null(idx) {
208                    None
209                } else {
210                    bitcode::decode(meta_col.value(idx)).ok()
211                }
212            })
213            .collect()
214    }
215}