1use std::collections::HashMap;
26use std::sync::Arc;
27
28use arrow::array::{Array, BinaryArray, UInt64Array};
29use arrow::datatypes::{DataType, Field, Schema};
30use arrow::record_batch::RecordBatch;
31use bitcode::{Decode, Encode};
32
33use crate::types::TableId;
34use llkv_column_map::store::scan::{
35 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
36 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
37};
38
39use llkv_column_map::types::LogicalFieldId;
40use llkv_column_map::{
41 ColumnStore,
42 store::{GatherNullPolicy, ROW_ID_COLUMN_NAME, rowid_fid},
43 types::Namespace,
44};
45use llkv_result::{self, Result as LlkvResult};
46use llkv_storage::pager::{MemPager, Pager};
47use simd_r_drive_entry_handle::EntryHandle;
48
49use crate::reserved::*;
51
52#[inline]
56fn lfid(table_id: TableId, col_id: u32) -> LogicalFieldId {
57 LogicalFieldId::new()
58 .with_namespace(Namespace::UserData)
59 .with_table_id(table_id)
60 .with_field_id(col_id)
61}
62
63#[inline]
65fn rid_table(table_id: TableId) -> u64 {
66 let fid = LogicalFieldId::new()
67 .with_namespace(Namespace::UserData)
68 .with_table_id(table_id)
69 .with_field_id(0);
70 fid.into()
71}
72
73#[inline]
75fn rid_col(table_id: TableId, col_id: u32) -> u64 {
76 lfid(table_id, col_id).into()
77}
78
79#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
85pub struct TableMeta {
86 pub table_id: TableId,
88 pub name: Option<String>,
90 pub created_at_micros: u64,
92 pub flags: u32,
94 pub epoch: u64,
96}
97
98#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
102pub struct ColMeta {
103 pub col_id: u32,
105 pub name: Option<String>,
107 pub flags: u32,
109 pub default: Option<Vec<u8>>,
111}
112
113pub struct SysCatalog<'a, P = MemPager>
126where
127 P: Pager<Blob = EntryHandle> + Send + Sync,
128{
129 store: &'a ColumnStore<P>,
130}
131
132impl<'a, P> SysCatalog<'a, P>
133where
134 P: Pager<Blob = EntryHandle> + Send + Sync,
135{
136 pub fn new(store: &'a ColumnStore<P>) -> Self {
138 Self { store }
139 }
140
141 pub fn put_table_meta(&self, meta: &TableMeta) {
146 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID).into();
147 let schema = Arc::new(Schema::new(vec![
148 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
149 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
150 crate::constants::FIELD_ID_META_KEY.to_string(),
151 lfid_val.to_string(),
152 )])),
153 ]));
154
155 let row_id = Arc::new(UInt64Array::from(vec![rid_table(meta.table_id)]));
156 let meta_encoded = bitcode::encode(meta);
157 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
158
159 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
160 self.store.append(&batch).unwrap();
161 }
162
163 pub fn get_table_meta(&self, table_id: TableId) -> Option<TableMeta> {
167 struct MetaVisitor {
168 target_rid: u64,
169 meta: Option<TableMeta>,
170 }
171 impl PrimitiveVisitor for MetaVisitor {}
172 impl PrimitiveWithRowIdsVisitor for MetaVisitor {
173 fn u64_chunk_with_rids(&mut self, v: &UInt64Array, r: &UInt64Array) {
174 for i in 0..r.len() {
175 if r.value(i) == self.target_rid {
176 let _bytes = v.value(i);
181 break;
183 }
184 }
185 }
186 }
187 impl PrimitiveSortedVisitor for MetaVisitor {}
188 impl PrimitiveSortedWithRowIdsVisitor for MetaVisitor {}
189
190 let mut visitor = MetaVisitor {
191 target_rid: rid_table(table_id),
192 meta: None,
193 };
194 let scan_opts = llkv_column_map::store::scan::ScanOptions {
197 with_row_ids: true,
198 ..Default::default()
199 };
200
201 let _ = self.store.scan(
202 lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID),
203 scan_opts,
204 &mut visitor,
205 );
206 visitor.meta
207 }
208
209 pub fn put_col_meta(&self, table_id: TableId, meta: &ColMeta) {
211 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID).into();
212 let schema = Arc::new(Schema::new(vec![
213 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
214 Field::new("meta", DataType::Binary, false).with_metadata(HashMap::from([(
215 crate::constants::FIELD_ID_META_KEY.to_string(),
216 lfid_val.to_string(),
217 )])),
218 ]));
219
220 let rid_value = rid_col(table_id, meta.col_id);
221 let row_id = Arc::new(UInt64Array::from(vec![rid_value]));
222 let meta_encoded = bitcode::encode(meta);
223 let meta_bytes = Arc::new(BinaryArray::from(vec![meta_encoded.as_slice()]));
224
225 let batch = RecordBatch::try_new(schema, vec![row_id, meta_bytes]).unwrap();
226 self.store.append(&batch).unwrap();
227 }
228
229 pub fn get_cols_meta(&self, table_id: TableId, col_ids: &[u32]) -> Vec<Option<ColMeta>> {
231 if col_ids.is_empty() {
232 return Vec::new();
233 }
234
235 let row_ids: Vec<u64> = col_ids.iter().map(|&cid| rid_col(table_id, cid)).collect();
236 let catalog_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_COL_META_ID);
237
238 let batch =
239 match self
240 .store
241 .gather_rows(&[catalog_field], &row_ids, GatherNullPolicy::IncludeNulls)
242 {
243 Ok(batch) => batch,
244 Err(_) => return vec![None; col_ids.len()],
245 };
246
247 let meta_col = batch
248 .column(0)
249 .as_any()
250 .downcast_ref::<BinaryArray>()
251 .expect("catalog meta column should be Binary");
252
253 col_ids
254 .iter()
255 .enumerate()
256 .map(|(idx, _)| {
257 if meta_col.is_null(idx) {
258 None
259 } else {
260 bitcode::decode(meta_col.value(idx)).ok()
261 }
262 })
263 .collect()
264 }
265
266 pub fn put_next_table_id(&self, next_id: TableId) -> LlkvResult<()> {
267 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID).into();
268 let schema = Arc::new(Schema::new(vec![
269 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
270 Field::new("next_table_id", DataType::UInt64, false).with_metadata(HashMap::from([(
271 crate::constants::FIELD_ID_META_KEY.to_string(),
272 lfid_val.to_string(),
273 )])),
274 ]));
275
276 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TABLE_ROW_ID]));
277 let value_array = Arc::new(UInt64Array::from(vec![next_id as u64]));
278 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
279 self.store.append(&batch)?;
280 Ok(())
281 }
282
283 pub fn get_next_table_id(&self) -> LlkvResult<Option<TableId>> {
284 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TABLE_ID);
285 let batch = match self.store.gather_rows(
286 &[lfid],
287 &[CATALOG_NEXT_TABLE_ROW_ID],
288 GatherNullPolicy::IncludeNulls,
289 ) {
290 Ok(batch) => batch,
291 Err(llkv_result::Error::NotFound) => return Ok(None),
292 Err(err) => return Err(err),
293 };
294
295 if batch.num_columns() == 0 || batch.num_rows() == 0 {
296 return Ok(None);
297 }
298
299 let array = batch
300 .column(0)
301 .as_any()
302 .downcast_ref::<UInt64Array>()
303 .ok_or_else(|| {
304 llkv_result::Error::Internal(
305 "catalog next_table_id column stored unexpected type".into(),
306 )
307 })?;
308 if array.is_empty() || array.is_null(0) {
309 return Ok(None);
310 }
311
312 let value = array.value(0);
313 if value > TableId::MAX as u64 {
314 return Err(llkv_result::Error::InvalidArgumentError(
315 "persisted next_table_id exceeds TableId range".into(),
316 ));
317 }
318
319 Ok(Some(value as TableId))
320 }
321
322 pub fn max_table_id(&self) -> LlkvResult<Option<TableId>> {
323 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
324 let row_field = rowid_fid(meta_field);
325
326 let mut collector = MaxRowIdCollector { max: None };
327 match ScanBuilder::new(self.store, row_field)
328 .options(ScanOptions::default())
329 .run(&mut collector)
330 {
331 Ok(()) => {}
332 Err(llkv_result::Error::NotFound) => return Ok(None),
333 Err(err) => return Err(err),
334 }
335
336 let max_value = match collector.max {
337 Some(value) => value,
338 None => return Ok(None),
339 };
340
341 let logical: LogicalFieldId = max_value.into();
342 Ok(Some(logical.table_id()))
343 }
344
345 pub fn all_table_metas(&self) -> LlkvResult<Vec<(TableId, TableMeta)>> {
351 let meta_field = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_TABLE_META_ID);
352 let row_field = rowid_fid(meta_field);
353
354 struct RowIdCollector {
356 row_ids: Vec<u64>,
357 }
358
359 impl PrimitiveVisitor for RowIdCollector {
360 fn u64_chunk(&mut self, values: &UInt64Array) {
361 for i in 0..values.len() {
362 self.row_ids.push(values.value(i));
363 }
364 }
365 }
366 impl PrimitiveWithRowIdsVisitor for RowIdCollector {}
367 impl PrimitiveSortedVisitor for RowIdCollector {}
368 impl PrimitiveSortedWithRowIdsVisitor for RowIdCollector {}
369
370 let mut collector = RowIdCollector {
371 row_ids: Vec::new(),
372 };
373 match ScanBuilder::new(self.store, row_field)
374 .options(ScanOptions::default())
375 .run(&mut collector)
376 {
377 Ok(()) => {}
378 Err(llkv_result::Error::NotFound) => return Ok(Vec::new()),
379 Err(err) => return Err(err),
380 }
381
382 if collector.row_ids.is_empty() {
383 return Ok(Vec::new());
384 }
385
386 let batch = self.store.gather_rows(
388 &[meta_field],
389 &collector.row_ids,
390 GatherNullPolicy::IncludeNulls,
391 )?;
392
393 let meta_col = batch
394 .column(0)
395 .as_any()
396 .downcast_ref::<BinaryArray>()
397 .ok_or_else(|| {
398 llkv_result::Error::Internal("catalog table_meta column should be Binary".into())
399 })?;
400
401 let mut result = Vec::new();
402 for (idx, &row_id) in collector.row_ids.iter().enumerate() {
403 if !meta_col.is_null(idx) {
404 let bytes = meta_col.value(idx);
405 if let Ok(meta) = bitcode::decode::<TableMeta>(bytes) {
406 let logical: LogicalFieldId = row_id.into();
407 let table_id = logical.table_id();
408 result.push((table_id, meta));
409 }
410 }
411 }
412
413 Ok(result)
414 }
415
416 pub fn put_next_txn_id(&self, next_txn_id: u64) -> LlkvResult<()> {
418 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID).into();
419 let schema = Arc::new(Schema::new(vec![
420 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
421 Field::new("next_txn_id", DataType::UInt64, false).with_metadata(HashMap::from([(
422 crate::constants::FIELD_ID_META_KEY.to_string(),
423 lfid_val.to_string(),
424 )])),
425 ]));
426
427 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_NEXT_TXN_ROW_ID]));
428 let value_array = Arc::new(UInt64Array::from(vec![next_txn_id]));
429 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
430 self.store.append(&batch)?;
431 Ok(())
432 }
433
434 pub fn get_next_txn_id(&self) -> LlkvResult<Option<u64>> {
436 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_NEXT_TXN_ID);
437 let batch = match self.store.gather_rows(
438 &[lfid],
439 &[CATALOG_NEXT_TXN_ROW_ID],
440 GatherNullPolicy::IncludeNulls,
441 ) {
442 Ok(batch) => batch,
443 Err(llkv_result::Error::NotFound) => return Ok(None),
444 Err(err) => return Err(err),
445 };
446
447 if batch.num_columns() == 0 || batch.num_rows() == 0 {
448 return Ok(None);
449 }
450
451 let array = batch
452 .column(0)
453 .as_any()
454 .downcast_ref::<UInt64Array>()
455 .ok_or_else(|| {
456 llkv_result::Error::Internal(
457 "catalog next_txn_id column stored unexpected type".into(),
458 )
459 })?;
460 if array.is_empty() || array.is_null(0) {
461 return Ok(None);
462 }
463
464 let value = array.value(0);
465 Ok(Some(value))
466 }
467
468 pub fn put_last_committed_txn_id(&self, last_committed: u64) -> LlkvResult<()> {
470 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID).into();
471 let schema = Arc::new(Schema::new(vec![
472 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
473 Field::new("last_committed_txn_id", DataType::UInt64, false).with_metadata(
474 HashMap::from([(
475 crate::constants::FIELD_ID_META_KEY.to_string(),
476 lfid_val.to_string(),
477 )]),
478 ),
479 ]));
480
481 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_LAST_COMMITTED_TXN_ROW_ID]));
482 let value_array = Arc::new(UInt64Array::from(vec![last_committed]));
483 let batch = RecordBatch::try_new(schema, vec![row_id, value_array])?;
484 self.store.append(&batch)?;
485 Ok(())
486 }
487
488 pub fn get_last_committed_txn_id(&self) -> LlkvResult<Option<u64>> {
490 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_LAST_COMMITTED_TXN_ID);
491 let batch = match self.store.gather_rows(
492 &[lfid],
493 &[CATALOG_LAST_COMMITTED_TXN_ROW_ID],
494 GatherNullPolicy::IncludeNulls,
495 ) {
496 Ok(batch) => batch,
497 Err(llkv_result::Error::NotFound) => return Ok(None),
498 Err(err) => return Err(err),
499 };
500
501 if batch.num_columns() == 0 || batch.num_rows() == 0 {
502 return Ok(None);
503 }
504
505 let array = batch
506 .column(0)
507 .as_any()
508 .downcast_ref::<UInt64Array>()
509 .ok_or_else(|| {
510 llkv_result::Error::Internal(
511 "catalog last_committed_txn_id column stored unexpected type".into(),
512 )
513 })?;
514 if array.is_empty() || array.is_null(0) {
515 return Ok(None);
516 }
517
518 let value = array.value(0);
519 Ok(Some(value))
520 }
521
522 pub fn put_catalog_state(&self, state: &crate::catalog::TableCatalogState) -> LlkvResult<()> {
527 let lfid_val: u64 = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE).into();
528 let schema = Arc::new(Schema::new(vec![
529 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
530 Field::new("catalog_state", DataType::Binary, false).with_metadata(HashMap::from([(
531 crate::constants::FIELD_ID_META_KEY.to_string(),
532 lfid_val.to_string(),
533 )])),
534 ]));
535
536 let row_id = Arc::new(UInt64Array::from(vec![CATALOG_STATE_ROW_ID]));
537 let encoded = bitcode::encode(state);
538 let state_bytes = Arc::new(BinaryArray::from(vec![encoded.as_slice()]));
539
540 let batch = RecordBatch::try_new(schema, vec![row_id, state_bytes])?;
541 self.store.append(&batch)?;
542 Ok(())
543 }
544
545 pub fn get_catalog_state(&self) -> LlkvResult<Option<crate::catalog::TableCatalogState>> {
549 let lfid = lfid(CATALOG_TABLE_ID, CATALOG_FIELD_CATALOG_STATE);
550 let batch = match self.store.gather_rows(
551 &[lfid],
552 &[CATALOG_STATE_ROW_ID],
553 GatherNullPolicy::IncludeNulls,
554 ) {
555 Ok(batch) => batch,
556 Err(llkv_result::Error::NotFound) => return Ok(None),
557 Err(err) => return Err(err),
558 };
559
560 if batch.num_columns() == 0 || batch.num_rows() == 0 {
561 return Ok(None);
562 }
563
564 let array = batch
565 .column(0)
566 .as_any()
567 .downcast_ref::<BinaryArray>()
568 .ok_or_else(|| {
569 llkv_result::Error::Internal("catalog state column stored unexpected type".into())
570 })?;
571 if array.is_empty() || array.is_null(0) {
572 return Ok(None);
573 }
574
575 let bytes = array.value(0);
576 let state = bitcode::decode(bytes).map_err(|e| {
577 llkv_result::Error::Internal(format!("Failed to decode catalog state: {}", e))
578 })?;
579 Ok(Some(state))
580 }
581}
582
583struct MaxRowIdCollector {
584 max: Option<u64>,
585}
586
587impl PrimitiveVisitor for MaxRowIdCollector {
588 fn u64_chunk(&mut self, values: &UInt64Array) {
589 for i in 0..values.len() {
590 let value = values.value(i);
591 self.max = match self.max {
592 Some(curr) if curr >= value => Some(curr),
593 _ => Some(value),
594 };
595 }
596 }
597}
598
599impl PrimitiveWithRowIdsVisitor for MaxRowIdCollector {}
600impl PrimitiveSortedVisitor for MaxRowIdCollector {}
601impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdCollector {}