alopex_sql/catalog/
persistent.rs

1//! 永続化対応カタログ実装。
2//!
3//! 既存の `TableMetadata` / `IndexMetadata` は `Expr` を含むため、そのままシリアライズして
4//! 永続化することができない。そこで、本モジュールでは永続化用 DTO を定義し、KV ストアへ
5//! bincode で保存する。
6//!
7//! 注意: 現状は `ColumnMetadata.default`(DEFAULT 式)を永続化しない。復元時は `None` となる。
8
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11
12use alopex_core::kv::{KVStore, KVTransaction};
13use alopex_core::types::TxnMode;
14use serde::{Deserialize, Serialize};
15use thiserror::Error;
16
17use crate::ast::ddl::{IndexMethod, VectorMetric};
18use crate::catalog::{
19    Catalog, ColumnMetadata, Compression, IndexMetadata, MemoryCatalog, RowIdMode,
20};
21use crate::catalog::{StorageOptions, StorageType, TableMetadata};
22use crate::planner::PlannerError;
23use crate::planner::types::ResolvedType;
24
25/// カタログ用キープレフィックス。
26pub const CATALOG_PREFIX: &[u8] = b"__catalog__/";
27pub const CATALOGS_PREFIX: &[u8] = b"__catalog__/catalogs/";
28pub const NAMESPACES_PREFIX: &[u8] = b"__catalog__/namespaces/";
29pub const TABLES_PREFIX: &[u8] = b"__catalog__/tables/";
30pub const INDEXES_PREFIX: &[u8] = b"__catalog__/indexes/";
31pub const META_KEY: &[u8] = b"__catalog__/meta";
32
33const CATALOG_VERSION: u32 = 2;
34
35#[derive(Debug, Error)]
36pub enum CatalogError {
37    #[error("kv error: {0}")]
38    Kv(#[from] alopex_core::Error),
39
40    #[error("serialize error: {0}")]
41    Serialize(#[from] bincode::Error),
42
43    #[error("invalid catalog key: {0}")]
44    InvalidKey(String),
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
48struct CatalogState {
49    version: u32,
50    table_id_counter: u32,
51    index_id_counter: u32,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct PersistedCatalogMeta {
56    pub name: String,
57    pub comment: Option<String>,
58    pub storage_root: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62pub struct PersistedNamespaceMeta {
63    pub name: String,
64    pub catalog_name: String,
65    pub comment: Option<String>,
66    pub storage_root: Option<String>,
67}
68
69pub type CatalogMeta = PersistedCatalogMeta;
70pub type NamespaceMeta = PersistedNamespaceMeta;
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
73pub struct TableFqn {
74    pub catalog: String,
75    pub namespace: String,
76    pub table: String,
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct IndexFqn {
81    pub catalog: String,
82    pub namespace: String,
83    pub table: String,
84    pub index: String,
85}
86
87impl TableFqn {
88    pub fn new(catalog: &str, namespace: &str, table: &str) -> Self {
89        Self {
90            catalog: catalog.to_string(),
91            namespace: namespace.to_string(),
92            table: table.to_string(),
93        }
94    }
95}
96
97impl IndexFqn {
98    pub fn new(catalog: &str, namespace: &str, table: &str, index: &str) -> Self {
99        Self {
100            catalog: catalog.to_string(),
101            namespace: namespace.to_string(),
102            table: table.to_string(),
103            index: index.to_string(),
104        }
105    }
106}
107
108impl From<&TableMetadata> for TableFqn {
109    fn from(value: &TableMetadata) -> Self {
110        Self::new(&value.catalog_name, &value.namespace_name, &value.name)
111    }
112}
113
114impl From<&IndexMetadata> for IndexFqn {
115    fn from(value: &IndexMetadata) -> Self {
116        Self::new(
117            &value.catalog_name,
118            &value.namespace_name,
119            &value.table,
120            &value.name,
121        )
122    }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
127pub enum TableType {
128    Managed,
129    External,
130}
131
132#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
133#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
134pub enum DataSourceFormat {
135    #[default]
136    Alopex,
137    Parquet,
138    Delta,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
142pub enum PersistedVectorMetric {
143    Cosine,
144    L2,
145    Inner,
146}
147
148impl From<VectorMetric> for PersistedVectorMetric {
149    fn from(value: VectorMetric) -> Self {
150        match value {
151            VectorMetric::Cosine => Self::Cosine,
152            VectorMetric::L2 => Self::L2,
153            VectorMetric::Inner => Self::Inner,
154        }
155    }
156}
157
158impl From<PersistedVectorMetric> for VectorMetric {
159    fn from(value: PersistedVectorMetric) -> Self {
160        match value {
161            PersistedVectorMetric::Cosine => Self::Cosine,
162            PersistedVectorMetric::L2 => Self::L2,
163            PersistedVectorMetric::Inner => Self::Inner,
164        }
165    }
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
169pub enum PersistedType {
170    Integer,
171    BigInt,
172    Float,
173    Double,
174    Text,
175    Blob,
176    Boolean,
177    Timestamp,
178    Vector {
179        dimension: u32,
180        metric: PersistedVectorMetric,
181    },
182    Null,
183}
184
185impl From<ResolvedType> for PersistedType {
186    fn from(value: ResolvedType) -> Self {
187        match value {
188            ResolvedType::Integer => Self::Integer,
189            ResolvedType::BigInt => Self::BigInt,
190            ResolvedType::Float => Self::Float,
191            ResolvedType::Double => Self::Double,
192            ResolvedType::Text => Self::Text,
193            ResolvedType::Blob => Self::Blob,
194            ResolvedType::Boolean => Self::Boolean,
195            ResolvedType::Timestamp => Self::Timestamp,
196            ResolvedType::Vector { dimension, metric } => Self::Vector {
197                dimension,
198                metric: metric.into(),
199            },
200            ResolvedType::Null => Self::Null,
201        }
202    }
203}
204
205impl From<PersistedType> for ResolvedType {
206    fn from(value: PersistedType) -> Self {
207        match value {
208            PersistedType::Integer => Self::Integer,
209            PersistedType::BigInt => Self::BigInt,
210            PersistedType::Float => Self::Float,
211            PersistedType::Double => Self::Double,
212            PersistedType::Text => Self::Text,
213            PersistedType::Blob => Self::Blob,
214            PersistedType::Boolean => Self::Boolean,
215            PersistedType::Timestamp => Self::Timestamp,
216            PersistedType::Vector { dimension, metric } => Self::Vector {
217                dimension,
218                metric: metric.into(),
219            },
220            PersistedType::Null => Self::Null,
221        }
222    }
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
226pub enum PersistedIndexType {
227    BTree,
228    Hnsw,
229}
230
231impl From<PersistedIndexType> for IndexMethod {
232    fn from(value: PersistedIndexType) -> Self {
233        match value {
234            PersistedIndexType::BTree => IndexMethod::BTree,
235            PersistedIndexType::Hnsw => IndexMethod::Hnsw,
236        }
237    }
238}
239
240impl TryFrom<IndexMethod> for PersistedIndexType {
241    type Error = ();
242
243    fn try_from(value: IndexMethod) -> Result<Self, Self::Error> {
244        match value {
245            IndexMethod::BTree => Ok(Self::BTree),
246            IndexMethod::Hnsw => Ok(Self::Hnsw),
247        }
248    }
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
252pub enum PersistedStorageType {
253    Row,
254    Columnar,
255}
256
257impl From<PersistedStorageType> for StorageType {
258    fn from(value: PersistedStorageType) -> Self {
259        match value {
260            PersistedStorageType::Row => Self::Row,
261            PersistedStorageType::Columnar => Self::Columnar,
262        }
263    }
264}
265
266impl From<StorageType> for PersistedStorageType {
267    fn from(value: StorageType) -> Self {
268        match value {
269            StorageType::Row => Self::Row,
270            StorageType::Columnar => Self::Columnar,
271        }
272    }
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
276pub enum PersistedCompression {
277    None,
278    Lz4,
279    Zstd,
280}
281
282impl From<PersistedCompression> for Compression {
283    fn from(value: PersistedCompression) -> Self {
284        match value {
285            PersistedCompression::None => Self::None,
286            PersistedCompression::Lz4 => Self::Lz4,
287            PersistedCompression::Zstd => Self::Zstd,
288        }
289    }
290}
291
292impl From<Compression> for PersistedCompression {
293    fn from(value: Compression) -> Self {
294        match value {
295            Compression::None => Self::None,
296            Compression::Lz4 => Self::Lz4,
297            Compression::Zstd => Self::Zstd,
298        }
299    }
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
303pub enum PersistedRowIdMode {
304    None,
305    Direct,
306}
307
308impl From<PersistedRowIdMode> for RowIdMode {
309    fn from(value: PersistedRowIdMode) -> Self {
310        match value {
311            PersistedRowIdMode::None => Self::None,
312            PersistedRowIdMode::Direct => Self::Direct,
313        }
314    }
315}
316
317impl From<RowIdMode> for PersistedRowIdMode {
318    fn from(value: RowIdMode) -> Self {
319        match value {
320            RowIdMode::None => Self::None,
321            RowIdMode::Direct => Self::Direct,
322        }
323    }
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
327pub struct PersistedStorageOptions {
328    pub storage_type: PersistedStorageType,
329    pub compression: PersistedCompression,
330    pub row_group_size: u32,
331    pub row_id_mode: PersistedRowIdMode,
332}
333
334impl From<StorageOptions> for PersistedStorageOptions {
335    fn from(value: StorageOptions) -> Self {
336        Self {
337            storage_type: value.storage_type.into(),
338            compression: value.compression.into(),
339            row_group_size: value.row_group_size,
340            row_id_mode: value.row_id_mode.into(),
341        }
342    }
343}
344
345impl From<PersistedStorageOptions> for StorageOptions {
346    fn from(value: PersistedStorageOptions) -> Self {
347        Self {
348            storage_type: value.storage_type.into(),
349            compression: value.compression.into(),
350            row_group_size: value.row_group_size,
351            row_id_mode: value.row_id_mode.into(),
352        }
353    }
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
357pub struct PersistedColumnMeta {
358    pub name: String,
359    pub data_type: PersistedType,
360    pub not_null: bool,
361    pub primary_key: bool,
362    pub unique: bool,
363}
364
365impl From<&ColumnMetadata> for PersistedColumnMeta {
366    fn from(value: &ColumnMetadata) -> Self {
367        Self {
368            name: value.name.clone(),
369            data_type: value.data_type.clone().into(),
370            not_null: value.not_null,
371            primary_key: value.primary_key,
372            unique: value.unique,
373        }
374    }
375}
376
377impl From<PersistedColumnMeta> for ColumnMetadata {
378    fn from(value: PersistedColumnMeta) -> Self {
379        ColumnMetadata::new(value.name, value.data_type.into())
380            .with_not_null(value.not_null)
381            .with_primary_key(value.primary_key)
382            .with_unique(value.unique)
383    }
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
387pub struct PersistedTableMeta {
388    pub table_id: u32,
389    pub name: String,
390    pub catalog_name: String,
391    pub namespace_name: String,
392    pub table_type: TableType,
393    pub data_source_format: DataSourceFormat,
394    pub columns: Vec<PersistedColumnMeta>,
395    pub primary_key: Option<Vec<String>>,
396    pub storage_options: PersistedStorageOptions,
397    pub storage_location: Option<String>,
398    pub comment: Option<String>,
399    pub properties: HashMap<String, String>,
400}
401
402#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
403struct PersistedTableMetaV1 {
404    table_id: u32,
405    name: String,
406    columns: Vec<PersistedColumnMeta>,
407    primary_key: Option<Vec<String>>,
408    storage_options: PersistedStorageOptions,
409}
410
411impl From<&TableMetadata> for PersistedTableMeta {
412    fn from(value: &TableMetadata) -> Self {
413        Self {
414            table_id: value.table_id,
415            name: value.name.clone(),
416            catalog_name: value.catalog_name.clone(),
417            namespace_name: value.namespace_name.clone(),
418            table_type: value.table_type,
419            data_source_format: value.data_source_format,
420            columns: value
421                .columns
422                .iter()
423                .map(PersistedColumnMeta::from)
424                .collect(),
425            primary_key: value.primary_key.clone(),
426            storage_options: value.storage_options.clone().into(),
427            storage_location: value.storage_location.clone(),
428            comment: value.comment.clone(),
429            properties: value.properties.clone(),
430        }
431    }
432}
433
434impl From<PersistedTableMeta> for TableMetadata {
435    fn from(value: PersistedTableMeta) -> Self {
436        let mut table = TableMetadata::new(
437            value.name,
438            value
439                .columns
440                .into_iter()
441                .map(ColumnMetadata::from)
442                .collect(),
443        )
444        .with_table_id(value.table_id);
445        table.primary_key = value.primary_key;
446        table.storage_options = value.storage_options.into();
447        table.catalog_name = value.catalog_name;
448        table.namespace_name = value.namespace_name;
449        table.table_type = value.table_type;
450        table.data_source_format = value.data_source_format;
451        table.storage_location = value.storage_location;
452        table.comment = value.comment;
453        table.properties = value.properties;
454        table
455    }
456}
457
458#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
459pub struct PersistedIndexMeta {
460    pub index_id: u32,
461    pub name: String,
462    pub table: String,
463    pub columns: Vec<String>,
464    pub column_indices: Vec<usize>,
465    pub unique: bool,
466    pub method: Option<PersistedIndexType>,
467    pub options: Vec<(String, String)>,
468    pub catalog_name: String,
469    pub namespace_name: String,
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
473struct PersistedIndexMetaV1 {
474    index_id: u32,
475    name: String,
476    table: String,
477    columns: Vec<String>,
478    column_indices: Vec<usize>,
479    unique: bool,
480    method: Option<PersistedIndexType>,
481    options: Vec<(String, String)>,
482}
483
484impl From<&IndexMetadata> for PersistedIndexMeta {
485    fn from(value: &IndexMetadata) -> Self {
486        Self {
487            index_id: value.index_id,
488            name: value.name.clone(),
489            table: value.table.clone(),
490            columns: value.columns.clone(),
491            column_indices: value.column_indices.clone(),
492            unique: value.unique,
493            method: value
494                .method
495                .and_then(|m| PersistedIndexType::try_from(m).ok()),
496            options: value.options.clone(),
497            catalog_name: value.catalog_name.clone(),
498            namespace_name: value.namespace_name.clone(),
499        }
500    }
501}
502
503impl From<PersistedIndexMeta> for IndexMetadata {
504    fn from(value: PersistedIndexMeta) -> Self {
505        let mut index = IndexMetadata::new(value.index_id, value.name, value.table, value.columns)
506            .with_column_indices(value.column_indices)
507            .with_unique(value.unique)
508            .with_options(value.options);
509        index.catalog_name = value.catalog_name;
510        index.namespace_name = value.namespace_name;
511        if let Some(method) = value.method {
512            index = index.with_method(method.into());
513        }
514        index
515    }
516}
517
518fn deserialize_table_meta(bytes: &[u8]) -> Result<PersistedTableMeta, CatalogError> {
519    match bincode::deserialize::<PersistedTableMeta>(bytes) {
520        Ok(meta) => Ok(meta),
521        Err(err) => {
522            let is_legacy = matches!(
523                err.as_ref(),
524                bincode::ErrorKind::Io(io)
525                    if io.kind() == std::io::ErrorKind::UnexpectedEof
526            );
527            if !is_legacy {
528                return Err(err.into());
529            }
530            let legacy: PersistedTableMetaV1 = bincode::deserialize(bytes)?;
531            Ok(PersistedTableMeta {
532                table_id: legacy.table_id,
533                name: legacy.name,
534                catalog_name: "default".to_string(),
535                namespace_name: "default".to_string(),
536                table_type: TableType::Managed,
537                data_source_format: DataSourceFormat::Alopex,
538                columns: legacy.columns,
539                primary_key: legacy.primary_key,
540                storage_options: legacy.storage_options,
541                storage_location: None,
542                comment: None,
543                properties: HashMap::new(),
544            })
545        }
546    }
547}
548
549fn deserialize_index_meta(bytes: &[u8]) -> Result<PersistedIndexMeta, CatalogError> {
550    match bincode::deserialize::<PersistedIndexMeta>(bytes) {
551        Ok(meta) => Ok(meta),
552        Err(err) => {
553            let is_legacy = matches!(
554                err.as_ref(),
555                bincode::ErrorKind::Io(io)
556                    if io.kind() == std::io::ErrorKind::UnexpectedEof
557            );
558            if !is_legacy {
559                return Err(err.into());
560            }
561            let legacy: PersistedIndexMetaV1 = bincode::deserialize(bytes)?;
562            Ok(PersistedIndexMeta {
563                index_id: legacy.index_id,
564                name: legacy.name,
565                table: legacy.table,
566                columns: legacy.columns,
567                column_indices: legacy.column_indices,
568                unique: legacy.unique,
569                method: legacy.method,
570                options: legacy.options,
571                catalog_name: "default".to_string(),
572                namespace_name: "default".to_string(),
573            })
574        }
575    }
576}
577
578fn table_key(catalog_name: &str, namespace_name: &str, table_name: &str) -> Vec<u8> {
579    let mut key = TABLES_PREFIX.to_vec();
580    key.extend_from_slice(catalog_name.as_bytes());
581    key.push(b'/');
582    key.extend_from_slice(namespace_name.as_bytes());
583    key.push(b'/');
584    key.extend_from_slice(table_name.as_bytes());
585    key
586}
587
588fn catalog_key(name: &str) -> Vec<u8> {
589    let mut key = CATALOGS_PREFIX.to_vec();
590    key.extend_from_slice(name.as_bytes());
591    key
592}
593
594fn namespace_key(catalog_name: &str, namespace_name: &str) -> Vec<u8> {
595    let mut key = NAMESPACES_PREFIX.to_vec();
596    key.extend_from_slice(catalog_name.as_bytes());
597    key.push(b'/');
598    key.extend_from_slice(namespace_name.as_bytes());
599    key
600}
601
602fn index_key(
603    catalog_name: &str,
604    namespace_name: &str,
605    table_name: &str,
606    index_name: &str,
607) -> Vec<u8> {
608    let mut key = INDEXES_PREFIX.to_vec();
609    key.extend_from_slice(catalog_name.as_bytes());
610    key.push(b'/');
611    key.extend_from_slice(namespace_name.as_bytes());
612    key.push(b'/');
613    key.extend_from_slice(table_name.as_bytes());
614    key.push(b'/');
615    key.extend_from_slice(index_name.as_bytes());
616    key
617}
618
619fn index_prefix(catalog_name: &str, namespace_name: &str, table_name: &str) -> Vec<u8> {
620    let mut key = INDEXES_PREFIX.to_vec();
621    key.extend_from_slice(catalog_name.as_bytes());
622    key.push(b'/');
623    key.extend_from_slice(namespace_name.as_bytes());
624    key.push(b'/');
625    key.extend_from_slice(table_name.as_bytes());
626    key.push(b'/');
627    key
628}
629
630fn key_suffix(prefix: &[u8], key: &[u8]) -> Result<String, CatalogError> {
631    let suffix = key
632        .strip_prefix(prefix)
633        .ok_or_else(|| CatalogError::InvalidKey(format!("{key:?}")))?;
634    std::str::from_utf8(suffix)
635        .map(|s| s.to_string())
636        .map_err(|_| CatalogError::InvalidKey(format!("{key:?}")))
637}
638
639fn parse_table_key_suffix(suffix: &str) -> Result<TableFqn, CatalogError> {
640    let mut parts = suffix.splitn(3, '/');
641    let catalog = parts
642        .next()
643        .filter(|part| !part.is_empty())
644        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
645    let namespace = parts
646        .next()
647        .filter(|part| !part.is_empty())
648        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
649    let table = parts
650        .next()
651        .filter(|part| !part.is_empty())
652        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
653    Ok(TableFqn::new(catalog, namespace, table))
654}
655
656fn parse_index_key_suffix(suffix: &str) -> Result<IndexFqn, CatalogError> {
657    let mut parts = suffix.splitn(4, '/');
658    let catalog = parts
659        .next()
660        .filter(|part| !part.is_empty())
661        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
662    let namespace = parts
663        .next()
664        .filter(|part| !part.is_empty())
665        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
666    let table = parts
667        .next()
668        .filter(|part| !part.is_empty())
669        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
670    let index = parts
671        .next()
672        .filter(|part| !part.is_empty())
673        .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
674    Ok(IndexFqn::new(catalog, namespace, table, index))
675}
676
677#[derive(Debug, Clone, Default)]
678pub struct CatalogOverlay {
679    added_catalogs: HashMap<String, CatalogMeta>,
680    dropped_catalogs: HashSet<String>,
681    added_namespaces: HashMap<(String, String), NamespaceMeta>,
682    dropped_namespaces: HashSet<(String, String)>,
683    added_tables: HashMap<TableFqn, TableMetadata>,
684    dropped_tables: HashSet<TableFqn>,
685    added_indexes: HashMap<IndexFqn, IndexMetadata>,
686    dropped_indexes: HashSet<IndexFqn>,
687}
688
689impl CatalogOverlay {
690    pub fn new() -> Self {
691        Self::default()
692    }
693
694    pub fn add_catalog(&mut self, meta: CatalogMeta) {
695        self.dropped_catalogs.remove(&meta.name);
696        self.added_catalogs.insert(meta.name.clone(), meta);
697    }
698
699    pub fn drop_catalog(&mut self, name: &str) {
700        self.added_catalogs.remove(name);
701        self.dropped_catalogs.insert(name.to_string());
702    }
703
704    pub fn add_namespace(&mut self, meta: NamespaceMeta) {
705        let key = (meta.catalog_name.clone(), meta.name.clone());
706        self.dropped_namespaces.remove(&key);
707        self.added_namespaces.insert(key, meta);
708    }
709
710    pub fn drop_namespace(&mut self, catalog_name: &str, namespace_name: &str) {
711        let key = (catalog_name.to_string(), namespace_name.to_string());
712        self.added_namespaces.remove(&key);
713        self.dropped_namespaces.insert(key);
714    }
715
716    pub fn add_table(&mut self, fqn: TableFqn, table: TableMetadata) {
717        self.dropped_tables.remove(&fqn);
718        self.added_tables.insert(fqn, table);
719    }
720
721    pub fn drop_table(&mut self, fqn: &TableFqn) {
722        self.added_tables.remove(fqn);
723        self.dropped_tables.insert(fqn.clone());
724        self.added_indexes.retain(|key, _| {
725            key.catalog != fqn.catalog || key.namespace != fqn.namespace || key.table != fqn.table
726        });
727    }
728
729    pub fn add_index(&mut self, fqn: IndexFqn, index: IndexMetadata) {
730        self.dropped_indexes.remove(&fqn);
731        self.added_indexes.insert(fqn, index);
732    }
733
734    pub fn drop_index(&mut self, fqn: &IndexFqn) {
735        self.added_indexes.remove(fqn);
736        self.dropped_indexes.insert(fqn.clone());
737    }
738
739    pub fn drop_cascade_catalog(&mut self, catalog: &str) {
740        self.drop_catalog(catalog);
741
742        let namespace_keys: Vec<(String, String)> = self
743            .added_namespaces
744            .keys()
745            .filter(|(cat, _)| cat == catalog)
746            .cloned()
747            .collect();
748        for (catalog_name, namespace_name) in namespace_keys {
749            self.drop_namespace(&catalog_name, &namespace_name);
750        }
751
752        let index_keys: Vec<IndexFqn> = self
753            .added_indexes
754            .keys()
755            .filter(|fqn| fqn.catalog == catalog)
756            .cloned()
757            .collect();
758        for fqn in index_keys {
759            self.drop_index(&fqn);
760        }
761
762        let table_keys: Vec<TableFqn> = self
763            .added_tables
764            .keys()
765            .filter(|fqn| fqn.catalog == catalog)
766            .cloned()
767            .collect();
768        for fqn in table_keys {
769            self.drop_table(&fqn);
770        }
771    }
772
773    pub fn drop_cascade_namespace(&mut self, catalog: &str, namespace: &str) {
774        self.drop_namespace(catalog, namespace);
775
776        let index_keys: Vec<IndexFqn> = self
777            .added_indexes
778            .keys()
779            .filter(|fqn| fqn.catalog == catalog && fqn.namespace == namespace)
780            .cloned()
781            .collect();
782        for fqn in index_keys {
783            self.drop_index(&fqn);
784        }
785
786        let table_keys: Vec<TableFqn> = self
787            .added_tables
788            .keys()
789            .filter(|fqn| fqn.catalog == catalog && fqn.namespace == namespace)
790            .cloned()
791            .collect();
792        for fqn in table_keys {
793            self.drop_table(&fqn);
794        }
795    }
796}
797
798/// トランザクション内(オーバーレイ込み)で参照するための Catalog ビュー。
799///
800/// DML/SELECT の実行や Planner の参照用途に使う。書き込み系 API は利用しない前提のため、
801/// `Catalog` trait の書き込みメソッドは `unreachable!()` とする。
802pub struct TxnCatalogView<'a, S: KVStore> {
803    catalog: &'a PersistentCatalog<S>,
804    overlay: &'a CatalogOverlay,
805}
806
807impl<'a, S: KVStore> TxnCatalogView<'a, S> {
808    pub fn new(catalog: &'a PersistentCatalog<S>, overlay: &'a CatalogOverlay) -> Self {
809        Self { catalog, overlay }
810    }
811}
812
813impl<'a, S: KVStore> Catalog for TxnCatalogView<'a, S> {
814    fn create_table(&mut self, _table: TableMetadata) -> Result<(), PlannerError> {
815        unreachable!("TxnCatalogView は参照専用です")
816    }
817
818    fn get_table(&self, name: &str) -> Option<&TableMetadata> {
819        self.catalog.get_table_in_txn(name, self.overlay)
820    }
821
822    fn drop_table(&mut self, _name: &str) -> Result<(), PlannerError> {
823        unreachable!("TxnCatalogView は参照専用です")
824    }
825
826    fn create_index(&mut self, _index: IndexMetadata) -> Result<(), PlannerError> {
827        unreachable!("TxnCatalogView は参照専用です")
828    }
829
830    fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
831        self.catalog.get_index_in_txn(name, self.overlay)
832    }
833
834    fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
835        let Some(table_meta) = self.catalog.get_table_in_txn(table, self.overlay) else {
836            return Vec::new();
837        };
838
839        let mut indexes: Vec<&IndexMetadata> = self
840            .catalog
841            .inner
842            .get_indexes_for_table(table)
843            .into_iter()
844            .filter(|idx| {
845                idx.catalog_name == table_meta.catalog_name
846                    && idx.namespace_name == table_meta.namespace_name
847                    && !self.catalog.index_hidden_by_overlay(idx, self.overlay)
848            })
849            .collect();
850
851        for idx in self.overlay.added_indexes.values() {
852            if idx.table == table
853                && idx.catalog_name == table_meta.catalog_name
854                && idx.namespace_name == table_meta.namespace_name
855                && !self.catalog.index_hidden_by_overlay(idx, self.overlay)
856            {
857                indexes.push(idx);
858            }
859        }
860
861        indexes
862    }
863
864    fn drop_index(&mut self, _name: &str) -> Result<(), PlannerError> {
865        unreachable!("TxnCatalogView は参照専用です")
866    }
867
868    fn table_exists(&self, name: &str) -> bool {
869        self.catalog.table_exists_in_txn(name, self.overlay)
870    }
871
872    fn index_exists(&self, name: &str) -> bool {
873        self.catalog.index_exists_in_txn(name, self.overlay)
874    }
875
876    fn next_table_id(&mut self) -> u32 {
877        unreachable!("TxnCatalogView は参照専用です")
878    }
879
880    fn next_index_id(&mut self) -> u32 {
881        unreachable!("TxnCatalogView は参照専用です")
882    }
883}
884
885/// 永続カタログ実装。
886#[derive(Debug)]
887/// 永続化対応のカタログ実装。
888///
889/// # Examples
890///
891/// ```
892/// use std::sync::Arc;
893/// use alopex_core::kv::memory::MemoryKV;
894/// use alopex_sql::Catalog;
895/// use alopex_sql::catalog::PersistentCatalog;
896///
897/// let store = Arc::new(MemoryKV::new());
898/// let catalog = PersistentCatalog::new(store);
899/// assert!(catalog.table_exists("users") == false);
900/// ```
901pub struct PersistentCatalog<S: KVStore> {
902    inner: MemoryCatalog,
903    store: Arc<S>,
904    catalogs: HashMap<String, CatalogMeta>,
905    namespaces: HashMap<(String, String), NamespaceMeta>,
906}
907
908impl<S: KVStore> PersistentCatalog<S> {
909    pub fn load(store: Arc<S>) -> Result<Self, CatalogError> {
910        let mut txn = store.begin(TxnMode::ReadOnly)?;
911        let meta_key = META_KEY.to_vec();
912        let mut meta_state: Option<CatalogState> = None;
913
914        if let Some(meta_bytes) = txn.get(&meta_key)? {
915            let meta: CatalogState = bincode::deserialize(&meta_bytes)?;
916            if meta.version > CATALOG_VERSION {
917                return Err(CatalogError::InvalidKey(format!(
918                    "unsupported catalog version: {}",
919                    meta.version
920                )));
921            }
922            meta_state = Some(meta);
923        }
924
925        let mut needs_migration = meta_state
926            .as_ref()
927            .is_some_and(|meta| meta.version < CATALOG_VERSION);
928        if !needs_migration && meta_state.is_none() {
929            for (key, _) in txn.scan_prefix(TABLES_PREFIX)? {
930                let suffix = key_suffix(TABLES_PREFIX, &key)?;
931                if !suffix.contains('/') {
932                    needs_migration = true;
933                    break;
934                }
935            }
936            if !needs_migration {
937                for (key, _) in txn.scan_prefix(INDEXES_PREFIX)? {
938                    let suffix = key_suffix(INDEXES_PREFIX, &key)?;
939                    if !suffix.contains('/') {
940                        needs_migration = true;
941                        break;
942                    }
943                }
944            }
945        }
946
947        if needs_migration {
948            txn.rollback_self()?;
949            Self::migrate_v1_to_v2(&store)?;
950            return Self::load(store);
951        }
952
953        let mut inner = MemoryCatalog::new();
954        let mut catalogs = HashMap::new();
955        let mut namespaces = HashMap::new();
956
957        let mut max_table_id = 0u32;
958        let mut max_index_id = 0u32;
959
960        for (key, value) in txn.scan_prefix(CATALOGS_PREFIX)? {
961            let catalog_name = key_suffix(CATALOGS_PREFIX, &key)?;
962            let mut meta: CatalogMeta = bincode::deserialize(&value)?;
963            if meta.name != catalog_name {
964                meta.name = catalog_name.clone();
965            }
966            catalogs.insert(catalog_name, meta);
967        }
968
969        for (key, value) in txn.scan_prefix(NAMESPACES_PREFIX)? {
970            let suffix = key_suffix(NAMESPACES_PREFIX, &key)?;
971            let mut parts = suffix.splitn(2, '/');
972            let catalog_name = parts
973                .next()
974                .filter(|part| !part.is_empty())
975                .ok_or_else(|| CatalogError::InvalidKey(suffix.clone()))?;
976            let namespace_name = parts
977                .next()
978                .filter(|part| !part.is_empty())
979                .ok_or_else(|| CatalogError::InvalidKey(suffix.clone()))?;
980            let mut meta: NamespaceMeta = bincode::deserialize(&value)?;
981            if meta.catalog_name != catalog_name {
982                meta.catalog_name = catalog_name.to_string();
983            }
984            if meta.name != namespace_name {
985                meta.name = namespace_name.to_string();
986            }
987            namespaces.insert((meta.catalog_name.clone(), meta.name.clone()), meta);
988        }
989
990        // テーブルをロード(まずテーブルを入れてからインデックスを入れる)
991        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
992            let suffix = key_suffix(TABLES_PREFIX, &key)?;
993            let fqn = parse_table_key_suffix(&suffix)?;
994            let mut persisted = deserialize_table_meta(&value)?;
995            if persisted.catalog_name != fqn.catalog {
996                persisted.catalog_name = fqn.catalog.clone();
997            }
998            if persisted.namespace_name != fqn.namespace {
999                persisted.namespace_name = fqn.namespace.clone();
1000            }
1001            if persisted.name != fqn.table {
1002                persisted.name = fqn.table.clone();
1003            }
1004            max_table_id = max_table_id.max(persisted.table_id);
1005            let table: TableMetadata = persisted.into();
1006            inner.insert_table_unchecked(table);
1007        }
1008
1009        for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1010            let suffix = key_suffix(INDEXES_PREFIX, &key)?;
1011            let fqn = parse_index_key_suffix(&suffix)?;
1012            let mut persisted = deserialize_index_meta(&value)?;
1013            if persisted.catalog_name != fqn.catalog {
1014                persisted.catalog_name = fqn.catalog.clone();
1015            }
1016            if persisted.namespace_name != fqn.namespace {
1017                persisted.namespace_name = fqn.namespace.clone();
1018            }
1019            if persisted.table != fqn.table {
1020                persisted.table = fqn.table.clone();
1021            }
1022            if persisted.name != fqn.index {
1023                persisted.name = fqn.index.clone();
1024            }
1025            max_index_id = max_index_id.max(persisted.index_id);
1026            let mut index: IndexMetadata = persisted.into();
1027            // 参照先テーブルがない場合はスキップ(破損対策)
1028            if let Some(table) = inner.get_table(&index.table) {
1029                if index.catalog_name != table.catalog_name
1030                    || index.namespace_name != table.namespace_name
1031                {
1032                    index.catalog_name = table.catalog_name.clone();
1033                    index.namespace_name = table.namespace_name.clone();
1034                }
1035                inner.insert_index_unchecked(index);
1036            }
1037        }
1038
1039        let (mut table_id_counter, mut index_id_counter) = (max_table_id, max_index_id);
1040        if let Some(meta) = meta_state
1041            .as_ref()
1042            .filter(|meta| meta.version == CATALOG_VERSION)
1043        {
1044            table_id_counter = table_id_counter.max(meta.table_id_counter);
1045            index_id_counter = index_id_counter.max(meta.index_id_counter);
1046        }
1047        inner.set_counters(table_id_counter, index_id_counter);
1048
1049        txn.rollback_self()?;
1050
1051        Ok(Self {
1052            inner,
1053            store,
1054            catalogs,
1055            namespaces,
1056        })
1057    }
1058
1059    fn migrate_v1_to_v2(store: &Arc<S>) -> Result<(), CatalogError> {
1060        let mut txn = store.begin(TxnMode::ReadWrite)?;
1061
1062        if txn.get(&catalog_key("default"))?.is_none() {
1063            let meta = CatalogMeta {
1064                name: "default".to_string(),
1065                comment: None,
1066                storage_root: None,
1067            };
1068            let value = bincode::serialize(&meta)?;
1069            txn.put(catalog_key("default"), value)?;
1070        }
1071
1072        if txn.get(&namespace_key("default", "default"))?.is_none() {
1073            let meta = NamespaceMeta {
1074                name: "default".to_string(),
1075                catalog_name: "default".to_string(),
1076                comment: None,
1077                storage_root: None,
1078            };
1079            let value = bincode::serialize(&meta)?;
1080            txn.put(namespace_key("default", "default"), value)?;
1081        }
1082
1083        let mut table_updates = Vec::new();
1084        let mut table_keys_to_delete = Vec::new();
1085        let mut max_table_id = 0u32;
1086        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1087            let suffix = key_suffix(TABLES_PREFIX, &key)?;
1088            if suffix.contains('/') {
1089                continue;
1090            }
1091            let mut persisted = deserialize_table_meta(&value)?;
1092            if persisted.catalog_name.is_empty() {
1093                persisted.catalog_name = "default".to_string();
1094            }
1095            if persisted.namespace_name.is_empty() {
1096                persisted.namespace_name = "default".to_string();
1097            }
1098            persisted.table_type = TableType::Managed;
1099            persisted.data_source_format = DataSourceFormat::Alopex;
1100            max_table_id = max_table_id.max(persisted.table_id);
1101
1102            let new_key = table_key(
1103                &persisted.catalog_name,
1104                &persisted.namespace_name,
1105                &persisted.name,
1106            );
1107            let bytes = bincode::serialize(&persisted)?;
1108            table_updates.push((new_key, bytes));
1109            table_keys_to_delete.push(key);
1110        }
1111
1112        for (key, value) in table_updates {
1113            txn.put(key, value)?;
1114        }
1115        for key in table_keys_to_delete {
1116            txn.delete(key)?;
1117        }
1118
1119        let mut index_updates = Vec::new();
1120        let mut index_keys_to_delete = Vec::new();
1121        let mut max_index_id = 0u32;
1122        for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1123            let suffix = key_suffix(INDEXES_PREFIX, &key)?;
1124            if suffix.contains('/') {
1125                continue;
1126            }
1127            let mut persisted = deserialize_index_meta(&value)?;
1128            if persisted.catalog_name.is_empty() {
1129                persisted.catalog_name = "default".to_string();
1130            }
1131            if persisted.namespace_name.is_empty() {
1132                persisted.namespace_name = "default".to_string();
1133            }
1134            max_index_id = max_index_id.max(persisted.index_id);
1135
1136            let new_key = index_key(
1137                &persisted.catalog_name,
1138                &persisted.namespace_name,
1139                &persisted.table,
1140                &persisted.name,
1141            );
1142            let bytes = bincode::serialize(&persisted)?;
1143            index_updates.push((new_key, bytes));
1144            index_keys_to_delete.push(key);
1145        }
1146
1147        for (key, value) in index_updates {
1148            txn.put(key, value)?;
1149        }
1150        for key in index_keys_to_delete {
1151            txn.delete(key)?;
1152        }
1153
1154        let mut table_id_counter = max_table_id;
1155        let mut index_id_counter = max_index_id;
1156        if let Some(meta_bytes) = txn.get(&META_KEY.to_vec())? {
1157            let meta: CatalogState = bincode::deserialize(&meta_bytes)?;
1158            table_id_counter = table_id_counter.max(meta.table_id_counter);
1159            index_id_counter = index_id_counter.max(meta.index_id_counter);
1160        }
1161        let meta = CatalogState {
1162            version: CATALOG_VERSION,
1163            table_id_counter,
1164            index_id_counter,
1165        };
1166        let meta_bytes = bincode::serialize(&meta)?;
1167        txn.put(META_KEY.to_vec(), meta_bytes)?;
1168        txn.commit_self()?;
1169
1170        Ok(())
1171    }
1172
1173    pub fn new(store: Arc<S>) -> Self {
1174        Self {
1175            inner: MemoryCatalog::new(),
1176            store,
1177            catalogs: HashMap::new(),
1178            namespaces: HashMap::new(),
1179        }
1180    }
1181
1182    pub fn store(&self) -> &Arc<S> {
1183        &self.store
1184    }
1185
1186    pub fn list_catalogs(&self) -> Vec<CatalogMeta> {
1187        let mut catalogs: Vec<CatalogMeta> = self.catalogs.values().cloned().collect();
1188        catalogs.sort_by(|a, b| a.name.cmp(&b.name));
1189        catalogs
1190    }
1191
1192    pub fn get_catalog(&self, name: &str) -> Option<CatalogMeta> {
1193        self.catalogs.get(name).cloned()
1194    }
1195
1196    pub fn create_catalog(&mut self, meta: CatalogMeta) -> Result<(), CatalogError> {
1197        let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1198        let value = bincode::serialize(&meta)?;
1199        txn.put(catalog_key(&meta.name), value)?;
1200        txn.commit_self()?;
1201        self.catalogs.insert(meta.name.clone(), meta);
1202        Ok(())
1203    }
1204
1205    pub fn delete_catalog(&mut self, name: &str) -> Result<(), CatalogError> {
1206        let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1207        txn.delete(catalog_key(name))?;
1208        let mut namespace_prefix = NAMESPACES_PREFIX.to_vec();
1209        namespace_prefix.extend_from_slice(name.as_bytes());
1210        namespace_prefix.push(b'/');
1211        let mut namespace_keys = Vec::new();
1212        for (key, _) in txn.scan_prefix(&namespace_prefix)? {
1213            namespace_keys.push(key);
1214        }
1215        for key in namespace_keys {
1216            txn.delete(key)?;
1217        }
1218        let mut table_keys = Vec::new();
1219        let mut table_fqns = Vec::new();
1220        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1221            let persisted = deserialize_table_meta(&value)?;
1222            if persisted.catalog_name == name {
1223                table_fqns.push(TableFqn::new(
1224                    &persisted.catalog_name,
1225                    &persisted.namespace_name,
1226                    &persisted.name,
1227                ));
1228                table_keys.push(key);
1229            }
1230        }
1231        let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1232        for key in table_keys {
1233            txn.delete(key)?;
1234        }
1235        if !table_set.is_empty() {
1236            let mut index_keys = Vec::new();
1237            for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1238                let persisted = deserialize_index_meta(&value)?;
1239                let fqn = TableFqn::new(
1240                    &persisted.catalog_name,
1241                    &persisted.namespace_name,
1242                    &persisted.table,
1243                );
1244                if table_set.contains(&fqn) {
1245                    index_keys.push(key);
1246                }
1247            }
1248            for key in index_keys {
1249                txn.delete(key)?;
1250            }
1251        }
1252        txn.commit_self()?;
1253        self.catalogs.remove(name);
1254        self.namespaces.retain(|(catalog, _), _| catalog != name);
1255        for fqn in table_fqns {
1256            self.inner.remove_table_unchecked(&fqn.table);
1257        }
1258        Ok(())
1259    }
1260
1261    pub fn list_namespaces(&self, catalog_name: &str) -> Vec<NamespaceMeta> {
1262        let mut namespaces: Vec<NamespaceMeta> = self
1263            .namespaces
1264            .values()
1265            .filter(|meta| meta.catalog_name == catalog_name)
1266            .cloned()
1267            .collect();
1268        namespaces.sort_by(|a, b| a.name.cmp(&b.name));
1269        namespaces
1270    }
1271
1272    pub fn get_namespace(&self, catalog_name: &str, namespace_name: &str) -> Option<NamespaceMeta> {
1273        self.namespaces
1274            .get(&(catalog_name.to_string(), namespace_name.to_string()))
1275            .cloned()
1276    }
1277
1278    pub fn create_namespace(&mut self, meta: NamespaceMeta) -> Result<(), CatalogError> {
1279        if !self.catalogs.contains_key(&meta.catalog_name) {
1280            return Err(CatalogError::InvalidKey(format!(
1281                "catalog not found: {}",
1282                meta.catalog_name
1283            )));
1284        }
1285
1286        let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1287        let value = bincode::serialize(&meta)?;
1288        txn.put(namespace_key(&meta.catalog_name, &meta.name), value)?;
1289        txn.commit_self()?;
1290        self.namespaces
1291            .insert((meta.catalog_name.clone(), meta.name.clone()), meta);
1292        Ok(())
1293    }
1294
1295    pub fn delete_namespace(
1296        &mut self,
1297        catalog_name: &str,
1298        namespace_name: &str,
1299    ) -> Result<(), CatalogError> {
1300        if !self.catalogs.contains_key(catalog_name) {
1301            return Err(CatalogError::InvalidKey(format!(
1302                "catalog not found: {}",
1303                catalog_name
1304            )));
1305        }
1306
1307        let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1308        txn.delete(namespace_key(catalog_name, namespace_name))?;
1309        txn.commit_self()?;
1310        self.namespaces
1311            .remove(&(catalog_name.to_string(), namespace_name.to_string()));
1312        Ok(())
1313    }
1314
1315    fn persist_create_catalog(
1316        &mut self,
1317        txn: &mut S::Transaction<'_>,
1318        meta: &CatalogMeta,
1319    ) -> Result<(), CatalogError> {
1320        let value = bincode::serialize(meta)?;
1321        txn.put(catalog_key(&meta.name), value)?;
1322        Ok(())
1323    }
1324
1325    fn persist_drop_catalog(
1326        &mut self,
1327        txn: &mut S::Transaction<'_>,
1328        name: &str,
1329    ) -> Result<(), CatalogError> {
1330        txn.delete(catalog_key(name))?;
1331
1332        let mut namespace_prefix = NAMESPACES_PREFIX.to_vec();
1333        namespace_prefix.extend_from_slice(name.as_bytes());
1334        namespace_prefix.push(b'/');
1335        let mut namespace_keys = Vec::new();
1336        for (key, _) in txn.scan_prefix(&namespace_prefix)? {
1337            namespace_keys.push(key);
1338        }
1339        for key in namespace_keys {
1340            txn.delete(key)?;
1341        }
1342
1343        let mut table_keys = Vec::new();
1344        let mut table_fqns = Vec::new();
1345        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1346            let persisted = deserialize_table_meta(&value)?;
1347            if persisted.catalog_name == name {
1348                table_fqns.push(TableFqn::new(
1349                    &persisted.catalog_name,
1350                    &persisted.namespace_name,
1351                    &persisted.name,
1352                ));
1353                table_keys.push(key);
1354            }
1355        }
1356        let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1357        for key in table_keys {
1358            txn.delete(key)?;
1359        }
1360        if !table_set.is_empty() {
1361            let mut index_keys = Vec::new();
1362            for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1363                let persisted = deserialize_index_meta(&value)?;
1364                let fqn = TableFqn::new(
1365                    &persisted.catalog_name,
1366                    &persisted.namespace_name,
1367                    &persisted.table,
1368                );
1369                if table_set.contains(&fqn) {
1370                    index_keys.push(key);
1371                }
1372            }
1373            for key in index_keys {
1374                txn.delete(key)?;
1375            }
1376        }
1377        Ok(())
1378    }
1379
1380    fn persist_create_namespace(
1381        &mut self,
1382        txn: &mut S::Transaction<'_>,
1383        meta: &NamespaceMeta,
1384    ) -> Result<(), CatalogError> {
1385        let value = bincode::serialize(meta)?;
1386        txn.put(namespace_key(&meta.catalog_name, &meta.name), value)?;
1387        Ok(())
1388    }
1389
1390    fn persist_drop_namespace(
1391        &mut self,
1392        txn: &mut S::Transaction<'_>,
1393        catalog_name: &str,
1394        namespace_name: &str,
1395    ) -> Result<(), CatalogError> {
1396        txn.delete(namespace_key(catalog_name, namespace_name))?;
1397
1398        let mut table_keys = Vec::new();
1399        let mut table_fqns = Vec::new();
1400        for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1401            let persisted = deserialize_table_meta(&value)?;
1402            if persisted.catalog_name == catalog_name && persisted.namespace_name == namespace_name
1403            {
1404                table_fqns.push(TableFqn::new(
1405                    &persisted.catalog_name,
1406                    &persisted.namespace_name,
1407                    &persisted.name,
1408                ));
1409                table_keys.push(key);
1410            }
1411        }
1412        let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1413        for key in table_keys {
1414            txn.delete(key)?;
1415        }
1416        if !table_set.is_empty() {
1417            let mut index_keys = Vec::new();
1418            for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1419                let persisted = deserialize_index_meta(&value)?;
1420                let fqn = TableFqn::new(
1421                    &persisted.catalog_name,
1422                    &persisted.namespace_name,
1423                    &persisted.table,
1424                );
1425                if table_set.contains(&fqn) {
1426                    index_keys.push(key);
1427                }
1428            }
1429            for key in index_keys {
1430                txn.delete(key)?;
1431            }
1432        }
1433        Ok(())
1434    }
1435
1436    fn write_meta(&self, txn: &mut S::Transaction<'_>) -> Result<(), CatalogError> {
1437        let (table_id_counter, index_id_counter) = self.inner.counters();
1438        let meta = CatalogState {
1439            version: CATALOG_VERSION,
1440            table_id_counter,
1441            index_id_counter,
1442        };
1443        let meta_bytes = bincode::serialize(&meta)?;
1444        txn.put(META_KEY.to_vec(), meta_bytes)?;
1445        Ok(())
1446    }
1447
1448    pub fn persist_create_table(
1449        &mut self,
1450        txn: &mut S::Transaction<'_>,
1451        table: &TableMetadata,
1452    ) -> Result<(), CatalogError> {
1453        let persisted = PersistedTableMeta::from(table);
1454        let value = bincode::serialize(&persisted)?;
1455        txn.put(
1456            table_key(&table.catalog_name, &table.namespace_name, &table.name),
1457            value,
1458        )?;
1459        self.write_meta(txn)?;
1460        Ok(())
1461    }
1462
1463    pub fn persist_drop_table(
1464        &mut self,
1465        txn: &mut S::Transaction<'_>,
1466        fqn: &TableFqn,
1467    ) -> Result<(), CatalogError> {
1468        txn.delete(table_key(&fqn.catalog, &fqn.namespace, &fqn.table))?;
1469
1470        // テーブルに紐づくインデックスも削除する。
1471        let mut to_delete: Vec<String> = Vec::new();
1472        let prefix = index_prefix(&fqn.catalog, &fqn.namespace, &fqn.table);
1473        for (key, _) in txn.scan_prefix(&prefix)? {
1474            let index_name = key_suffix(&prefix, &key)?;
1475            to_delete.push(index_name);
1476        }
1477        for index_name in to_delete {
1478            txn.delete(index_key(
1479                &fqn.catalog,
1480                &fqn.namespace,
1481                &fqn.table,
1482                &index_name,
1483            ))?;
1484        }
1485
1486        Ok(())
1487    }
1488
1489    pub fn persist_create_index(
1490        &mut self,
1491        txn: &mut S::Transaction<'_>,
1492        index: &IndexMetadata,
1493    ) -> Result<(), CatalogError> {
1494        let persisted = PersistedIndexMeta::from(index);
1495        let value = bincode::serialize(&persisted)?;
1496        txn.put(
1497            index_key(
1498                &index.catalog_name,
1499                &index.namespace_name,
1500                &index.table,
1501                &index.name,
1502            ),
1503            value,
1504        )?;
1505        self.write_meta(txn)?;
1506        Ok(())
1507    }
1508
1509    pub fn persist_drop_index(
1510        &mut self,
1511        txn: &mut S::Transaction<'_>,
1512        fqn: &IndexFqn,
1513    ) -> Result<(), CatalogError> {
1514        txn.delete(index_key(
1515            &fqn.catalog,
1516            &fqn.namespace,
1517            &fqn.table,
1518            &fqn.index,
1519        ))?;
1520        Ok(())
1521    }
1522
1523    pub fn persist_overlay(
1524        &mut self,
1525        txn: &mut S::Transaction<'_>,
1526        overlay: &CatalogOverlay,
1527    ) -> Result<(), CatalogError> {
1528        self.ensure_overlay_name_uniqueness(overlay)?;
1529
1530        for catalog in overlay.dropped_catalogs.iter() {
1531            self.persist_drop_catalog(txn, catalog)?;
1532        }
1533
1534        for (catalog, namespace) in overlay.dropped_namespaces.iter() {
1535            if overlay.dropped_catalogs.contains(catalog) {
1536                continue;
1537            }
1538            self.persist_drop_namespace(txn, catalog, namespace)?;
1539        }
1540
1541        for fqn in overlay.dropped_tables.iter() {
1542            if overlay.dropped_catalogs.contains(&fqn.catalog)
1543                || overlay
1544                    .dropped_namespaces
1545                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1546            {
1547                continue;
1548            }
1549            self.persist_drop_table(txn, fqn)?;
1550        }
1551
1552        for fqn in overlay.dropped_indexes.iter() {
1553            if overlay.dropped_catalogs.contains(&fqn.catalog)
1554                || overlay
1555                    .dropped_namespaces
1556                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1557            {
1558                continue;
1559            }
1560            self.persist_drop_index(txn, fqn)?;
1561        }
1562
1563        for meta in overlay.added_catalogs.values() {
1564            if overlay.dropped_catalogs.contains(&meta.name) {
1565                continue;
1566            }
1567            self.persist_create_catalog(txn, meta)?;
1568        }
1569
1570        for meta in overlay.added_namespaces.values() {
1571            if overlay.dropped_catalogs.contains(&meta.catalog_name)
1572                || overlay
1573                    .dropped_namespaces
1574                    .contains(&(meta.catalog_name.clone(), meta.name.clone()))
1575            {
1576                continue;
1577            }
1578            self.persist_create_namespace(txn, meta)?;
1579        }
1580
1581        for (fqn, table) in overlay.added_tables.iter() {
1582            if overlay.dropped_catalogs.contains(&fqn.catalog)
1583                || overlay
1584                    .dropped_namespaces
1585                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1586                || overlay.dropped_tables.contains(fqn)
1587            {
1588                continue;
1589            }
1590            self.persist_create_table(txn, table)?;
1591        }
1592
1593        for (fqn, index) in overlay.added_indexes.iter() {
1594            if overlay.dropped_catalogs.contains(&fqn.catalog)
1595                || overlay
1596                    .dropped_namespaces
1597                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1598                || overlay.dropped_indexes.contains(fqn)
1599            {
1600                continue;
1601            }
1602            self.persist_create_index(txn, index)?;
1603        }
1604
1605        Ok(())
1606    }
1607
1608    fn ensure_overlay_name_uniqueness(&self, overlay: &CatalogOverlay) -> Result<(), CatalogError> {
1609        let mut table_names: HashMap<String, TableFqn> = HashMap::new();
1610        for name in self.inner.table_names() {
1611            let Some(table) = self.inner.get_table(name) else {
1612                continue;
1613            };
1614            let fqn = TableFqn::from(table);
1615            if overlay.dropped_catalogs.contains(&fqn.catalog)
1616                || overlay
1617                    .dropped_namespaces
1618                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1619                || overlay.dropped_tables.contains(&fqn)
1620            {
1621                continue;
1622            }
1623            table_names.insert(table.name.clone(), fqn);
1624        }
1625
1626        for (fqn, table) in overlay.added_tables.iter() {
1627            if overlay.dropped_catalogs.contains(&fqn.catalog)
1628                || overlay
1629                    .dropped_namespaces
1630                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1631                || overlay.dropped_tables.contains(fqn)
1632            {
1633                continue;
1634            }
1635            if let Some(existing) = table_names.get(&table.name)
1636                && existing != fqn
1637            {
1638                return Err(CatalogError::InvalidKey(format!(
1639                    "table name '{}' conflicts across namespaces: {}.{} vs {}.{}",
1640                    table.name, existing.catalog, existing.namespace, fqn.catalog, fqn.namespace
1641                )));
1642            }
1643            table_names.insert(table.name.clone(), fqn.clone());
1644        }
1645
1646        let mut index_names: HashMap<String, IndexFqn> = HashMap::new();
1647        for name in self.inner.index_names() {
1648            let Some(index) = self.inner.get_index(name) else {
1649                continue;
1650            };
1651            let fqn = IndexFqn::from(index);
1652            if overlay.dropped_catalogs.contains(&fqn.catalog)
1653                || overlay
1654                    .dropped_namespaces
1655                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1656                || overlay.dropped_indexes.contains(&fqn)
1657                || overlay.dropped_tables.contains(&TableFqn::new(
1658                    &fqn.catalog,
1659                    &fqn.namespace,
1660                    &fqn.table,
1661                ))
1662            {
1663                continue;
1664            }
1665            index_names.insert(index.name.clone(), fqn);
1666        }
1667
1668        for (fqn, index) in overlay.added_indexes.iter() {
1669            if overlay.dropped_catalogs.contains(&fqn.catalog)
1670                || overlay
1671                    .dropped_namespaces
1672                    .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1673                || overlay.dropped_indexes.contains(fqn)
1674                || overlay.dropped_tables.contains(&TableFqn::new(
1675                    &fqn.catalog,
1676                    &fqn.namespace,
1677                    &fqn.table,
1678                ))
1679            {
1680                continue;
1681            }
1682            if let Some(existing) = index_names.get(&index.name)
1683                && existing != fqn
1684            {
1685                return Err(CatalogError::InvalidKey(format!(
1686                    "index name '{}' conflicts across namespaces: {}.{} vs {}.{}",
1687                    index.name, existing.catalog, existing.namespace, fqn.catalog, fqn.namespace
1688                )));
1689            }
1690            index_names.insert(index.name.clone(), fqn.clone());
1691        }
1692
1693        Ok(())
1694    }
1695
1696    fn namespace_dropped(overlay: &CatalogOverlay, catalog: &str, namespace: &str) -> bool {
1697        overlay.dropped_catalogs.contains(catalog)
1698            || overlay
1699                .dropped_namespaces
1700                .contains(&(catalog.to_string(), namespace.to_string()))
1701    }
1702
1703    fn overlay_added_table_by_name<'a>(
1704        overlay: &'a CatalogOverlay,
1705        name: &str,
1706    ) -> Option<&'a TableMetadata> {
1707        let mut iter = overlay
1708            .added_tables
1709            .values()
1710            .filter(|table| table.name == name);
1711        let first = iter.next()?;
1712        if iter.next().is_some() {
1713            return None;
1714        }
1715        Some(first)
1716    }
1717
1718    fn overlay_added_index_by_name<'a>(
1719        overlay: &'a CatalogOverlay,
1720        name: &str,
1721    ) -> Option<&'a IndexMetadata> {
1722        let mut iter = overlay
1723            .added_indexes
1724            .values()
1725            .filter(|index| index.name == name);
1726        let first = iter.next()?;
1727        if iter.next().is_some() {
1728            return None;
1729        }
1730        Some(first)
1731    }
1732
1733    fn base_table_conflicts_with_overlay(
1734        &self,
1735        overlay: &CatalogOverlay,
1736        table: &TableMetadata,
1737    ) -> bool {
1738        let Some(base) = self.inner.get_table(&table.name) else {
1739            return false;
1740        };
1741        if self.table_hidden_by_overlay(base, overlay) {
1742            return false;
1743        }
1744        if overlay.dropped_tables.contains(&TableFqn::from(base)) {
1745            return false;
1746        }
1747        TableFqn::from(base) != TableFqn::from(table)
1748    }
1749
1750    fn base_index_conflicts_with_overlay(
1751        &self,
1752        overlay: &CatalogOverlay,
1753        index: &IndexMetadata,
1754    ) -> bool {
1755        let Some(base) = self.inner.get_index(&index.name) else {
1756            return false;
1757        };
1758        if Self::namespace_dropped(overlay, &base.catalog_name, &base.namespace_name) {
1759            return false;
1760        }
1761        if overlay.dropped_indexes.contains(&IndexFqn::from(base)) {
1762            return false;
1763        }
1764        if self.dropped_table_matches_fqn(
1765            &base.table,
1766            &base.catalog_name,
1767            &base.namespace_name,
1768            overlay,
1769        ) {
1770            return false;
1771        }
1772        IndexFqn::from(base) != IndexFqn::from(index)
1773    }
1774
1775    fn table_hidden_by_overlay(&self, table: &TableMetadata, overlay: &CatalogOverlay) -> bool {
1776        Self::namespace_dropped(overlay, &table.catalog_name, &table.namespace_name)
1777    }
1778
1779    fn dropped_table_matches_fqn(
1780        &self,
1781        table_name: &str,
1782        catalog: &str,
1783        namespace: &str,
1784        overlay: &CatalogOverlay,
1785    ) -> bool {
1786        let fqn = TableFqn::new(catalog, namespace, table_name);
1787        overlay.dropped_tables.contains(&fqn)
1788    }
1789
1790    fn index_hidden_by_overlay(&self, index: &IndexMetadata, overlay: &CatalogOverlay) -> bool {
1791        let index_fqn = IndexFqn::from(index);
1792        if overlay.dropped_indexes.contains(&index_fqn) {
1793            return true;
1794        }
1795        if Self::namespace_dropped(overlay, &index.catalog_name, &index.namespace_name) {
1796            return true;
1797        }
1798        if self.dropped_table_matches_fqn(
1799            &index.table,
1800            &index.catalog_name,
1801            &index.namespace_name,
1802            overlay,
1803        ) {
1804            return true;
1805        }
1806        match self.get_table_in_txn(&index.table, overlay) {
1807            Some(table) => {
1808                table.catalog_name != index.catalog_name
1809                    || table.namespace_name != index.namespace_name
1810            }
1811            None => true,
1812        }
1813    }
1814
1815    pub fn get_catalog_in_txn<'a>(
1816        &'a self,
1817        name: &str,
1818        overlay: &'a CatalogOverlay,
1819    ) -> Option<&'a CatalogMeta> {
1820        if overlay.dropped_catalogs.contains(name) {
1821            return None;
1822        }
1823        if let Some(catalog) = overlay.added_catalogs.get(name) {
1824            return Some(catalog);
1825        }
1826        self.catalogs.get(name)
1827    }
1828
1829    pub fn get_namespace_in_txn<'a>(
1830        &'a self,
1831        catalog_name: &str,
1832        namespace_name: &str,
1833        overlay: &'a CatalogOverlay,
1834    ) -> Option<&'a NamespaceMeta> {
1835        if overlay.dropped_catalogs.contains(catalog_name) {
1836            return None;
1837        }
1838        let key = (catalog_name.to_string(), namespace_name.to_string());
1839        if overlay.dropped_namespaces.contains(&key) {
1840            return None;
1841        }
1842        if let Some(namespace) = overlay.added_namespaces.get(&key) {
1843            return Some(namespace);
1844        }
1845        self.namespaces.get(&key)
1846    }
1847
1848    pub fn list_catalogs_in_txn(&self, overlay: &CatalogOverlay) -> Vec<CatalogMeta> {
1849        let mut catalogs: HashMap<String, CatalogMeta> = HashMap::new();
1850        for (name, meta) in &self.catalogs {
1851            if !overlay.dropped_catalogs.contains(name) {
1852                catalogs.insert(name.clone(), meta.clone());
1853            }
1854        }
1855        for (name, meta) in &overlay.added_catalogs {
1856            if !overlay.dropped_catalogs.contains(name) {
1857                catalogs.insert(name.clone(), meta.clone());
1858            }
1859        }
1860        let mut values: Vec<CatalogMeta> = catalogs.into_values().collect();
1861        values.sort_by(|a, b| a.name.cmp(&b.name));
1862        values
1863    }
1864
1865    pub fn list_namespaces_in_txn(
1866        &self,
1867        catalog_name: &str,
1868        overlay: &CatalogOverlay,
1869    ) -> Vec<NamespaceMeta> {
1870        if overlay.dropped_catalogs.contains(catalog_name) {
1871            return Vec::new();
1872        }
1873        let mut namespaces: HashMap<(String, String), NamespaceMeta> = HashMap::new();
1874        for ((catalog, namespace), meta) in &self.namespaces {
1875            if catalog != catalog_name {
1876                continue;
1877            }
1878            let key = (catalog.clone(), namespace.clone());
1879            if overlay.dropped_namespaces.contains(&key) {
1880                continue;
1881            }
1882            namespaces.insert(key, meta.clone());
1883        }
1884        for ((catalog, namespace), meta) in &overlay.added_namespaces {
1885            if catalog != catalog_name {
1886                continue;
1887            }
1888            let key = (catalog.clone(), namespace.clone());
1889            if overlay.dropped_namespaces.contains(&key) {
1890                continue;
1891            }
1892            namespaces.insert(key, meta.clone());
1893        }
1894        let mut values: Vec<NamespaceMeta> = namespaces.into_values().collect();
1895        values.sort_by(|a, b| a.name.cmp(&b.name));
1896        values
1897    }
1898
1899    pub fn table_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
1900        if let Some(table) = Self::overlay_added_table_by_name(overlay, name) {
1901            if self.table_hidden_by_overlay(table, overlay) {
1902                return false;
1903            }
1904            if self.base_table_conflicts_with_overlay(overlay, table) {
1905                return false;
1906            }
1907            return true;
1908        }
1909        match self.inner.get_table(name) {
1910            Some(table) => {
1911                !self.table_hidden_by_overlay(table, overlay)
1912                    && !overlay.dropped_tables.contains(&TableFqn::from(table))
1913            }
1914            None => false,
1915        }
1916    }
1917
1918    pub fn get_table_in_txn<'a>(
1919        &'a self,
1920        name: &str,
1921        overlay: &'a CatalogOverlay,
1922    ) -> Option<&'a TableMetadata> {
1923        if let Some(table) = Self::overlay_added_table_by_name(overlay, name) {
1924            if self.table_hidden_by_overlay(table, overlay) {
1925                return None;
1926            }
1927            if self.base_table_conflicts_with_overlay(overlay, table) {
1928                return None;
1929            }
1930            return Some(table);
1931        }
1932        self.inner.get_table(name).filter(|table| {
1933            !self.table_hidden_by_overlay(table, overlay)
1934                && !overlay.dropped_tables.contains(&TableFqn::from(*table))
1935        })
1936    }
1937
1938    pub fn index_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
1939        if let Some(index) = Self::overlay_added_index_by_name(overlay, name) {
1940            if self.index_hidden_by_overlay(index, overlay) {
1941                return false;
1942            }
1943            if self.base_index_conflicts_with_overlay(overlay, index) {
1944                return false;
1945            }
1946            return true;
1947        }
1948        match self.inner.get_index(name) {
1949            Some(index) => !self.index_hidden_by_overlay(index, overlay),
1950            None => false,
1951        }
1952    }
1953
1954    pub fn get_index_in_txn<'a>(
1955        &'a self,
1956        name: &str,
1957        overlay: &'a CatalogOverlay,
1958    ) -> Option<&'a IndexMetadata> {
1959        if let Some(index) = Self::overlay_added_index_by_name(overlay, name) {
1960            if self.index_hidden_by_overlay(index, overlay) {
1961                return None;
1962            }
1963            if self.base_index_conflicts_with_overlay(overlay, index) {
1964                return None;
1965            }
1966            return Some(index);
1967        }
1968        match self.inner.get_index(name) {
1969            Some(index) if self.index_hidden_by_overlay(index, overlay) => None,
1970            other => other,
1971        }
1972    }
1973
1974    pub fn list_tables_in_txn(
1975        &self,
1976        catalog_name: &str,
1977        namespace_name: &str,
1978        overlay: &CatalogOverlay,
1979    ) -> Vec<TableMetadata> {
1980        if overlay.dropped_catalogs.contains(catalog_name) {
1981            return Vec::new();
1982        }
1983        if overlay
1984            .dropped_namespaces
1985            .contains(&(catalog_name.to_string(), namespace_name.to_string()))
1986        {
1987            return Vec::new();
1988        }
1989        let mut tables: HashMap<TableFqn, TableMetadata> = HashMap::new();
1990        for name in self.inner.table_names() {
1991            if let Some(table) = self.inner.get_table(name)
1992                && table.catalog_name == catalog_name
1993                && table.namespace_name == namespace_name
1994            {
1995                let fqn = TableFqn::from(table);
1996                if !overlay.dropped_tables.contains(&fqn) {
1997                    tables.insert(fqn, table.clone());
1998                }
1999            }
2000        }
2001        for table in overlay.added_tables.values() {
2002            if table.catalog_name == catalog_name && table.namespace_name == namespace_name {
2003                tables.insert(TableFqn::from(table), table.clone());
2004            }
2005        }
2006        let mut values: Vec<TableMetadata> = tables.into_values().collect();
2007        values.sort_by(|a, b| a.name.cmp(&b.name));
2008        values
2009    }
2010
2011    pub fn list_indexes_in_txn(
2012        &self,
2013        fqn: &TableFqn,
2014        overlay: &CatalogOverlay,
2015    ) -> Vec<IndexMetadata> {
2016        if overlay.dropped_catalogs.contains(&fqn.catalog) {
2017            return Vec::new();
2018        }
2019        if overlay
2020            .dropped_namespaces
2021            .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
2022        {
2023            return Vec::new();
2024        }
2025        if overlay
2026            .dropped_tables
2027            .contains(&TableFqn::new(&fqn.catalog, &fqn.namespace, &fqn.table))
2028        {
2029            return Vec::new();
2030        }
2031
2032        let mut indexes: HashMap<IndexFqn, IndexMetadata> = HashMap::new();
2033        for index in self.inner.get_indexes_for_table(&fqn.table) {
2034            if index.catalog_name == fqn.catalog && index.namespace_name == fqn.namespace {
2035                let index_fqn = IndexFqn::from(index);
2036                if !overlay.dropped_indexes.contains(&index_fqn) {
2037                    indexes.insert(index_fqn, index.clone());
2038                }
2039            }
2040        }
2041        for index in overlay.added_indexes.values() {
2042            if index.table == fqn.table
2043                && index.catalog_name == fqn.catalog
2044                && index.namespace_name == fqn.namespace
2045            {
2046                indexes.insert(IndexFqn::from(index), index.clone());
2047            }
2048        }
2049        let mut values: Vec<IndexMetadata> = indexes.into_values().collect();
2050        values.sort_by(|a, b| a.name.cmp(&b.name));
2051        values
2052    }
2053
2054    pub fn apply_overlay(&mut self, overlay: CatalogOverlay) {
2055        let CatalogOverlay {
2056            added_catalogs,
2057            dropped_catalogs,
2058            added_namespaces,
2059            dropped_namespaces,
2060            added_tables,
2061            dropped_tables,
2062            added_indexes,
2063            dropped_indexes,
2064        } = overlay;
2065
2066        for (name, meta) in added_catalogs {
2067            self.catalogs.insert(name, meta);
2068        }
2069        for (catalog_name, namespace_name) in dropped_namespaces.iter() {
2070            self.namespaces
2071                .remove(&(catalog_name.clone(), namespace_name.clone()));
2072        }
2073        for ((catalog_name, namespace_name), meta) in added_namespaces {
2074            self.namespaces.insert((catalog_name, namespace_name), meta);
2075        }
2076        for name in dropped_catalogs {
2077            self.catalogs.remove(&name);
2078            self.namespaces.retain(|(catalog, _), _| catalog != &name);
2079        }
2080        for (_, table) in added_tables {
2081            self.inner.insert_table_unchecked(table);
2082        }
2083        for fqn in dropped_tables {
2084            self.inner.remove_table_unchecked(&fqn.table);
2085        }
2086        for (_, index) in added_indexes {
2087            self.inner.insert_index_unchecked(index);
2088        }
2089        for fqn in dropped_indexes {
2090            self.inner.remove_index_unchecked(&fqn.index);
2091        }
2092    }
2093
2094    pub fn discard_overlay(_overlay: CatalogOverlay) {}
2095}
2096
2097impl<S: KVStore> Catalog for PersistentCatalog<S> {
2098    fn create_table(&mut self, table: TableMetadata) -> Result<(), PlannerError> {
2099        self.inner.create_table(table)
2100    }
2101
2102    fn get_table(&self, name: &str) -> Option<&TableMetadata> {
2103        self.inner.get_table(name)
2104    }
2105
2106    fn drop_table(&mut self, name: &str) -> Result<(), PlannerError> {
2107        self.inner.drop_table(name)
2108    }
2109
2110    fn create_index(&mut self, index: IndexMetadata) -> Result<(), PlannerError> {
2111        self.inner.create_index(index)
2112    }
2113
2114    fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
2115        self.inner.get_index(name)
2116    }
2117
2118    fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
2119        self.inner.get_indexes_for_table(table)
2120    }
2121
2122    fn drop_index(&mut self, name: &str) -> Result<(), PlannerError> {
2123        self.inner.drop_index(name)
2124    }
2125
2126    fn table_exists(&self, name: &str) -> bool {
2127        self.inner.table_exists(name)
2128    }
2129
2130    fn index_exists(&self, name: &str) -> bool {
2131        self.inner.index_exists(name)
2132    }
2133
2134    fn next_table_id(&mut self) -> u32 {
2135        self.inner.next_table_id()
2136    }
2137
2138    fn next_index_id(&mut self) -> u32 {
2139        self.inner.next_index_id()
2140    }
2141}
2142
2143#[cfg(test)]
2144mod tests {
2145    use super::*;
2146    use crate::planner::types::ResolvedType;
2147    use std::collections::HashSet;
2148
2149    fn test_table(name: &str, id: u32) -> TableMetadata {
2150        TableMetadata::new(
2151            name,
2152            vec![ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true)],
2153        )
2154        .with_table_id(id)
2155        .with_primary_key(vec!["id".to_string()])
2156    }
2157
2158    fn legacy_table_key(table_name: &str) -> Vec<u8> {
2159        let mut key = TABLES_PREFIX.to_vec();
2160        key.extend_from_slice(table_name.as_bytes());
2161        key
2162    }
2163
2164    fn legacy_index_key(index_name: &str) -> Vec<u8> {
2165        let mut key = INDEXES_PREFIX.to_vec();
2166        key.extend_from_slice(index_name.as_bytes());
2167        key
2168    }
2169
2170    fn seed_legacy_store(store: &Arc<alopex_core::kv::memory::MemoryKV>) {
2171        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2172        let table = test_table("users", 7);
2173        let legacy_table = PersistedTableMetaV1 {
2174            table_id: table.table_id,
2175            name: table.name.clone(),
2176            columns: table
2177                .columns
2178                .iter()
2179                .map(PersistedColumnMeta::from)
2180                .collect(),
2181            primary_key: table.primary_key.clone(),
2182            storage_options: table.storage_options.clone().into(),
2183        };
2184        let table_bytes = bincode::serialize(&legacy_table).unwrap();
2185        txn.put(legacy_table_key("users"), table_bytes).unwrap();
2186
2187        let legacy_index = PersistedIndexMetaV1 {
2188            index_id: 3,
2189            name: "idx_users_id".to_string(),
2190            table: "users".to_string(),
2191            columns: vec!["id".to_string()],
2192            column_indices: vec![0],
2193            unique: false,
2194            method: Some(PersistedIndexType::BTree),
2195            options: Vec::new(),
2196        };
2197        let index_bytes = bincode::serialize(&legacy_index).unwrap();
2198        txn.put(legacy_index_key("idx_users_id"), index_bytes)
2199            .unwrap();
2200
2201        let meta = CatalogState {
2202            version: 1,
2203            table_id_counter: 7,
2204            index_id_counter: 3,
2205        };
2206        let meta_bytes = bincode::serialize(&meta).unwrap();
2207        txn.put(META_KEY.to_vec(), meta_bytes).unwrap();
2208        txn.commit_self().unwrap();
2209    }
2210
2211    #[test]
2212    fn load_empty_store() {
2213        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2214        let catalog = PersistentCatalog::load(store).unwrap();
2215        assert_eq!(catalog.inner.table_count(), 0);
2216        assert_eq!(catalog.inner.index_count(), 0);
2217    }
2218
2219    #[test]
2220    fn load_migrates_v1_keys_and_meta() {
2221        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2222        seed_legacy_store(&store);
2223
2224        let reloaded = PersistentCatalog::load(store.clone()).unwrap();
2225        assert!(reloaded.get_catalog("default").is_some());
2226        assert!(reloaded.get_namespace("default", "default").is_some());
2227
2228        let table = reloaded.get_table("users").unwrap();
2229        assert_eq!(table.catalog_name, "default");
2230        assert_eq!(table.namespace_name, "default");
2231
2232        let index = reloaded.get_index("idx_users_id").unwrap();
2233        assert_eq!(index.catalog_name, "default");
2234        assert_eq!(index.namespace_name, "default");
2235        assert_eq!(index.table, "users");
2236
2237        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2238        assert!(txn.get(&legacy_table_key("users")).unwrap().is_none());
2239        assert!(
2240            txn.get(&table_key("default", "default", "users"))
2241                .unwrap()
2242                .is_some()
2243        );
2244        assert!(
2245            txn.get(&legacy_index_key("idx_users_id"))
2246                .unwrap()
2247                .is_none()
2248        );
2249        assert!(
2250            txn.get(&index_key("default", "default", "users", "idx_users_id"))
2251                .unwrap()
2252                .is_some()
2253        );
2254        let meta_bytes = txn.get(&META_KEY.to_vec()).unwrap().unwrap();
2255        let meta: CatalogState = bincode::deserialize(&meta_bytes).unwrap();
2256        assert_eq!(meta.version, CATALOG_VERSION);
2257        txn.rollback_self().unwrap();
2258    }
2259
2260    #[test]
2261    fn load_after_migration_keeps_v2_keys() {
2262        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2263        seed_legacy_store(&store);
2264
2265        let _ = PersistentCatalog::load(store.clone()).unwrap();
2266        let reloaded = PersistentCatalog::load(store.clone()).unwrap();
2267
2268        let table = reloaded.get_table("users").unwrap();
2269        assert_eq!(table.catalog_name, "default");
2270        assert_eq!(table.namespace_name, "default");
2271
2272        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2273        assert!(txn.get(&legacy_table_key("users")).unwrap().is_none());
2274        assert!(
2275            txn.get(&table_key("default", "default", "users"))
2276                .unwrap()
2277                .is_some()
2278        );
2279        let meta_bytes = txn.get(&META_KEY.to_vec()).unwrap().unwrap();
2280        let meta: CatalogState = bincode::deserialize(&meta_bytes).unwrap();
2281        assert_eq!(meta.version, CATALOG_VERSION);
2282        txn.rollback_self().unwrap();
2283    }
2284
2285    #[test]
2286    fn create_table_persists() {
2287        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2288        let mut catalog = PersistentCatalog::new(store.clone());
2289
2290        // inner のカウンタを更新して meta が書き込まれることを担保する
2291        catalog.inner.set_counters(1, 0);
2292
2293        let table = test_table("users", 1);
2294        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2295        catalog.persist_create_table(&mut txn, &table).unwrap();
2296        txn.commit_self().unwrap();
2297
2298        let reloaded = PersistentCatalog::load(store).unwrap();
2299        assert!(reloaded.table_exists("users"));
2300        assert_eq!(reloaded.get_table("users").unwrap().table_id, 1);
2301    }
2302
2303    #[test]
2304    fn drop_table_removes() {
2305        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2306        let mut catalog = PersistentCatalog::new(store.clone());
2307        catalog.inner.set_counters(1, 0);
2308
2309        let table = test_table("users", 1);
2310        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2311        catalog.persist_create_table(&mut txn, &table).unwrap();
2312        txn.commit_self().unwrap();
2313
2314        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2315        let fqn = TableFqn::new("default", "default", "users");
2316        catalog.persist_drop_table(&mut txn, &fqn).unwrap();
2317        txn.commit_self().unwrap();
2318
2319        let reloaded = PersistentCatalog::load(store).unwrap();
2320        assert!(!reloaded.table_exists("users"));
2321    }
2322
2323    #[test]
2324    fn reload_preserves_state() {
2325        let temp_dir = tempfile::tempdir().unwrap();
2326        let wal_path = temp_dir.path().join("catalog.wal");
2327        let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
2328        let mut catalog = PersistentCatalog::new(store.clone());
2329        catalog.inner.set_counters(1, 0);
2330
2331        let table = test_table("users", 1);
2332        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2333        catalog.persist_create_table(&mut txn, &table).unwrap();
2334        txn.commit_self().unwrap();
2335        store.flush().unwrap();
2336
2337        drop(catalog);
2338        drop(store);
2339
2340        let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
2341        let reloaded = PersistentCatalog::load(store).unwrap();
2342        assert!(reloaded.table_exists("users"));
2343    }
2344
2345    #[test]
2346    fn overlay_applied_on_commit() {
2347        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2348        let mut catalog = PersistentCatalog::new(store);
2349        let users = test_table("users", 1);
2350        catalog.inner.insert_table_unchecked(users.clone());
2351
2352        let mut overlay = CatalogOverlay::new();
2353        overlay.drop_table(&TableFqn::from(&users));
2354        let orders = test_table("orders", 2);
2355        overlay.add_table(TableFqn::from(&orders), orders);
2356
2357        assert!(!catalog.table_exists_in_txn("users", &overlay));
2358        assert!(catalog.table_exists_in_txn("orders", &overlay));
2359
2360        catalog.apply_overlay(overlay);
2361
2362        assert!(!catalog.table_exists("users"));
2363        assert!(catalog.table_exists("orders"));
2364    }
2365
2366    #[test]
2367    fn overlay_discarded_on_rollback() {
2368        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2369        let mut catalog = PersistentCatalog::new(store);
2370        let users = test_table("users", 1);
2371        catalog.inner.insert_table_unchecked(users.clone());
2372
2373        let mut overlay = CatalogOverlay::new();
2374        overlay.drop_table(&TableFqn::from(&users));
2375
2376        PersistentCatalog::<alopex_core::kv::memory::MemoryKV>::discard_overlay(overlay);
2377
2378        assert!(catalog.table_exists("users"));
2379    }
2380
2381    #[test]
2382    fn catalog_crud_persists() {
2383        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2384        let mut catalog = PersistentCatalog::new(store.clone());
2385
2386        let meta = CatalogMeta {
2387            name: "main".to_string(),
2388            comment: Some("primary".to_string()),
2389            storage_root: Some("/tmp/alopex".to_string()),
2390        };
2391
2392        catalog.create_catalog(meta.clone()).unwrap();
2393        assert!(catalog.get_catalog("main").is_some());
2394
2395        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2396        let stored = txn.get(&catalog_key("main")).unwrap().unwrap();
2397        let decoded: CatalogMeta = bincode::deserialize(&stored).unwrap();
2398        txn.rollback_self().unwrap();
2399        assert_eq!(decoded, meta);
2400
2401        catalog.delete_catalog("main").unwrap();
2402        assert!(catalog.get_catalog("main").is_none());
2403    }
2404
2405    #[test]
2406    fn namespace_crud_persists_and_validates_catalog() {
2407        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2408        let mut catalog = PersistentCatalog::new(store.clone());
2409
2410        let missing_catalog = NamespaceMeta {
2411            name: "analytics".to_string(),
2412            catalog_name: "missing".to_string(),
2413            comment: None,
2414            storage_root: None,
2415        };
2416        let err = catalog.create_namespace(missing_catalog).unwrap_err();
2417        assert!(matches!(err, CatalogError::InvalidKey(_)));
2418
2419        catalog
2420            .create_catalog(CatalogMeta {
2421                name: "main".to_string(),
2422                comment: None,
2423                storage_root: None,
2424            })
2425            .unwrap();
2426
2427        let namespace = NamespaceMeta {
2428            name: "analytics".to_string(),
2429            catalog_name: "main".to_string(),
2430            comment: Some("warehouse".to_string()),
2431            storage_root: None,
2432        };
2433
2434        catalog.create_namespace(namespace.clone()).unwrap();
2435        assert!(catalog.get_namespace("main", "analytics").is_some());
2436
2437        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2438        let stored = txn
2439            .get(&namespace_key("main", "analytics"))
2440            .unwrap()
2441            .unwrap();
2442        let decoded: NamespaceMeta = bincode::deserialize(&stored).unwrap();
2443        txn.rollback_self().unwrap();
2444        assert_eq!(decoded, namespace);
2445
2446        catalog.delete_namespace("main", "analytics").unwrap();
2447        assert!(catalog.get_namespace("main", "analytics").is_none());
2448    }
2449
2450    #[test]
2451    fn delete_catalog_removes_namespaces_from_store() {
2452        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2453        let mut catalog = PersistentCatalog::new(store.clone());
2454
2455        catalog
2456            .create_catalog(CatalogMeta {
2457                name: "main".to_string(),
2458                comment: None,
2459                storage_root: None,
2460            })
2461            .unwrap();
2462        catalog
2463            .create_namespace(NamespaceMeta {
2464                name: "analytics".to_string(),
2465                catalog_name: "main".to_string(),
2466                comment: None,
2467                storage_root: None,
2468            })
2469            .unwrap();
2470
2471        catalog.delete_catalog("main").unwrap();
2472
2473        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2474        let mut prefix = NAMESPACES_PREFIX.to_vec();
2475        prefix.extend_from_slice(b"main");
2476        prefix.push(b'/');
2477        let remaining: Vec<_> = txn.scan_prefix(&prefix).unwrap().collect();
2478        txn.rollback_self().unwrap();
2479
2480        assert!(remaining.is_empty());
2481        assert!(catalog.list_namespaces("main").is_empty());
2482    }
2483
2484    #[test]
2485    fn delete_catalog_removes_tables_and_indexes_from_store() {
2486        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2487        let mut catalog = PersistentCatalog::new(store.clone());
2488
2489        catalog
2490            .create_catalog(CatalogMeta {
2491                name: "main".to_string(),
2492                comment: None,
2493                storage_root: None,
2494            })
2495            .unwrap();
2496
2497        let mut table = test_table("users", 1);
2498        table.catalog_name = "main".to_string();
2499        table.namespace_name = "default".to_string();
2500
2501        let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2502            .with_column_indices(vec![0]);
2503        index.catalog_name = "main".to_string();
2504        index.namespace_name = "default".to_string();
2505
2506        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2507        catalog.persist_create_table(&mut txn, &table).unwrap();
2508        catalog.persist_create_index(&mut txn, &index).unwrap();
2509        txn.commit_self().unwrap();
2510
2511        catalog.inner.insert_table_unchecked(table);
2512        catalog.inner.insert_index_unchecked(index);
2513
2514        catalog.delete_catalog("main").unwrap();
2515
2516        assert!(catalog.inner.get_table("users").is_none());
2517        assert!(catalog.inner.get_index("idx_users_id").is_none());
2518
2519        let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2520        assert!(
2521            txn.get(&table_key("main", "default", "users"))
2522                .unwrap()
2523                .is_none()
2524        );
2525        assert!(
2526            txn.get(&index_key("main", "default", "users", "idx_users_id"))
2527                .unwrap()
2528                .is_none()
2529        );
2530        txn.rollback_self().unwrap();
2531    }
2532
2533    #[test]
2534    fn index_meta_loads_catalog_and_namespace_from_table() {
2535        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2536        let mut catalog = PersistentCatalog::new(store.clone());
2537        catalog.inner.set_counters(1, 1);
2538
2539        let mut table = test_table("users", 1);
2540        table.catalog_name = "main".to_string();
2541        table.namespace_name = "analytics".to_string();
2542
2543        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2544        catalog.persist_create_table(&mut txn, &table).unwrap();
2545        let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2546            .with_column_indices(vec![0])
2547            .with_method(IndexMethod::BTree);
2548        index.catalog_name = "main".to_string();
2549        index.namespace_name = "analytics".to_string();
2550        catalog.persist_create_index(&mut txn, &index).unwrap();
2551        txn.commit_self().unwrap();
2552
2553        let reloaded = PersistentCatalog::load(store).unwrap();
2554        let index = reloaded.get_index("idx_users_id").unwrap();
2555        assert_eq!(index.catalog_name, "main");
2556        assert_eq!(index.namespace_name, "analytics");
2557    }
2558
2559    #[test]
2560    fn legacy_index_meta_loads_catalog_and_namespace_from_table() {
2561        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2562        let mut catalog = PersistentCatalog::new(store.clone());
2563        catalog.inner.set_counters(1, 1);
2564
2565        let mut table = test_table("users", 1);
2566        table.catalog_name = "main".to_string();
2567        table.namespace_name = "analytics".to_string();
2568
2569        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2570        catalog.persist_create_table(&mut txn, &table).unwrap();
2571        let legacy = PersistedIndexMetaV1 {
2572            index_id: 1,
2573            name: "idx_users_id".to_string(),
2574            table: "users".to_string(),
2575            columns: vec!["id".to_string()],
2576            column_indices: vec![0],
2577            unique: false,
2578            method: Some(PersistedIndexType::BTree),
2579            options: Vec::new(),
2580        };
2581        let bytes = bincode::serialize(&legacy).unwrap();
2582        txn.put(
2583            index_key("main", "analytics", "users", "idx_users_id"),
2584            bytes,
2585        )
2586        .unwrap();
2587        txn.commit_self().unwrap();
2588
2589        let reloaded = PersistentCatalog::load(store).unwrap();
2590        let index = reloaded.get_index("idx_users_id").unwrap();
2591        assert_eq!(index.catalog_name, "main");
2592        assert_eq!(index.namespace_name, "analytics");
2593    }
2594
2595    #[test]
2596    fn overlay_catalog_get_and_list() {
2597        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2598        let mut catalog = PersistentCatalog::new(store);
2599        catalog
2600            .create_catalog(CatalogMeta {
2601                name: "main".to_string(),
2602                comment: None,
2603                storage_root: None,
2604            })
2605            .unwrap();
2606
2607        let mut overlay = CatalogOverlay::new();
2608        overlay.add_catalog(CatalogMeta {
2609            name: "temp".to_string(),
2610            comment: None,
2611            storage_root: None,
2612        });
2613        overlay.drop_catalog("main");
2614
2615        assert!(catalog.get_catalog_in_txn("main", &overlay).is_none());
2616        assert!(catalog.get_catalog_in_txn("temp", &overlay).is_some());
2617
2618        let names: Vec<String> = catalog
2619            .list_catalogs_in_txn(&overlay)
2620            .into_iter()
2621            .map(|meta| meta.name)
2622            .collect();
2623        assert_eq!(names, vec!["temp".to_string()]);
2624    }
2625
2626    #[test]
2627    fn overlay_namespace_get_and_list() {
2628        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2629        let mut catalog = PersistentCatalog::new(store);
2630        catalog
2631            .create_catalog(CatalogMeta {
2632                name: "main".to_string(),
2633                comment: None,
2634                storage_root: None,
2635            })
2636            .unwrap();
2637        catalog
2638            .create_namespace(NamespaceMeta {
2639                name: "default".to_string(),
2640                catalog_name: "main".to_string(),
2641                comment: None,
2642                storage_root: None,
2643            })
2644            .unwrap();
2645
2646        let mut overlay = CatalogOverlay::new();
2647        overlay.add_namespace(NamespaceMeta {
2648            name: "analytics".to_string(),
2649            catalog_name: "main".to_string(),
2650            comment: None,
2651            storage_root: None,
2652        });
2653        overlay.drop_namespace("main", "default");
2654
2655        assert!(
2656            catalog
2657                .get_namespace_in_txn("main", "default", &overlay)
2658                .is_none()
2659        );
2660        assert!(
2661            catalog
2662                .get_namespace_in_txn("main", "analytics", &overlay)
2663                .is_some()
2664        );
2665
2666        let names: Vec<String> = catalog
2667            .list_namespaces_in_txn("main", &overlay)
2668            .into_iter()
2669            .map(|meta| meta.name)
2670            .collect();
2671        assert_eq!(names, vec!["analytics".to_string()]);
2672    }
2673
2674    #[test]
2675    fn overlay_table_and_index_list() {
2676        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2677        let mut catalog = PersistentCatalog::new(store);
2678
2679        let mut users = test_table("users", 1);
2680        users.catalog_name = "main".to_string();
2681        users.namespace_name = "default".to_string();
2682        catalog.inner.insert_table_unchecked(users.clone());
2683
2684        let mut users_index =
2685            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2686                .with_column_indices(vec![0]);
2687        users_index.catalog_name = "main".to_string();
2688        users_index.namespace_name = "default".to_string();
2689        catalog.inner.insert_index_unchecked(users_index);
2690
2691        let mut overlay = CatalogOverlay::new();
2692
2693        let mut orders = test_table("orders", 2);
2694        orders.catalog_name = "main".to_string();
2695        orders.namespace_name = "default".to_string();
2696        overlay.add_table(TableFqn::from(&orders), orders.clone());
2697
2698        let mut orders_index =
2699            IndexMetadata::new(2, "idx_orders_id", "orders", vec!["id".to_string()])
2700                .with_column_indices(vec![0]);
2701        orders_index.catalog_name = "main".to_string();
2702        orders_index.namespace_name = "default".to_string();
2703        overlay.add_index(IndexFqn::from(&orders_index), orders_index);
2704
2705        overlay.drop_table(&TableFqn::from(&users));
2706
2707        let table_names: Vec<String> = catalog
2708            .list_tables_in_txn("main", "default", &overlay)
2709            .into_iter()
2710            .map(|table| table.name)
2711            .collect();
2712        assert_eq!(table_names, vec!["orders".to_string()]);
2713
2714        let users_fqn = TableFqn::new("main", "default", "users");
2715        assert!(catalog.list_indexes_in_txn(&users_fqn, &overlay).is_empty());
2716
2717        let orders_fqn = TableFqn::new("main", "default", "orders");
2718        let index_names: Vec<String> = catalog
2719            .list_indexes_in_txn(&orders_fqn, &overlay)
2720            .into_iter()
2721            .map(|index| index.name)
2722            .collect();
2723        assert_eq!(index_names, vec!["idx_orders_id".to_string()]);
2724    }
2725
2726    #[test]
2727    fn overlay_name_lookup_ambiguous_returns_none() {
2728        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2729        let catalog = PersistentCatalog::new(store);
2730
2731        let mut overlay = CatalogOverlay::new();
2732
2733        let mut users_default = test_table("users", 1);
2734        users_default.catalog_name = "main".to_string();
2735        users_default.namespace_name = "default".to_string();
2736        overlay.add_table(TableFqn::from(&users_default), users_default);
2737
2738        let mut users_analytics = test_table("users", 2);
2739        users_analytics.catalog_name = "main".to_string();
2740        users_analytics.namespace_name = "analytics".to_string();
2741        overlay.add_table(TableFqn::from(&users_analytics), users_analytics);
2742
2743        assert!(catalog.get_table_in_txn("users", &overlay).is_none());
2744        assert!(!catalog.table_exists_in_txn("users", &overlay));
2745
2746        let mut idx_default =
2747            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2748                .with_column_indices(vec![0]);
2749        idx_default.catalog_name = "main".to_string();
2750        idx_default.namespace_name = "default".to_string();
2751        overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2752
2753        let mut idx_analytics =
2754            IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2755                .with_column_indices(vec![0]);
2756        idx_analytics.catalog_name = "main".to_string();
2757        idx_analytics.namespace_name = "analytics".to_string();
2758        overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2759
2760        assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
2761        assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
2762    }
2763
2764    #[test]
2765    fn overlay_name_lookup_ambiguous_with_base_returns_none() {
2766        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2767        let mut catalog = PersistentCatalog::new(store);
2768
2769        let mut overlay = CatalogOverlay::new();
2770
2771        let mut base_users = test_table("users", 1);
2772        base_users.catalog_name = "main".to_string();
2773        base_users.namespace_name = "default".to_string();
2774        catalog.inner.insert_table_unchecked(base_users);
2775
2776        let mut overlay_users = test_table("users", 2);
2777        overlay_users.catalog_name = "main".to_string();
2778        overlay_users.namespace_name = "analytics".to_string();
2779        overlay.add_table(TableFqn::from(&overlay_users), overlay_users);
2780
2781        assert!(catalog.get_table_in_txn("users", &overlay).is_none());
2782        assert!(!catalog.table_exists_in_txn("users", &overlay));
2783
2784        let mut base_index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2785            .with_column_indices(vec![0]);
2786        base_index.catalog_name = "main".to_string();
2787        base_index.namespace_name = "default".to_string();
2788        catalog.inner.insert_index_unchecked(base_index);
2789
2790        let mut overlay_index =
2791            IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2792                .with_column_indices(vec![0]);
2793        overlay_index.catalog_name = "main".to_string();
2794        overlay_index.namespace_name = "analytics".to_string();
2795        overlay.add_index(IndexFqn::from(&overlay_index), overlay_index);
2796
2797        assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
2798        assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
2799    }
2800
2801    #[test]
2802    fn overlay_fqn_tables_separate_namespaces() {
2803        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2804        let catalog = PersistentCatalog::new(store);
2805
2806        let mut overlay = CatalogOverlay::new();
2807
2808        let mut users_default = test_table("users", 1);
2809        users_default.catalog_name = "main".to_string();
2810        users_default.namespace_name = "default".to_string();
2811        overlay.add_table(TableFqn::from(&users_default), users_default.clone());
2812
2813        let mut users_analytics = test_table("users", 2);
2814        users_analytics.catalog_name = "main".to_string();
2815        users_analytics.namespace_name = "analytics".to_string();
2816        overlay.add_table(TableFqn::from(&users_analytics), users_analytics.clone());
2817
2818        let default_tables: Vec<String> = catalog
2819            .list_tables_in_txn("main", "default", &overlay)
2820            .into_iter()
2821            .map(|table| table.name)
2822            .collect();
2823        assert_eq!(default_tables, vec!["users".to_string()]);
2824
2825        let analytics_tables: Vec<String> = catalog
2826            .list_tables_in_txn("main", "analytics", &overlay)
2827            .into_iter()
2828            .map(|table| table.name)
2829            .collect();
2830        assert_eq!(analytics_tables, vec!["users".to_string()]);
2831
2832        overlay.drop_table(&TableFqn::from(&users_default));
2833
2834        let default_tables_after: Vec<String> = catalog
2835            .list_tables_in_txn("main", "default", &overlay)
2836            .into_iter()
2837            .map(|table| table.name)
2838            .collect();
2839        assert!(default_tables_after.is_empty());
2840
2841        let analytics_tables_after: Vec<String> = catalog
2842            .list_tables_in_txn("main", "analytics", &overlay)
2843            .into_iter()
2844            .map(|table| table.name)
2845            .collect();
2846        assert_eq!(analytics_tables_after, vec!["users".to_string()]);
2847    }
2848
2849    #[test]
2850    fn overlay_fqn_indexes_separate_namespaces() {
2851        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2852        let catalog = PersistentCatalog::new(store);
2853
2854        let mut overlay = CatalogOverlay::new();
2855
2856        let mut users_default = test_table("users", 1);
2857        users_default.catalog_name = "main".to_string();
2858        users_default.namespace_name = "default".to_string();
2859        overlay.add_table(TableFqn::from(&users_default), users_default.clone());
2860
2861        let mut users_analytics = test_table("users", 2);
2862        users_analytics.catalog_name = "main".to_string();
2863        users_analytics.namespace_name = "analytics".to_string();
2864        overlay.add_table(TableFqn::from(&users_analytics), users_analytics.clone());
2865
2866        let mut idx_default =
2867            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2868                .with_column_indices(vec![0]);
2869        idx_default.catalog_name = "main".to_string();
2870        idx_default.namespace_name = "default".to_string();
2871        overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2872
2873        let mut idx_analytics =
2874            IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2875                .with_column_indices(vec![0]);
2876        idx_analytics.catalog_name = "main".to_string();
2877        idx_analytics.namespace_name = "analytics".to_string();
2878        overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2879
2880        let default_fqn = TableFqn::new("main", "default", "users");
2881        let analytics_fqn = TableFqn::new("main", "analytics", "users");
2882
2883        let default_indexes: Vec<String> = catalog
2884            .list_indexes_in_txn(&default_fqn, &overlay)
2885            .into_iter()
2886            .map(|index| index.name)
2887            .collect();
2888        assert_eq!(default_indexes, vec!["idx_users_id".to_string()]);
2889
2890        let analytics_indexes: Vec<String> = catalog
2891            .list_indexes_in_txn(&analytics_fqn, &overlay)
2892            .into_iter()
2893            .map(|index| index.name)
2894            .collect();
2895        assert_eq!(analytics_indexes, vec!["idx_users_id".to_string()]);
2896    }
2897
2898    #[test]
2899    fn persist_overlay_rejects_duplicate_table_names() {
2900        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2901        let mut catalog = PersistentCatalog::new(store.clone());
2902
2903        let mut overlay = CatalogOverlay::new();
2904
2905        let mut users_default = test_table("users", 1);
2906        users_default.catalog_name = "main".to_string();
2907        users_default.namespace_name = "default".to_string();
2908        overlay.add_table(TableFqn::from(&users_default), users_default);
2909
2910        let mut users_analytics = test_table("users", 2);
2911        users_analytics.catalog_name = "main".to_string();
2912        users_analytics.namespace_name = "analytics".to_string();
2913        overlay.add_table(TableFqn::from(&users_analytics), users_analytics);
2914
2915        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2916        let err = catalog.persist_overlay(&mut txn, &overlay).unwrap_err();
2917        assert!(matches!(err, CatalogError::InvalidKey(_)));
2918        txn.rollback_self().unwrap();
2919    }
2920
2921    #[test]
2922    fn persist_overlay_rejects_duplicate_index_names() {
2923        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2924        let mut catalog = PersistentCatalog::new(store.clone());
2925
2926        let mut overlay = CatalogOverlay::new();
2927
2928        let mut idx_default =
2929            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2930                .with_column_indices(vec![0]);
2931        idx_default.catalog_name = "main".to_string();
2932        idx_default.namespace_name = "default".to_string();
2933        overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2934
2935        let mut idx_analytics =
2936            IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2937                .with_column_indices(vec![0]);
2938        idx_analytics.catalog_name = "main".to_string();
2939        idx_analytics.namespace_name = "analytics".to_string();
2940        overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2941
2942        let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2943        let err = catalog.persist_overlay(&mut txn, &overlay).unwrap_err();
2944        assert!(matches!(err, CatalogError::InvalidKey(_)));
2945        txn.rollback_self().unwrap();
2946    }
2947
2948    #[test]
2949    fn overlay_drop_cascade_namespace_removes_children() {
2950        let mut overlay = CatalogOverlay::new();
2951
2952        overlay.add_namespace(NamespaceMeta {
2953            name: "default".to_string(),
2954            catalog_name: "main".to_string(),
2955            comment: None,
2956            storage_root: None,
2957        });
2958
2959        let mut users = test_table("users", 1);
2960        users.catalog_name = "main".to_string();
2961        users.namespace_name = "default".to_string();
2962        overlay.add_table(TableFqn::from(&users), users.clone());
2963
2964        let mut users_index =
2965            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2966                .with_column_indices(vec![0]);
2967        users_index.catalog_name = "main".to_string();
2968        users_index.namespace_name = "default".to_string();
2969        let users_index_fqn = IndexFqn::from(&users_index);
2970        overlay.add_index(users_index_fqn.clone(), users_index);
2971
2972        overlay.drop_cascade_namespace("main", "default");
2973
2974        assert!(
2975            overlay
2976                .dropped_namespaces
2977                .contains(&("main".to_string(), "default".to_string()))
2978        );
2979        assert!(overlay.dropped_tables.contains(&TableFqn::from(&users)));
2980        assert!(overlay.dropped_indexes.contains(&users_index_fqn));
2981        assert!(!overlay.added_tables.contains_key(&TableFqn::from(&users)));
2982        assert!(!overlay.added_indexes.contains_key(&users_index_fqn));
2983    }
2984
2985    #[test]
2986    fn overlay_drop_cascade_catalog_removes_children() {
2987        let mut overlay = CatalogOverlay::new();
2988
2989        overlay.add_catalog(CatalogMeta {
2990            name: "main".to_string(),
2991            comment: None,
2992            storage_root: None,
2993        });
2994        overlay.add_namespace(NamespaceMeta {
2995            name: "default".to_string(),
2996            catalog_name: "main".to_string(),
2997            comment: None,
2998            storage_root: None,
2999        });
3000
3001        let mut users = test_table("users", 1);
3002        users.catalog_name = "main".to_string();
3003        users.namespace_name = "default".to_string();
3004        overlay.add_table(TableFqn::from(&users), users.clone());
3005
3006        let mut users_index =
3007            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3008                .with_column_indices(vec![0]);
3009        users_index.catalog_name = "main".to_string();
3010        users_index.namespace_name = "default".to_string();
3011        let users_index_fqn = IndexFqn::from(&users_index);
3012        overlay.add_index(users_index_fqn.clone(), users_index);
3013
3014        overlay.drop_cascade_catalog("main");
3015
3016        assert!(overlay.dropped_catalogs.contains("main"));
3017        assert!(overlay.dropped_tables.contains(&TableFqn::from(&users)));
3018        assert!(overlay.dropped_indexes.contains(&users_index_fqn));
3019        assert!(!overlay.added_tables.contains_key(&TableFqn::from(&users)));
3020        assert!(!overlay.added_indexes.contains_key(&users_index_fqn));
3021    }
3022
3023    #[test]
3024    fn dropped_namespace_hides_tables_and_indexes() {
3025        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
3026        let mut catalog = PersistentCatalog::new(store);
3027
3028        let mut users = test_table("users", 1);
3029        users.catalog_name = "main".to_string();
3030        users.namespace_name = "default".to_string();
3031        catalog.inner.insert_table_unchecked(users);
3032
3033        let mut users_index =
3034            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3035                .with_column_indices(vec![0]);
3036        users_index.catalog_name = "main".to_string();
3037        users_index.namespace_name = "default".to_string();
3038        catalog.inner.insert_index_unchecked(users_index);
3039
3040        let mut overlay = CatalogOverlay::new();
3041        overlay.drop_namespace("main", "default");
3042
3043        let table_names: Vec<String> = catalog
3044            .list_tables_in_txn("main", "default", &overlay)
3045            .into_iter()
3046            .map(|table| table.name)
3047            .collect();
3048        assert!(table_names.is_empty());
3049
3050        let fqn = TableFqn {
3051            catalog: "main".to_string(),
3052            namespace: "default".to_string(),
3053            table: "users".to_string(),
3054        };
3055        let index_names: Vec<String> = catalog
3056            .list_indexes_in_txn(&fqn, &overlay)
3057            .into_iter()
3058            .map(|index| index.name)
3059            .collect();
3060        assert!(index_names.is_empty());
3061    }
3062
3063    #[test]
3064    fn dropped_namespace_hides_get_and_exists() {
3065        let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
3066        let mut catalog = PersistentCatalog::new(store);
3067
3068        let mut users = test_table("users", 1);
3069        users.catalog_name = "main".to_string();
3070        users.namespace_name = "default".to_string();
3071        catalog.inner.insert_table_unchecked(users);
3072
3073        let mut users_index =
3074            IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3075                .with_column_indices(vec![0]);
3076        users_index.catalog_name = "main".to_string();
3077        users_index.namespace_name = "default".to_string();
3078        catalog.inner.insert_index_unchecked(users_index);
3079
3080        let mut overlay = CatalogOverlay::new();
3081        overlay.drop_namespace("main", "default");
3082
3083        assert!(!catalog.table_exists_in_txn("users", &overlay));
3084        assert!(catalog.get_table_in_txn("users", &overlay).is_none());
3085        assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
3086        assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
3087
3088        let view = TxnCatalogView::new(&catalog, &overlay);
3089        assert!(view.get_indexes_for_table("users").is_empty());
3090    }
3091
3092    #[test]
3093    fn persisted_catalog_meta_roundtrip() {
3094        let meta = PersistedCatalogMeta {
3095            name: "main".to_string(),
3096            comment: Some("primary catalog".to_string()),
3097            storage_root: Some("/tmp/alopex".to_string()),
3098        };
3099        let bytes = bincode::serialize(&meta).unwrap();
3100        let decoded: PersistedCatalogMeta = bincode::deserialize(&bytes).unwrap();
3101        assert_eq!(meta, decoded);
3102    }
3103
3104    #[test]
3105    fn persisted_namespace_meta_roundtrip() {
3106        let meta = PersistedNamespaceMeta {
3107            name: "analytics".to_string(),
3108            catalog_name: "main".to_string(),
3109            comment: Some("warehouse".to_string()),
3110            storage_root: Some("s3://bucket/ns".to_string()),
3111        };
3112        let bytes = bincode::serialize(&meta).unwrap();
3113        let decoded: PersistedNamespaceMeta = bincode::deserialize(&bytes).unwrap();
3114        assert_eq!(meta, decoded);
3115    }
3116
3117    #[test]
3118    fn table_fqn_hash_and_eq() {
3119        let first = TableFqn {
3120            catalog: "main".to_string(),
3121            namespace: "default".to_string(),
3122            table: "users".to_string(),
3123        };
3124        let same = TableFqn {
3125            catalog: "main".to_string(),
3126            namespace: "default".to_string(),
3127            table: "users".to_string(),
3128        };
3129        let different = TableFqn {
3130            catalog: "main".to_string(),
3131            namespace: "default".to_string(),
3132            table: "orders".to_string(),
3133        };
3134
3135        let mut set = HashSet::new();
3136        set.insert(first);
3137        assert!(set.contains(&same));
3138        assert!(!set.contains(&different));
3139    }
3140
3141    #[test]
3142    fn index_fqn_hash_and_eq() {
3143        let first = IndexFqn {
3144            catalog: "main".to_string(),
3145            namespace: "default".to_string(),
3146            table: "users".to_string(),
3147            index: "idx_users_id".to_string(),
3148        };
3149        let same = IndexFqn {
3150            catalog: "main".to_string(),
3151            namespace: "default".to_string(),
3152            table: "users".to_string(),
3153            index: "idx_users_id".to_string(),
3154        };
3155        let different = IndexFqn {
3156            catalog: "main".to_string(),
3157            namespace: "default".to_string(),
3158            table: "users".to_string(),
3159            index: "idx_users_email".to_string(),
3160        };
3161
3162        let mut set = HashSet::new();
3163        set.insert(first);
3164        assert!(set.contains(&same));
3165        assert!(!set.contains(&different));
3166    }
3167
3168    #[test]
3169    fn table_type_and_data_source_format_serde() {
3170        let managed = serde_json::to_string(&TableType::Managed).unwrap();
3171        let external = serde_json::to_string(&TableType::External).unwrap();
3172        let alopex = serde_json::to_string(&DataSourceFormat::Alopex).unwrap();
3173        let parquet = serde_json::to_string(&DataSourceFormat::Parquet).unwrap();
3174        let delta = serde_json::to_string(&DataSourceFormat::Delta).unwrap();
3175
3176        assert_eq!(managed, "\"MANAGED\"");
3177        assert_eq!(external, "\"EXTERNAL\"");
3178        assert_eq!(alopex, "\"ALOPEX\"");
3179        assert_eq!(parquet, "\"PARQUET\"");
3180        assert_eq!(delta, "\"DELTA\"");
3181    }
3182}