alopex_sql/catalog/
persistent.rs

1//! 永続化対応カタログ実装。
2//!
3//! 既存の `TableMetadata` / `IndexMetadata` は `Expr` を含むため、そのままシリアライズして
4//! 永続化することができない。そこで、本モジュールでは永続化用 DTO を定義し、KV ストアへ
5//! bincode で保存する。
6//!
7//! 注意: 現状は `ColumnMetadata.default`(DEFAULT 式)を永続化しない。復元時は `None` となる。
8
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11
12use alopex_core::kv::{KVStore, KVTransaction};
13use alopex_core::types::TxnMode;
14use serde::{Deserialize, Serialize};
15use thiserror::Error;
16
17use crate::ast::ddl::{IndexMethod, VectorMetric};
18use crate::catalog::{
19    Catalog, ColumnMetadata, Compression, IndexMetadata, MemoryCatalog, RowIdMode,
20};
21use crate::catalog::{StorageOptions, StorageType, TableMetadata};
22use crate::planner::PlannerError;
23use crate::planner::types::ResolvedType;
24
25/// カタログ用キープレフィックス。
26pub const CATALOG_PREFIX: &[u8] = b"__catalog__/";
27pub const TABLES_PREFIX: &[u8] = b"__catalog__/tables/";
28pub const INDEXES_PREFIX: &[u8] = b"__catalog__/indexes/";
29pub const META_KEY: &[u8] = b"__catalog__/meta";
30
31const CATALOG_VERSION: u32 = 1;
32
33#[derive(Debug, Error)]
34pub enum CatalogError {
35    #[error("kv error: {0}")]
36    Kv(#[from] alopex_core::Error),
37
38    #[error("serialize error: {0}")]
39    Serialize(#[from] bincode::Error),
40
41    #[error("invalid catalog key: {0}")]
42    InvalidKey(String),
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
46struct CatalogMeta {
47    version: u32,
48    table_id_counter: u32,
49    index_id_counter: u32,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub enum PersistedVectorMetric {
54    Cosine,
55    L2,
56    Inner,
57}
58
59impl From<VectorMetric> for PersistedVectorMetric {
60    fn from(value: VectorMetric) -> Self {
61        match value {
62            VectorMetric::Cosine => Self::Cosine,
63            VectorMetric::L2 => Self::L2,
64            VectorMetric::Inner => Self::Inner,
65        }
66    }
67}
68
69impl From<PersistedVectorMetric> for VectorMetric {
70    fn from(value: PersistedVectorMetric) -> Self {
71        match value {
72            PersistedVectorMetric::Cosine => Self::Cosine,
73            PersistedVectorMetric::L2 => Self::L2,
74            PersistedVectorMetric::Inner => Self::Inner,
75        }
76    }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
80pub enum PersistedType {
81    Integer,
82    BigInt,
83    Float,
84    Double,
85    Text,
86    Blob,
87    Boolean,
88    Timestamp,
89    Vector {
90        dimension: u32,
91        metric: PersistedVectorMetric,
92    },
93    Null,
94}
95
96impl From<ResolvedType> for PersistedType {
97    fn from(value: ResolvedType) -> Self {
98        match value {
99            ResolvedType::Integer => Self::Integer,
100            ResolvedType::BigInt => Self::BigInt,
101            ResolvedType::Float => Self::Float,
102            ResolvedType::Double => Self::Double,
103            ResolvedType::Text => Self::Text,
104            ResolvedType::Blob => Self::Blob,
105            ResolvedType::Boolean => Self::Boolean,
106            ResolvedType::Timestamp => Self::Timestamp,
107            ResolvedType::Vector { dimension, metric } => Self::Vector {
108                dimension,
109                metric: metric.into(),
110            },
111            ResolvedType::Null => Self::Null,
112        }
113    }
114}
115
116impl From<PersistedType> for ResolvedType {
117    fn from(value: PersistedType) -> Self {
118        match value {
119            PersistedType::Integer => Self::Integer,
120            PersistedType::BigInt => Self::BigInt,
121            PersistedType::Float => Self::Float,
122            PersistedType::Double => Self::Double,
123            PersistedType::Text => Self::Text,
124            PersistedType::Blob => Self::Blob,
125            PersistedType::Boolean => Self::Boolean,
126            PersistedType::Timestamp => Self::Timestamp,
127            PersistedType::Vector { dimension, metric } => Self::Vector {
128                dimension,
129                metric: metric.into(),
130            },
131            PersistedType::Null => Self::Null,
132        }
133    }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
137pub enum PersistedIndexType {
138    BTree,
139    Hnsw,
140}
141
142impl From<PersistedIndexType> for IndexMethod {
143    fn from(value: PersistedIndexType) -> Self {
144        match value {
145            PersistedIndexType::BTree => IndexMethod::BTree,
146            PersistedIndexType::Hnsw => IndexMethod::Hnsw,
147        }
148    }
149}
150
151impl TryFrom<IndexMethod> for PersistedIndexType {
152    type Error = ();
153
154    fn try_from(value: IndexMethod) -> Result<Self, Self::Error> {
155        match value {
156            IndexMethod::BTree => Ok(Self::BTree),
157            IndexMethod::Hnsw => Ok(Self::Hnsw),
158        }
159    }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
163pub enum PersistedStorageType {
164    Row,
165    Columnar,
166}
167
168impl From<PersistedStorageType> for StorageType {
169    fn from(value: PersistedStorageType) -> Self {
170        match value {
171            PersistedStorageType::Row => Self::Row,
172            PersistedStorageType::Columnar => Self::Columnar,
173        }
174    }
175}
176
177impl From<StorageType> for PersistedStorageType {
178    fn from(value: StorageType) -> Self {
179        match value {
180            StorageType::Row => Self::Row,
181            StorageType::Columnar => Self::Columnar,
182        }
183    }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
187pub enum PersistedCompression {
188    None,
189    Lz4,
190    Zstd,
191}
192
193impl From<PersistedCompression> for Compression {
194    fn from(value: PersistedCompression) -> Self {
195        match value {
196            PersistedCompression::None => Self::None,
197            PersistedCompression::Lz4 => Self::Lz4,
198            PersistedCompression::Zstd => Self::Zstd,
199        }
200    }
201}
202
203impl From<Compression> for PersistedCompression {
204    fn from(value: Compression) -> Self {
205        match value {
206            Compression::None => Self::None,
207            Compression::Lz4 => Self::Lz4,
208            Compression::Zstd => Self::Zstd,
209        }
210    }
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
214pub enum PersistedRowIdMode {
215    None,
216    Direct,
217}
218
219impl From<PersistedRowIdMode> for RowIdMode {
220    fn from(value: PersistedRowIdMode) -> Self {
221        match value {
222            PersistedRowIdMode::None => Self::None,
223            PersistedRowIdMode::Direct => Self::Direct,
224        }
225    }
226}
227
228impl From<RowIdMode> for PersistedRowIdMode {
229    fn from(value: RowIdMode) -> Self {
230        match value {
231            RowIdMode::None => Self::None,
232            RowIdMode::Direct => Self::Direct,
233        }
234    }
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
238pub struct PersistedStorageOptions {
239    pub storage_type: PersistedStorageType,
240    pub compression: PersistedCompression,
241    pub row_group_size: u32,
242    pub row_id_mode: PersistedRowIdMode,
243}
244
245impl From<StorageOptions> for PersistedStorageOptions {
246    fn from(value: StorageOptions) -> Self {
247        Self {
248            storage_type: value.storage_type.into(),
249            compression: value.compression.into(),
250            row_group_size: value.row_group_size,
251            row_id_mode: value.row_id_mode.into(),
252        }
253    }
254}
255
256impl From<PersistedStorageOptions> for StorageOptions {
257    fn from(value: PersistedStorageOptions) -> Self {
258        Self {
259            storage_type: value.storage_type.into(),
260            compression: value.compression.into(),
261            row_group_size: value.row_group_size,
262            row_id_mode: value.row_id_mode.into(),
263        }
264    }
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
268pub struct PersistedColumnMeta {
269    pub name: String,
270    pub data_type: PersistedType,
271    pub not_null: bool,
272    pub primary_key: bool,
273    pub unique: bool,
274}
275
276impl From<&ColumnMetadata> for PersistedColumnMeta {
277    fn from(value: &ColumnMetadata) -> Self {
278        Self {
279            name: value.name.clone(),
280            data_type: value.data_type.clone().into(),
281            not_null: value.not_null,
282            primary_key: value.primary_key,
283            unique: value.unique,
284        }
285    }
286}
287
288impl From<PersistedColumnMeta> for ColumnMetadata {
289    fn from(value: PersistedColumnMeta) -> Self {
290        ColumnMetadata::new(value.name, value.data_type.into())
291            .with_not_null(value.not_null)
292            .with_primary_key(value.primary_key)
293            .with_unique(value.unique)
294    }
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
298pub struct PersistedTableMeta {
299    pub table_id: u32,
300    pub name: String,
301    pub columns: Vec<PersistedColumnMeta>,
302    pub primary_key: Option<Vec<String>>,
303    pub storage_options: PersistedStorageOptions,
304}
305
306impl From<&TableMetadata> for PersistedTableMeta {
307    fn from(value: &TableMetadata) -> Self {
308        Self {
309            table_id: value.table_id,
310            name: value.name.clone(),
311            columns: value
312                .columns
313                .iter()
314                .map(PersistedColumnMeta::from)
315                .collect(),
316            primary_key: value.primary_key.clone(),
317            storage_options: value.storage_options.clone().into(),
318        }
319    }
320}
321
322impl From<PersistedTableMeta> for TableMetadata {
323    fn from(value: PersistedTableMeta) -> Self {
324        let mut table = TableMetadata::new(
325            value.name,
326            value
327                .columns
328                .into_iter()
329                .map(ColumnMetadata::from)
330                .collect(),
331        )
332        .with_table_id(value.table_id);
333        table.primary_key = value.primary_key;
334        table.storage_options = value.storage_options.into();
335        table
336    }
337}
338
339#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
340pub struct PersistedIndexMeta {
341    pub index_id: u32,
342    pub name: String,
343    pub table: String,
344    pub columns: Vec<String>,
345    pub column_indices: Vec<usize>,
346    pub unique: bool,
347    pub method: Option<PersistedIndexType>,
348    pub options: Vec<(String, String)>,
349}
350
351impl From<&IndexMetadata> for PersistedIndexMeta {
352    fn from(value: &IndexMetadata) -> Self {
353        Self {
354            index_id: value.index_id,
355            name: value.name.clone(),
356            table: value.table.clone(),
357            columns: value.columns.clone(),
358            column_indices: value.column_indices.clone(),
359            unique: value.unique,
360            method: value
361                .method
362                .and_then(|m| PersistedIndexType::try_from(m).ok()),
363            options: value.options.clone(),
364        }
365    }
366}
367
368impl From<PersistedIndexMeta> for IndexMetadata {
369    fn from(value: PersistedIndexMeta) -> Self {
370        let mut index = IndexMetadata::new(value.index_id, value.name, value.table, value.columns)
371            .with_column_indices(value.column_indices)
372            .with_unique(value.unique)
373            .with_options(value.options);
374        if let Some(method) = value.method {
375            index = index.with_method(method.into());
376        }
377        index
378    }
379}
380
381fn table_key(name: &str) -> Vec<u8> {
382    let mut key = TABLES_PREFIX.to_vec();
383    key.extend_from_slice(name.as_bytes());
384    key
385}
386
387fn index_key(name: &str) -> Vec<u8> {
388    let mut key = INDEXES_PREFIX.to_vec();
389    key.extend_from_slice(name.as_bytes());
390    key
391}
392
393fn key_suffix(prefix: &[u8], key: &[u8]) -> Result<String, CatalogError> {
394    let suffix = key
395        .strip_prefix(prefix)
396        .ok_or_else(|| CatalogError::InvalidKey(format!("{key:?}")))?;
397    std::str::from_utf8(suffix)
398        .map(|s| s.to_string())
399        .map_err(|_| CatalogError::InvalidKey(format!("{key:?}")))
400}
401
402#[derive(Debug, Clone, Default)]
403pub struct CatalogOverlay {
404    added_tables: HashMap<String, TableMetadata>,
405    dropped_tables: HashSet<String>,
406    added_indexes: HashMap<String, IndexMetadata>,
407    dropped_indexes: HashSet<String>,
408}
409
410impl CatalogOverlay {
411    pub fn new() -> Self {
412        Self::default()
413    }
414
415    pub fn add_table(&mut self, table: TableMetadata) {
416        self.dropped_tables.remove(&table.name);
417        self.added_tables.insert(table.name.clone(), table);
418    }
419
420    pub fn drop_table(&mut self, name: &str) {
421        self.added_tables.remove(name);
422        self.dropped_tables.insert(name.to_string());
423        self.added_indexes.retain(|_, idx| idx.table != name);
424    }
425
426    pub fn add_index(&mut self, index: IndexMetadata) {
427        self.dropped_indexes.remove(&index.name);
428        self.added_indexes.insert(index.name.clone(), index);
429    }
430
431    pub fn drop_index(&mut self, name: &str) {
432        self.added_indexes.remove(name);
433        self.dropped_indexes.insert(name.to_string());
434    }
435}
436
437/// トランザクション内(オーバーレイ込み)で参照するための Catalog ビュー。
438///
439/// DML/SELECT の実行や Planner の参照用途に使う。書き込み系 API は利用しない前提のため、
440/// `Catalog` trait の書き込みメソッドは `unreachable!()` とする。
441pub struct TxnCatalogView<'a, S: KVStore> {
442    catalog: &'a PersistentCatalog<S>,
443    overlay: &'a CatalogOverlay,
444}
445
446impl<'a, S: KVStore> TxnCatalogView<'a, S> {
447    pub fn new(catalog: &'a PersistentCatalog<S>, overlay: &'a CatalogOverlay) -> Self {
448        Self { catalog, overlay }
449    }
450}
451
452impl<'a, S: KVStore> Catalog for TxnCatalogView<'a, S> {
453    fn create_table(&mut self, _table: TableMetadata) -> Result<(), PlannerError> {
454        unreachable!("TxnCatalogView は参照専用です")
455    }
456
457    fn get_table(&self, name: &str) -> Option<&TableMetadata> {
458        self.catalog.get_table_in_txn(name, self.overlay)
459    }
460
461    fn drop_table(&mut self, _name: &str) -> Result<(), PlannerError> {
462        unreachable!("TxnCatalogView は参照専用です")
463    }
464
465    fn create_index(&mut self, _index: IndexMetadata) -> Result<(), PlannerError> {
466        unreachable!("TxnCatalogView は参照専用です")
467    }
468
469    fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
470        self.catalog.get_index_in_txn(name, self.overlay)
471    }
472
473    fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
474        if self.overlay.dropped_tables.contains(table) {
475            return Vec::new();
476        }
477
478        let mut indexes: Vec<&IndexMetadata> = self
479            .catalog
480            .inner
481            .get_indexes_for_table(table)
482            .into_iter()
483            .filter(|idx| !self.overlay.dropped_indexes.contains(&idx.name))
484            .collect();
485
486        for idx in self.overlay.added_indexes.values() {
487            if idx.table == table && !self.overlay.dropped_indexes.contains(&idx.name) {
488                indexes.push(idx);
489            }
490        }
491
492        indexes
493    }
494
495    fn drop_index(&mut self, _name: &str) -> Result<(), PlannerError> {
496        unreachable!("TxnCatalogView は参照専用です")
497    }
498
499    fn table_exists(&self, name: &str) -> bool {
500        self.catalog.table_exists_in_txn(name, self.overlay)
501    }
502
503    fn index_exists(&self, name: &str) -> bool {
504        self.catalog.index_exists_in_txn(name, self.overlay)
505    }
506
507    fn next_table_id(&mut self) -> u32 {
508        unreachable!("TxnCatalogView は参照専用です")
509    }
510
511    fn next_index_id(&mut self) -> u32 {
512        unreachable!("TxnCatalogView は参照専用です")
513    }
514}
515
516/// 永続カタログ実装。
517#[derive(Debug)]
518/// 永続化対応のカタログ実装。
519///
520/// # Examples
521///
522/// ```
523/// use std::sync::Arc;
524/// use alopex_core::kv::memory::MemoryKV;
525/// use alopex_sql::Catalog;
526/// use alopex_sql::catalog::PersistentCatalog;
527///
528/// let store = Arc::new(MemoryKV::new());
529/// let catalog = PersistentCatalog::new(store);
530/// assert!(catalog.table_exists("users") == false);
531/// ```
532pub struct PersistentCatalog<S: KVStore> {
533    inner: MemoryCatalog,
534    store: Arc<S>,
535}
536
537impl<S: KVStore> PersistentCatalog<S> {
538    pub fn load(store: Arc<S>) -> Result<Self, CatalogError> {
539        let mut txn = store.begin(TxnMode::ReadOnly)?;
540
541        let mut inner = MemoryCatalog::new();
542
543        let mut max_table_id = 0u32;
544        let mut max_index_id = 0u32;
545
546        // テーブルをロード(まずテーブルを入れてからインデックスを入れる)
547        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
548            let _table_name = key_suffix(TABLES_PREFIX, &key)?;
549            let persisted: PersistedTableMeta = bincode::deserialize(&value)?;
550            max_table_id = max_table_id.max(persisted.table_id);
551            let table: TableMetadata = persisted.into();
552            inner.insert_table_unchecked(table);
553        }
554
555        for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
556            let _index_name = key_suffix(INDEXES_PREFIX, &key)?;
557            let persisted: PersistedIndexMeta = bincode::deserialize(&value)?;
558            max_index_id = max_index_id.max(persisted.index_id);
559            let index: IndexMetadata = persisted.into();
560            // 参照先テーブルがない場合はスキップ(破損対策)
561            if inner.table_exists(&index.table) {
562                inner.insert_index_unchecked(index);
563            }
564        }
565
566        let (mut table_id_counter, mut index_id_counter) = (max_table_id, max_index_id);
567        let meta_key = META_KEY.to_vec();
568        if let Some(meta_bytes) = txn.get(&meta_key)? {
569            let meta: CatalogMeta = bincode::deserialize(&meta_bytes)?;
570            if meta.version == CATALOG_VERSION {
571                table_id_counter = table_id_counter.max(meta.table_id_counter);
572                index_id_counter = index_id_counter.max(meta.index_id_counter);
573            }
574        }
575        inner.set_counters(table_id_counter, index_id_counter);
576
577        txn.rollback_self()?;
578
579        Ok(Self { inner, store })
580    }
581
582    pub fn new(store: Arc<S>) -> Self {
583        Self {
584            inner: MemoryCatalog::new(),
585            store,
586        }
587    }
588
589    pub fn store(&self) -> &Arc<S> {
590        &self.store
591    }
592
593    fn write_meta(&self, txn: &mut S::Transaction<'_>) -> Result<(), CatalogError> {
594        let (table_id_counter, index_id_counter) = self.inner.counters();
595        let meta = CatalogMeta {
596            version: CATALOG_VERSION,
597            table_id_counter,
598            index_id_counter,
599        };
600        let meta_bytes = bincode::serialize(&meta)?;
601        txn.put(META_KEY.to_vec(), meta_bytes)?;
602        Ok(())
603    }
604
605    pub fn persist_create_table(
606        &mut self,
607        txn: &mut S::Transaction<'_>,
608        table: &TableMetadata,
609    ) -> Result<(), CatalogError> {
610        let persisted = PersistedTableMeta::from(table);
611        let value = bincode::serialize(&persisted)?;
612        txn.put(table_key(&table.name), value)?;
613        self.write_meta(txn)?;
614        Ok(())
615    }
616
617    pub fn persist_drop_table(
618        &mut self,
619        txn: &mut S::Transaction<'_>,
620        name: &str,
621    ) -> Result<(), CatalogError> {
622        txn.delete(table_key(name))?;
623
624        // テーブルに紐づくインデックスも削除する。
625        let mut to_delete: Vec<String> = Vec::new();
626        for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
627            let persisted: PersistedIndexMeta = bincode::deserialize(&value)?;
628            if persisted.table == name {
629                let index_name = key_suffix(INDEXES_PREFIX, &key)?;
630                to_delete.push(index_name);
631            }
632        }
633        for index_name in to_delete {
634            txn.delete(index_key(&index_name))?;
635        }
636
637        Ok(())
638    }
639
640    pub fn persist_create_index(
641        &mut self,
642        txn: &mut S::Transaction<'_>,
643        index: &IndexMetadata,
644    ) -> Result<(), CatalogError> {
645        let persisted = PersistedIndexMeta::from(index);
646        let value = bincode::serialize(&persisted)?;
647        txn.put(index_key(&index.name), value)?;
648        self.write_meta(txn)?;
649        Ok(())
650    }
651
652    pub fn persist_drop_index(
653        &mut self,
654        txn: &mut S::Transaction<'_>,
655        name: &str,
656    ) -> Result<(), CatalogError> {
657        txn.delete(index_key(name))?;
658        Ok(())
659    }
660
661    pub fn table_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
662        if overlay.dropped_tables.contains(name) {
663            return false;
664        }
665        if overlay.added_tables.contains_key(name) {
666            return true;
667        }
668        self.inner.table_exists(name)
669    }
670
671    pub fn get_table_in_txn<'a>(
672        &'a self,
673        name: &str,
674        overlay: &'a CatalogOverlay,
675    ) -> Option<&'a TableMetadata> {
676        if overlay.dropped_tables.contains(name) {
677            return None;
678        }
679        if let Some(table) = overlay.added_tables.get(name) {
680            return Some(table);
681        }
682        self.inner.get_table(name)
683    }
684
685    pub fn index_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
686        if overlay.dropped_indexes.contains(name) {
687            return false;
688        }
689        if let Some(index) = overlay.added_indexes.get(name) {
690            if overlay.dropped_tables.contains(&index.table) {
691                return false;
692            }
693            return true;
694        }
695        match self.inner.get_index(name) {
696            Some(index) if overlay.dropped_tables.contains(&index.table) => false,
697            Some(_) => true,
698            None => false,
699        }
700    }
701
702    pub fn get_index_in_txn<'a>(
703        &'a self,
704        name: &str,
705        overlay: &'a CatalogOverlay,
706    ) -> Option<&'a IndexMetadata> {
707        if overlay.dropped_indexes.contains(name) {
708            return None;
709        }
710        if let Some(index) = overlay.added_indexes.get(name) {
711            if overlay.dropped_tables.contains(&index.table) {
712                return None;
713            }
714            return Some(index);
715        }
716        match self.inner.get_index(name) {
717            Some(index) if overlay.dropped_tables.contains(&index.table) => None,
718            other => other,
719        }
720    }
721
722    pub fn apply_overlay(&mut self, overlay: CatalogOverlay) {
723        for (_, table) in overlay.added_tables {
724            self.inner.insert_table_unchecked(table);
725        }
726        for name in overlay.dropped_tables {
727            self.inner.remove_table_unchecked(&name);
728        }
729        for (_, index) in overlay.added_indexes {
730            self.inner.insert_index_unchecked(index);
731        }
732        for name in overlay.dropped_indexes {
733            self.inner.remove_index_unchecked(&name);
734        }
735    }
736
737    pub fn discard_overlay(_overlay: CatalogOverlay) {}
738}
739
740impl<S: KVStore> Catalog for PersistentCatalog<S> {
741    fn create_table(&mut self, table: TableMetadata) -> Result<(), PlannerError> {
742        self.inner.create_table(table)
743    }
744
745    fn get_table(&self, name: &str) -> Option<&TableMetadata> {
746        self.inner.get_table(name)
747    }
748
749    fn drop_table(&mut self, name: &str) -> Result<(), PlannerError> {
750        self.inner.drop_table(name)
751    }
752
753    fn create_index(&mut self, index: IndexMetadata) -> Result<(), PlannerError> {
754        self.inner.create_index(index)
755    }
756
757    fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
758        self.inner.get_index(name)
759    }
760
761    fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
762        self.inner.get_indexes_for_table(table)
763    }
764
765    fn drop_index(&mut self, name: &str) -> Result<(), PlannerError> {
766        self.inner.drop_index(name)
767    }
768
769    fn table_exists(&self, name: &str) -> bool {
770        self.inner.table_exists(name)
771    }
772
773    fn index_exists(&self, name: &str) -> bool {
774        self.inner.index_exists(name)
775    }
776
777    fn next_table_id(&mut self) -> u32 {
778        self.inner.next_table_id()
779    }
780
781    fn next_index_id(&mut self) -> u32 {
782        self.inner.next_index_id()
783    }
784}
785
786#[cfg(test)]
787mod tests {
788    use super::*;
789    use crate::planner::types::ResolvedType;
790
791    fn test_table(name: &str, id: u32) -> TableMetadata {
792        TableMetadata::new(
793            name,
794            vec![ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true)],
795        )
796        .with_table_id(id)
797        .with_primary_key(vec!["id".to_string()])
798    }
799
800    #[test]
801    fn load_empty_store() {
802        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
803        let catalog = PersistentCatalog::load(store).unwrap();
804        assert_eq!(catalog.inner.table_count(), 0);
805        assert_eq!(catalog.inner.index_count(), 0);
806    }
807
808    #[test]
809    fn create_table_persists() {
810        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
811        let mut catalog = PersistentCatalog::new(store.clone());
812
813        // inner のカウンタを更新して meta が書き込まれることを担保する
814        catalog.inner.set_counters(1, 0);
815
816        let table = test_table("users", 1);
817        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
818        catalog.persist_create_table(&mut txn, &table).unwrap();
819        txn.commit_self().unwrap();
820
821        let reloaded = PersistentCatalog::load(store).unwrap();
822        assert!(reloaded.table_exists("users"));
823        assert_eq!(reloaded.get_table("users").unwrap().table_id, 1);
824    }
825
826    #[test]
827    fn drop_table_removes() {
828        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
829        let mut catalog = PersistentCatalog::new(store.clone());
830        catalog.inner.set_counters(1, 0);
831
832        let table = test_table("users", 1);
833        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
834        catalog.persist_create_table(&mut txn, &table).unwrap();
835        txn.commit_self().unwrap();
836
837        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
838        catalog.persist_drop_table(&mut txn, "users").unwrap();
839        txn.commit_self().unwrap();
840
841        let reloaded = PersistentCatalog::load(store).unwrap();
842        assert!(!reloaded.table_exists("users"));
843    }
844
845    #[test]
846    fn reload_preserves_state() {
847        let temp_dir = tempfile::tempdir().unwrap();
848        let wal_path = temp_dir.path().join("catalog.wal");
849        let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
850        let mut catalog = PersistentCatalog::new(store.clone());
851        catalog.inner.set_counters(1, 0);
852
853        let table = test_table("users", 1);
854        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
855        catalog.persist_create_table(&mut txn, &table).unwrap();
856        txn.commit_self().unwrap();
857        store.flush().unwrap();
858
859        drop(catalog);
860        drop(store);
861
862        let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
863        let reloaded = PersistentCatalog::load(store).unwrap();
864        assert!(reloaded.table_exists("users"));
865    }
866
867    #[test]
868    fn overlay_applied_on_commit() {
869        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
870        let mut catalog = PersistentCatalog::new(store);
871        catalog.inner.insert_table_unchecked(test_table("users", 1));
872
873        let mut overlay = CatalogOverlay::new();
874        overlay.drop_table("users");
875        overlay.add_table(test_table("orders", 2));
876
877        assert!(!catalog.table_exists_in_txn("users", &overlay));
878        assert!(catalog.table_exists_in_txn("orders", &overlay));
879
880        catalog.apply_overlay(overlay);
881
882        assert!(!catalog.table_exists("users"));
883        assert!(catalog.table_exists("orders"));
884    }
885
886    #[test]
887    fn overlay_discarded_on_rollback() {
888        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
889        let mut catalog = PersistentCatalog::new(store);
890        catalog.inner.insert_table_unchecked(test_table("users", 1));
891
892        let mut overlay = CatalogOverlay::new();
893        overlay.drop_table("users");
894
895        PersistentCatalog::<alopex_core::kv::memory::MemoryKV>::discard_overlay(overlay);
896
897        assert!(catalog.table_exists("users"));
898    }
899}