1use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11
12use alopex_core::kv::{KVStore, KVTransaction};
13use alopex_core::types::TxnMode;
14use serde::{Deserialize, Serialize};
15use thiserror::Error;
16
17use crate::ast::ddl::{IndexMethod, VectorMetric};
18use crate::catalog::{
19 Catalog, ColumnMetadata, Compression, IndexMetadata, MemoryCatalog, RowIdMode,
20};
21use crate::catalog::{StorageOptions, StorageType, TableMetadata};
22use crate::planner::PlannerError;
23use crate::planner::types::ResolvedType;
24
25pub const CATALOG_PREFIX: &[u8] = b"__catalog__/";
27pub const TABLES_PREFIX: &[u8] = b"__catalog__/tables/";
28pub const INDEXES_PREFIX: &[u8] = b"__catalog__/indexes/";
29pub const META_KEY: &[u8] = b"__catalog__/meta";
30
31const CATALOG_VERSION: u32 = 1;
32
33#[derive(Debug, Error)]
34pub enum CatalogError {
35 #[error("kv error: {0}")]
36 Kv(#[from] alopex_core::Error),
37
38 #[error("serialize error: {0}")]
39 Serialize(#[from] bincode::Error),
40
41 #[error("invalid catalog key: {0}")]
42 InvalidKey(String),
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
46struct CatalogMeta {
47 version: u32,
48 table_id_counter: u32,
49 index_id_counter: u32,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub enum PersistedVectorMetric {
54 Cosine,
55 L2,
56 Inner,
57}
58
59impl From<VectorMetric> for PersistedVectorMetric {
60 fn from(value: VectorMetric) -> Self {
61 match value {
62 VectorMetric::Cosine => Self::Cosine,
63 VectorMetric::L2 => Self::L2,
64 VectorMetric::Inner => Self::Inner,
65 }
66 }
67}
68
69impl From<PersistedVectorMetric> for VectorMetric {
70 fn from(value: PersistedVectorMetric) -> Self {
71 match value {
72 PersistedVectorMetric::Cosine => Self::Cosine,
73 PersistedVectorMetric::L2 => Self::L2,
74 PersistedVectorMetric::Inner => Self::Inner,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
80pub enum PersistedType {
81 Integer,
82 BigInt,
83 Float,
84 Double,
85 Text,
86 Blob,
87 Boolean,
88 Timestamp,
89 Vector {
90 dimension: u32,
91 metric: PersistedVectorMetric,
92 },
93 Null,
94}
95
96impl From<ResolvedType> for PersistedType {
97 fn from(value: ResolvedType) -> Self {
98 match value {
99 ResolvedType::Integer => Self::Integer,
100 ResolvedType::BigInt => Self::BigInt,
101 ResolvedType::Float => Self::Float,
102 ResolvedType::Double => Self::Double,
103 ResolvedType::Text => Self::Text,
104 ResolvedType::Blob => Self::Blob,
105 ResolvedType::Boolean => Self::Boolean,
106 ResolvedType::Timestamp => Self::Timestamp,
107 ResolvedType::Vector { dimension, metric } => Self::Vector {
108 dimension,
109 metric: metric.into(),
110 },
111 ResolvedType::Null => Self::Null,
112 }
113 }
114}
115
116impl From<PersistedType> for ResolvedType {
117 fn from(value: PersistedType) -> Self {
118 match value {
119 PersistedType::Integer => Self::Integer,
120 PersistedType::BigInt => Self::BigInt,
121 PersistedType::Float => Self::Float,
122 PersistedType::Double => Self::Double,
123 PersistedType::Text => Self::Text,
124 PersistedType::Blob => Self::Blob,
125 PersistedType::Boolean => Self::Boolean,
126 PersistedType::Timestamp => Self::Timestamp,
127 PersistedType::Vector { dimension, metric } => Self::Vector {
128 dimension,
129 metric: metric.into(),
130 },
131 PersistedType::Null => Self::Null,
132 }
133 }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
137pub enum PersistedIndexType {
138 BTree,
139 Hnsw,
140}
141
142impl From<PersistedIndexType> for IndexMethod {
143 fn from(value: PersistedIndexType) -> Self {
144 match value {
145 PersistedIndexType::BTree => IndexMethod::BTree,
146 PersistedIndexType::Hnsw => IndexMethod::Hnsw,
147 }
148 }
149}
150
151impl TryFrom<IndexMethod> for PersistedIndexType {
152 type Error = ();
153
154 fn try_from(value: IndexMethod) -> Result<Self, Self::Error> {
155 match value {
156 IndexMethod::BTree => Ok(Self::BTree),
157 IndexMethod::Hnsw => Ok(Self::Hnsw),
158 }
159 }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
163pub enum PersistedStorageType {
164 Row,
165 Columnar,
166}
167
168impl From<PersistedStorageType> for StorageType {
169 fn from(value: PersistedStorageType) -> Self {
170 match value {
171 PersistedStorageType::Row => Self::Row,
172 PersistedStorageType::Columnar => Self::Columnar,
173 }
174 }
175}
176
177impl From<StorageType> for PersistedStorageType {
178 fn from(value: StorageType) -> Self {
179 match value {
180 StorageType::Row => Self::Row,
181 StorageType::Columnar => Self::Columnar,
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
187pub enum PersistedCompression {
188 None,
189 Lz4,
190 Zstd,
191}
192
193impl From<PersistedCompression> for Compression {
194 fn from(value: PersistedCompression) -> Self {
195 match value {
196 PersistedCompression::None => Self::None,
197 PersistedCompression::Lz4 => Self::Lz4,
198 PersistedCompression::Zstd => Self::Zstd,
199 }
200 }
201}
202
203impl From<Compression> for PersistedCompression {
204 fn from(value: Compression) -> Self {
205 match value {
206 Compression::None => Self::None,
207 Compression::Lz4 => Self::Lz4,
208 Compression::Zstd => Self::Zstd,
209 }
210 }
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
214pub enum PersistedRowIdMode {
215 None,
216 Direct,
217}
218
219impl From<PersistedRowIdMode> for RowIdMode {
220 fn from(value: PersistedRowIdMode) -> Self {
221 match value {
222 PersistedRowIdMode::None => Self::None,
223 PersistedRowIdMode::Direct => Self::Direct,
224 }
225 }
226}
227
228impl From<RowIdMode> for PersistedRowIdMode {
229 fn from(value: RowIdMode) -> Self {
230 match value {
231 RowIdMode::None => Self::None,
232 RowIdMode::Direct => Self::Direct,
233 }
234 }
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
238pub struct PersistedStorageOptions {
239 pub storage_type: PersistedStorageType,
240 pub compression: PersistedCompression,
241 pub row_group_size: u32,
242 pub row_id_mode: PersistedRowIdMode,
243}
244
245impl From<StorageOptions> for PersistedStorageOptions {
246 fn from(value: StorageOptions) -> Self {
247 Self {
248 storage_type: value.storage_type.into(),
249 compression: value.compression.into(),
250 row_group_size: value.row_group_size,
251 row_id_mode: value.row_id_mode.into(),
252 }
253 }
254}
255
256impl From<PersistedStorageOptions> for StorageOptions {
257 fn from(value: PersistedStorageOptions) -> Self {
258 Self {
259 storage_type: value.storage_type.into(),
260 compression: value.compression.into(),
261 row_group_size: value.row_group_size,
262 row_id_mode: value.row_id_mode.into(),
263 }
264 }
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
268pub struct PersistedColumnMeta {
269 pub name: String,
270 pub data_type: PersistedType,
271 pub not_null: bool,
272 pub primary_key: bool,
273 pub unique: bool,
274}
275
276impl From<&ColumnMetadata> for PersistedColumnMeta {
277 fn from(value: &ColumnMetadata) -> Self {
278 Self {
279 name: value.name.clone(),
280 data_type: value.data_type.clone().into(),
281 not_null: value.not_null,
282 primary_key: value.primary_key,
283 unique: value.unique,
284 }
285 }
286}
287
288impl From<PersistedColumnMeta> for ColumnMetadata {
289 fn from(value: PersistedColumnMeta) -> Self {
290 ColumnMetadata::new(value.name, value.data_type.into())
291 .with_not_null(value.not_null)
292 .with_primary_key(value.primary_key)
293 .with_unique(value.unique)
294 }
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
298pub struct PersistedTableMeta {
299 pub table_id: u32,
300 pub name: String,
301 pub columns: Vec<PersistedColumnMeta>,
302 pub primary_key: Option<Vec<String>>,
303 pub storage_options: PersistedStorageOptions,
304}
305
306impl From<&TableMetadata> for PersistedTableMeta {
307 fn from(value: &TableMetadata) -> Self {
308 Self {
309 table_id: value.table_id,
310 name: value.name.clone(),
311 columns: value
312 .columns
313 .iter()
314 .map(PersistedColumnMeta::from)
315 .collect(),
316 primary_key: value.primary_key.clone(),
317 storage_options: value.storage_options.clone().into(),
318 }
319 }
320}
321
322impl From<PersistedTableMeta> for TableMetadata {
323 fn from(value: PersistedTableMeta) -> Self {
324 let mut table = TableMetadata::new(
325 value.name,
326 value
327 .columns
328 .into_iter()
329 .map(ColumnMetadata::from)
330 .collect(),
331 )
332 .with_table_id(value.table_id);
333 table.primary_key = value.primary_key;
334 table.storage_options = value.storage_options.into();
335 table
336 }
337}
338
339#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
340pub struct PersistedIndexMeta {
341 pub index_id: u32,
342 pub name: String,
343 pub table: String,
344 pub columns: Vec<String>,
345 pub column_indices: Vec<usize>,
346 pub unique: bool,
347 pub method: Option<PersistedIndexType>,
348 pub options: Vec<(String, String)>,
349}
350
351impl From<&IndexMetadata> for PersistedIndexMeta {
352 fn from(value: &IndexMetadata) -> Self {
353 Self {
354 index_id: value.index_id,
355 name: value.name.clone(),
356 table: value.table.clone(),
357 columns: value.columns.clone(),
358 column_indices: value.column_indices.clone(),
359 unique: value.unique,
360 method: value
361 .method
362 .and_then(|m| PersistedIndexType::try_from(m).ok()),
363 options: value.options.clone(),
364 }
365 }
366}
367
368impl From<PersistedIndexMeta> for IndexMetadata {
369 fn from(value: PersistedIndexMeta) -> Self {
370 let mut index = IndexMetadata::new(value.index_id, value.name, value.table, value.columns)
371 .with_column_indices(value.column_indices)
372 .with_unique(value.unique)
373 .with_options(value.options);
374 if let Some(method) = value.method {
375 index = index.with_method(method.into());
376 }
377 index
378 }
379}
380
381fn table_key(name: &str) -> Vec<u8> {
382 let mut key = TABLES_PREFIX.to_vec();
383 key.extend_from_slice(name.as_bytes());
384 key
385}
386
387fn index_key(name: &str) -> Vec<u8> {
388 let mut key = INDEXES_PREFIX.to_vec();
389 key.extend_from_slice(name.as_bytes());
390 key
391}
392
393fn key_suffix(prefix: &[u8], key: &[u8]) -> Result<String, CatalogError> {
394 let suffix = key
395 .strip_prefix(prefix)
396 .ok_or_else(|| CatalogError::InvalidKey(format!("{key:?}")))?;
397 std::str::from_utf8(suffix)
398 .map(|s| s.to_string())
399 .map_err(|_| CatalogError::InvalidKey(format!("{key:?}")))
400}
401
402#[derive(Debug, Clone, Default)]
403pub struct CatalogOverlay {
404 added_tables: HashMap<String, TableMetadata>,
405 dropped_tables: HashSet<String>,
406 added_indexes: HashMap<String, IndexMetadata>,
407 dropped_indexes: HashSet<String>,
408}
409
410impl CatalogOverlay {
411 pub fn new() -> Self {
412 Self::default()
413 }
414
415 pub fn add_table(&mut self, table: TableMetadata) {
416 self.dropped_tables.remove(&table.name);
417 self.added_tables.insert(table.name.clone(), table);
418 }
419
420 pub fn drop_table(&mut self, name: &str) {
421 self.added_tables.remove(name);
422 self.dropped_tables.insert(name.to_string());
423 self.added_indexes.retain(|_, idx| idx.table != name);
424 }
425
426 pub fn add_index(&mut self, index: IndexMetadata) {
427 self.dropped_indexes.remove(&index.name);
428 self.added_indexes.insert(index.name.clone(), index);
429 }
430
431 pub fn drop_index(&mut self, name: &str) {
432 self.added_indexes.remove(name);
433 self.dropped_indexes.insert(name.to_string());
434 }
435}
436
437pub struct TxnCatalogView<'a, S: KVStore> {
442 catalog: &'a PersistentCatalog<S>,
443 overlay: &'a CatalogOverlay,
444}
445
446impl<'a, S: KVStore> TxnCatalogView<'a, S> {
447 pub fn new(catalog: &'a PersistentCatalog<S>, overlay: &'a CatalogOverlay) -> Self {
448 Self { catalog, overlay }
449 }
450}
451
452impl<'a, S: KVStore> Catalog for TxnCatalogView<'a, S> {
453 fn create_table(&mut self, _table: TableMetadata) -> Result<(), PlannerError> {
454 unreachable!("TxnCatalogView は参照専用です")
455 }
456
457 fn get_table(&self, name: &str) -> Option<&TableMetadata> {
458 self.catalog.get_table_in_txn(name, self.overlay)
459 }
460
461 fn drop_table(&mut self, _name: &str) -> Result<(), PlannerError> {
462 unreachable!("TxnCatalogView は参照専用です")
463 }
464
465 fn create_index(&mut self, _index: IndexMetadata) -> Result<(), PlannerError> {
466 unreachable!("TxnCatalogView は参照専用です")
467 }
468
469 fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
470 self.catalog.get_index_in_txn(name, self.overlay)
471 }
472
473 fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
474 if self.overlay.dropped_tables.contains(table) {
475 return Vec::new();
476 }
477
478 let mut indexes: Vec<&IndexMetadata> = self
479 .catalog
480 .inner
481 .get_indexes_for_table(table)
482 .into_iter()
483 .filter(|idx| !self.overlay.dropped_indexes.contains(&idx.name))
484 .collect();
485
486 for idx in self.overlay.added_indexes.values() {
487 if idx.table == table && !self.overlay.dropped_indexes.contains(&idx.name) {
488 indexes.push(idx);
489 }
490 }
491
492 indexes
493 }
494
495 fn drop_index(&mut self, _name: &str) -> Result<(), PlannerError> {
496 unreachable!("TxnCatalogView は参照専用です")
497 }
498
499 fn table_exists(&self, name: &str) -> bool {
500 self.catalog.table_exists_in_txn(name, self.overlay)
501 }
502
503 fn index_exists(&self, name: &str) -> bool {
504 self.catalog.index_exists_in_txn(name, self.overlay)
505 }
506
507 fn next_table_id(&mut self) -> u32 {
508 unreachable!("TxnCatalogView は参照専用です")
509 }
510
511 fn next_index_id(&mut self) -> u32 {
512 unreachable!("TxnCatalogView は参照専用です")
513 }
514}
515
516#[derive(Debug)]
518pub struct PersistentCatalog<S: KVStore> {
533 inner: MemoryCatalog,
534 store: Arc<S>,
535}
536
537impl<S: KVStore> PersistentCatalog<S> {
538 pub fn load(store: Arc<S>) -> Result<Self, CatalogError> {
539 let mut txn = store.begin(TxnMode::ReadOnly)?;
540
541 let mut inner = MemoryCatalog::new();
542
543 let mut max_table_id = 0u32;
544 let mut max_index_id = 0u32;
545
546 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
548 let _table_name = key_suffix(TABLES_PREFIX, &key)?;
549 let persisted: PersistedTableMeta = bincode::deserialize(&value)?;
550 max_table_id = max_table_id.max(persisted.table_id);
551 let table: TableMetadata = persisted.into();
552 inner.insert_table_unchecked(table);
553 }
554
555 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
556 let _index_name = key_suffix(INDEXES_PREFIX, &key)?;
557 let persisted: PersistedIndexMeta = bincode::deserialize(&value)?;
558 max_index_id = max_index_id.max(persisted.index_id);
559 let index: IndexMetadata = persisted.into();
560 if inner.table_exists(&index.table) {
562 inner.insert_index_unchecked(index);
563 }
564 }
565
566 let (mut table_id_counter, mut index_id_counter) = (max_table_id, max_index_id);
567 let meta_key = META_KEY.to_vec();
568 if let Some(meta_bytes) = txn.get(&meta_key)? {
569 let meta: CatalogMeta = bincode::deserialize(&meta_bytes)?;
570 if meta.version == CATALOG_VERSION {
571 table_id_counter = table_id_counter.max(meta.table_id_counter);
572 index_id_counter = index_id_counter.max(meta.index_id_counter);
573 }
574 }
575 inner.set_counters(table_id_counter, index_id_counter);
576
577 txn.rollback_self()?;
578
579 Ok(Self { inner, store })
580 }
581
582 pub fn new(store: Arc<S>) -> Self {
583 Self {
584 inner: MemoryCatalog::new(),
585 store,
586 }
587 }
588
589 pub fn store(&self) -> &Arc<S> {
590 &self.store
591 }
592
593 fn write_meta(&self, txn: &mut S::Transaction<'_>) -> Result<(), CatalogError> {
594 let (table_id_counter, index_id_counter) = self.inner.counters();
595 let meta = CatalogMeta {
596 version: CATALOG_VERSION,
597 table_id_counter,
598 index_id_counter,
599 };
600 let meta_bytes = bincode::serialize(&meta)?;
601 txn.put(META_KEY.to_vec(), meta_bytes)?;
602 Ok(())
603 }
604
605 pub fn persist_create_table(
606 &mut self,
607 txn: &mut S::Transaction<'_>,
608 table: &TableMetadata,
609 ) -> Result<(), CatalogError> {
610 let persisted = PersistedTableMeta::from(table);
611 let value = bincode::serialize(&persisted)?;
612 txn.put(table_key(&table.name), value)?;
613 self.write_meta(txn)?;
614 Ok(())
615 }
616
617 pub fn persist_drop_table(
618 &mut self,
619 txn: &mut S::Transaction<'_>,
620 name: &str,
621 ) -> Result<(), CatalogError> {
622 txn.delete(table_key(name))?;
623
624 let mut to_delete: Vec<String> = Vec::new();
626 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
627 let persisted: PersistedIndexMeta = bincode::deserialize(&value)?;
628 if persisted.table == name {
629 let index_name = key_suffix(INDEXES_PREFIX, &key)?;
630 to_delete.push(index_name);
631 }
632 }
633 for index_name in to_delete {
634 txn.delete(index_key(&index_name))?;
635 }
636
637 Ok(())
638 }
639
640 pub fn persist_create_index(
641 &mut self,
642 txn: &mut S::Transaction<'_>,
643 index: &IndexMetadata,
644 ) -> Result<(), CatalogError> {
645 let persisted = PersistedIndexMeta::from(index);
646 let value = bincode::serialize(&persisted)?;
647 txn.put(index_key(&index.name), value)?;
648 self.write_meta(txn)?;
649 Ok(())
650 }
651
652 pub fn persist_drop_index(
653 &mut self,
654 txn: &mut S::Transaction<'_>,
655 name: &str,
656 ) -> Result<(), CatalogError> {
657 txn.delete(index_key(name))?;
658 Ok(())
659 }
660
661 pub fn table_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
662 if overlay.dropped_tables.contains(name) {
663 return false;
664 }
665 if overlay.added_tables.contains_key(name) {
666 return true;
667 }
668 self.inner.table_exists(name)
669 }
670
671 pub fn get_table_in_txn<'a>(
672 &'a self,
673 name: &str,
674 overlay: &'a CatalogOverlay,
675 ) -> Option<&'a TableMetadata> {
676 if overlay.dropped_tables.contains(name) {
677 return None;
678 }
679 if let Some(table) = overlay.added_tables.get(name) {
680 return Some(table);
681 }
682 self.inner.get_table(name)
683 }
684
685 pub fn index_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
686 if overlay.dropped_indexes.contains(name) {
687 return false;
688 }
689 if let Some(index) = overlay.added_indexes.get(name) {
690 if overlay.dropped_tables.contains(&index.table) {
691 return false;
692 }
693 return true;
694 }
695 match self.inner.get_index(name) {
696 Some(index) if overlay.dropped_tables.contains(&index.table) => false,
697 Some(_) => true,
698 None => false,
699 }
700 }
701
702 pub fn get_index_in_txn<'a>(
703 &'a self,
704 name: &str,
705 overlay: &'a CatalogOverlay,
706 ) -> Option<&'a IndexMetadata> {
707 if overlay.dropped_indexes.contains(name) {
708 return None;
709 }
710 if let Some(index) = overlay.added_indexes.get(name) {
711 if overlay.dropped_tables.contains(&index.table) {
712 return None;
713 }
714 return Some(index);
715 }
716 match self.inner.get_index(name) {
717 Some(index) if overlay.dropped_tables.contains(&index.table) => None,
718 other => other,
719 }
720 }
721
722 pub fn apply_overlay(&mut self, overlay: CatalogOverlay) {
723 for (_, table) in overlay.added_tables {
724 self.inner.insert_table_unchecked(table);
725 }
726 for name in overlay.dropped_tables {
727 self.inner.remove_table_unchecked(&name);
728 }
729 for (_, index) in overlay.added_indexes {
730 self.inner.insert_index_unchecked(index);
731 }
732 for name in overlay.dropped_indexes {
733 self.inner.remove_index_unchecked(&name);
734 }
735 }
736
737 pub fn discard_overlay(_overlay: CatalogOverlay) {}
738}
739
740impl<S: KVStore> Catalog for PersistentCatalog<S> {
741 fn create_table(&mut self, table: TableMetadata) -> Result<(), PlannerError> {
742 self.inner.create_table(table)
743 }
744
745 fn get_table(&self, name: &str) -> Option<&TableMetadata> {
746 self.inner.get_table(name)
747 }
748
749 fn drop_table(&mut self, name: &str) -> Result<(), PlannerError> {
750 self.inner.drop_table(name)
751 }
752
753 fn create_index(&mut self, index: IndexMetadata) -> Result<(), PlannerError> {
754 self.inner.create_index(index)
755 }
756
757 fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
758 self.inner.get_index(name)
759 }
760
761 fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
762 self.inner.get_indexes_for_table(table)
763 }
764
765 fn drop_index(&mut self, name: &str) -> Result<(), PlannerError> {
766 self.inner.drop_index(name)
767 }
768
769 fn table_exists(&self, name: &str) -> bool {
770 self.inner.table_exists(name)
771 }
772
773 fn index_exists(&self, name: &str) -> bool {
774 self.inner.index_exists(name)
775 }
776
777 fn next_table_id(&mut self) -> u32 {
778 self.inner.next_table_id()
779 }
780
781 fn next_index_id(&mut self) -> u32 {
782 self.inner.next_index_id()
783 }
784}
785
786#[cfg(test)]
787mod tests {
788 use super::*;
789 use crate::planner::types::ResolvedType;
790
791 fn test_table(name: &str, id: u32) -> TableMetadata {
792 TableMetadata::new(
793 name,
794 vec![ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true)],
795 )
796 .with_table_id(id)
797 .with_primary_key(vec!["id".to_string()])
798 }
799
800 #[test]
801 fn load_empty_store() {
802 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
803 let catalog = PersistentCatalog::load(store).unwrap();
804 assert_eq!(catalog.inner.table_count(), 0);
805 assert_eq!(catalog.inner.index_count(), 0);
806 }
807
808 #[test]
809 fn create_table_persists() {
810 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
811 let mut catalog = PersistentCatalog::new(store.clone());
812
813 catalog.inner.set_counters(1, 0);
815
816 let table = test_table("users", 1);
817 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
818 catalog.persist_create_table(&mut txn, &table).unwrap();
819 txn.commit_self().unwrap();
820
821 let reloaded = PersistentCatalog::load(store).unwrap();
822 assert!(reloaded.table_exists("users"));
823 assert_eq!(reloaded.get_table("users").unwrap().table_id, 1);
824 }
825
826 #[test]
827 fn drop_table_removes() {
828 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
829 let mut catalog = PersistentCatalog::new(store.clone());
830 catalog.inner.set_counters(1, 0);
831
832 let table = test_table("users", 1);
833 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
834 catalog.persist_create_table(&mut txn, &table).unwrap();
835 txn.commit_self().unwrap();
836
837 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
838 catalog.persist_drop_table(&mut txn, "users").unwrap();
839 txn.commit_self().unwrap();
840
841 let reloaded = PersistentCatalog::load(store).unwrap();
842 assert!(!reloaded.table_exists("users"));
843 }
844
845 #[test]
846 fn reload_preserves_state() {
847 let temp_dir = tempfile::tempdir().unwrap();
848 let wal_path = temp_dir.path().join("catalog.wal");
849 let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
850 let mut catalog = PersistentCatalog::new(store.clone());
851 catalog.inner.set_counters(1, 0);
852
853 let table = test_table("users", 1);
854 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
855 catalog.persist_create_table(&mut txn, &table).unwrap();
856 txn.commit_self().unwrap();
857 store.flush().unwrap();
858
859 drop(catalog);
860 drop(store);
861
862 let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
863 let reloaded = PersistentCatalog::load(store).unwrap();
864 assert!(reloaded.table_exists("users"));
865 }
866
867 #[test]
868 fn overlay_applied_on_commit() {
869 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
870 let mut catalog = PersistentCatalog::new(store);
871 catalog.inner.insert_table_unchecked(test_table("users", 1));
872
873 let mut overlay = CatalogOverlay::new();
874 overlay.drop_table("users");
875 overlay.add_table(test_table("orders", 2));
876
877 assert!(!catalog.table_exists_in_txn("users", &overlay));
878 assert!(catalog.table_exists_in_txn("orders", &overlay));
879
880 catalog.apply_overlay(overlay);
881
882 assert!(!catalog.table_exists("users"));
883 assert!(catalog.table_exists("orders"));
884 }
885
886 #[test]
887 fn overlay_discarded_on_rollback() {
888 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
889 let mut catalog = PersistentCatalog::new(store);
890 catalog.inner.insert_table_unchecked(test_table("users", 1));
891
892 let mut overlay = CatalogOverlay::new();
893 overlay.drop_table("users");
894
895 PersistentCatalog::<alopex_core::kv::memory::MemoryKV>::discard_overlay(overlay);
896
897 assert!(catalog.table_exists("users"));
898 }
899}