Skip to main content

alopex_sql/catalog/
persistent.rs

1//! 永続化対応カタログ実装。
2//!
3//! 既存の `TableMetadata` / `IndexMetadata` は `Expr` を含むため、そのままシリアライズして
4//! 永続化することができない。そこで、本モジュールでは永続化用 DTO を定義し、KV ストアへ
5//! bincode で保存する。
6//!
7//! 注意: 現状は `ColumnMetadata.default`(DEFAULT 式)を永続化しない。復元時は `None` となる。
8
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11
12use alopex_core::kv::{KVStore, KVTransaction};
13use alopex_core::types::TxnMode;
14use serde::{Deserialize, Serialize};
15use thiserror::Error;
16
17use crate::ast::ddl::{IndexMethod, VectorMetric};
18use crate::catalog::{
19    Catalog, ColumnMetadata, Compression, IndexMetadata, MemoryCatalog, RowIdMode,
20};
21use crate::catalog::{StorageOptions, StorageType, TableMetadata};
22use crate::planner::PlannerError;
23use crate::planner::types::ResolvedType;
24
25/// カタログ用キープレフィックス。
26pub const CATALOG_PREFIX: &[u8] = b"__catalog__/";
27pub const CATALOGS_PREFIX: &[u8] = b"__catalog__/catalogs/";
28pub const NAMESPACES_PREFIX: &[u8] = b"__catalog__/namespaces/";
29pub const TABLES_PREFIX: &[u8] = b"__catalog__/tables/";
30pub const INDEXES_PREFIX: &[u8] = b"__catalog__/indexes/";
31pub const META_KEY: &[u8] = b"__catalog__/meta";
32
33const CATALOG_VERSION: u32 = 2;
34
35#[derive(Debug, Error)]
36pub enum CatalogError {
37    #[error("kv error: {0}")]
38    Kv(#[from] alopex_core::Error),
39
40    #[error("serialize error: {0}")]
41    Serialize(#[from] bincode::Error),
42
43    #[error("invalid catalog key: {0}")]
44    InvalidKey(String),
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
48struct CatalogState {
49    version: u32,
50    table_id_counter: u32,
51    index_id_counter: u32,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct PersistedCatalogMeta {
56    pub name: String,
57    pub comment: Option<String>,
58    pub storage_root: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62pub struct PersistedNamespaceMeta {
63    pub name: String,
64    pub catalog_name: String,
65    pub comment: Option<String>,
66    pub storage_root: Option<String>,
67}
68
69pub type CatalogMeta = PersistedCatalogMeta;
70pub type NamespaceMeta = PersistedNamespaceMeta;
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
73pub struct TableFqn {
74    pub catalog: String,
75    pub namespace: String,
76    pub table: String,
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct IndexFqn {
81    pub catalog: String,
82    pub namespace: String,
83    pub table: String,
84    pub index: String,
85}
86
87impl TableFqn {
88    pub fn new(catalog: &str, namespace: &str, table: &str) -> Self {
89        Self {
90            catalog: catalog.to_string(),
91            namespace: namespace.to_string(),
92            table: table.to_string(),
93        }
94    }
95}
96
97impl IndexFqn {
98    pub fn new(catalog: &str, namespace: &str, table: &str, index: &str) -> Self {
99        Self {
100            catalog: catalog.to_string(),
101            namespace: namespace.to_string(),
102            table: table.to_string(),
103            index: index.to_string(),
104        }
105    }
106}
107
108impl From<&TableMetadata> for TableFqn {
109    fn from(value: &TableMetadata) -> Self {
110        Self::new(&value.catalog_name, &value.namespace_name, &value.name)
111    }
112}
113
114impl From<&IndexMetadata> for IndexFqn {
115    fn from(value: &IndexMetadata) -> Self {
116        Self::new(
117            &value.catalog_name,
118            &value.namespace_name,
119            &value.table,
120            &value.name,
121        )
122    }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
127pub enum TableType {
128    Managed,
129    External,
130}
131
132#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
133#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
134pub enum DataSourceFormat {
135    #[default]
136    Alopex,
137    Parquet,
138    Delta,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
142pub enum PersistedVectorMetric {
143    Cosine,
144    L2,
145    Inner,
146}
147
148impl From<VectorMetric> for PersistedVectorMetric {
149    fn from(value: VectorMetric) -> Self {
150        match value {
151            VectorMetric::Cosine => Self::Cosine,
152            VectorMetric::L2 => Self::L2,
153            VectorMetric::Inner => Self::Inner,
154        }
155    }
156}
157
158impl From<PersistedVectorMetric> for VectorMetric {
159    fn from(value: PersistedVectorMetric) -> Self {
160        match value {
161            PersistedVectorMetric::Cosine => Self::Cosine,
162            PersistedVectorMetric::L2 => Self::L2,
163            PersistedVectorMetric::Inner => Self::Inner,
164        }
165    }
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
169pub enum PersistedType {
170    Integer,
171    BigInt,
172    Float,
173    Double,
174    Text,
175    Blob,
176    Boolean,
177    Timestamp,
178    Vector {
179        dimension: u32,
180        metric: PersistedVectorMetric,
181    },
182    Null,
183}
184
185impl From<ResolvedType> for PersistedType {
186    fn from(value: ResolvedType) -> Self {
187        match value {
188            ResolvedType::Integer => Self::Integer,
189            ResolvedType::BigInt => Self::BigInt,
190            ResolvedType::Float => Self::Float,
191            ResolvedType::Double => Self::Double,
192            ResolvedType::Text => Self::Text,
193            ResolvedType::Blob => Self::Blob,
194            ResolvedType::Boolean => Self::Boolean,
195            ResolvedType::Timestamp => Self::Timestamp,
196            ResolvedType::Vector { dimension, metric } => Self::Vector {
197                dimension,
198                metric: metric.into(),
199            },
200            ResolvedType::Null => Self::Null,
201        }
202    }
203}
204
205impl From<PersistedType> for ResolvedType {
206    fn from(value: PersistedType) -> Self {
207        match value {
208            PersistedType::Integer => Self::Integer,
209            PersistedType::BigInt => Self::BigInt,
210            PersistedType::Float => Self::Float,
211            PersistedType::Double => Self::Double,
212            PersistedType::Text => Self::Text,
213            PersistedType::Blob => Self::Blob,
214            PersistedType::Boolean => Self::Boolean,
215            PersistedType::Timestamp => Self::Timestamp,
216            PersistedType::Vector { dimension, metric } => Self::Vector {
217                dimension,
218                metric: metric.into(),
219            },
220            PersistedType::Null => Self::Null,
221        }
222    }
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
226pub enum PersistedIndexType {
227    BTree,
228    Hnsw,
229}
230
231impl From<PersistedIndexType> for IndexMethod {
232    fn from(value: PersistedIndexType) -> Self {
233        match value {
234            PersistedIndexType::BTree => IndexMethod::BTree,
235            PersistedIndexType::Hnsw => IndexMethod::Hnsw,
236        }
237    }
238}
239
240impl TryFrom<IndexMethod> for PersistedIndexType {
241    type Error = ();
242
243    fn try_from(value: IndexMethod) -> Result<Self, Self::Error> {
244        match value {
245            IndexMethod::BTree => Ok(Self::BTree),
246            IndexMethod::Hnsw => Ok(Self::Hnsw),
247        }
248    }
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
252pub enum PersistedStorageType {
253    Row,
254    Columnar,
255}
256
257impl From<PersistedStorageType> for StorageType {
258    fn from(value: PersistedStorageType) -> Self {
259        match value {
260            PersistedStorageType::Row => Self::Row,
261            PersistedStorageType::Columnar => Self::Columnar,
262        }
263    }
264}
265
266impl From<StorageType> for PersistedStorageType {
267    fn from(value: StorageType) -> Self {
268        match value {
269            StorageType::Row => Self::Row,
270            StorageType::Columnar => Self::Columnar,
271        }
272    }
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
276pub enum PersistedCompression {
277    None,
278    Lz4,
279    Zstd,
280}
281
282impl From<PersistedCompression> for Compression {
283    fn from(value: PersistedCompression) -> Self {
284        match value {
285            PersistedCompression::None => Self::None,
286            PersistedCompression::Lz4 => Self::Lz4,
287            PersistedCompression::Zstd => Self::Zstd,
288        }
289    }
290}
291
292impl From<Compression> for PersistedCompression {
293    fn from(value: Compression) -> Self {
294        match value {
295            Compression::None => Self::None,
296            Compression::Lz4 => Self::Lz4,
297            Compression::Zstd => Self::Zstd,
298        }
299    }
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
303pub enum PersistedRowIdMode {
304    None,
305    Direct,
306}
307
308impl From<PersistedRowIdMode> for RowIdMode {
309    fn from(value: PersistedRowIdMode) -> Self {
310        match value {
311            PersistedRowIdMode::None => Self::None,
312            PersistedRowIdMode::Direct => Self::Direct,
313        }
314    }
315}
316
317impl From<RowIdMode> for PersistedRowIdMode {
318    fn from(value: RowIdMode) -> Self {
319        match value {
320            RowIdMode::None => Self::None,
321            RowIdMode::Direct => Self::Direct,
322        }
323    }
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
327pub struct PersistedStorageOptions {
328    pub storage_type: PersistedStorageType,
329    pub compression: PersistedCompression,
330    pub row_group_size: u32,
331    pub row_id_mode: PersistedRowIdMode,
332}
333
334impl From<StorageOptions> for PersistedStorageOptions {
335    fn from(value: StorageOptions) -> Self {
336        Self {
337            storage_type: value.storage_type.into(),
338            compression: value.compression.into(),
339            row_group_size: value.row_group_size,
340            row_id_mode: value.row_id_mode.into(),
341        }
342    }
343}
344
345impl From<PersistedStorageOptions> for StorageOptions {
346    fn from(value: PersistedStorageOptions) -> Self {
347        Self {
348            storage_type: value.storage_type.into(),
349            compression: value.compression.into(),
350            row_group_size: value.row_group_size,
351            row_id_mode: value.row_id_mode.into(),
352        }
353    }
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
357pub struct PersistedColumnMeta {
358    pub name: String,
359    pub data_type: PersistedType,
360    pub not_null: bool,
361    pub primary_key: bool,
362    pub unique: bool,
363}
364
365impl From<&ColumnMetadata> for PersistedColumnMeta {
366    fn from(value: &ColumnMetadata) -> Self {
367        Self {
368            name: value.name.clone(),
369            data_type: value.data_type.clone().into(),
370            not_null: value.not_null,
371            primary_key: value.primary_key,
372            unique: value.unique,
373        }
374    }
375}
376
377impl From<PersistedColumnMeta> for ColumnMetadata {
378    fn from(value: PersistedColumnMeta) -> Self {
379        ColumnMetadata::new(value.name, value.data_type.into())
380            .with_not_null(value.not_null)
381            .with_primary_key(value.primary_key)
382            .with_unique(value.unique)
383    }
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
387pub struct PersistedTableMeta {
388    pub table_id: u32,
389    pub name: String,
390    pub catalog_name: String,
391    pub namespace_name: String,
392    pub table_type: TableType,
393    pub data_source_format: DataSourceFormat,
394    pub columns: Vec<PersistedColumnMeta>,
395    pub primary_key: Option<Vec<String>>,
396    pub storage_options: PersistedStorageOptions,
397    pub storage_location: Option<String>,
398    pub comment: Option<String>,
399    pub properties: HashMap<String, String>,
400}
401
402#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
403struct PersistedTableMetaV1 {
404    table_id: u32,
405    name: String,
406    columns: Vec<PersistedColumnMeta>,
407    primary_key: Option<Vec<String>>,
408    storage_options: PersistedStorageOptions,
409}
410
411impl From<&TableMetadata> for PersistedTableMeta {
412    fn from(value: &TableMetadata) -> Self {
413        Self {
414            table_id: value.table_id,
415            name: value.name.clone(),
416            catalog_name: value.catalog_name.clone(),
417            namespace_name: value.namespace_name.clone(),
418            table_type: value.table_type,
419            data_source_format: value.data_source_format,
420            columns: value
421                .columns
422                .iter()
423                .map(PersistedColumnMeta::from)
424                .collect(),
425            primary_key: value.primary_key.clone(),
426            storage_options: value.storage_options.clone().into(),
427            storage_location: value.storage_location.clone(),
428            comment: value.comment.clone(),
429            properties: value.properties.clone(),
430        }
431    }
432}
433
434impl From<PersistedTableMeta> for TableMetadata {
435    fn from(value: PersistedTableMeta) -> Self {
436        let mut table = TableMetadata::new(
437            value.name,
438            value
439                .columns
440                .into_iter()
441                .map(ColumnMetadata::from)
442                .collect(),
443        )
444        .with_table_id(value.table_id);
445        table.primary_key = value.primary_key;
446        table.storage_options = value.storage_options.into();
447        table.catalog_name = value.catalog_name;
448        table.namespace_name = value.namespace_name;
449        table.table_type = value.table_type;
450        table.data_source_format = value.data_source_format;
451        table.storage_location = value.storage_location;
452        table.comment = value.comment;
453        table.properties = value.properties;
454        table
455    }
456}
457
458#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
459pub struct PersistedIndexMeta {
460    pub index_id: u32,
461    pub name: String,
462    pub table: String,
463    pub columns: Vec<String>,
464    pub column_indices: Vec<usize>,
465    pub unique: bool,
466    pub method: Option<PersistedIndexType>,
467    pub options: Vec<(String, String)>,
468    pub catalog_name: String,
469    pub namespace_name: String,
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
473struct PersistedIndexMetaV1 {
474    index_id: u32,
475    name: String,
476    table: String,
477    columns: Vec<String>,
478    column_indices: Vec<usize>,
479    unique: bool,
480    method: Option<PersistedIndexType>,
481    options: Vec<(String, String)>,
482}
483
484impl From<&IndexMetadata> for PersistedIndexMeta {
485    fn from(value: &IndexMetadata) -> Self {
486        Self {
487            index_id: value.index_id,
488            name: value.name.clone(),
489            table: value.table.clone(),
490            columns: value.columns.clone(),
491            column_indices: value.column_indices.clone(),
492            unique: value.unique,
493            method: value
494                .method
495                .and_then(|m| PersistedIndexType::try_from(m).ok()),
496            options: value.options.clone(),
497            catalog_name: value.catalog_name.clone(),
498            namespace_name: value.namespace_name.clone(),
499        }
500    }
501}
502
503impl From<PersistedIndexMeta> for IndexMetadata {
504    fn from(value: PersistedIndexMeta) -> Self {
505        let mut index = IndexMetadata::new(value.index_id, value.name, value.table, value.columns)
506            .with_column_indices(value.column_indices)
507            .with_unique(value.unique)
508            .with_options(value.options);
509        index.catalog_name = value.catalog_name;
510        index.namespace_name = value.namespace_name;
511        if let Some(method) = value.method {
512            index = index.with_method(method.into());
513        }
514        index
515    }
516}
517
518fn deserialize_table_meta(bytes: &[u8]) -> Result<PersistedTableMeta, CatalogError> {
519    match bincode::deserialize::<PersistedTableMeta>(bytes) {
520        Ok(meta) => Ok(meta),
521        Err(err) => {
522            let is_legacy = matches!(
523                err.as_ref(),
524                bincode::ErrorKind::Io(io)
525                    if io.kind() == std::io::ErrorKind::UnexpectedEof
526            );
527            if !is_legacy {
528                return Err(err.into());
529            }
530            let legacy: PersistedTableMetaV1 = bincode::deserialize(bytes)?;
531            Ok(PersistedTableMeta {
532                table_id: legacy.table_id,
533                name: legacy.name,
534                catalog_name: "default".to_string(),
535                namespace_name: "default".to_string(),
536                table_type: TableType::Managed,
537                data_source_format: DataSourceFormat::Alopex,
538                columns: legacy.columns,
539                primary_key: legacy.primary_key,
540                storage_options: legacy.storage_options,
541                storage_location: None,
542                comment: None,
543                properties: HashMap::new(),
544            })
545        }
546    }
547}
548
549fn deserialize_index_meta(bytes: &[u8]) -> Result<PersistedIndexMeta, CatalogError> {
550    match bincode::deserialize::<PersistedIndexMeta>(bytes) {
551        Ok(meta) => Ok(meta),
552        Err(err) => {
553            let is_legacy = matches!(
554                err.as_ref(),
555                bincode::ErrorKind::Io(io)
556                    if io.kind() == std::io::ErrorKind::UnexpectedEof
557            );
558            if !is_legacy {
559                return Err(err.into());
560            }
561            let legacy: PersistedIndexMetaV1 = bincode::deserialize(bytes)?;
562            Ok(PersistedIndexMeta {
563                index_id: legacy.index_id,
564                name: legacy.name,
565                table: legacy.table,
566                columns: legacy.columns,
567                column_indices: legacy.column_indices,
568                unique: legacy.unique,
569                method: legacy.method,
570                options: legacy.options,
571                catalog_name: "default".to_string(),
572                namespace_name: "default".to_string(),
573            })
574        }
575    }
576}
577
578fn table_key(catalog_name: &str, namespace_name: &str, table_name: &str) -> Vec<u8> {
579    let mut key = TABLES_PREFIX.to_vec();
580    key.extend_from_slice(catalog_name.as_bytes());
581    key.push(b'/');
582    key.extend_from_slice(namespace_name.as_bytes());
583    key.push(b'/');
584    key.extend_from_slice(table_name.as_bytes());
585    key
586}
587
588fn catalog_key(name: &str) -> Vec<u8> {
589    let mut key = CATALOGS_PREFIX.to_vec();
590    key.extend_from_slice(name.as_bytes());
591    key
592}
593
594fn namespace_key(catalog_name: &str, namespace_name: &str) -> Vec<u8> {
595    let mut key = NAMESPACES_PREFIX.to_vec();
596    key.extend_from_slice(catalog_name.as_bytes());
597    key.push(b'/');
598    key.extend_from_slice(namespace_name.as_bytes());
599    key
600}
601
602fn index_key(
603    catalog_name: &str,
604    namespace_name: &str,
605    table_name: &str,
606    index_name: &str,
607) -> Vec<u8> {
608    let mut key = INDEXES_PREFIX.to_vec();
609    key.extend_from_slice(catalog_name.as_bytes());
610    key.push(b'/');
611    key.extend_from_slice(namespace_name.as_bytes());
612    key.push(b'/');
613    key.extend_from_slice(table_name.as_bytes());
614    key.push(b'/');
615    key.extend_from_slice(index_name.as_bytes());
616    key
617}
618
619fn index_prefix(catalog_name: &str, namespace_name: &str, table_name: &str) -> Vec<u8> {
620    let mut key = INDEXES_PREFIX.to_vec();
621    key.extend_from_slice(catalog_name.as_bytes());
622    key.push(b'/');
623    key.extend_from_slice(namespace_name.as_bytes());
624    key.push(b'/');
625    key.extend_from_slice(table_name.as_bytes());
626    key.push(b'/');
627    key
628}
629
630fn key_suffix(prefix: &[u8], key: &[u8]) -> Result<String, CatalogError> {
631    let suffix = key
632        .strip_prefix(prefix)
633        .ok_or_else(|| CatalogError::InvalidKey(format!("{key:?}")))?;
634    std::str::from_utf8(suffix)
635        .map(|s| s.to_string())
636        .map_err(|_| CatalogError::InvalidKey(format!("{key:?}")))
637}
638
639fn parse_table_key_suffix(suffix: &str) -> Result<TableFqn, CatalogError> {
640    let mut parts = suffix.splitn(3, '/');
641    let catalog = parts
642        .next()
643        .filter(|part| !part.is_empty())
644        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
645    let namespace = parts
646        .next()
647        .filter(|part| !part.is_empty())
648        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
649    let table = parts
650        .next()
651        .filter(|part| !part.is_empty())
652        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
653    Ok(TableFqn::new(catalog, namespace, table))
654}
655
656fn parse_index_key_suffix(suffix: &str) -> Result<IndexFqn, CatalogError> {
657    let mut parts = suffix.splitn(4, '/');
658    let catalog = parts
659        .next()
660        .filter(|part| !part.is_empty())
661        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
662    let namespace = parts
663        .next()
664        .filter(|part| !part.is_empty())
665        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
666    let table = parts
667        .next()
668        .filter(|part| !part.is_empty())
669        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
670    let index = parts
671        .next()
672        .filter(|part| !part.is_empty())
673        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
674    Ok(IndexFqn::new(catalog, namespace, table, index))
675}
676
677#[derive(Debug, Clone, Default)]
678pub struct CatalogOverlay {
679    added_catalogs: HashMap<String, CatalogMeta>,
680    dropped_catalogs: HashSet<String>,
681    added_namespaces: HashMap<(String, String), NamespaceMeta>,
682    dropped_namespaces: HashSet<(String, String)>,
683    added_tables: HashMap<TableFqn, TableMetadata>,
684    dropped_tables: HashSet<TableFqn>,
685    added_indexes: HashMap<IndexFqn, IndexMetadata>,
686    dropped_indexes: HashSet<IndexFqn>,
687}
688
689impl CatalogOverlay {
690    pub fn new() -> Self {
691        Self::default()
692    }
693
694    pub fn add_catalog(&mut self, meta: CatalogMeta) {
695        self.dropped_catalogs.remove(&meta.name);
696        self.added_catalogs.insert(meta.name.clone(), meta);
697    }
698
699    pub fn drop_catalog(&mut self, name: &str) {
700        self.added_catalogs.remove(name);
701        self.dropped_catalogs.insert(name.to_string());
702    }
703
704    pub fn add_namespace(&mut self, meta: NamespaceMeta) {
705        let key = (meta.catalog_name.clone(), meta.name.clone());
706        self.dropped_namespaces.remove(&key);
707        self.added_namespaces.insert(key, meta);
708    }
709
710    pub fn drop_namespace(&mut self, catalog_name: &str, namespace_name: &str) {
711        let key = (catalog_name.to_string(), namespace_name.to_string());
712        self.added_namespaces.remove(&key);
713        self.dropped_namespaces.insert(key);
714    }
715
716    pub fn add_table(&mut self, fqn: TableFqn, table: TableMetadata) {
717        self.dropped_tables.remove(&fqn);
718        self.added_tables.insert(fqn, table);
719    }
720
721    pub fn drop_table(&mut self, fqn: &TableFqn) {
722        self.added_tables.remove(fqn);
723        self.dropped_tables.insert(fqn.clone());
724        self.added_indexes.retain(|key, _| {
725            key.catalog != fqn.catalog || key.namespace != fqn.namespace || key.table != fqn.table
726        });
727    }
728
729    pub fn add_index(&mut self, fqn: IndexFqn, index: IndexMetadata) {
730        self.dropped_indexes.remove(&fqn);
731        self.added_indexes.insert(fqn, index);
732    }
733
734    pub fn drop_index(&mut self, fqn: &IndexFqn) {
735        self.added_indexes.remove(fqn);
736        self.dropped_indexes.insert(fqn.clone());
737    }
738
739    pub fn drop_cascade_catalog(&mut self, catalog: &str) {
740        self.drop_catalog(catalog);
741
742        let namespace_keys: Vec<(String, String)> = self
743            .added_namespaces
744            .keys()
745            .filter(|(cat, _)| cat == catalog)
746            .cloned()
747            .collect();
748        for (catalog_name, namespace_name) in namespace_keys {
749            self.drop_namespace(&catalog_name, &namespace_name);
750        }
751
752        let index_keys: Vec<IndexFqn> = self
753            .added_indexes
754            .keys()
755            .filter(|fqn| fqn.catalog == catalog)
756            .cloned()
757            .collect();
758        for fqn in index_keys {
759            self.drop_index(&fqn);
760        }
761
762        let table_keys: Vec<TableFqn> = self
763            .added_tables
764            .keys()
765            .filter(|fqn| fqn.catalog == catalog)
766            .cloned()
767            .collect();
768        for fqn in table_keys {
769            self.drop_table(&fqn);
770        }
771    }
772
773    pub fn drop_cascade_namespace(&mut self, catalog: &str, namespace: &str) {
774        self.drop_namespace(catalog, namespace);
775
776        let index_keys: Vec<IndexFqn> = self
777            .added_indexes
778            .keys()
779            .filter(|fqn| fqn.catalog == catalog && fqn.namespace == namespace)
780            .cloned()
781            .collect();
782        for fqn in index_keys {
783            self.drop_index(&fqn);
784        }
785
786        let table_keys: Vec<TableFqn> = self
787            .added_tables
788            .keys()
789            .filter(|fqn| fqn.catalog == catalog && fqn.namespace == namespace)
790            .cloned()
791            .collect();
792        for fqn in table_keys {
793            self.drop_table(&fqn);
794        }
795    }
796}
797
798/// トランザクション内(オーバーレイ込み)で参照するための Catalog ビュー。
799///
800/// DML/SELECT の実行や Planner の参照用途に使う。書き込み系 API は利用しない前提のため、
801/// `Catalog` trait の書き込みメソッドは `unreachable!()` とする。
802pub struct TxnCatalogView<'a, S: KVStore> {
803    catalog: &'a PersistentCatalog<S>,
804    overlay: &'a CatalogOverlay,
805}
806
807impl<'a, S: KVStore> TxnCatalogView<'a, S> {
808    pub fn new(catalog: &'a PersistentCatalog<S>, overlay: &'a CatalogOverlay) -> Self {
809        Self { catalog, overlay }
810    }
811}
812
813impl<'a, S: KVStore> Catalog for TxnCatalogView<'a, S> {
814    fn create_table(&mut self, _table: TableMetadata) -> Result<(), PlannerError> {
815        unreachable!("TxnCatalogView は参照専用です")
816    }
817
818    fn get_table(&self, name: &str) -> Option<&TableMetadata> {
819        self.catalog.get_table_in_txn(name, self.overlay)
820    }
821
822    fn drop_table(&mut self, _name: &str) -> Result<(), PlannerError> {
823        unreachable!("TxnCatalogView は参照専用です")
824    }
825
826    fn create_index(&mut self, _index: IndexMetadata) -> Result<(), PlannerError> {
827        unreachable!("TxnCatalogView は参照専用です")
828    }
829
830    fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
831        self.catalog.get_index_in_txn(name, self.overlay)
832    }
833
834    fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
835        let Some(table_meta) = self.catalog.get_table_in_txn(table, self.overlay) else {
836            return Vec::new();
837        };
838
839        let mut indexes: Vec<&IndexMetadata> = self
840            .catalog
841            .inner
842            .get_indexes_for_table(table)
843            .into_iter()
844            .filter(|idx| {
845                idx.catalog_name == table_meta.catalog_name
846                    && idx.namespace_name == table_meta.namespace_name
847                    && !self.catalog.index_hidden_by_overlay(idx, self.overlay)
848            })
849            .collect();
850
851        for idx in self.overlay.added_indexes.values() {
852            if idx.table == table
853                && idx.catalog_name == table_meta.catalog_name
854                && idx.namespace_name == table_meta.namespace_name
855                && !self.catalog.index_hidden_by_overlay(idx, self.overlay)
856            {
857                indexes.push(idx);
858            }
859        }
860
861        indexes
862    }
863
864    fn drop_index(&mut self, _name: &str) -> Result<(), PlannerError> {
865        unreachable!("TxnCatalogView は参照専用です")
866    }
867
868    fn table_exists(&self, name: &str) -> bool {
869        self.catalog.table_exists_in_txn(name, self.overlay)
870    }
871
872    fn index_exists(&self, name: &str) -> bool {
873        self.catalog.index_exists_in_txn(name, self.overlay)
874    }
875
876    fn next_table_id(&mut self) -> u32 {
877        unreachable!("TxnCatalogView は参照専用です")
878    }
879
880    fn next_index_id(&mut self) -> u32 {
881        unreachable!("TxnCatalogView は参照専用です")
882    }
883
884    fn list_tables(&self) -> Vec<TableMetadata> {
885        let mut names = HashSet::new();
886        for name in self.catalog.inner.table_names() {
887            names.insert(name.to_string());
888        }
889        for fqn in self.overlay.added_tables.keys() {
890            names.insert(fqn.table.clone());
891        }
892
893        let mut tables = Vec::new();
894        for name in names {
895            if let Some(table) = self.catalog.get_table_in_txn(&name, self.overlay) {
896                tables.push(table.clone());
897            }
898        }
899        tables
900    }
901}
902
903/// 永続カタログ実装。
904#[derive(Debug)]
905/// 永続化対応のカタログ実装。
906///
907/// # Examples
908///
909/// ```
910/// use std::sync::Arc;
911/// use alopex_core::kv::memory::MemoryKV;
912/// use alopex_sql::Catalog;
913/// use alopex_sql::catalog::PersistentCatalog;
914///
915/// let store = Arc::new(MemoryKV::new());
916/// let catalog = PersistentCatalog::new(store);
917/// assert!(catalog.table_exists("users") == false);
918/// ```
919pub struct PersistentCatalog<S: KVStore> {
920    inner: MemoryCatalog,
921    store: Arc<S>,
922    catalogs: HashMap<String, CatalogMeta>,
923    namespaces: HashMap<(String, String), NamespaceMeta>,
924}
925
926impl<S: KVStore> PersistentCatalog<S> {
927    pub fn load(store: Arc<S>) -> Result<Self, CatalogError> {
928        let mut txn = store.begin(TxnMode::ReadOnly)?;
929        let meta_key = META_KEY.to_vec();
930        let mut meta_state: Option<CatalogState> = None;
931
932        if let Some(meta_bytes) = txn.get(&meta_key)? {
933            let meta: CatalogState = bincode::deserialize(&meta_bytes)?;
934            if meta.version > CATALOG_VERSION {
935                return Err(CatalogError::InvalidKey(format!(
936                    "unsupported catalog version: {}",
937                    meta.version
938                )));
939            }
940            meta_state = Some(meta);
941        }
942
943        let mut needs_migration = meta_state
944            .as_ref()
945            .is_some_and(|meta| meta.version < CATALOG_VERSION);
946        if !needs_migration && meta_state.is_none() {
947            for (key, _) in txn.scan_prefix(TABLES_PREFIX)? {
948                let suffix = key_suffix(TABLES_PREFIX, &key)?;
949                if !suffix.contains('/') {
950                    needs_migration = true;
951                    break;
952                }
953            }
954            if !needs_migration {
955                for (key, _) in txn.scan_prefix(INDEXES_PREFIX)? {
956                    let suffix = key_suffix(INDEXES_PREFIX, &key)?;
957                    if !suffix.contains('/') {
958                        needs_migration = true;
959                        break;
960                    }
961                }
962            }
963        }
964
965        if needs_migration {
966            txn.rollback_self()?;
967            Self::migrate_v1_to_v2(&store)?;
968            return Self::load(store);
969        }
970
971        let mut inner = MemoryCatalog::new();
972        let mut catalogs = HashMap::new();
973        let mut namespaces = HashMap::new();
974
975        let mut max_table_id = 0u32;
976        let mut max_index_id = 0u32;
977
978        for (key, value) in txn.scan_prefix(CATALOGS_PREFIX)? {
979            let catalog_name = key_suffix(CATALOGS_PREFIX, &key)?;
980            let mut meta: CatalogMeta = bincode::deserialize(&value)?;
981            if meta.name != catalog_name {
982                meta.name = catalog_name.clone();
983            }
984            catalogs.insert(catalog_name, meta);
985        }
986
987        for (key, value) in txn.scan_prefix(NAMESPACES_PREFIX)? {
988            let suffix = key_suffix(NAMESPACES_PREFIX, &key)?;
989            let mut parts = suffix.splitn(2, '/');
990            let catalog_name = parts
991                .next()
992                .filter(|part| !part.is_empty())
993                .ok_or_else(|| CatalogError::InvalidKey(suffix.clone()))?;
994            let namespace_name = parts
995                .next()
996                .filter(|part| !part.is_empty())
997                .ok_or_else(|| CatalogError::InvalidKey(suffix.clone()))?;
998            let mut meta: NamespaceMeta = bincode::deserialize(&value)?;
999            if meta.catalog_name != catalog_name {
1000                meta.catalog_name = catalog_name.to_string();
1001            }
1002            if meta.name != namespace_name {
1003                meta.name = namespace_name.to_string();
1004            }
1005            namespaces.insert((meta.catalog_name.clone(), meta.name.clone()), meta);
1006        }
1007
1008        // テーブルをロード(まずテーブルを入れてからインデックスを入れる)
1009        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1010            let suffix = key_suffix(TABLES_PREFIX, &key)?;
1011            let fqn = parse_table_key_suffix(&suffix)?;
1012            let mut persisted = deserialize_table_meta(&value)?;
1013            if persisted.catalog_name != fqn.catalog {
1014                persisted.catalog_name = fqn.catalog.clone();
1015            }
1016            if persisted.namespace_name != fqn.namespace {
1017                persisted.namespace_name = fqn.namespace.clone();
1018            }
1019            if persisted.name != fqn.table {
1020                persisted.name = fqn.table.clone();
1021            }
1022            max_table_id = max_table_id.max(persisted.table_id);
1023            let table: TableMetadata = persisted.into();
1024            inner.insert_table_unchecked(table);
1025        }
1026
1027        for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1028            let suffix = key_suffix(INDEXES_PREFIX, &key)?;
1029            let fqn = parse_index_key_suffix(&suffix)?;
1030            let mut persisted = deserialize_index_meta(&value)?;
1031            if persisted.catalog_name != fqn.catalog {
1032                persisted.catalog_name = fqn.catalog.clone();
1033            }
1034            if persisted.namespace_name != fqn.namespace {
1035                persisted.namespace_name = fqn.namespace.clone();
1036            }
1037            if persisted.table != fqn.table {
1038                persisted.table = fqn.table.clone();
1039            }
1040            if persisted.name != fqn.index {
1041                persisted.name = fqn.index.clone();
1042            }
1043            max_index_id = max_index_id.max(persisted.index_id);
1044            let mut index: IndexMetadata = persisted.into();
1045            // 参照先テーブルがない場合はスキップ(破損対策)
1046            if let Some(table) = inner.get_table(&index.table) {
1047                if index.catalog_name != table.catalog_name
1048                    || index.namespace_name != table.namespace_name
1049                {
1050                    index.catalog_name = table.catalog_name.clone();
1051                    index.namespace_name = table.namespace_name.clone();
1052                }
1053                inner.insert_index_unchecked(index);
1054            }
1055        }
1056
1057        let (mut table_id_counter, mut index_id_counter) = (max_table_id, max_index_id);
1058        if let Some(meta) = meta_state
1059            .as_ref()
1060            .filter(|meta| meta.version == CATALOG_VERSION)
1061        {
1062            table_id_counter = table_id_counter.max(meta.table_id_counter);
1063            index_id_counter = index_id_counter.max(meta.index_id_counter);
1064        }
1065        inner.set_counters(table_id_counter, index_id_counter);
1066
1067        txn.rollback_self()?;
1068
1069        Ok(Self {
1070            inner,
1071            store,
1072            catalogs,
1073            namespaces,
1074        })
1075    }
1076
1077    fn migrate_v1_to_v2(store: &Arc<S>) -> Result<(), CatalogError> {
1078        let mut txn = store.begin(TxnMode::ReadWrite)?;
1079
1080        if txn.get(&catalog_key("default"))?.is_none() {
1081            let meta = CatalogMeta {
1082                name: "default".to_string(),
1083                comment: None,
1084                storage_root: None,
1085            };
1086            let value = bincode::serialize(&meta)?;
1087            txn.put(catalog_key("default"), value)?;
1088        }
1089
1090        if txn.get(&namespace_key("default", "default"))?.is_none() {
1091            let meta = NamespaceMeta {
1092                name: "default".to_string(),
1093                catalog_name: "default".to_string(),
1094                comment: None,
1095                storage_root: None,
1096            };
1097            let value = bincode::serialize(&meta)?;
1098            txn.put(namespace_key("default", "default"), value)?;
1099        }
1100
1101        let mut table_updates = Vec::new();
1102        let mut table_keys_to_delete = Vec::new();
1103        let mut max_table_id = 0u32;
1104        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1105            let suffix = key_suffix(TABLES_PREFIX, &key)?;
1106            if suffix.contains('/') {
1107                continue;
1108            }
1109            let mut persisted = deserialize_table_meta(&value)?;
1110            if persisted.catalog_name.is_empty() {
1111                persisted.catalog_name = "default".to_string();
1112            }
1113            if persisted.namespace_name.is_empty() {
1114                persisted.namespace_name = "default".to_string();
1115            }
1116            persisted.table_type = TableType::Managed;
1117            persisted.data_source_format = DataSourceFormat::Alopex;
1118            max_table_id = max_table_id.max(persisted.table_id);
1119
1120            let new_key = table_key(
1121                &persisted.catalog_name,
1122                &persisted.namespace_name,
1123                &persisted.name,
1124            );
1125            let bytes = bincode::serialize(&persisted)?;
1126            table_updates.push((new_key, bytes));
1127            table_keys_to_delete.push(key);
1128        }
1129
1130        for (key, value) in table_updates {
1131            txn.put(key, value)?;
1132        }
1133        for key in table_keys_to_delete {
1134            txn.delete(key)?;
1135        }
1136
1137        let mut index_updates = Vec::new();
1138        let mut index_keys_to_delete = Vec::new();
1139        let mut max_index_id = 0u32;
1140        for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1141            let suffix = key_suffix(INDEXES_PREFIX, &key)?;
1142            if suffix.contains('/') {
1143                continue;
1144            }
1145            let mut persisted = deserialize_index_meta(&value)?;
1146            if persisted.catalog_name.is_empty() {
1147                persisted.catalog_name = "default".to_string();
1148            }
1149            if persisted.namespace_name.is_empty() {
1150                persisted.namespace_name = "default".to_string();
1151            }
1152            max_index_id = max_index_id.max(persisted.index_id);
1153
1154            let new_key = index_key(
1155                &persisted.catalog_name,
1156                &persisted.namespace_name,
1157                &persisted.table,
1158                &persisted.name,
1159            );
1160            let bytes = bincode::serialize(&persisted)?;
1161            index_updates.push((new_key, bytes));
1162            index_keys_to_delete.push(key);
1163        }
1164
1165        for (key, value) in index_updates {
1166            txn.put(key, value)?;
1167        }
1168        for key in index_keys_to_delete {
1169            txn.delete(key)?;
1170        }
1171
1172        let mut table_id_counter = max_table_id;
1173        let mut index_id_counter = max_index_id;
1174        if let Some(meta_bytes) = txn.get(&META_KEY.to_vec())? {
1175            let meta: CatalogState = bincode::deserialize(&meta_bytes)?;
1176            table_id_counter = table_id_counter.max(meta.table_id_counter);
1177            index_id_counter = index_id_counter.max(meta.index_id_counter);
1178        }
1179        let meta = CatalogState {
1180            version: CATALOG_VERSION,
1181            table_id_counter,
1182            index_id_counter,
1183        };
1184        let meta_bytes = bincode::serialize(&meta)?;
1185        txn.put(META_KEY.to_vec(), meta_bytes)?;
1186        txn.commit_self()?;
1187
1188        Ok(())
1189    }
1190
1191    pub fn new(store: Arc<S>) -> Self {
1192        Self {
1193            inner: MemoryCatalog::new(),
1194            store,
1195            catalogs: HashMap::new(),
1196            namespaces: HashMap::new(),
1197        }
1198    }
1199
1200    pub fn store(&self) -> &Arc<S> {
1201        &self.store
1202    }
1203
1204    pub fn list_catalogs(&self) -> Vec<CatalogMeta> {
1205        let mut catalogs: Vec<CatalogMeta> = self.catalogs.values().cloned().collect();
1206        catalogs.sort_by(|a, b| a.name.cmp(&b.name));
1207        catalogs
1208    }
1209
1210    pub fn get_catalog(&self, name: &str) -> Option<CatalogMeta> {
1211        self.catalogs.get(name).cloned()
1212    }
1213
1214    pub fn create_catalog(&mut self, meta: CatalogMeta) -> Result<(), CatalogError> {
1215        let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1216        let value = bincode::serialize(&meta)?;
1217        txn.put(catalog_key(&meta.name), value)?;
1218        txn.commit_self()?;
1219        self.catalogs.insert(meta.name.clone(), meta);
1220        Ok(())
1221    }
1222
1223    pub fn delete_catalog(&mut self, name: &str) -> Result<(), CatalogError> {
1224        let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1225        txn.delete(catalog_key(name))?;
1226        let mut namespace_prefix = NAMESPACES_PREFIX.to_vec();
1227        namespace_prefix.extend_from_slice(name.as_bytes());
1228        namespace_prefix.push(b'/');
1229        let mut namespace_keys = Vec::new();
1230        for (key, _) in txn.scan_prefix(&namespace_prefix)? {
1231            namespace_keys.push(key);
1232        }
1233        for key in namespace_keys {
1234            txn.delete(key)?;
1235        }
1236        let mut table_keys = Vec::new();
1237        let mut table_fqns = Vec::new();
1238        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1239            let persisted = deserialize_table_meta(&value)?;
1240            if persisted.catalog_name == name {
1241                table_fqns.push(TableFqn::new(
1242                    &persisted.catalog_name,
1243                    &persisted.namespace_name,
1244                    &persisted.name,
1245                ));
1246                table_keys.push(key);
1247            }
1248        }
1249        let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1250        for key in table_keys {
1251            txn.delete(key)?;
1252        }
1253        if !table_set.is_empty() {
1254            let mut index_keys = Vec::new();
1255            for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1256                let persisted = deserialize_index_meta(&value)?;
1257                let fqn = TableFqn::new(
1258                    &persisted.catalog_name,
1259                    &persisted.namespace_name,
1260                    &persisted.table,
1261                );
1262                if table_set.contains(&fqn) {
1263                    index_keys.push(key);
1264                }
1265            }
1266            for key in index_keys {
1267                txn.delete(key)?;
1268            }
1269        }
1270        txn.commit_self()?;
1271        self.catalogs.remove(name);
1272        self.namespaces.retain(|(catalog, _), _| catalog != name);
1273        for fqn in table_fqns {
1274            self.inner.remove_table_unchecked(&fqn.table);
1275        }
1276        Ok(())
1277    }
1278
1279    pub fn list_namespaces(&self, catalog_name: &str) -> Vec<NamespaceMeta> {
1280        let mut namespaces: Vec<NamespaceMeta> = self
1281            .namespaces
1282            .values()
1283            .filter(|meta| meta.catalog_name == catalog_name)
1284            .cloned()
1285            .collect();
1286        namespaces.sort_by(|a, b| a.name.cmp(&b.name));
1287        namespaces
1288    }
1289
1290    pub fn get_namespace(&self, catalog_name: &str, namespace_name: &str) -> Option<NamespaceMeta> {
1291        self.namespaces
1292            .get(&(catalog_name.to_string(), namespace_name.to_string()))
1293            .cloned()
1294    }
1295
1296    pub fn create_namespace(&mut self, meta: NamespaceMeta) -> Result<(), CatalogError> {
1297        if !self.catalogs.contains_key(&meta.catalog_name) {
1298            return Err(CatalogError::InvalidKey(format!(
1299                "catalog not found: {}",
1300                meta.catalog_name
1301            )));
1302        }
1303
1304        let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1305        let value = bincode::serialize(&meta)?;
1306        txn.put(namespace_key(&meta.catalog_name, &meta.name), value)?;
1307        txn.commit_self()?;
1308        self.namespaces
1309            .insert((meta.catalog_name.clone(), meta.name.clone()), meta);
1310        Ok(())
1311    }
1312
1313    pub fn delete_namespace(
1314        &mut self,
1315        catalog_name: &str,
1316        namespace_name: &str,
1317    ) -> Result<(), CatalogError> {
1318        if !self.catalogs.contains_key(catalog_name) {
1319            return Err(CatalogError::InvalidKey(format!(
1320                "catalog not found: {}",
1321                catalog_name
1322            )));
1323        }
1324
1325        let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1326        txn.delete(namespace_key(catalog_name, namespace_name))?;
1327        txn.commit_self()?;
1328        self.namespaces
1329            .remove(&(catalog_name.to_string(), namespace_name.to_string()));
1330        Ok(())
1331    }
1332
1333    fn persist_create_catalog(
1334        &mut self,
1335        txn: &mut S::Transaction<'_>,
1336        meta: &CatalogMeta,
1337    ) -> Result<(), CatalogError> {
1338        let value = bincode::serialize(meta)?;
1339        txn.put(catalog_key(&meta.name), value)?;
1340        Ok(())
1341    }
1342
1343    fn persist_drop_catalog(
1344        &mut self,
1345        txn: &mut S::Transaction<'_>,
1346        name: &str,
1347    ) -> Result<(), CatalogError> {
1348        txn.delete(catalog_key(name))?;
1349
1350        let mut namespace_prefix = NAMESPACES_PREFIX.to_vec();
1351        namespace_prefix.extend_from_slice(name.as_bytes());
1352        namespace_prefix.push(b'/');
1353        let mut namespace_keys = Vec::new();
1354        for (key, _) in txn.scan_prefix(&namespace_prefix)? {
1355            namespace_keys.push(key);
1356        }
1357        for key in namespace_keys {
1358            txn.delete(key)?;
1359        }
1360
1361        let mut table_keys = Vec::new();
1362        let mut table_fqns = Vec::new();
1363        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1364            let persisted = deserialize_table_meta(&value)?;
1365            if persisted.catalog_name == name {
1366                table_fqns.push(TableFqn::new(
1367                    &persisted.catalog_name,
1368                    &persisted.namespace_name,
1369                    &persisted.name,
1370                ));
1371                table_keys.push(key);
1372            }
1373        }
1374        let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1375        for key in table_keys {
1376            txn.delete(key)?;
1377        }
1378        if !table_set.is_empty() {
1379            let mut index_keys = Vec::new();
1380            for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1381                let persisted = deserialize_index_meta(&value)?;
1382                let fqn = TableFqn::new(
1383                    &persisted.catalog_name,
1384                    &persisted.namespace_name,
1385                    &persisted.table,
1386                );
1387                if table_set.contains(&fqn) {
1388                    index_keys.push(key);
1389                }
1390            }
1391            for key in index_keys {
1392                txn.delete(key)?;
1393            }
1394        }
1395        Ok(())
1396    }
1397
1398    fn persist_create_namespace(
1399        &mut self,
1400        txn: &mut S::Transaction<'_>,
1401        meta: &NamespaceMeta,
1402    ) -> Result<(), CatalogError> {
1403        let value = bincode::serialize(meta)?;
1404        txn.put(namespace_key(&meta.catalog_name, &meta.name), value)?;
1405        Ok(())
1406    }
1407
1408    fn persist_drop_namespace(
1409        &mut self,
1410        txn: &mut S::Transaction<'_>,
1411        catalog_name: &str,
1412        namespace_name: &str,
1413    ) -> Result<(), CatalogError> {
1414        txn.delete(namespace_key(catalog_name, namespace_name))?;
1415
1416        let mut table_keys = Vec::new();
1417        let mut table_fqns = Vec::new();
1418        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1419            let persisted = deserialize_table_meta(&value)?;
1420            if persisted.catalog_name == catalog_name && persisted.namespace_name == namespace_name
1421            {
1422                table_fqns.push(TableFqn::new(
1423                    &persisted.catalog_name,
1424                    &persisted.namespace_name,
1425                    &persisted.name,
1426                ));
1427                table_keys.push(key);
1428            }
1429        }
1430        let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1431        for key in table_keys {
1432            txn.delete(key)?;
1433        }
1434        if !table_set.is_empty() {
1435            let mut index_keys = Vec::new();
1436            for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1437                let persisted = deserialize_index_meta(&value)?;
1438                let fqn = TableFqn::new(
1439                    &persisted.catalog_name,
1440                    &persisted.namespace_name,
1441                    &persisted.table,
1442                );
1443                if table_set.contains(&fqn) {
1444                    index_keys.push(key);
1445                }
1446            }
1447            for key in index_keys {
1448                txn.delete(key)?;
1449            }
1450        }
1451        Ok(())
1452    }
1453
1454    fn write_meta(&self, txn: &mut S::Transaction<'_>) -> Result<(), CatalogError> {
1455        let (table_id_counter, index_id_counter) = self.inner.counters();
1456        let meta = CatalogState {
1457            version: CATALOG_VERSION,
1458            table_id_counter,
1459            index_id_counter,
1460        };
1461        let meta_bytes = bincode::serialize(&meta)?;
1462        txn.put(META_KEY.to_vec(), meta_bytes)?;
1463        Ok(())
1464    }
1465
1466    pub fn persist_create_table(
1467        &mut self,
1468        txn: &mut S::Transaction<'_>,
1469        table: &TableMetadata,
1470    ) -> Result<(), CatalogError> {
1471        let persisted = PersistedTableMeta::from(table);
1472        let value = bincode::serialize(&persisted)?;
1473        txn.put(
1474            table_key(&table.catalog_name, &table.namespace_name, &table.name),
1475            value,
1476        )?;
1477        self.write_meta(txn)?;
1478        Ok(())
1479    }
1480
1481    pub fn persist_drop_table(
1482        &mut self,
1483        txn: &mut S::Transaction<'_>,
1484        fqn: &TableFqn,
1485    ) -> Result<(), CatalogError> {
1486        txn.delete(table_key(&fqn.catalog, &fqn.namespace, &fqn.table))?;
1487
1488        // テーブルに紐づくインデックスも削除する。
1489        let mut to_delete: Vec<String> = Vec::new();
1490        let prefix = index_prefix(&fqn.catalog, &fqn.namespace, &fqn.table);
1491        for (key, _) in txn.scan_prefix(&prefix)? {
1492            let index_name = key_suffix(&prefix, &key)?;
1493            to_delete.push(index_name);
1494        }
1495        for index_name in to_delete {
1496            txn.delete(index_key(
1497                &fqn.catalog,
1498                &fqn.namespace,
1499                &fqn.table,
1500                &index_name,
1501            ))?;
1502        }
1503
1504        Ok(())
1505    }
1506
1507    pub fn persist_create_index(
1508        &mut self,
1509        txn: &mut S::Transaction<'_>,
1510        index: &IndexMetadata,
1511    ) -> Result<(), CatalogError> {
1512        let persisted = PersistedIndexMeta::from(index);
1513        let value = bincode::serialize(&persisted)?;
1514        txn.put(
1515            index_key(
1516                &index.catalog_name,
1517                &index.namespace_name,
1518                &index.table,
1519                &index.name,
1520            ),
1521            value,
1522        )?;
1523        self.write_meta(txn)?;
1524        Ok(())
1525    }
1526
1527    pub fn persist_drop_index(
1528        &mut self,
1529        txn: &mut S::Transaction<'_>,
1530        fqn: &IndexFqn,
1531    ) -> Result<(), CatalogError> {
1532        txn.delete(index_key(
1533            &fqn.catalog,
1534            &fqn.namespace,
1535            &fqn.table,
1536            &fqn.index,
1537        ))?;
1538        Ok(())
1539    }
1540
1541    pub fn persist_overlay(
1542        &mut self,
1543        txn: &mut S::Transaction<'_>,
1544        overlay: &CatalogOverlay,
1545    ) -> Result<(), CatalogError> {
1546        self.ensure_overlay_name_uniqueness(overlay)?;
1547
1548        for catalog in overlay.dropped_catalogs.iter() {
1549            self.persist_drop_catalog(txn, catalog)?;
1550        }
1551
1552        for (catalog, namespace) in overlay.dropped_namespaces.iter() {
1553            if overlay.dropped_catalogs.contains(catalog) {
1554                continue;
1555            }
1556            self.persist_drop_namespace(txn, catalog, namespace)?;
1557        }
1558
1559        for fqn in overlay.dropped_tables.iter() {
1560            if overlay.dropped_catalogs.contains(&fqn.catalog)
1561                || overlay
1562                    .dropped_namespaces
1563                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1564            {
1565                continue;
1566            }
1567            self.persist_drop_table(txn, fqn)?;
1568        }
1569
1570        for fqn in overlay.dropped_indexes.iter() {
1571            if overlay.dropped_catalogs.contains(&fqn.catalog)
1572                || overlay
1573                    .dropped_namespaces
1574                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1575            {
1576                continue;
1577            }
1578            self.persist_drop_index(txn, fqn)?;
1579        }
1580
1581        for meta in overlay.added_catalogs.values() {
1582            if overlay.dropped_catalogs.contains(&meta.name) {
1583                continue;
1584            }
1585            self.persist_create_catalog(txn, meta)?;
1586        }
1587
1588        for meta in overlay.added_namespaces.values() {
1589            if overlay.dropped_catalogs.contains(&meta.catalog_name)
1590                || overlay
1591                    .dropped_namespaces
1592                    .contains(&(meta.catalog_name.clone(), meta.name.clone()))
1593            {
1594                continue;
1595            }
1596            self.persist_create_namespace(txn, meta)?;
1597        }
1598
1599        for (fqn, table) in overlay.added_tables.iter() {
1600            if overlay.dropped_catalogs.contains(&fqn.catalog)
1601                || overlay
1602                    .dropped_namespaces
1603                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1604                || overlay.dropped_tables.contains(fqn)
1605            {
1606                continue;
1607            }
1608            self.persist_create_table(txn, table)?;
1609        }
1610
1611        for (fqn, index) in overlay.added_indexes.iter() {
1612            if overlay.dropped_catalogs.contains(&fqn.catalog)
1613                || overlay
1614                    .dropped_namespaces
1615                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1616                || overlay.dropped_indexes.contains(fqn)
1617            {
1618                continue;
1619            }
1620            self.persist_create_index(txn, index)?;
1621        }
1622
1623        Ok(())
1624    }
1625
1626    fn ensure_overlay_name_uniqueness(&self, overlay: &CatalogOverlay) -> Result<(), CatalogError> {
1627        let mut table_names: HashMap<String, TableFqn> = HashMap::new();
1628        for name in self.inner.table_names() {
1629            let Some(table) = self.inner.get_table(name) else {
1630                continue;
1631            };
1632            let fqn = TableFqn::from(table);
1633            if overlay.dropped_catalogs.contains(&fqn.catalog)
1634                || overlay
1635                    .dropped_namespaces
1636                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1637                || overlay.dropped_tables.contains(&fqn)
1638            {
1639                continue;
1640            }
1641            table_names.insert(table.name.clone(), fqn);
1642        }
1643
1644        for (fqn, table) in overlay.added_tables.iter() {
1645            if overlay.dropped_catalogs.contains(&fqn.catalog)
1646                || overlay
1647                    .dropped_namespaces
1648                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1649                || overlay.dropped_tables.contains(fqn)
1650            {
1651                continue;
1652            }
1653            if let Some(existing) = table_names.get(&table.name)
1654                && existing != fqn
1655            {
1656                return Err(CatalogError::InvalidKey(format!(
1657                    "table name '{}' conflicts across namespaces: {}.{} vs {}.{}",
1658                    table.name, existing.catalog, existing.namespace, fqn.catalog, fqn.namespace
1659                )));
1660            }
1661            table_names.insert(table.name.clone(), fqn.clone());
1662        }
1663
1664        let mut index_names: HashMap<String, IndexFqn> = HashMap::new();
1665        for name in self.inner.index_names() {
1666            let Some(index) = self.inner.get_index(name) else {
1667                continue;
1668            };
1669            let fqn = IndexFqn::from(index);
1670            if overlay.dropped_catalogs.contains(&fqn.catalog)
1671                || overlay
1672                    .dropped_namespaces
1673                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1674                || overlay.dropped_indexes.contains(&fqn)
1675                || overlay.dropped_tables.contains(&TableFqn::new(
1676                    &fqn.catalog,
1677                    &fqn.namespace,
1678                    &fqn.table,
1679                ))
1680            {
1681                continue;
1682            }
1683            index_names.insert(index.name.clone(), fqn);
1684        }
1685
1686        for (fqn, index) in overlay.added_indexes.iter() {
1687            if overlay.dropped_catalogs.contains(&fqn.catalog)
1688                || overlay
1689                    .dropped_namespaces
1690                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1691                || overlay.dropped_indexes.contains(fqn)
1692                || overlay.dropped_tables.contains(&TableFqn::new(
1693                    &fqn.catalog,
1694                    &fqn.namespace,
1695                    &fqn.table,
1696                ))
1697            {
1698                continue;
1699            }
1700            if let Some(existing) = index_names.get(&index.name)
1701                && existing != fqn
1702            {
1703                return Err(CatalogError::InvalidKey(format!(
1704                    "index name '{}' conflicts across namespaces: {}.{} vs {}.{}",
1705                    index.name, existing.catalog, existing.namespace, fqn.catalog, fqn.namespace
1706                )));
1707            }
1708            index_names.insert(index.name.clone(), fqn.clone());
1709        }
1710
1711        Ok(())
1712    }
1713
1714    fn namespace_dropped(overlay: &CatalogOverlay, catalog: &str, namespace: &str) -> bool {
1715        overlay.dropped_catalogs.contains(catalog)
1716            || overlay
1717                .dropped_namespaces
1718                .contains(&(catalog.to_string(), namespace.to_string()))
1719    }
1720
1721    fn overlay_added_table_by_name<'a>(
1722        overlay: &'a CatalogOverlay,
1723        name: &str,
1724    ) -> Option<&'a TableMetadata> {
1725        let mut iter = overlay
1726            .added_tables
1727            .values()
1728            .filter(|table| table.name == name);
1729        let first = iter.next()?;
1730        if iter.next().is_some() {
1731            return None;
1732        }
1733        Some(first)
1734    }
1735
1736    fn overlay_added_index_by_name<'a>(
1737        overlay: &'a CatalogOverlay,
1738        name: &str,
1739    ) -> Option<&'a IndexMetadata> {
1740        let mut iter = overlay
1741            .added_indexes
1742            .values()
1743            .filter(|index| index.name == name);
1744        let first = iter.next()?;
1745        if iter.next().is_some() {
1746            return None;
1747        }
1748        Some(first)
1749    }
1750
1751    fn base_table_conflicts_with_overlay(
1752        &self,
1753        overlay: &CatalogOverlay,
1754        table: &TableMetadata,
1755    ) -> bool {
1756        let Some(base) = self.inner.get_table(&table.name) else {
1757            return false;
1758        };
1759        if self.table_hidden_by_overlay(base, overlay) {
1760            return false;
1761        }
1762        if overlay.dropped_tables.contains(&TableFqn::from(base)) {
1763            return false;
1764        }
1765        TableFqn::from(base) != TableFqn::from(table)
1766    }
1767
1768    fn base_index_conflicts_with_overlay(
1769        &self,
1770        overlay: &CatalogOverlay,
1771        index: &IndexMetadata,
1772    ) -> bool {
1773        let Some(base) = self.inner.get_index(&index.name) else {
1774            return false;
1775        };
1776        if Self::namespace_dropped(overlay, &base.catalog_name, &base.namespace_name) {
1777            return false;
1778        }
1779        if overlay.dropped_indexes.contains(&IndexFqn::from(base)) {
1780            return false;
1781        }
1782        if self.dropped_table_matches_fqn(
1783            &base.table,
1784            &base.catalog_name,
1785            &base.namespace_name,
1786            overlay,
1787        ) {
1788            return false;
1789        }
1790        IndexFqn::from(base) != IndexFqn::from(index)
1791    }
1792
1793    fn table_hidden_by_overlay(&self, table: &TableMetadata, overlay: &CatalogOverlay) -> bool {
1794        Self::namespace_dropped(overlay, &table.catalog_name, &table.namespace_name)
1795    }
1796
1797    fn dropped_table_matches_fqn(
1798        &self,
1799        table_name: &str,
1800        catalog: &str,
1801        namespace: &str,
1802        overlay: &CatalogOverlay,
1803    ) -> bool {
1804        let fqn = TableFqn::new(catalog, namespace, table_name);
1805        overlay.dropped_tables.contains(&fqn)
1806    }
1807
1808    fn index_hidden_by_overlay(&self, index: &IndexMetadata, overlay: &CatalogOverlay) -> bool {
1809        let index_fqn = IndexFqn::from(index);
1810        if overlay.dropped_indexes.contains(&index_fqn) {
1811            return true;
1812        }
1813        if Self::namespace_dropped(overlay, &index.catalog_name, &index.namespace_name) {
1814            return true;
1815        }
1816        if self.dropped_table_matches_fqn(
1817            &index.table,
1818            &index.catalog_name,
1819            &index.namespace_name,
1820            overlay,
1821        ) {
1822            return true;
1823        }
1824        match self.get_table_in_txn(&index.table, overlay) {
1825            Some(table) => {
1826                table.catalog_name != index.catalog_name
1827                    || table.namespace_name != index.namespace_name
1828            }
1829            None => true,
1830        }
1831    }
1832
1833    pub fn get_catalog_in_txn<'a>(
1834        &'a self,
1835        name: &str,
1836        overlay: &'a CatalogOverlay,
1837    ) -> Option<&'a CatalogMeta> {
1838        if overlay.dropped_catalogs.contains(name) {
1839            return None;
1840        }
1841        if let Some(catalog) = overlay.added_catalogs.get(name) {
1842            return Some(catalog);
1843        }
1844        self.catalogs.get(name)
1845    }
1846
1847    pub fn get_namespace_in_txn<'a>(
1848        &'a self,
1849        catalog_name: &str,
1850        namespace_name: &str,
1851        overlay: &'a CatalogOverlay,
1852    ) -> Option<&'a NamespaceMeta> {
1853        if overlay.dropped_catalogs.contains(catalog_name) {
1854            return None;
1855        }
1856        let key = (catalog_name.to_string(), namespace_name.to_string());
1857        if overlay.dropped_namespaces.contains(&key) {
1858            return None;
1859        }
1860        if let Some(namespace) = overlay.added_namespaces.get(&key) {
1861            return Some(namespace);
1862        }
1863        self.namespaces.get(&key)
1864    }
1865
1866    pub fn list_catalogs_in_txn(&self, overlay: &CatalogOverlay) -> Vec<CatalogMeta> {
1867        let mut catalogs: HashMap<String, CatalogMeta> = HashMap::new();
1868        for (name, meta) in &self.catalogs {
1869            if !overlay.dropped_catalogs.contains(name) {
1870                catalogs.insert(name.clone(), meta.clone());
1871            }
1872        }
1873        for (name, meta) in &overlay.added_catalogs {
1874            if !overlay.dropped_catalogs.contains(name) {
1875                catalogs.insert(name.clone(), meta.clone());
1876            }
1877        }
1878        let mut values: Vec<CatalogMeta> = catalogs.into_values().collect();
1879        values.sort_by(|a, b| a.name.cmp(&b.name));
1880        values
1881    }
1882
1883    pub fn list_namespaces_in_txn(
1884        &self,
1885        catalog_name: &str,
1886        overlay: &CatalogOverlay,
1887    ) -> Vec<NamespaceMeta> {
1888        if overlay.dropped_catalogs.contains(catalog_name) {
1889            return Vec::new();
1890        }
1891        let mut namespaces: HashMap<(String, String), NamespaceMeta> = HashMap::new();
1892        for ((catalog, namespace), meta) in &self.namespaces {
1893            if catalog != catalog_name {
1894                continue;
1895            }
1896            let key = (catalog.clone(), namespace.clone());
1897            if overlay.dropped_namespaces.contains(&key) {
1898                continue;
1899            }
1900            namespaces.insert(key, meta.clone());
1901        }
1902        for ((catalog, namespace), meta) in &overlay.added_namespaces {
1903            if catalog != catalog_name {
1904                continue;
1905            }
1906            let key = (catalog.clone(), namespace.clone());
1907            if overlay.dropped_namespaces.contains(&key) {
1908                continue;
1909            }
1910            namespaces.insert(key, meta.clone());
1911        }
1912        let mut values: Vec<NamespaceMeta> = namespaces.into_values().collect();
1913        values.sort_by(|a, b| a.name.cmp(&b.name));
1914        values
1915    }
1916
1917    pub fn table_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
1918        if let Some(table) = Self::overlay_added_table_by_name(overlay, name) {
1919            if self.table_hidden_by_overlay(table, overlay) {
1920                return false;
1921            }
1922            if self.base_table_conflicts_with_overlay(overlay, table) {
1923                return false;
1924            }
1925            return true;
1926        }
1927        match self.inner.get_table(name) {
1928            Some(table) => {
1929                !self.table_hidden_by_overlay(table, overlay)
1930                    && !overlay.dropped_tables.contains(&TableFqn::from(table))
1931            }
1932            None => false,
1933        }
1934    }
1935
1936    pub fn get_table_in_txn<'a>(
1937        &'a self,
1938        name: &str,
1939        overlay: &'a CatalogOverlay,
1940    ) -> Option<&'a TableMetadata> {
1941        if let Some(table) = Self::overlay_added_table_by_name(overlay, name) {
1942            if self.table_hidden_by_overlay(table, overlay) {
1943                return None;
1944            }
1945            if self.base_table_conflicts_with_overlay(overlay, table) {
1946                return None;
1947            }
1948            return Some(table);
1949        }
1950        self.inner.get_table(name).filter(|table| {
1951            !self.table_hidden_by_overlay(table, overlay)
1952                && !overlay.dropped_tables.contains(&TableFqn::from(*table))
1953        })
1954    }
1955
1956    pub fn index_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
1957        if let Some(index) = Self::overlay_added_index_by_name(overlay, name) {
1958            if self.index_hidden_by_overlay(index, overlay) {
1959                return false;
1960            }
1961            if self.base_index_conflicts_with_overlay(overlay, index) {
1962                return false;
1963            }
1964            return true;
1965        }
1966        match self.inner.get_index(name) {
1967            Some(index) => !self.index_hidden_by_overlay(index, overlay),
1968            None => false,
1969        }
1970    }
1971
1972    pub fn get_index_in_txn<'a>(
1973        &'a self,
1974        name: &str,
1975        overlay: &'a CatalogOverlay,
1976    ) -> Option<&'a IndexMetadata> {
1977        if let Some(index) = Self::overlay_added_index_by_name(overlay, name) {
1978            if self.index_hidden_by_overlay(index, overlay) {
1979                return None;
1980            }
1981            if self.base_index_conflicts_with_overlay(overlay, index) {
1982                return None;
1983            }
1984            return Some(index);
1985        }
1986        match self.inner.get_index(name) {
1987            Some(index) if self.index_hidden_by_overlay(index, overlay) => None,
1988            other => other,
1989        }
1990    }
1991
1992    pub fn list_tables_in_txn(
1993        &self,
1994        catalog_name: &str,
1995        namespace_name: &str,
1996        overlay: &CatalogOverlay,
1997    ) -> Vec<TableMetadata> {
1998        if overlay.dropped_catalogs.contains(catalog_name) {
1999            return Vec::new();
2000        }
2001        if overlay
2002            .dropped_namespaces
2003            .contains(&(catalog_name.to_string(), namespace_name.to_string()))
2004        {
2005            return Vec::new();
2006        }
2007        let mut tables: HashMap<TableFqn, TableMetadata> = HashMap::new();
2008        for name in self.inner.table_names() {
2009            if let Some(table) = self.inner.get_table(name)
2010                && table.catalog_name == catalog_name
2011                && table.namespace_name == namespace_name
2012            {
2013                let fqn = TableFqn::from(table);
2014                if !overlay.dropped_tables.contains(&fqn) {
2015                    tables.insert(fqn, table.clone());
2016                }
2017            }
2018        }
2019        for table in overlay.added_tables.values() {
2020            if table.catalog_name == catalog_name && table.namespace_name == namespace_name {
2021                tables.insert(TableFqn::from(table), table.clone());
2022            }
2023        }
2024        let mut values: Vec<TableMetadata> = tables.into_values().collect();
2025        values.sort_by(|a, b| a.name.cmp(&b.name));
2026        values
2027    }
2028
2029    pub fn list_indexes_in_txn(
2030        &self,
2031        fqn: &TableFqn,
2032        overlay: &CatalogOverlay,
2033    ) -> Vec<IndexMetadata> {
2034        if overlay.dropped_catalogs.contains(&fqn.catalog) {
2035            return Vec::new();
2036        }
2037        if overlay
2038            .dropped_namespaces
2039            .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
2040        {
2041            return Vec::new();
2042        }
2043        if overlay
2044            .dropped_tables
2045            .contains(&TableFqn::new(&fqn.catalog, &fqn.namespace, &fqn.table))
2046        {
2047            return Vec::new();
2048        }
2049
2050        let mut indexes: HashMap<IndexFqn, IndexMetadata> = HashMap::new();
2051        for index in self.inner.get_indexes_for_table(&fqn.table) {
2052            if index.catalog_name == fqn.catalog && index.namespace_name == fqn.namespace {
2053                let index_fqn = IndexFqn::from(index);
2054                if !overlay.dropped_indexes.contains(&index_fqn) {
2055                    indexes.insert(index_fqn, index.clone());
2056                }
2057            }
2058        }
2059        for index in overlay.added_indexes.values() {
2060            if index.table == fqn.table
2061                && index.catalog_name == fqn.catalog
2062                && index.namespace_name == fqn.namespace
2063            {
2064                indexes.insert(IndexFqn::from(index), index.clone());
2065            }
2066        }
2067        let mut values: Vec<IndexMetadata> = indexes.into_values().collect();
2068        values.sort_by(|a, b| a.name.cmp(&b.name));
2069        values
2070    }
2071
2072    pub fn apply_overlay(&mut self, overlay: CatalogOverlay) {
2073        let CatalogOverlay {
2074            added_catalogs,
2075            dropped_catalogs,
2076            added_namespaces,
2077            dropped_namespaces,
2078            added_tables,
2079            dropped_tables,
2080            added_indexes,
2081            dropped_indexes,
2082        } = overlay;
2083
2084        for (name, meta) in added_catalogs {
2085            self.catalogs.insert(name, meta);
2086        }
2087        for (catalog_name, namespace_name) in dropped_namespaces.iter() {
2088            self.namespaces
2089                .remove(&(catalog_name.clone(), namespace_name.clone()));
2090        }
2091        for ((catalog_name, namespace_name), meta) in added_namespaces {
2092            self.namespaces.insert((catalog_name, namespace_name), meta);
2093        }
2094        for name in dropped_catalogs {
2095            self.catalogs.remove(&name);
2096            self.namespaces.retain(|(catalog, _), _| catalog != &name);
2097        }
2098        for (_, table) in added_tables {
2099            self.inner.insert_table_unchecked(table);
2100        }
2101        for fqn in dropped_tables {
2102            self.inner.remove_table_unchecked(&fqn.table);
2103        }
2104        for (_, index) in added_indexes {
2105            self.inner.insert_index_unchecked(index);
2106        }
2107        for fqn in dropped_indexes {
2108            self.inner.remove_index_unchecked(&fqn.index);
2109        }
2110    }
2111
2112    pub fn discard_overlay(_overlay: CatalogOverlay) {}
2113}
2114
2115impl<S: KVStore> Catalog for PersistentCatalog<S> {
2116    fn create_table(&mut self, table: TableMetadata) -> Result<(), PlannerError> {
2117        self.inner.create_table(table)
2118    }
2119
2120    fn get_table(&self, name: &str) -> Option<&TableMetadata> {
2121        self.inner.get_table(name)
2122    }
2123
2124    fn drop_table(&mut self, name: &str) -> Result<(), PlannerError> {
2125        self.inner.drop_table(name)
2126    }
2127
2128    fn create_index(&mut self, index: IndexMetadata) -> Result<(), PlannerError> {
2129        self.inner.create_index(index)
2130    }
2131
2132    fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
2133        self.inner.get_index(name)
2134    }
2135
2136    fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
2137        self.inner.get_indexes_for_table(table)
2138    }
2139
2140    fn drop_index(&mut self, name: &str) -> Result<(), PlannerError> {
2141        self.inner.drop_index(name)
2142    }
2143
2144    fn table_exists(&self, name: &str) -> bool {
2145        self.inner.table_exists(name)
2146    }
2147
2148    fn index_exists(&self, name: &str) -> bool {
2149        self.inner.index_exists(name)
2150    }
2151
2152    fn next_table_id(&mut self) -> u32 {
2153        self.inner.next_table_id()
2154    }
2155
2156    fn next_index_id(&mut self) -> u32 {
2157        self.inner.next_index_id()
2158    }
2159
2160    fn list_tables(&self) -> Vec<TableMetadata> {
2161        let mut tables = Vec::new();
2162        for name in self.inner.table_names() {
2163            if let Some(table) = self.inner.get_table(name) {
2164                tables.push(table.clone());
2165            }
2166        }
2167        tables
2168    }
2169
2170    fn persistence_enabled(&self) -> bool {
2171        true
2172    }
2173}
2174
2175#[cfg(test)]
2176mod tests {
2177    use super::*;
2178    use crate::planner::types::ResolvedType;
2179    use std::collections::HashSet;
2180
2181    fn test_table(name: &str, id: u32) -> TableMetadata {
2182        TableMetadata::new(
2183            name,
2184            vec![ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true)],
2185        )
2186        .with_table_id(id)
2187        .with_primary_key(vec!["id".to_string()])
2188    }
2189
2190    fn legacy_table_key(table_name: &str) -> Vec<u8> {
2191        let mut key = TABLES_PREFIX.to_vec();
2192        key.extend_from_slice(table_name.as_bytes());
2193        key
2194    }
2195
2196    fn legacy_index_key(index_name: &str) -> Vec<u8> {
2197        let mut key = INDEXES_PREFIX.to_vec();
2198        key.extend_from_slice(index_name.as_bytes());
2199        key
2200    }
2201
2202    fn seed_legacy_store(store: &Arc<alopex_core::kv::memory::MemoryKV>) {
2203        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2204        let table = test_table("users", 7);
2205        let legacy_table = PersistedTableMetaV1 {
2206            table_id: table.table_id,
2207            name: table.name.clone(),
2208            columns: table
2209                .columns
2210                .iter()
2211                .map(PersistedColumnMeta::from)
2212                .collect(),
2213            primary_key: table.primary_key.clone(),
2214            storage_options: table.storage_options.clone().into(),
2215        };
2216        let table_bytes = bincode::serialize(&legacy_table).unwrap();
2217        txn.put(legacy_table_key("users"), table_bytes).unwrap();
2218
2219        let legacy_index = PersistedIndexMetaV1 {
2220            index_id: 3,
2221            name: "idx_users_id".to_string(),
2222            table: "users".to_string(),
2223            columns: vec!["id".to_string()],
2224            column_indices: vec![0],
2225            unique: false,
2226            method: Some(PersistedIndexType::BTree),
2227            options: Vec::new(),
2228        };
2229        let index_bytes = bincode::serialize(&legacy_index).unwrap();
2230        txn.put(legacy_index_key("idx_users_id"), index_bytes)
2231            .unwrap();
2232
2233        let meta = CatalogState {
2234            version: 1,
2235            table_id_counter: 7,
2236            index_id_counter: 3,
2237        };
2238        let meta_bytes = bincode::serialize(&meta).unwrap();
2239        txn.put(META_KEY.to_vec(), meta_bytes).unwrap();
2240        txn.commit_self().unwrap();
2241    }
2242
2243    #[test]
2244    fn load_empty_store() {
2245        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2246        let catalog = PersistentCatalog::load(store).unwrap();
2247        assert_eq!(catalog.inner.table_count(), 0);
2248        assert_eq!(catalog.inner.index_count(), 0);
2249    }
2250
2251    #[test]
2252    fn load_migrates_v1_keys_and_meta() {
2253        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2254        seed_legacy_store(&store);
2255
2256        let reloaded = PersistentCatalog::load(store.clone()).unwrap();
2257        assert!(reloaded.get_catalog("default").is_some());
2258        assert!(reloaded.get_namespace("default", "default").is_some());
2259
2260        let table = reloaded.get_table("users").unwrap();
2261        assert_eq!(table.catalog_name, "default");
2262        assert_eq!(table.namespace_name, "default");
2263
2264        let index = reloaded.get_index("idx_users_id").unwrap();
2265        assert_eq!(index.catalog_name, "default");
2266        assert_eq!(index.namespace_name, "default");
2267        assert_eq!(index.table, "users");
2268
2269        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2270        assert!(txn.get(&legacy_table_key("users")).unwrap().is_none());
2271        assert!(
2272            txn.get(&table_key("default", "default", "users"))
2273                .unwrap()
2274                .is_some()
2275        );
2276        assert!(
2277            txn.get(&legacy_index_key("idx_users_id"))
2278                .unwrap()
2279                .is_none()
2280        );
2281        assert!(
2282            txn.get(&index_key("default", "default", "users", "idx_users_id"))
2283                .unwrap()
2284                .is_some()
2285        );
2286        let meta_bytes = txn.get(&META_KEY.to_vec()).unwrap().unwrap();
2287        let meta: CatalogState = bincode::deserialize(&meta_bytes).unwrap();
2288        assert_eq!(meta.version, CATALOG_VERSION);
2289        txn.rollback_self().unwrap();
2290    }
2291
2292    #[test]
2293    fn load_after_migration_keeps_v2_keys() {
2294        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2295        seed_legacy_store(&store);
2296
2297        let _ = PersistentCatalog::load(store.clone()).unwrap();
2298        let reloaded = PersistentCatalog::load(store.clone()).unwrap();
2299
2300        let table = reloaded.get_table("users").unwrap();
2301        assert_eq!(table.catalog_name, "default");
2302        assert_eq!(table.namespace_name, "default");
2303
2304        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2305        assert!(txn.get(&legacy_table_key("users")).unwrap().is_none());
2306        assert!(
2307            txn.get(&table_key("default", "default", "users"))
2308                .unwrap()
2309                .is_some()
2310        );
2311        let meta_bytes = txn.get(&META_KEY.to_vec()).unwrap().unwrap();
2312        let meta: CatalogState = bincode::deserialize(&meta_bytes).unwrap();
2313        assert_eq!(meta.version, CATALOG_VERSION);
2314        txn.rollback_self().unwrap();
2315    }
2316
2317    #[test]
2318    fn create_table_persists() {
2319        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2320        let mut catalog = PersistentCatalog::new(store.clone());
2321
2322        // inner のカウンタを更新して meta が書き込まれることを担保する
2323        catalog.inner.set_counters(1, 0);
2324
2325        let table = test_table("users", 1);
2326        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2327        catalog.persist_create_table(&mut txn, &table).unwrap();
2328        txn.commit_self().unwrap();
2329
2330        let reloaded = PersistentCatalog::load(store).unwrap();
2331        assert!(reloaded.table_exists("users"));
2332        assert_eq!(reloaded.get_table("users").unwrap().table_id, 1);
2333    }
2334
2335    #[test]
2336    fn drop_table_removes() {
2337        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2338        let mut catalog = PersistentCatalog::new(store.clone());
2339        catalog.inner.set_counters(1, 0);
2340
2341        let table = test_table("users", 1);
2342        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2343        catalog.persist_create_table(&mut txn, &table).unwrap();
2344        txn.commit_self().unwrap();
2345
2346        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2347        let fqn = TableFqn::new("default", "default", "users");
2348        catalog.persist_drop_table(&mut txn, &fqn).unwrap();
2349        txn.commit_self().unwrap();
2350
2351        let reloaded = PersistentCatalog::load(store).unwrap();
2352        assert!(!reloaded.table_exists("users"));
2353    }
2354
2355    #[test]
2356    fn reload_preserves_state() {
2357        let temp_dir = tempfile::tempdir().unwrap();
2358        let wal_path = temp_dir.path().join("catalog.wal");
2359        let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
2360        let mut catalog = PersistentCatalog::new(store.clone());
2361        catalog.inner.set_counters(1, 0);
2362
2363        let table = test_table("users", 1);
2364        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2365        catalog.persist_create_table(&mut txn, &table).unwrap();
2366        txn.commit_self().unwrap();
2367        store.flush().unwrap();
2368
2369        drop(catalog);
2370        drop(store);
2371
2372        let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
2373        let reloaded = PersistentCatalog::load(store).unwrap();
2374        assert!(reloaded.table_exists("users"));
2375    }
2376
2377    #[test]
2378    fn overlay_applied_on_commit() {
2379        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2380        let mut catalog = PersistentCatalog::new(store);
2381        let users = test_table("users", 1);
2382        catalog.inner.insert_table_unchecked(users.clone());
2383
2384        let mut overlay = CatalogOverlay::new();
2385        overlay.drop_table(&TableFqn::from(&users));
2386        let orders = test_table("orders", 2);
2387        overlay.add_table(TableFqn::from(&orders), orders);
2388
2389        assert!(!catalog.table_exists_in_txn("users", &overlay));
2390        assert!(catalog.table_exists_in_txn("orders", &overlay));
2391
2392        catalog.apply_overlay(overlay);
2393
2394        assert!(!catalog.table_exists("users"));
2395        assert!(catalog.table_exists("orders"));
2396    }
2397
2398    #[test]
2399    fn overlay_discarded_on_rollback() {
2400        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2401        let mut catalog = PersistentCatalog::new(store);
2402        let users = test_table("users", 1);
2403        catalog.inner.insert_table_unchecked(users.clone());
2404
2405        let mut overlay = CatalogOverlay::new();
2406        overlay.drop_table(&TableFqn::from(&users));
2407
2408        PersistentCatalog::<alopex_core::kv::memory::MemoryKV>::discard_overlay(overlay);
2409
2410        assert!(catalog.table_exists("users"));
2411    }
2412
2413    #[test]
2414    fn catalog_crud_persists() {
2415        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2416        let mut catalog = PersistentCatalog::new(store.clone());
2417
2418        let meta = CatalogMeta {
2419            name: "main".to_string(),
2420            comment: Some("primary".to_string()),
2421            storage_root: Some("/tmp/alopex".to_string()),
2422        };
2423
2424        catalog.create_catalog(meta.clone()).unwrap();
2425        assert!(catalog.get_catalog("main").is_some());
2426
2427        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2428        let stored = txn.get(&catalog_key("main")).unwrap().unwrap();
2429        let decoded: CatalogMeta = bincode::deserialize(&stored).unwrap();
2430        txn.rollback_self().unwrap();
2431        assert_eq!(decoded, meta);
2432
2433        catalog.delete_catalog("main").unwrap();
2434        assert!(catalog.get_catalog("main").is_none());
2435    }
2436
2437    #[test]
2438    fn namespace_crud_persists_and_validates_catalog() {
2439        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2440        let mut catalog = PersistentCatalog::new(store.clone());
2441
2442        let missing_catalog = NamespaceMeta {
2443            name: "analytics".to_string(),
2444            catalog_name: "missing".to_string(),
2445            comment: None,
2446            storage_root: None,
2447        };
2448        let err = catalog.create_namespace(missing_catalog).unwrap_err();
2449        assert!(matches!(err, CatalogError::InvalidKey(_)));
2450
2451        catalog
2452            .create_catalog(CatalogMeta {
2453                name: "main".to_string(),
2454                comment: None,
2455                storage_root: None,
2456            })
2457            .unwrap();
2458
2459        let namespace = NamespaceMeta {
2460            name: "analytics".to_string(),
2461            catalog_name: "main".to_string(),
2462            comment: Some("warehouse".to_string()),
2463            storage_root: None,
2464        };
2465
2466        catalog.create_namespace(namespace.clone()).unwrap();
2467        assert!(catalog.get_namespace("main", "analytics").is_some());
2468
2469        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2470        let stored = txn
2471            .get(&namespace_key("main", "analytics"))
2472            .unwrap()
2473            .unwrap();
2474        let decoded: NamespaceMeta = bincode::deserialize(&stored).unwrap();
2475        txn.rollback_self().unwrap();
2476        assert_eq!(decoded, namespace);
2477
2478        catalog.delete_namespace("main", "analytics").unwrap();
2479        assert!(catalog.get_namespace("main", "analytics").is_none());
2480    }
2481
2482    #[test]
2483    fn delete_catalog_removes_namespaces_from_store() {
2484        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2485        let mut catalog = PersistentCatalog::new(store.clone());
2486
2487        catalog
2488            .create_catalog(CatalogMeta {
2489                name: "main".to_string(),
2490                comment: None,
2491                storage_root: None,
2492            })
2493            .unwrap();
2494        catalog
2495            .create_namespace(NamespaceMeta {
2496                name: "analytics".to_string(),
2497                catalog_name: "main".to_string(),
2498                comment: None,
2499                storage_root: None,
2500            })
2501            .unwrap();
2502
2503        catalog.delete_catalog("main").unwrap();
2504
2505        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2506        let mut prefix = NAMESPACES_PREFIX.to_vec();
2507        prefix.extend_from_slice(b"main");
2508        prefix.push(b'/');
2509        let remaining: Vec<_> = txn.scan_prefix(&prefix).unwrap().collect();
2510        txn.rollback_self().unwrap();
2511
2512        assert!(remaining.is_empty());
2513        assert!(catalog.list_namespaces("main").is_empty());
2514    }
2515
2516    #[test]
2517    fn delete_catalog_removes_tables_and_indexes_from_store() {
2518        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2519        let mut catalog = PersistentCatalog::new(store.clone());
2520
2521        catalog
2522            .create_catalog(CatalogMeta {
2523                name: "main".to_string(),
2524                comment: None,
2525                storage_root: None,
2526            })
2527            .unwrap();
2528
2529        let mut table = test_table("users", 1);
2530        table.catalog_name = "main".to_string();
2531        table.namespace_name = "default".to_string();
2532
2533        let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2534            .with_column_indices(vec![0]);
2535        index.catalog_name = "main".to_string();
2536        index.namespace_name = "default".to_string();
2537
2538        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2539        catalog.persist_create_table(&mut txn, &table).unwrap();
2540        catalog.persist_create_index(&mut txn, &index).unwrap();
2541        txn.commit_self().unwrap();
2542
2543        catalog.inner.insert_table_unchecked(table);
2544        catalog.inner.insert_index_unchecked(index);
2545
2546        catalog.delete_catalog("main").unwrap();
2547
2548        assert!(catalog.inner.get_table("users").is_none());
2549        assert!(catalog.inner.get_index("idx_users_id").is_none());
2550
2551        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2552        assert!(
2553            txn.get(&table_key("main", "default", "users"))
2554                .unwrap()
2555                .is_none()
2556        );
2557        assert!(
2558            txn.get(&index_key("main", "default", "users", "idx_users_id"))
2559                .unwrap()
2560                .is_none()
2561        );
2562        txn.rollback_self().unwrap();
2563    }
2564
2565    #[test]
2566    fn index_meta_loads_catalog_and_namespace_from_table() {
2567        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2568        let mut catalog = PersistentCatalog::new(store.clone());
2569        catalog.inner.set_counters(1, 1);
2570
2571        let mut table = test_table("users", 1);
2572        table.catalog_name = "main".to_string();
2573        table.namespace_name = "analytics".to_string();
2574
2575        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2576        catalog.persist_create_table(&mut txn, &table).unwrap();
2577        let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2578            .with_column_indices(vec![0])
2579            .with_method(IndexMethod::BTree);
2580        index.catalog_name = "main".to_string();
2581        index.namespace_name = "analytics".to_string();
2582        catalog.persist_create_index(&mut txn, &index).unwrap();
2583        txn.commit_self().unwrap();
2584
2585        let reloaded = PersistentCatalog::load(store).unwrap();
2586        let index = reloaded.get_index("idx_users_id").unwrap();
2587        assert_eq!(index.catalog_name, "main");
2588        assert_eq!(index.namespace_name, "analytics");
2589    }
2590
2591    #[test]
2592    fn legacy_index_meta_loads_catalog_and_namespace_from_table() {
2593        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2594        let mut catalog = PersistentCatalog::new(store.clone());
2595        catalog.inner.set_counters(1, 1);
2596
2597        let mut table = test_table("users", 1);
2598        table.catalog_name = "main".to_string();
2599        table.namespace_name = "analytics".to_string();
2600
2601        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2602        catalog.persist_create_table(&mut txn, &table).unwrap();
2603        let legacy = PersistedIndexMetaV1 {
2604            index_id: 1,
2605            name: "idx_users_id".to_string(),
2606            table: "users".to_string(),
2607            columns: vec!["id".to_string()],
2608            column_indices: vec![0],
2609            unique: false,
2610            method: Some(PersistedIndexType::BTree),
2611            options: Vec::new(),
2612        };
2613        let bytes = bincode::serialize(&legacy).unwrap();
2614        txn.put(
2615            index_key("main", "analytics", "users", "idx_users_id"),
2616            bytes,
2617        )
2618        .unwrap();
2619        txn.commit_self().unwrap();
2620
2621        let reloaded = PersistentCatalog::load(store).unwrap();
2622        let index = reloaded.get_index("idx_users_id").unwrap();
2623        assert_eq!(index.catalog_name, "main");
2624        assert_eq!(index.namespace_name, "analytics");
2625    }
2626
2627    #[test]
2628    fn overlay_catalog_get_and_list() {
2629        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2630        let mut catalog = PersistentCatalog::new(store);
2631        catalog
2632            .create_catalog(CatalogMeta {
2633                name: "main".to_string(),
2634                comment: None,
2635                storage_root: None,
2636            })
2637            .unwrap();
2638
2639        let mut overlay = CatalogOverlay::new();
2640        overlay.add_catalog(CatalogMeta {
2641            name: "temp".to_string(),
2642            comment: None,
2643            storage_root: None,
2644        });
2645        overlay.drop_catalog("main");
2646
2647        assert!(catalog.get_catalog_in_txn("main", &overlay).is_none());
2648        assert!(catalog.get_catalog_in_txn("temp", &overlay).is_some());
2649
2650        let names: Vec<String> = catalog
2651            .list_catalogs_in_txn(&overlay)
2652            .into_iter()
2653            .map(|meta| meta.name)
2654            .collect();
2655        assert_eq!(names, vec!["temp".to_string()]);
2656    }
2657
2658    #[test]
2659    fn overlay_namespace_get_and_list() {
2660        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2661        let mut catalog = PersistentCatalog::new(store);
2662        catalog
2663            .create_catalog(CatalogMeta {
2664                name: "main".to_string(),
2665                comment: None,
2666                storage_root: None,
2667            })
2668            .unwrap();
2669        catalog
2670            .create_namespace(NamespaceMeta {
2671                name: "default".to_string(),
2672                catalog_name: "main".to_string(),
2673                comment: None,
2674                storage_root: None,
2675            })
2676            .unwrap();
2677
2678        let mut overlay = CatalogOverlay::new();
2679        overlay.add_namespace(NamespaceMeta {
2680            name: "analytics".to_string(),
2681            catalog_name: "main".to_string(),
2682            comment: None,
2683            storage_root: None,
2684        });
2685        overlay.drop_namespace("main", "default");
2686
2687        assert!(
2688            catalog
2689                .get_namespace_in_txn("main", "default", &overlay)
2690                .is_none()
2691        );
2692        assert!(
2693            catalog
2694                .get_namespace_in_txn("main", "analytics", &overlay)
2695                .is_some()
2696        );
2697
2698        let names: Vec<String> = catalog
2699            .list_namespaces_in_txn("main", &overlay)
2700            .into_iter()
2701            .map(|meta| meta.name)
2702            .collect();
2703        assert_eq!(names, vec!["analytics".to_string()]);
2704    }
2705
2706    #[test]
2707    fn overlay_table_and_index_list() {
2708        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2709        let mut catalog = PersistentCatalog::new(store);
2710
2711        let mut users = test_table("users", 1);
2712        users.catalog_name = "main".to_string();
2713        users.namespace_name = "default".to_string();
2714        catalog.inner.insert_table_unchecked(users.clone());
2715
2716        let mut users_index =
2717            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2718                .with_column_indices(vec![0]);
2719        users_index.catalog_name = "main".to_string();
2720        users_index.namespace_name = "default".to_string();
2721        catalog.inner.insert_index_unchecked(users_index);
2722
2723        let mut overlay = CatalogOverlay::new();
2724
2725        let mut orders = test_table("orders", 2);
2726        orders.catalog_name = "main".to_string();
2727        orders.namespace_name = "default".to_string();
2728        overlay.add_table(TableFqn::from(&orders), orders.clone());
2729
2730        let mut orders_index =
2731            IndexMetadata::new(2, "idx_orders_id", "orders", vec!["id".to_string()])
2732                .with_column_indices(vec![0]);
2733        orders_index.catalog_name = "main".to_string();
2734        orders_index.namespace_name = "default".to_string();
2735        overlay.add_index(IndexFqn::from(&orders_index), orders_index);
2736
2737        overlay.drop_table(&TableFqn::from(&users));
2738
2739        let table_names: Vec<String> = catalog
2740            .list_tables_in_txn("main", "default", &overlay)
2741            .into_iter()
2742            .map(|table| table.name)
2743            .collect();
2744        assert_eq!(table_names, vec!["orders".to_string()]);
2745
2746        let users_fqn = TableFqn::new("main", "default", "users");
2747        assert!(catalog.list_indexes_in_txn(&users_fqn, &overlay).is_empty());
2748
2749        let orders_fqn = TableFqn::new("main", "default", "orders");
2750        let index_names: Vec<String> = catalog
2751            .list_indexes_in_txn(&orders_fqn, &overlay)
2752            .into_iter()
2753            .map(|index| index.name)
2754            .collect();
2755        assert_eq!(index_names, vec!["idx_orders_id".to_string()]);
2756    }
2757
2758    #[test]
2759    fn overlay_name_lookup_ambiguous_returns_none() {
2760        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2761        let catalog = PersistentCatalog::new(store);
2762
2763        let mut overlay = CatalogOverlay::new();
2764
2765        let mut users_default = test_table("users", 1);
2766        users_default.catalog_name = "main".to_string();
2767        users_default.namespace_name = "default".to_string();
2768        overlay.add_table(TableFqn::from(&users_default), users_default);
2769
2770        let mut users_analytics = test_table("users", 2);
2771        users_analytics.catalog_name = "main".to_string();
2772        users_analytics.namespace_name = "analytics".to_string();
2773        overlay.add_table(TableFqn::from(&users_analytics), users_analytics);
2774
2775        assert!(catalog.get_table_in_txn("users", &overlay).is_none());
2776        assert!(!catalog.table_exists_in_txn("users", &overlay));
2777
2778        let mut idx_default =
2779            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2780                .with_column_indices(vec![0]);
2781        idx_default.catalog_name = "main".to_string();
2782        idx_default.namespace_name = "default".to_string();
2783        overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2784
2785        let mut idx_analytics =
2786            IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2787                .with_column_indices(vec![0]);
2788        idx_analytics.catalog_name = "main".to_string();
2789        idx_analytics.namespace_name = "analytics".to_string();
2790        overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2791
2792        assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
2793        assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
2794    }
2795
2796    #[test]
2797    fn overlay_name_lookup_ambiguous_with_base_returns_none() {
2798        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2799        let mut catalog = PersistentCatalog::new(store);
2800
2801        let mut overlay = CatalogOverlay::new();
2802
2803        let mut base_users = test_table("users", 1);
2804        base_users.catalog_name = "main".to_string();
2805        base_users.namespace_name = "default".to_string();
2806        catalog.inner.insert_table_unchecked(base_users);
2807
2808        let mut overlay_users = test_table("users", 2);
2809        overlay_users.catalog_name = "main".to_string();
2810        overlay_users.namespace_name = "analytics".to_string();
2811        overlay.add_table(TableFqn::from(&overlay_users), overlay_users);
2812
2813        assert!(catalog.get_table_in_txn("users", &overlay).is_none());
2814        assert!(!catalog.table_exists_in_txn("users", &overlay));
2815
2816        let mut base_index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2817            .with_column_indices(vec![0]);
2818        base_index.catalog_name = "main".to_string();
2819        base_index.namespace_name = "default".to_string();
2820        catalog.inner.insert_index_unchecked(base_index);
2821
2822        let mut overlay_index =
2823            IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2824                .with_column_indices(vec![0]);
2825        overlay_index.catalog_name = "main".to_string();
2826        overlay_index.namespace_name = "analytics".to_string();
2827        overlay.add_index(IndexFqn::from(&overlay_index), overlay_index);
2828
2829        assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
2830        assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
2831    }
2832
2833    #[test]
2834    fn overlay_fqn_tables_separate_namespaces() {
2835        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2836        let catalog = PersistentCatalog::new(store);
2837
2838        let mut overlay = CatalogOverlay::new();
2839
2840        let mut users_default = test_table("users", 1);
2841        users_default.catalog_name = "main".to_string();
2842        users_default.namespace_name = "default".to_string();
2843        overlay.add_table(TableFqn::from(&users_default), users_default.clone());
2844
2845        let mut users_analytics = test_table("users", 2);
2846        users_analytics.catalog_name = "main".to_string();
2847        users_analytics.namespace_name = "analytics".to_string();
2848        overlay.add_table(TableFqn::from(&users_analytics), users_analytics.clone());
2849
2850        let default_tables: Vec<String> = catalog
2851            .list_tables_in_txn("main", "default", &overlay)
2852            .into_iter()
2853            .map(|table| table.name)
2854            .collect();
2855        assert_eq!(default_tables, vec!["users".to_string()]);
2856
2857        let analytics_tables: Vec<String> = catalog
2858            .list_tables_in_txn("main", "analytics", &overlay)
2859            .into_iter()
2860            .map(|table| table.name)
2861            .collect();
2862        assert_eq!(analytics_tables, vec!["users".to_string()]);
2863
2864        overlay.drop_table(&TableFqn::from(&users_default));
2865
2866        let default_tables_after: Vec<String> = catalog
2867            .list_tables_in_txn("main", "default", &overlay)
2868            .into_iter()
2869            .map(|table| table.name)
2870            .collect();
2871        assert!(default_tables_after.is_empty());
2872
2873        let analytics_tables_after: Vec<String> = catalog
2874            .list_tables_in_txn("main", "analytics", &overlay)
2875            .into_iter()
2876            .map(|table| table.name)
2877            .collect();
2878        assert_eq!(analytics_tables_after, vec!["users".to_string()]);
2879    }
2880
2881    #[test]
2882    fn overlay_fqn_indexes_separate_namespaces() {
2883        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2884        let catalog = PersistentCatalog::new(store);
2885
2886        let mut overlay = CatalogOverlay::new();
2887
2888        let mut users_default = test_table("users", 1);
2889        users_default.catalog_name = "main".to_string();
2890        users_default.namespace_name = "default".to_string();
2891        overlay.add_table(TableFqn::from(&users_default), users_default.clone());
2892
2893        let mut users_analytics = test_table("users", 2);
2894        users_analytics.catalog_name = "main".to_string();
2895        users_analytics.namespace_name = "analytics".to_string();
2896        overlay.add_table(TableFqn::from(&users_analytics), users_analytics.clone());
2897
2898        let mut idx_default =
2899            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2900                .with_column_indices(vec![0]);
2901        idx_default.catalog_name = "main".to_string();
2902        idx_default.namespace_name = "default".to_string();
2903        overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2904
2905        let mut idx_analytics =
2906            IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2907                .with_column_indices(vec![0]);
2908        idx_analytics.catalog_name = "main".to_string();
2909        idx_analytics.namespace_name = "analytics".to_string();
2910        overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2911
2912        let default_fqn = TableFqn::new("main", "default", "users");
2913        let analytics_fqn = TableFqn::new("main", "analytics", "users");
2914
2915        let default_indexes: Vec<String> = catalog
2916            .list_indexes_in_txn(&default_fqn, &overlay)
2917            .into_iter()
2918            .map(|index| index.name)
2919            .collect();
2920        assert_eq!(default_indexes, vec!["idx_users_id".to_string()]);
2921
2922        let analytics_indexes: Vec<String> = catalog
2923            .list_indexes_in_txn(&analytics_fqn, &overlay)
2924            .into_iter()
2925            .map(|index| index.name)
2926            .collect();
2927        assert_eq!(analytics_indexes, vec!["idx_users_id".to_string()]);
2928    }
2929
2930    #[test]
2931    fn persist_overlay_rejects_duplicate_table_names() {
2932        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2933        let mut catalog = PersistentCatalog::new(store.clone());
2934
2935        let mut overlay = CatalogOverlay::new();
2936
2937        let mut users_default = test_table("users", 1);
2938        users_default.catalog_name = "main".to_string();
2939        users_default.namespace_name = "default".to_string();
2940        overlay.add_table(TableFqn::from(&users_default), users_default);
2941
2942        let mut users_analytics = test_table("users", 2);
2943        users_analytics.catalog_name = "main".to_string();
2944        users_analytics.namespace_name = "analytics".to_string();
2945        overlay.add_table(TableFqn::from(&users_analytics), users_analytics);
2946
2947        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2948        let err = catalog.persist_overlay(&mut txn, &overlay).unwrap_err();
2949        assert!(matches!(err, CatalogError::InvalidKey(_)));
2950        txn.rollback_self().unwrap();
2951    }
2952
2953    #[test]
2954    fn persist_overlay_rejects_duplicate_index_names() {
2955        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2956        let mut catalog = PersistentCatalog::new(store.clone());
2957
2958        let mut overlay = CatalogOverlay::new();
2959
2960        let mut idx_default =
2961            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2962                .with_column_indices(vec![0]);
2963        idx_default.catalog_name = "main".to_string();
2964        idx_default.namespace_name = "default".to_string();
2965        overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2966
2967        let mut idx_analytics =
2968            IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2969                .with_column_indices(vec![0]);
2970        idx_analytics.catalog_name = "main".to_string();
2971        idx_analytics.namespace_name = "analytics".to_string();
2972        overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2973
2974        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2975        let err = catalog.persist_overlay(&mut txn, &overlay).unwrap_err();
2976        assert!(matches!(err, CatalogError::InvalidKey(_)));
2977        txn.rollback_self().unwrap();
2978    }
2979
2980    #[test]
2981    fn overlay_drop_cascade_namespace_removes_children() {
2982        let mut overlay = CatalogOverlay::new();
2983
2984        overlay.add_namespace(NamespaceMeta {
2985            name: "default".to_string(),
2986            catalog_name: "main".to_string(),
2987            comment: None,
2988            storage_root: None,
2989        });
2990
2991        let mut users = test_table("users", 1);
2992        users.catalog_name = "main".to_string();
2993        users.namespace_name = "default".to_string();
2994        overlay.add_table(TableFqn::from(&users), users.clone());
2995
2996        let mut users_index =
2997            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2998                .with_column_indices(vec![0]);
2999        users_index.catalog_name = "main".to_string();
3000        users_index.namespace_name = "default".to_string();
3001        let users_index_fqn = IndexFqn::from(&users_index);
3002        overlay.add_index(users_index_fqn.clone(), users_index);
3003
3004        overlay.drop_cascade_namespace("main", "default");
3005
3006        assert!(
3007            overlay
3008                .dropped_namespaces
3009                .contains(&("main".to_string(), "default".to_string()))
3010        );
3011        assert!(overlay.dropped_tables.contains(&TableFqn::from(&users)));
3012        assert!(overlay.dropped_indexes.contains(&users_index_fqn));
3013        assert!(!overlay.added_tables.contains_key(&TableFqn::from(&users)));
3014        assert!(!overlay.added_indexes.contains_key(&users_index_fqn));
3015    }
3016
3017    #[test]
3018    fn overlay_drop_cascade_catalog_removes_children() {
3019        let mut overlay = CatalogOverlay::new();
3020
3021        overlay.add_catalog(CatalogMeta {
3022            name: "main".to_string(),
3023            comment: None,
3024            storage_root: None,
3025        });
3026        overlay.add_namespace(NamespaceMeta {
3027            name: "default".to_string(),
3028            catalog_name: "main".to_string(),
3029            comment: None,
3030            storage_root: None,
3031        });
3032
3033        let mut users = test_table("users", 1);
3034        users.catalog_name = "main".to_string();
3035        users.namespace_name = "default".to_string();
3036        overlay.add_table(TableFqn::from(&users), users.clone());
3037
3038        let mut users_index =
3039            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3040                .with_column_indices(vec![0]);
3041        users_index.catalog_name = "main".to_string();
3042        users_index.namespace_name = "default".to_string();
3043        let users_index_fqn = IndexFqn::from(&users_index);
3044        overlay.add_index(users_index_fqn.clone(), users_index);
3045
3046        overlay.drop_cascade_catalog("main");
3047
3048        assert!(overlay.dropped_catalogs.contains("main"));
3049        assert!(overlay.dropped_tables.contains(&TableFqn::from(&users)));
3050        assert!(overlay.dropped_indexes.contains(&users_index_fqn));
3051        assert!(!overlay.added_tables.contains_key(&TableFqn::from(&users)));
3052        assert!(!overlay.added_indexes.contains_key(&users_index_fqn));
3053    }
3054
3055    #[test]
3056    fn dropped_namespace_hides_tables_and_indexes() {
3057        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
3058        let mut catalog = PersistentCatalog::new(store);
3059
3060        let mut users = test_table("users", 1);
3061        users.catalog_name = "main".to_string();
3062        users.namespace_name = "default".to_string();
3063        catalog.inner.insert_table_unchecked(users);
3064
3065        let mut users_index =
3066            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3067                .with_column_indices(vec![0]);
3068        users_index.catalog_name = "main".to_string();
3069        users_index.namespace_name = "default".to_string();
3070        catalog.inner.insert_index_unchecked(users_index);
3071
3072        let mut overlay = CatalogOverlay::new();
3073        overlay.drop_namespace("main", "default");
3074
3075        let table_names: Vec<String> = catalog
3076            .list_tables_in_txn("main", "default", &overlay)
3077            .into_iter()
3078            .map(|table| table.name)
3079            .collect();
3080        assert!(table_names.is_empty());
3081
3082        let fqn = TableFqn {
3083            catalog: "main".to_string(),
3084            namespace: "default".to_string(),
3085            table: "users".to_string(),
3086        };
3087        let index_names: Vec<String> = catalog
3088            .list_indexes_in_txn(&fqn, &overlay)
3089            .into_iter()
3090            .map(|index| index.name)
3091            .collect();
3092        assert!(index_names.is_empty());
3093    }
3094
3095    #[test]
3096    fn dropped_namespace_hides_get_and_exists() {
3097        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
3098        let mut catalog = PersistentCatalog::new(store);
3099
3100        let mut users = test_table("users", 1);
3101        users.catalog_name = "main".to_string();
3102        users.namespace_name = "default".to_string();
3103        catalog.inner.insert_table_unchecked(users);
3104
3105        let mut users_index =
3106            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3107                .with_column_indices(vec![0]);
3108        users_index.catalog_name = "main".to_string();
3109        users_index.namespace_name = "default".to_string();
3110        catalog.inner.insert_index_unchecked(users_index);
3111
3112        let mut overlay = CatalogOverlay::new();
3113        overlay.drop_namespace("main", "default");
3114
3115        assert!(!catalog.table_exists_in_txn("users", &overlay));
3116        assert!(catalog.get_table_in_txn("users", &overlay).is_none());
3117        assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
3118        assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
3119
3120        let view = TxnCatalogView::new(&catalog, &overlay);
3121        assert!(view.get_indexes_for_table("users").is_empty());
3122    }
3123
3124    #[test]
3125    fn persisted_catalog_meta_roundtrip() {
3126        let meta = PersistedCatalogMeta {
3127            name: "main".to_string(),
3128            comment: Some("primary catalog".to_string()),
3129            storage_root: Some("/tmp/alopex".to_string()),
3130        };
3131        let bytes = bincode::serialize(&meta).unwrap();
3132        let decoded: PersistedCatalogMeta = bincode::deserialize(&bytes).unwrap();
3133        assert_eq!(meta, decoded);
3134    }
3135
3136    #[test]
3137    fn persisted_namespace_meta_roundtrip() {
3138        let meta = PersistedNamespaceMeta {
3139            name: "analytics".to_string(),
3140            catalog_name: "main".to_string(),
3141            comment: Some("warehouse".to_string()),
3142            storage_root: Some("s3://bucket/ns".to_string()),
3143        };
3144        let bytes = bincode::serialize(&meta).unwrap();
3145        let decoded: PersistedNamespaceMeta = bincode::deserialize(&bytes).unwrap();
3146        assert_eq!(meta, decoded);
3147    }
3148
3149    #[test]
3150    fn table_fqn_hash_and_eq() {
3151        let first = TableFqn {
3152            catalog: "main".to_string(),
3153            namespace: "default".to_string(),
3154            table: "users".to_string(),
3155        };
3156        let same = TableFqn {
3157            catalog: "main".to_string(),
3158            namespace: "default".to_string(),
3159            table: "users".to_string(),
3160        };
3161        let different = TableFqn {
3162            catalog: "main".to_string(),
3163            namespace: "default".to_string(),
3164            table: "orders".to_string(),
3165        };
3166
3167        let mut set = HashSet::new();
3168        set.insert(first);
3169        assert!(set.contains(&same));
3170        assert!(!set.contains(&different));
3171    }
3172
3173    #[test]
3174    fn index_fqn_hash_and_eq() {
3175        let first = IndexFqn {
3176            catalog: "main".to_string(),
3177            namespace: "default".to_string(),
3178            table: "users".to_string(),
3179            index: "idx_users_id".to_string(),
3180        };
3181        let same = IndexFqn {
3182            catalog: "main".to_string(),
3183            namespace: "default".to_string(),
3184            table: "users".to_string(),
3185            index: "idx_users_id".to_string(),
3186        };
3187        let different = IndexFqn {
3188            catalog: "main".to_string(),
3189            namespace: "default".to_string(),
3190            table: "users".to_string(),
3191            index: "idx_users_email".to_string(),
3192        };
3193
3194        let mut set = HashSet::new();
3195        set.insert(first);
3196        assert!(set.contains(&same));
3197        assert!(!set.contains(&different));
3198    }
3199
3200    #[test]
3201    fn table_type_and_data_source_format_serde() {
3202        let managed = serde_json::to_string(&TableType::Managed).unwrap();
3203        let external = serde_json::to_string(&TableType::External).unwrap();
3204        let alopex = serde_json::to_string(&DataSourceFormat::Alopex).unwrap();
3205        let parquet = serde_json::to_string(&DataSourceFormat::Parquet).unwrap();
3206        let delta = serde_json::to_string(&DataSourceFormat::Delta).unwrap();
3207
3208        assert_eq!(managed, "\"MANAGED\"");
3209        assert_eq!(external, "\"EXTERNAL\"");
3210        assert_eq!(alopex, "\"ALOPEX\"");
3211        assert_eq!(parquet, "\"PARQUET\"");
3212        assert_eq!(delta, "\"DELTA\"");
3213    }
3214}