1use std::collections::HashMap;
4use std::sync::Arc;
5
6use arrow::array::{Array, BinaryArray, UInt64Array};
7use arrow::datatypes::{DataType, Field, Schema};
8use arrow::record_batch::RecordBatch;
9use bitcode::{Decode, Encode};
10
11use crate::types::TableId;
12use llkv_column_map::store::scan::{
13 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
14 PrimitiveWithRowIdsVisitor,
15};
16
17use llkv_column_map::types::LogicalFieldId;
18use llkv_column_map::{
19 ColumnStore,
20 store::{GatherNullPolicy, ROW_ID_COLUMN_NAME},
21 types::Namespace,
22};
23use llkv_storage::pager::{MemPager, Pager};
24use simd_r_drive_entry_handle::EntryHandle;
25
26pub const CATALOG_TID: TableId = 0;
30
31const F_TABLE_META: u32 = 1; const F_COL_META: u32 = 10; #[inline]
39fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
40 LogicalFieldId::new()
41 .with_namespace(Namespace::UserData)
42 .with_table_id(table_id)
43 .with_field_id(col_id)
44}
45
46#[inline]
48fn rid_table(table_id: TableId) -> u64 {
49 let fid = LogicalFieldId::new()
50 .with_namespace(Namespace::UserData)
51 .with_table_id(table_id)
52 .with_field_id(0);
53 fid.into()
54}
55
56#[inline]
58fn rid_col(table_id: TableId, col_id: u32) -> u64 {
59 lfid(table_id, col_id).into()
60}
61
62#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
65pub struct TableMeta {
66 pub table_id: TableId,
67 pub name: Option<String>,
68 pub created_at_micros: u64,
69 pub flags: u32,
70 pub epoch: u64,
71}
72
73#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
74pub struct ColMeta {
75 pub col_id: u32,
76 pub name: Option<String>,
77 pub flags: u32,
78 pub default: Option<Vec<u8>>,
79}
80
81pub struct SysCatalog<'a, P = MemPager>
84where
85 P: Pager<Blob = EntryHandle> + Send + Sync,
86{
87 store: &'a ColumnStore<P>,
88}
89
90impl<'a, P> SysCatalog<'a, P>
91where
92 P: Pager<Blob = EntryHandle> + Send + Sync,
93{
94 pub fn new(store: &'a ColumnStore<P>) -> Self {
95 Self { store }
96 }
97
98 pub fn put_table_meta(&self, meta: &TableMeta) {
100 let lfid_val: u64 = lfid(CATALOG_TID, F_TABLE_META).into();
101 let schema = Arc::new(Schema::new(vec![
102 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
103 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
104 "field_id".to_string(),
105 lfid_val.to_string(),
106 )])),
107 ]));
108
109 let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
110 let meta_encoded = bitcode::encode(meta);
111 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
112
113 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
114 self.store.append(&batch).unwrap();
115 }
116
117 pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
119 struct MetaVisitor {
120 target_rid: u64,
121 meta: Option<TableMeta>,
122 }
123 impl PrimitiveVisitor for MetaVisitor {}
124 impl PrimitiveWithRowIdsVisitor for MetaVisitor {
125 fn u64_chunk_with_rids(&mut self, v: &UInt64Array, r: &UInt64Array) {
126 for i in 0..r.len() {
127 if r.value(i) == self.target_rid {
128 let _bytes = v.value(i);
133 break;
135 }
136 }
137 }
138 }
139 impl PrimitiveSortedVisitor for MetaVisitor {}
140 impl PrimitiveSortedWithRowIdsVisitor for MetaVisitor {}
141
142 let mut visitor = MetaVisitor {
143 target_rid: rid_table(table_id),
144 meta: None,
145 };
146 let scan_opts = llkv_column_map::store::scan::ScanOptions {
149 with_row_ids: true,
150 ..Default::default()
151 };
152
153 let _ = self
154 .store
155 .scan(lfid(CATALOG_TID, F_TABLE_META), scan_opts, &mut visitor);
156 visitor.meta
157 }
158
159 pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
161 let lfid_val: u64 = lfid(CATALOG_TID, F_COL_META).into();
162 let schema = Arc::new(Schema::new(vec![
163 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
164 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
165 "field_id".to_string(),
166 lfid_val.to_string(),
167 )])),
168 ]));
169
170 let rid_value = rid_col(table_id, meta.col_id);
171 let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
172 let meta_encoded = bitcode::encode(meta);
173 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
174
175 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
176 self.store.append(&batch).unwrap();
177 }
178
179 pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
181 if col_ids.is_empty() {
182 return Vec::new();
183 }
184
185 let row_ids: Vec<u64> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
186 let catalog_field = lfid(CATALOG_TID, F_COL_META);
187
188 let batch =
189 match self
190 .store
191 .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
192 {
193 Ok(batch) => batch,
194 Err(_) => return vec![None; col_ids.len()],
195 };
196
197 let meta_col = batch
198 .column(0)
199 .as_any()
200 .downcast_ref::<BinaryArray>()
201 .expect("catalog meta column should be Binary");
202
203 col_ids
204 .iter()
205 .enumerate()
206 .map(|(idx, _)| {
207 if meta_col.is_null(idx) {
208 None
209 } else {
210 bitcode::decode(meta_col.value(idx)).ok()
211 }
212 })
213 .collect()
214 }
215}