Skip to main content

alopex_embedded/
catalog_api.rs

1//! Catalog API 向けの公開型定義。
2
3use std::collections::HashMap;
4
5use alopex_core::{KVStore, KVTransaction};
6use alopex_sql::ast::ddl::{DataType, IndexMethod, VectorMetric};
7use alopex_sql::catalog::persistent::{CatalogMeta, NamespaceMeta, TableFqn};
8use alopex_sql::catalog::{
9    Catalog, CatalogOverlay, ColumnMetadata, Compression, IndexMetadata, StorageOptions,
10    StorageType, TableMetadata,
11};
12use alopex_sql::planner::types::ResolvedType;
13use alopex_sql::{DataSourceFormat, TableType};
14
15use crate::{Database, Error, Result, Transaction, TxnMode};
16
17/// Catalog 情報(公開 API 返却用)。
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct CatalogInfo {
20    /// Catalog 名。
21    pub name: String,
22    /// コメント。
23    pub comment: Option<String>,
24    /// ストレージルート。
25    pub storage_root: Option<String>,
26}
27
28impl From<CatalogMeta> for CatalogInfo {
29    fn from(value: CatalogMeta) -> Self {
30        Self {
31            name: value.name,
32            comment: value.comment,
33            storage_root: value.storage_root,
34        }
35    }
36}
37
38/// Namespace 情報(公開 API 返却用)。
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct NamespaceInfo {
41    /// Namespace 名。
42    pub name: String,
43    /// 所属 Catalog 名。
44    pub catalog_name: String,
45    /// コメント。
46    pub comment: Option<String>,
47    /// ストレージルート。
48    pub storage_root: Option<String>,
49}
50
51impl From<NamespaceMeta> for NamespaceInfo {
52    fn from(value: NamespaceMeta) -> Self {
53        Self {
54            name: value.name,
55            catalog_name: value.catalog_name,
56            comment: value.comment,
57            storage_root: value.storage_root,
58        }
59    }
60}
61
62/// カラム情報(公開 API 返却用)。
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct ColumnInfo {
65    /// カラム名。
66    pub name: String,
67    /// データ型(例: "INTEGER", "TEXT", "VECTOR(128, COSINE)")。
68    pub data_type: String,
69    /// NULL 許可。
70    pub nullable: bool,
71    /// 主キーの一部かどうか。
72    pub is_primary_key: bool,
73    /// コメント。
74    pub comment: Option<String>,
75}
76
77/// ストレージ設定情報(TableInfo 返却用)。
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct StorageInfo {
80    /// ストレージ種別("row" | "columnar")。
81    pub storage_type: String,
82    /// 圧縮方式("none" | "lz4" | "zstd")。
83    pub compression: String,
84}
85
86impl Default for StorageInfo {
87    fn default() -> Self {
88        Self {
89            storage_type: "row".to_string(),
90            compression: "none".to_string(),
91        }
92    }
93}
94
95impl From<&StorageOptions> for StorageInfo {
96    fn from(value: &StorageOptions) -> Self {
97        let storage_type = match value.storage_type {
98            StorageType::Row => "row",
99            StorageType::Columnar => "columnar",
100        };
101        let compression = match value.compression {
102            Compression::None => "none",
103            Compression::Lz4 => "lz4",
104            Compression::Zstd => "zstd",
105        };
106        Self {
107            storage_type: storage_type.to_string(),
108            compression: compression.to_string(),
109        }
110    }
111}
112
113/// テーブル情報(公開 API 返却用)。
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct TableInfo {
116    /// テーブル名。
117    pub name: String,
118    /// 所属 Catalog 名。
119    pub catalog_name: String,
120    /// 所属 Namespace 名。
121    pub namespace_name: String,
122    /// テーブル ID。
123    pub table_id: u32,
124    /// テーブル種別。
125    pub table_type: TableType,
126    /// カラム情報。
127    pub columns: Vec<ColumnInfo>,
128    /// 主キー。
129    pub primary_key: Option<Vec<String>>,
130    /// ストレージロケーション。
131    pub storage_location: Option<String>,
132    /// データソース形式。
133    pub data_source_format: DataSourceFormat,
134    /// ストレージ設定。
135    pub storage_options: StorageInfo,
136    /// コメント。
137    pub comment: Option<String>,
138    /// カスタムプロパティ。
139    pub properties: HashMap<String, String>,
140}
141
142impl From<&TableMetadata> for TableInfo {
143    fn from(value: &TableMetadata) -> Self {
144        let primary_key = value.primary_key.clone();
145        let columns = value
146            .columns
147            .iter()
148            .map(|column| ColumnInfo {
149                name: column.name.clone(),
150                data_type: resolved_type_to_string(&column.data_type),
151                nullable: !column.not_null,
152                is_primary_key: column.primary_key
153                    || primary_key
154                        .as_ref()
155                        .map(|keys| keys.iter().any(|name| name == &column.name))
156                        .unwrap_or(false),
157                comment: None,
158            })
159            .collect();
160        let storage_options = if value.storage_options == StorageOptions::default() {
161            StorageInfo::default()
162        } else {
163            StorageInfo::from(&value.storage_options)
164        };
165
166        Self {
167            name: value.name.clone(),
168            catalog_name: value.catalog_name.clone(),
169            namespace_name: value.namespace_name.clone(),
170            table_id: value.table_id,
171            table_type: value.table_type,
172            columns,
173            primary_key,
174            storage_location: value.storage_location.clone(),
175            data_source_format: value.data_source_format,
176            storage_options,
177            comment: value.comment.clone(),
178            properties: value.properties.clone(),
179        }
180    }
181}
182
183impl From<TableMetadata> for TableInfo {
184    fn from(value: TableMetadata) -> Self {
185        Self::from(&value)
186    }
187}
188
189/// インデックス情報(公開 API 返却用)。
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct IndexInfo {
192    /// インデックス名。
193    pub name: String,
194    /// インデックス ID。
195    pub index_id: u32,
196    /// 所属 Catalog 名。
197    pub catalog_name: String,
198    /// 所属 Namespace 名。
199    pub namespace_name: String,
200    /// 対象テーブル名。
201    pub table_name: String,
202    /// 対象カラム名。
203    pub columns: Vec<String>,
204    /// インデックス方式("btree" | "hnsw")。
205    pub method: String,
206    /// ユニーク制約。
207    pub is_unique: bool,
208}
209
210impl From<&IndexMetadata> for IndexInfo {
211    fn from(value: &IndexMetadata) -> Self {
212        let method = match value.method {
213            Some(IndexMethod::BTree) | None => "btree",
214            Some(IndexMethod::Hnsw) => "hnsw",
215        };
216        Self {
217            name: value.name.clone(),
218            index_id: value.index_id,
219            catalog_name: value.catalog_name.clone(),
220            namespace_name: value.namespace_name.clone(),
221            table_name: value.table.clone(),
222            columns: value.columns.clone(),
223            method: method.to_string(),
224            is_unique: value.unique,
225        }
226    }
227}
228
229impl From<IndexMetadata> for IndexInfo {
230    fn from(value: IndexMetadata) -> Self {
231        Self::from(&value)
232    }
233}
234
235/// Catalog 作成リクエスト。
236#[derive(Debug, Clone, PartialEq, Eq)]
237pub struct CreateCatalogRequest {
238    /// Catalog 名。
239    pub name: String,
240    /// コメント。
241    pub comment: Option<String>,
242    /// ストレージルート。
243    pub storage_root: Option<String>,
244}
245
246impl CreateCatalogRequest {
247    /// 必須フィールドを指定して作成する。
248    pub fn new(name: impl Into<String>) -> Self {
249        Self {
250            name: name.into(),
251            comment: None,
252            storage_root: None,
253        }
254    }
255
256    /// コメントを指定する。
257    pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
258        self.comment = Some(comment.into());
259        self
260    }
261
262    /// ストレージルートを指定する。
263    pub fn with_storage_root(mut self, storage_root: impl Into<String>) -> Self {
264        self.storage_root = Some(storage_root.into());
265        self
266    }
267
268    /// 必須フィールドを検証して返す。
269    pub fn build(self) -> Result<Self> {
270        validate_required(&self.name, "catalog 名")?;
271        Ok(self)
272    }
273}
274
275/// Namespace 作成リクエスト。
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct CreateNamespaceRequest {
278    /// 所属 Catalog 名。
279    pub catalog_name: String,
280    /// Namespace 名。
281    pub name: String,
282    /// コメント。
283    pub comment: Option<String>,
284    /// ストレージルート。
285    pub storage_root: Option<String>,
286}
287
288impl CreateNamespaceRequest {
289    /// 必須フィールドを指定して作成する。
290    pub fn new(catalog_name: impl Into<String>, name: impl Into<String>) -> Self {
291        Self {
292            catalog_name: catalog_name.into(),
293            name: name.into(),
294            comment: None,
295            storage_root: None,
296        }
297    }
298
299    /// コメントを指定する。
300    pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
301        self.comment = Some(comment.into());
302        self
303    }
304
305    /// ストレージルートを指定する。
306    pub fn with_storage_root(mut self, storage_root: impl Into<String>) -> Self {
307        self.storage_root = Some(storage_root.into());
308        self
309    }
310
311    /// 必須フィールドを検証して返す。
312    pub fn build(self) -> Result<Self> {
313        validate_required(&self.catalog_name, "catalog 名")?;
314        validate_required(&self.name, "namespace 名")?;
315        Ok(self)
316    }
317}
318
319/// テーブル作成リクエスト。
320#[derive(Debug, Clone)]
321pub struct CreateTableRequest {
322    /// Catalog 名(既定: "default")。
323    pub catalog_name: String,
324    /// Namespace 名(既定: "default")。
325    pub namespace_name: String,
326    /// テーブル名。
327    pub name: String,
328    /// スキーマ。
329    pub schema: Option<Vec<ColumnDefinition>>,
330    /// テーブル種別(既定: Managed)。
331    pub table_type: TableType,
332    /// データソース形式(None の場合は Alopex)。
333    pub data_source_format: Option<DataSourceFormat>,
334    /// 主キー。
335    pub primary_key: Option<Vec<String>>,
336    /// ストレージルート。
337    pub storage_root: Option<String>,
338    /// ストレージオプション。
339    pub storage_options: Option<StorageOptions>,
340    /// コメント。
341    pub comment: Option<String>,
342    /// カスタムプロパティ(None の場合は空の HashMap)。
343    pub properties: Option<HashMap<String, String>>,
344}
345
346impl CreateTableRequest {
347    /// 必須フィールドを指定して作成する。
348    pub fn new(name: impl Into<String>) -> Self {
349        Self {
350            catalog_name: "default".to_string(),
351            namespace_name: "default".to_string(),
352            name: name.into(),
353            schema: None,
354            table_type: TableType::Managed,
355            data_source_format: None,
356            primary_key: None,
357            storage_root: None,
358            storage_options: None,
359            comment: None,
360            properties: None,
361        }
362    }
363
364    /// Catalog 名を指定する。
365    pub fn with_catalog_name(mut self, catalog_name: impl Into<String>) -> Self {
366        self.catalog_name = catalog_name.into();
367        self
368    }
369
370    /// Namespace 名を指定する。
371    pub fn with_namespace_name(mut self, namespace_name: impl Into<String>) -> Self {
372        self.namespace_name = namespace_name.into();
373        self
374    }
375
376    /// スキーマを指定する。
377    pub fn with_schema(mut self, schema: Vec<ColumnDefinition>) -> Self {
378        self.schema = Some(schema);
379        self
380    }
381
382    /// テーブル種別を指定する。
383    pub fn with_table_type(mut self, table_type: TableType) -> Self {
384        self.table_type = table_type;
385        self
386    }
387
388    /// データソース形式を指定する。
389    pub fn with_data_source_format(mut self, data_source_format: DataSourceFormat) -> Self {
390        self.data_source_format = Some(data_source_format);
391        self
392    }
393
394    /// 主キーを指定する。
395    pub fn with_primary_key(mut self, primary_key: Vec<String>) -> Self {
396        self.primary_key = Some(primary_key);
397        self
398    }
399
400    /// ストレージルートを指定する。
401    pub fn with_storage_root(mut self, storage_root: impl Into<String>) -> Self {
402        self.storage_root = Some(storage_root.into());
403        self
404    }
405
406    /// ストレージオプションを指定する。
407    pub fn with_storage_options(mut self, storage_options: StorageOptions) -> Self {
408        self.storage_options = Some(storage_options);
409        self
410    }
411
412    /// コメントを指定する。
413    pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
414        self.comment = Some(comment.into());
415        self
416    }
417
418    /// カスタムプロパティを指定する。
419    pub fn with_properties(mut self, properties: HashMap<String, String>) -> Self {
420        self.properties = Some(properties);
421        self
422    }
423
424    /// 必須フィールドを検証して返す。
425    pub fn build(mut self) -> Result<Self> {
426        validate_required(&self.catalog_name, "catalog 名")?;
427        validate_required(&self.namespace_name, "namespace 名")?;
428        validate_required(&self.name, "table 名")?;
429
430        if self.table_type == TableType::Managed && self.schema.is_none() {
431            return Err(Error::SchemaRequired);
432        }
433        if self.table_type == TableType::External && self.storage_root.is_none() {
434            return Err(Error::StorageRootRequired);
435        }
436
437        if self.data_source_format.is_none() {
438            self.data_source_format = Some(DataSourceFormat::Alopex);
439        }
440        if self.properties.is_none() {
441            self.properties = Some(HashMap::new());
442        }
443        Ok(self)
444    }
445}
446
447/// カラム定義。
448#[derive(Debug, Clone)]
449pub struct ColumnDefinition {
450    /// カラム名。
451    pub name: String,
452    /// データ型。
453    pub data_type: DataType,
454    /// NULL 許可(既定: true)。
455    pub nullable: bool,
456    /// コメント。
457    pub comment: Option<String>,
458}
459
460impl ColumnDefinition {
461    /// 必須フィールドを指定して作成する。
462    pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
463        Self {
464            name: name.into(),
465            data_type,
466            nullable: true,
467            comment: None,
468        }
469    }
470
471    /// NULL 許可を指定する。
472    pub fn with_nullable(mut self, nullable: bool) -> Self {
473        self.nullable = nullable;
474        self
475    }
476
477    /// コメントを指定する。
478    pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
479        self.comment = Some(comment.into());
480        self
481    }
482}
483
484impl Database {
485    /// Catalog 一覧を取得する。
486    pub fn list_catalogs(&self) -> Result<Vec<CatalogInfo>> {
487        let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
488        Ok(catalog
489            .list_catalogs()
490            .into_iter()
491            .map(CatalogInfo::from)
492            .collect())
493    }
494
495    /// Catalog を取得する。
496    pub fn get_catalog(&self, name: &str) -> Result<CatalogInfo> {
497        let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
498        let meta = catalog
499            .get_catalog(name)
500            .ok_or_else(|| Error::CatalogNotFound(name.to_string()))?;
501        Ok(meta.into())
502    }
503
504    /// Namespace 一覧を取得する。
505    pub fn list_namespaces(&self, catalog_name: &str) -> Result<Vec<NamespaceInfo>> {
506        let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
507        ensure_catalog_exists(&*catalog, catalog_name)?;
508        Ok(catalog
509            .list_namespaces(catalog_name)
510            .into_iter()
511            .map(NamespaceInfo::from)
512            .collect())
513    }
514
515    /// Namespace を取得する。
516    pub fn get_namespace(&self, catalog_name: &str, namespace_name: &str) -> Result<NamespaceInfo> {
517        let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
518        ensure_catalog_exists(&*catalog, catalog_name)?;
519        let meta = catalog
520            .get_namespace(catalog_name, namespace_name)
521            .ok_or_else(|| {
522                Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
523            })?;
524        Ok(meta.into())
525    }
526
527    /// テーブル一覧を取得する。
528    pub fn list_tables(&self, catalog_name: &str, namespace_name: &str) -> Result<Vec<TableInfo>> {
529        let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
530        ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
531        let namespace = catalog
532            .get_namespace(catalog_name, namespace_name)
533            .ok_or_else(|| {
534                Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
535            })?;
536
537        let overlay = CatalogOverlay::new();
538        let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, &overlay);
539        Ok(tables
540            .into_iter()
541            .map(|table| {
542                let info = TableInfo::from(table);
543                apply_storage_location(info, namespace.storage_root.as_deref())
544            })
545            .collect())
546    }
547
548    /// デフォルト catalog/namespace のテーブル一覧を取得する。
549    pub fn list_tables_simple(&self) -> Result<Vec<TableInfo>> {
550        self.list_tables("default", "default")
551    }
552
553    /// テーブル情報を取得する。
554    pub fn get_table_info(
555        &self,
556        catalog_name: &str,
557        namespace_name: &str,
558        table_name: &str,
559    ) -> Result<TableInfo> {
560        let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
561        ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
562        let namespace = catalog
563            .get_namespace(catalog_name, namespace_name)
564            .ok_or_else(|| {
565                Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
566            })?;
567
568        let overlay = CatalogOverlay::new();
569        let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, &overlay);
570        let table = tables
571            .into_iter()
572            .find(|table| table.name == table_name)
573            .ok_or_else(|| {
574                Error::TableNotFound(table_full_name(catalog_name, namespace_name, table_name))
575            })?;
576
577        let info = TableInfo::from(table);
578        Ok(apply_storage_location(
579            info,
580            namespace.storage_root.as_deref(),
581        ))
582    }
583
584    /// デフォルト catalog/namespace のテーブル情報を取得する。
585    pub fn get_table_info_simple(&self, table_name: &str) -> Result<TableInfo> {
586        self.get_table_info("default", "default", table_name)
587    }
588
589    /// テーブル情報をキャッシュから取得する(キャッシュミス時は DB ルックアップ)。
590    ///
591    /// パフォーマンス最適化のため、キャッシュを使用してテーブルメタデータを取得する。
592    /// DDL 操作(テーブル作成/削除など)が発生するとキャッシュは自動的に無効化される。
593    pub fn get_table_info_cached(
594        &self,
595        catalog_name: &str,
596        namespace_name: &str,
597        table_name: &str,
598    ) -> Result<crate::CachedTableInfo> {
599        // Check cache first
600        if let Some(cached) = self.get_cached_table_info(catalog_name, namespace_name, table_name) {
601            return Ok(cached);
602        }
603
604        // Cache miss: fetch from database and cache the result
605        let info = self.get_table_info(catalog_name, namespace_name, table_name)?;
606        let cached = crate::CachedTableInfo {
607            storage_location: info.storage_location.clone(),
608            format: format!("{:?}", info.data_source_format).to_uppercase(),
609        };
610        self.cache_table_info(catalog_name, namespace_name, table_name, cached.clone());
611        Ok(cached)
612    }
613
614    /// インデックス一覧を取得する。
615    pub fn list_indexes(
616        &self,
617        catalog_name: &str,
618        namespace_name: &str,
619        table_name: &str,
620    ) -> Result<Vec<IndexInfo>> {
621        let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
622        ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
623        ensure_table_exists(&*catalog, catalog_name, namespace_name, table_name)?;
624
625        let overlay = CatalogOverlay::new();
626        let fqn = TableFqn::new(catalog_name, namespace_name, table_name);
627        let indexes = catalog.list_indexes_in_txn(&fqn, &overlay);
628        Ok(indexes.into_iter().map(IndexInfo::from).collect())
629    }
630
631    /// デフォルト catalog/namespace のインデックス一覧を取得する。
632    pub fn list_indexes_simple(&self, table_name: &str) -> Result<Vec<IndexInfo>> {
633        self.list_indexes("default", "default", table_name)
634    }
635
636    /// インデックス情報を取得する。
637    pub fn get_index_info(
638        &self,
639        catalog_name: &str,
640        namespace_name: &str,
641        table_name: &str,
642        index_name: &str,
643    ) -> Result<IndexInfo> {
644        let indexes = self.list_indexes(catalog_name, namespace_name, table_name)?;
645        indexes
646            .into_iter()
647            .find(|index| index.name == index_name)
648            .ok_or_else(|| {
649                Error::IndexNotFound(index_full_name(
650                    catalog_name,
651                    namespace_name,
652                    table_name,
653                    index_name,
654                ))
655            })
656    }
657
658    /// デフォルト catalog/namespace のインデックス情報を取得する。
659    pub fn get_index_info_simple(&self, table_name: &str, index_name: &str) -> Result<IndexInfo> {
660        self.get_index_info("default", "default", table_name, index_name)
661    }
662
663    /// Catalog を作成する。
664    ///
665    /// # Examples
666    ///
667    /// ```
668    /// use alopex_embedded::{CreateCatalogRequest, Database};
669    ///
670    /// let db = Database::new();
671    /// let catalog = db.create_catalog(CreateCatalogRequest::new("main")).unwrap();
672    /// assert_eq!(catalog.name, "main");
673    /// ```
674    pub fn create_catalog(&self, request: CreateCatalogRequest) -> Result<CatalogInfo> {
675        let request = request.build()?;
676        let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
677        if catalog.get_catalog(&request.name).is_some() {
678            return Err(Error::CatalogAlreadyExists(request.name));
679        }
680        let meta = CatalogMeta {
681            name: request.name,
682            comment: request.comment,
683            storage_root: request.storage_root,
684        };
685        catalog
686            .create_catalog(meta.clone())
687            .map_err(|err| Error::Sql(err.into()))?;
688        self.invalidate_table_info_cache();
689        Ok(meta.into())
690    }
691
692    /// Catalog を削除する。
693    pub fn delete_catalog(&self, name: &str, force: bool) -> Result<()> {
694        if name == "default" {
695            return Err(Error::CannotDeleteDefault("catalog".to_string()));
696        }
697        let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
698        ensure_catalog_exists(&*catalog, name)?;
699
700        if !force {
701            let namespaces = catalog.list_namespaces(name);
702            let has_non_default = namespaces.iter().any(|ns| ns.name != "default");
703            let has_tables = namespaces.iter().any(|ns| {
704                let overlay = CatalogOverlay::new();
705                !catalog
706                    .list_tables_in_txn(name, &ns.name, &overlay)
707                    .is_empty()
708            });
709            if has_non_default || has_tables {
710                return Err(Error::CatalogNotEmpty(name.to_string()));
711            }
712        }
713
714        catalog
715            .delete_catalog(name)
716            .map_err(|err| Error::Sql(err.into()))?;
717        self.invalidate_table_info_cache();
718        Ok(())
719    }
720
721    /// Namespace を作成する。
722    ///
723    /// # Examples
724    ///
725    /// ```
726    /// use alopex_embedded::{CreateCatalogRequest, CreateNamespaceRequest, Database};
727    ///
728    /// let db = Database::new();
729    /// db.create_catalog(CreateCatalogRequest::new("main")).unwrap();
730    /// let namespace = db
731    ///     .create_namespace(CreateNamespaceRequest::new("main", "analytics"))
732    ///     .unwrap();
733    /// assert_eq!(namespace.catalog_name, "main");
734    /// assert_eq!(namespace.name, "analytics");
735    /// ```
736    pub fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<NamespaceInfo> {
737        let request = request.build()?;
738        let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
739        let catalog_meta = catalog
740            .get_catalog(&request.catalog_name)
741            .ok_or_else(|| Error::CatalogNotFound(request.catalog_name.clone()))?;
742        if catalog
743            .get_namespace(&request.catalog_name, &request.name)
744            .is_some()
745        {
746            return Err(Error::NamespaceAlreadyExists(
747                request.catalog_name,
748                request.name,
749            ));
750        }
751
752        let storage_root = request
753            .storage_root
754            .or_else(|| catalog_meta.storage_root.clone());
755        let meta = NamespaceMeta {
756            name: request.name,
757            catalog_name: request.catalog_name,
758            comment: request.comment,
759            storage_root,
760        };
761        catalog
762            .create_namespace(meta.clone())
763            .map_err(|err| Error::Sql(err.into()))?;
764        self.invalidate_table_info_cache();
765        Ok(meta.into())
766    }
767
768    /// Namespace を削除する。
769    pub fn delete_namespace(
770        &self,
771        catalog_name: &str,
772        namespace_name: &str,
773        force: bool,
774    ) -> Result<()> {
775        if namespace_name == "default" {
776            return Err(Error::CannotDeleteDefault("namespace".to_string()));
777        }
778        let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
779        ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
780
781        let overlay = CatalogOverlay::new();
782        let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, &overlay);
783        if !force && !tables.is_empty() {
784            return Err(Error::NamespaceNotEmpty(
785                catalog_name.to_string(),
786                namespace_name.to_string(),
787            ));
788        }
789
790        if force {
791            let store = catalog.store().clone();
792            let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
793            for table in &tables {
794                catalog
795                    .persist_drop_table(&mut txn, &TableFqn::from(table))
796                    .map_err(|err| Error::Sql(err.into()))?;
797            }
798            txn.commit_self().map_err(Error::Core)?;
799
800            let mut overlay = CatalogOverlay::new();
801            for table in tables {
802                overlay.drop_table(&TableFqn::from(&table));
803            }
804            catalog.apply_overlay(overlay);
805        }
806
807        catalog
808            .delete_namespace(catalog_name, namespace_name)
809            .map_err(|err| Error::Sql(err.into()))?;
810        self.invalidate_table_info_cache();
811        Ok(())
812    }
813
814    /// テーブルを作成する。
815    ///
816    /// # Examples
817    ///
818    /// ```
819    /// use alopex_embedded::{
820    ///     ColumnDefinition, CreateCatalogRequest, CreateNamespaceRequest, CreateTableRequest,
821    ///     Database,
822    /// };
823    /// use alopex_sql::ast::ddl::DataType;
824    ///
825    /// let db = Database::new();
826    /// db.create_catalog(CreateCatalogRequest::new("default")).unwrap();
827    /// db.create_namespace(CreateNamespaceRequest::new("default", "default"))
828    ///     .unwrap();
829    ///
830    /// let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
831    /// let table = db
832    ///     .create_table(CreateTableRequest::new("users").with_schema(schema))
833    ///     .unwrap();
834    /// assert_eq!(table.name, "users");
835    /// ```
836    pub fn create_table(&self, request: CreateTableRequest) -> Result<TableInfo> {
837        let request = request.build()?;
838        let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
839
840        ensure_namespace_exists(&*catalog, &request.catalog_name, &request.namespace_name)?;
841        ensure_table_absent(
842            &*catalog,
843            &request.catalog_name,
844            &request.namespace_name,
845            &request.name,
846        )?;
847
848        if request.table_type == TableType::Managed && request.storage_root.is_some() {
849            eprintln!("警告: managed テーブルの storage_root は無視されます");
850        }
851
852        let table_id = catalog.next_table_id();
853        let primary_key = request.primary_key.clone();
854        let columns = build_columns(request.schema.clone(), primary_key.as_ref())?;
855
856        let storage_options = request.storage_options.unwrap_or_else(|| StorageOptions {
857            compression: Compression::None,
858            ..StorageOptions::default()
859        });
860
861        let namespace = catalog.get_namespace(&request.catalog_name, &request.namespace_name);
862        let storage_location = resolve_storage_location(
863            &request.table_type,
864            request.storage_root.as_deref(),
865            namespace.as_ref(),
866            &request.name,
867        )?;
868
869        let mut table = TableMetadata::new(&request.name, columns).with_table_id(table_id);
870        table.catalog_name = request.catalog_name.clone();
871        table.namespace_name = request.namespace_name.clone();
872        table.primary_key = primary_key;
873        table.storage_options = storage_options;
874        table.table_type = request.table_type;
875        table.data_source_format = request
876            .data_source_format
877            .unwrap_or(DataSourceFormat::Alopex);
878        table.storage_location = storage_location;
879        table.comment = request.comment;
880        table.properties = request.properties.unwrap_or_default();
881
882        let store = catalog.store().clone();
883        let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
884        catalog
885            .persist_create_table(&mut txn, &table)
886            .map_err(|err| Error::Sql(err.into()))?;
887        txn.commit_self().map_err(Error::Core)?;
888
889        let mut overlay = CatalogOverlay::new();
890        overlay.add_table(TableFqn::from(&table), table.clone());
891        catalog.apply_overlay(overlay);
892        drop(catalog); // Release lock before invalidating cache
893        self.invalidate_table_info_cache();
894
895        let info = TableInfo::from(table);
896        let namespace_root = namespace.and_then(|ns| ns.storage_root);
897        Ok(apply_storage_location(info, namespace_root.as_deref()))
898    }
899
900    /// デフォルト catalog/namespace のテーブルを作成する。
901    pub fn create_table_simple(
902        &self,
903        name: &str,
904        schema: Vec<ColumnDefinition>,
905    ) -> Result<TableInfo> {
906        self.create_table(CreateTableRequest::new(name).with_schema(schema))
907    }
908
909    /// テーブルを削除する。
910    pub fn delete_table(
911        &self,
912        catalog_name: &str,
913        namespace_name: &str,
914        table_name: &str,
915    ) -> Result<()> {
916        let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
917        ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
918        let table = find_table_metadata(&*catalog, catalog_name, namespace_name, table_name)?
919            .ok_or_else(|| {
920                Error::TableNotFound(table_full_name(catalog_name, namespace_name, table_name))
921            })?;
922
923        let store = catalog.store().clone();
924        let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
925        catalog
926            .persist_drop_table(&mut txn, &TableFqn::from(&table))
927            .map_err(|err| Error::Sql(err.into()))?;
928        txn.commit_self().map_err(Error::Core)?;
929
930        let mut overlay = CatalogOverlay::new();
931        overlay.drop_table(&TableFqn::from(&table));
932        catalog.apply_overlay(overlay);
933        drop(catalog); // Release lock before invalidating cache
934        self.invalidate_table_info_cache();
935        Ok(())
936    }
937
938    /// デフォルト catalog/namespace のテーブルを削除する。
939    pub fn delete_table_simple(&self, name: &str) -> Result<()> {
940        self.delete_table("default", "default", name)
941    }
942}
943
944impl<'a> Transaction<'a> {
945    /// Catalog 一覧を取得する(オーバーレイ反映)。
946    pub fn list_catalogs(&self) -> Result<Vec<CatalogInfo>> {
947        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
948        Ok(catalog
949            .list_catalogs_in_txn(self.catalog_overlay())
950            .into_iter()
951            .map(CatalogInfo::from)
952            .collect())
953    }
954
955    /// Catalog を取得する(オーバーレイ反映)。
956    pub fn get_catalog(&self, name: &str) -> Result<CatalogInfo> {
957        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
958        let meta = catalog
959            .get_catalog_in_txn(name, self.catalog_overlay())
960            .ok_or_else(|| Error::CatalogNotFound(name.to_string()))?;
961        Ok(meta.clone().into())
962    }
963
964    /// Namespace 一覧を取得する(オーバーレイ反映)。
965    pub fn list_namespaces(&self, catalog_name: &str) -> Result<Vec<NamespaceInfo>> {
966        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
967        ensure_catalog_exists_in_txn(&*catalog, self.catalog_overlay(), catalog_name)?;
968        Ok(catalog
969            .list_namespaces_in_txn(catalog_name, self.catalog_overlay())
970            .into_iter()
971            .map(NamespaceInfo::from)
972            .collect())
973    }
974
975    /// Namespace を取得する(オーバーレイ反映)。
976    pub fn get_namespace(&self, catalog_name: &str, namespace_name: &str) -> Result<NamespaceInfo> {
977        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
978        ensure_catalog_exists_in_txn(&*catalog, self.catalog_overlay(), catalog_name)?;
979        let meta = catalog
980            .get_namespace_in_txn(catalog_name, namespace_name, self.catalog_overlay())
981            .ok_or_else(|| {
982                Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
983            })?;
984        Ok(meta.clone().into())
985    }
986
987    /// テーブル一覧を取得する(オーバーレイ反映)。
988    pub fn list_tables(&self, catalog_name: &str, namespace_name: &str) -> Result<Vec<TableInfo>> {
989        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
990        ensure_namespace_exists_in_txn(
991            &*catalog,
992            self.catalog_overlay(),
993            catalog_name,
994            namespace_name,
995        )?;
996        let namespace = catalog
997            .get_namespace_in_txn(catalog_name, namespace_name, self.catalog_overlay())
998            .cloned()
999            .ok_or_else(|| {
1000                Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
1001            })?;
1002        let tables =
1003            catalog.list_tables_in_txn(catalog_name, namespace_name, self.catalog_overlay());
1004        Ok(tables
1005            .into_iter()
1006            .map(|table| {
1007                let info = TableInfo::from(table);
1008                apply_storage_location(info, namespace.storage_root.as_deref())
1009            })
1010            .collect())
1011    }
1012
1013    /// テーブル情報を取得する(オーバーレイ反映)。
1014    pub fn get_table_info(
1015        &self,
1016        catalog_name: &str,
1017        namespace_name: &str,
1018        table_name: &str,
1019    ) -> Result<TableInfo> {
1020        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1021        ensure_namespace_exists_in_txn(
1022            &*catalog,
1023            self.catalog_overlay(),
1024            catalog_name,
1025            namespace_name,
1026        )?;
1027        let namespace = catalog
1028            .get_namespace_in_txn(catalog_name, namespace_name, self.catalog_overlay())
1029            .cloned()
1030            .ok_or_else(|| {
1031                Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
1032            })?;
1033
1034        let tables =
1035            catalog.list_tables_in_txn(catalog_name, namespace_name, self.catalog_overlay());
1036        let table = tables
1037            .into_iter()
1038            .find(|table| table.name == table_name)
1039            .ok_or_else(|| {
1040                Error::TableNotFound(table_full_name(catalog_name, namespace_name, table_name))
1041            })?;
1042        let info = TableInfo::from(table);
1043        Ok(apply_storage_location(
1044            info,
1045            namespace.storage_root.as_deref(),
1046        ))
1047    }
1048
1049    /// Catalog を作成する(オーバーレイ反映)。
1050    ///
1051    /// # Examples
1052    ///
1053    /// ```
1054    /// use alopex_embedded::{CreateCatalogRequest, Database, TxnMode};
1055    ///
1056    /// let db = Database::new();
1057    /// let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1058    /// let catalog = txn.create_catalog(CreateCatalogRequest::new("main")).unwrap();
1059    /// assert_eq!(catalog.name, "main");
1060    /// ```
1061    pub fn create_catalog(&mut self, request: CreateCatalogRequest) -> Result<CatalogInfo> {
1062        ensure_write_mode(self)?;
1063        let request = request.build()?;
1064        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1065        if catalog
1066            .get_catalog_in_txn(&request.name, self.catalog_overlay())
1067            .is_some()
1068        {
1069            return Err(Error::CatalogAlreadyExists(request.name));
1070        }
1071
1072        let meta = CatalogMeta {
1073            name: request.name,
1074            comment: request.comment,
1075            storage_root: request.storage_root,
1076        };
1077        self.catalog_overlay_mut().add_catalog(meta.clone());
1078        self.catalog_modified = true;
1079        Ok(meta.into())
1080    }
1081
1082    /// Catalog を削除する(オーバーレイ反映)。
1083    pub fn delete_catalog(&mut self, name: &str, force: bool) -> Result<()> {
1084        ensure_write_mode(self)?;
1085        if name == "default" {
1086            return Err(Error::CannotDeleteDefault("catalog".to_string()));
1087        }
1088        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1089        ensure_catalog_exists_in_txn(&*catalog, self.catalog_overlay(), name)?;
1090
1091        if !force {
1092            let namespaces = catalog.list_namespaces_in_txn(name, self.catalog_overlay());
1093            let has_non_default = namespaces.iter().any(|ns| ns.name != "default");
1094            let has_tables = namespaces.iter().any(|ns| {
1095                !catalog
1096                    .list_tables_in_txn(name, &ns.name, self.catalog_overlay())
1097                    .is_empty()
1098            });
1099            if has_non_default || has_tables {
1100                return Err(Error::CatalogNotEmpty(name.to_string()));
1101            }
1102        }
1103
1104        if force {
1105            self.catalog_overlay_mut().drop_cascade_catalog(name);
1106        } else {
1107            self.catalog_overlay_mut().drop_catalog(name);
1108        }
1109        self.catalog_modified = true;
1110        Ok(())
1111    }
1112
1113    /// Namespace を作成する(オーバーレイ反映)。
1114    pub fn create_namespace(&mut self, request: CreateNamespaceRequest) -> Result<NamespaceInfo> {
1115        ensure_write_mode(self)?;
1116        let request = request.build()?;
1117        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1118        let catalog_meta = catalog
1119            .get_catalog_in_txn(&request.catalog_name, self.catalog_overlay())
1120            .ok_or_else(|| Error::CatalogNotFound(request.catalog_name.clone()))?;
1121        if catalog
1122            .get_namespace_in_txn(&request.catalog_name, &request.name, self.catalog_overlay())
1123            .is_some()
1124        {
1125            return Err(Error::NamespaceAlreadyExists(
1126                request.catalog_name,
1127                request.name,
1128            ));
1129        }
1130
1131        let storage_root = request
1132            .storage_root
1133            .or_else(|| catalog_meta.storage_root.clone());
1134        let meta = NamespaceMeta {
1135            name: request.name,
1136            catalog_name: request.catalog_name,
1137            comment: request.comment,
1138            storage_root,
1139        };
1140        self.catalog_overlay_mut().add_namespace(meta.clone());
1141        self.catalog_modified = true;
1142        Ok(meta.into())
1143    }
1144
1145    /// Namespace を削除する(オーバーレイ反映)。
1146    pub fn delete_namespace(
1147        &mut self,
1148        catalog_name: &str,
1149        namespace_name: &str,
1150        force: bool,
1151    ) -> Result<()> {
1152        ensure_write_mode(self)?;
1153        if namespace_name == "default" {
1154            return Err(Error::CannotDeleteDefault("namespace".to_string()));
1155        }
1156        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1157        ensure_namespace_exists_in_txn(
1158            &*catalog,
1159            self.catalog_overlay(),
1160            catalog_name,
1161            namespace_name,
1162        )?;
1163
1164        let tables =
1165            catalog.list_tables_in_txn(catalog_name, namespace_name, self.catalog_overlay());
1166        if !force && !tables.is_empty() {
1167            return Err(Error::NamespaceNotEmpty(
1168                catalog_name.to_string(),
1169                namespace_name.to_string(),
1170            ));
1171        }
1172
1173        if force {
1174            self.catalog_overlay_mut()
1175                .drop_cascade_namespace(catalog_name, namespace_name);
1176        } else {
1177            self.catalog_overlay_mut()
1178                .drop_namespace(catalog_name, namespace_name);
1179        }
1180        self.catalog_modified = true;
1181        Ok(())
1182    }
1183
1184    /// テーブルを作成する(オーバーレイ反映)。
1185    ///
1186    /// # Examples
1187    ///
1188    /// ```
1189    /// use alopex_embedded::{
1190    ///     ColumnDefinition, CreateCatalogRequest, CreateNamespaceRequest, CreateTableRequest,
1191    ///     Database, TxnMode,
1192    /// };
1193    /// use alopex_sql::ast::ddl::DataType;
1194    ///
1195    /// let db = Database::new();
1196    /// let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1197    /// txn.create_catalog(CreateCatalogRequest::new("main")).unwrap();
1198    /// txn.create_namespace(CreateNamespaceRequest::new("main", "default"))
1199    ///     .unwrap();
1200    ///
1201    /// let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1202    /// let table = txn
1203    ///     .create_table(
1204    ///         CreateTableRequest::new("events")
1205    ///             .with_catalog_name("main")
1206    ///             .with_namespace_name("default")
1207    ///             .with_schema(schema),
1208    ///     )
1209    ///     .unwrap();
1210    /// assert_eq!(table.name, "events");
1211    /// ```
1212    pub fn create_table(&mut self, request: CreateTableRequest) -> Result<TableInfo> {
1213        ensure_write_mode(self)?;
1214        let request = request.build()?;
1215
1216        let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1217        ensure_namespace_exists_in_txn(
1218            &*catalog,
1219            self.catalog_overlay(),
1220            &request.catalog_name,
1221            &request.namespace_name,
1222        )?;
1223        ensure_table_absent_in_txn(
1224            &*catalog,
1225            self.catalog_overlay(),
1226            &request.catalog_name,
1227            &request.namespace_name,
1228            &request.name,
1229        )?;
1230
1231        if request.table_type == TableType::Managed && request.storage_root.is_some() {
1232            eprintln!("警告: managed テーブルの storage_root は無視されます");
1233        }
1234
1235        let table_id = catalog.next_table_id();
1236        let primary_key = request.primary_key.clone();
1237        let columns = build_columns(request.schema.clone(), primary_key.as_ref())?;
1238
1239        let storage_options = request.storage_options.unwrap_or_else(|| StorageOptions {
1240            compression: Compression::None,
1241            ..StorageOptions::default()
1242        });
1243
1244        let namespace = catalog
1245            .get_namespace_in_txn(
1246                &request.catalog_name,
1247                &request.namespace_name,
1248                self.catalog_overlay(),
1249            )
1250            .cloned();
1251        let storage_location = resolve_storage_location(
1252            &request.table_type,
1253            request.storage_root.as_deref(),
1254            namespace.as_ref(),
1255            &request.name,
1256        )?;
1257
1258        let mut table = TableMetadata::new(&request.name, columns).with_table_id(table_id);
1259        table.catalog_name = request.catalog_name.clone();
1260        table.namespace_name = request.namespace_name.clone();
1261        table.primary_key = primary_key;
1262        table.storage_options = storage_options;
1263        table.table_type = request.table_type;
1264        table.data_source_format = request
1265            .data_source_format
1266            .unwrap_or(DataSourceFormat::Alopex);
1267        table.storage_location = storage_location;
1268        table.comment = request.comment;
1269        table.properties = request.properties.unwrap_or_default();
1270
1271        self.catalog_overlay_mut()
1272            .add_table(TableFqn::from(&table), table.clone());
1273        self.catalog_modified = true;
1274        let info = TableInfo::from(table);
1275        let namespace_root = namespace.and_then(|ns| ns.storage_root);
1276        Ok(apply_storage_location(info, namespace_root.as_deref()))
1277    }
1278
1279    /// テーブルを削除する(オーバーレイ反映)。
1280    pub fn delete_table(
1281        &mut self,
1282        catalog_name: &str,
1283        namespace_name: &str,
1284        table_name: &str,
1285    ) -> Result<()> {
1286        ensure_write_mode(self)?;
1287        let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1288        ensure_namespace_exists_in_txn(
1289            &*catalog,
1290            self.catalog_overlay(),
1291            catalog_name,
1292            namespace_name,
1293        )?;
1294        let table = find_table_metadata_in_txn(
1295            &*catalog,
1296            self.catalog_overlay(),
1297            catalog_name,
1298            namespace_name,
1299            table_name,
1300        )?
1301        .ok_or_else(|| {
1302            Error::TableNotFound(table_full_name(catalog_name, namespace_name, table_name))
1303        })?;
1304
1305        self.catalog_overlay_mut()
1306            .drop_table(&TableFqn::from(&table));
1307        self.catalog_modified = true;
1308        Ok(())
1309    }
1310}
1311
1312fn validate_required(value: &str, label: &str) -> Result<()> {
1313    if value.trim().is_empty() {
1314        return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
1315            "{label}が未指定です"
1316        ))));
1317    }
1318    Ok(())
1319}
1320
1321fn ensure_catalog_exists<S: alopex_core::kv::KVStore>(
1322    catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1323    name: &str,
1324) -> Result<()> {
1325    if catalog.get_catalog(name).is_none() {
1326        return Err(Error::CatalogNotFound(name.to_string()));
1327    }
1328    Ok(())
1329}
1330
1331fn ensure_catalog_exists_in_txn<S: alopex_core::kv::KVStore>(
1332    catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1333    overlay: &CatalogOverlay,
1334    name: &str,
1335) -> Result<()> {
1336    if catalog.get_catalog_in_txn(name, overlay).is_none() {
1337        return Err(Error::CatalogNotFound(name.to_string()));
1338    }
1339    Ok(())
1340}
1341
1342fn ensure_namespace_exists<S: alopex_core::kv::KVStore>(
1343    catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1344    catalog_name: &str,
1345    namespace_name: &str,
1346) -> Result<()> {
1347    ensure_catalog_exists(catalog, catalog_name)?;
1348    if catalog
1349        .get_namespace(catalog_name, namespace_name)
1350        .is_none()
1351    {
1352        return Err(Error::NamespaceNotFound(
1353            catalog_name.to_string(),
1354            namespace_name.to_string(),
1355        ));
1356    }
1357    Ok(())
1358}
1359
1360fn ensure_namespace_exists_in_txn<S: alopex_core::kv::KVStore>(
1361    catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1362    overlay: &CatalogOverlay,
1363    catalog_name: &str,
1364    namespace_name: &str,
1365) -> Result<()> {
1366    ensure_catalog_exists_in_txn(catalog, overlay, catalog_name)?;
1367    if catalog
1368        .get_namespace_in_txn(catalog_name, namespace_name, overlay)
1369        .is_none()
1370    {
1371        return Err(Error::NamespaceNotFound(
1372            catalog_name.to_string(),
1373            namespace_name.to_string(),
1374        ));
1375    }
1376    Ok(())
1377}
1378
1379fn ensure_table_exists<S: alopex_core::kv::KVStore>(
1380    catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1381    catalog_name: &str,
1382    namespace_name: &str,
1383    table_name: &str,
1384) -> Result<()> {
1385    let Some(table) = find_table_metadata(catalog, catalog_name, namespace_name, table_name)?
1386    else {
1387        return Err(Error::TableNotFound(table_full_name(
1388            catalog_name,
1389            namespace_name,
1390            table_name,
1391        )));
1392    };
1393    let _ = table;
1394    Ok(())
1395}
1396
1397fn ensure_table_absent<S: alopex_core::kv::KVStore>(
1398    catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1399    catalog_name: &str,
1400    namespace_name: &str,
1401    table_name: &str,
1402) -> Result<()> {
1403    if find_table_metadata(catalog, catalog_name, namespace_name, table_name)?.is_some() {
1404        return Err(Error::TableAlreadyExists(table_full_name(
1405            catalog_name,
1406            namespace_name,
1407            table_name,
1408        )));
1409    }
1410    Ok(())
1411}
1412
1413fn ensure_table_absent_in_txn<S: alopex_core::kv::KVStore>(
1414    catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1415    overlay: &CatalogOverlay,
1416    catalog_name: &str,
1417    namespace_name: &str,
1418    table_name: &str,
1419) -> Result<()> {
1420    if find_table_metadata_in_txn(catalog, overlay, catalog_name, namespace_name, table_name)?
1421        .is_some()
1422    {
1423        return Err(Error::TableAlreadyExists(table_full_name(
1424            catalog_name,
1425            namespace_name,
1426            table_name,
1427        )));
1428    }
1429    Ok(())
1430}
1431
1432fn find_table_metadata<S: alopex_core::kv::KVStore>(
1433    catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1434    catalog_name: &str,
1435    namespace_name: &str,
1436    table_name: &str,
1437) -> Result<Option<TableMetadata>> {
1438    let overlay = CatalogOverlay::new();
1439    let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, &overlay);
1440    Ok(tables.into_iter().find(|table| table.name == table_name))
1441}
1442
1443fn find_table_metadata_in_txn<S: alopex_core::kv::KVStore>(
1444    catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1445    overlay: &CatalogOverlay,
1446    catalog_name: &str,
1447    namespace_name: &str,
1448    table_name: &str,
1449) -> Result<Option<TableMetadata>> {
1450    let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, overlay);
1451    Ok(tables.into_iter().find(|table| table.name == table_name))
1452}
1453
1454fn table_full_name(catalog_name: &str, namespace_name: &str, table_name: &str) -> String {
1455    format!("{catalog_name}.{namespace_name}.{table_name}")
1456}
1457
1458fn index_full_name(
1459    catalog_name: &str,
1460    namespace_name: &str,
1461    table_name: &str,
1462    index_name: &str,
1463) -> String {
1464    format!("{catalog_name}.{namespace_name}.{table_name}.{index_name}")
1465}
1466
1467fn apply_storage_location(mut info: TableInfo, namespace_root: Option<&str>) -> TableInfo {
1468    if info.storage_location.is_none() && info.table_type == TableType::Managed {
1469        if let Some(root) = namespace_root {
1470            info.storage_location = Some(format!("{root}/{}", info.name));
1471        }
1472    }
1473    info
1474}
1475
1476fn resolve_storage_location(
1477    table_type: &TableType,
1478    request_storage_root: Option<&str>,
1479    namespace: Option<&NamespaceMeta>,
1480    table_name: &str,
1481) -> Result<Option<String>> {
1482    match table_type {
1483        TableType::Managed => Ok(namespace
1484            .and_then(|ns| ns.storage_root.as_deref())
1485            .map(|root| format!("{root}/{table_name}"))),
1486        TableType::External => {
1487            let storage_root = request_storage_root
1488                .map(|root| root.to_string())
1489                .ok_or(Error::StorageRootRequired)?;
1490            Ok(Some(storage_root))
1491        }
1492    }
1493}
1494
1495fn build_columns(
1496    schema: Option<Vec<ColumnDefinition>>,
1497    primary_key: Option<&Vec<String>>,
1498) -> Result<Vec<ColumnMetadata>> {
1499    let Some(schema) = schema else {
1500        return Ok(Vec::new());
1501    };
1502
1503    let mut columns = Vec::with_capacity(schema.len());
1504    for definition in schema {
1505        validate_required(&definition.name, "column 名")?;
1506        let mut column = ColumnMetadata::new(
1507            definition.name.clone(),
1508            ResolvedType::from_ast(&definition.data_type),
1509        )
1510        .with_not_null(!definition.nullable);
1511        if primary_key
1512            .map(|keys| keys.iter().any(|key| key == &definition.name))
1513            .unwrap_or(false)
1514        {
1515            column = column.with_primary_key(true).with_not_null(true);
1516        }
1517        columns.push(column);
1518    }
1519
1520    if let Some(keys) = primary_key {
1521        let missing: Vec<String> = keys
1522            .iter()
1523            .filter(|key| !columns.iter().any(|col| col.name == **key))
1524            .cloned()
1525            .collect();
1526        if !missing.is_empty() {
1527            return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
1528                "主キーが見つかりません: {}",
1529                missing.join(", ")
1530            ))));
1531        }
1532    }
1533
1534    Ok(columns)
1535}
1536
1537fn ensure_write_mode(txn: &Transaction<'_>) -> Result<()> {
1538    let mode = txn.txn_mode()?;
1539    if mode != TxnMode::ReadWrite {
1540        return Err(Error::TxnReadOnly);
1541    }
1542    Ok(())
1543}
1544
1545fn resolved_type_to_string(resolved_type: &ResolvedType) -> String {
1546    match resolved_type {
1547        ResolvedType::Integer => "INTEGER".to_string(),
1548        ResolvedType::BigInt => "BIGINT".to_string(),
1549        ResolvedType::Float => "FLOAT".to_string(),
1550        ResolvedType::Double => "DOUBLE".to_string(),
1551        ResolvedType::Text => "TEXT".to_string(),
1552        ResolvedType::Blob => "BLOB".to_string(),
1553        ResolvedType::Boolean => "BOOLEAN".to_string(),
1554        ResolvedType::Timestamp => "TIMESTAMP".to_string(),
1555        ResolvedType::Vector { dimension, metric } => {
1556            let metric = match metric {
1557                VectorMetric::Cosine => "COSINE",
1558                VectorMetric::L2 => "L2",
1559                VectorMetric::Inner => "INNER",
1560            };
1561            format!("VECTOR({dimension}, {metric})")
1562        }
1563        ResolvedType::Null => "NULL".to_string(),
1564    }
1565}
1566
1567#[cfg(test)]
1568mod tests {
1569    use super::*;
1570    use crate::{Database, TxnMode};
1571    use alopex_sql::catalog::{ColumnMetadata, RowIdMode};
1572    use alopex_sql::ExecutionResult;
1573
1574    #[test]
1575    fn storage_info_default_is_row_none() {
1576        let info = StorageInfo::default();
1577        assert_eq!(info.storage_type, "row");
1578        assert_eq!(info.compression, "none");
1579    }
1580
1581    #[test]
1582    fn column_definition_defaults_to_nullable() {
1583        let column = ColumnDefinition::new("id", DataType::Integer);
1584        assert!(column.nullable);
1585        assert!(column.comment.is_none());
1586
1587        let column = column.with_nullable(false).with_comment("ID");
1588        assert!(!column.nullable);
1589        assert_eq!(column.comment.as_deref(), Some("ID"));
1590    }
1591
1592    #[test]
1593    fn create_catalog_request_builder_validates_name() {
1594        let err = CreateCatalogRequest::new("").build().unwrap_err();
1595        assert!(matches!(err, Error::Core(_)));
1596
1597        let request = CreateCatalogRequest::new("main")
1598            .with_comment("メイン")
1599            .with_storage_root("/data")
1600            .build()
1601            .unwrap();
1602        assert_eq!(request.name, "main");
1603        assert_eq!(request.comment.as_deref(), Some("メイン"));
1604        assert_eq!(request.storage_root.as_deref(), Some("/data"));
1605    }
1606
1607    #[test]
1608    fn create_namespace_request_builder_validates_fields() {
1609        let err = CreateNamespaceRequest::new("", "default")
1610            .build()
1611            .unwrap_err();
1612        assert!(matches!(err, Error::Core(_)));
1613
1614        let request = CreateNamespaceRequest::new("main", "analytics")
1615            .with_comment("分析")
1616            .build()
1617            .unwrap();
1618        assert_eq!(request.catalog_name, "main");
1619        assert_eq!(request.name, "analytics");
1620        assert_eq!(request.comment.as_deref(), Some("分析"));
1621    }
1622
1623    #[test]
1624    fn create_table_request_defaults_and_validation() {
1625        let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1626
1627        let request = CreateTableRequest::new("users")
1628            .with_schema(schema.clone())
1629            .build()
1630            .unwrap();
1631        assert_eq!(request.catalog_name, "default");
1632        assert_eq!(request.namespace_name, "default");
1633        assert_eq!(request.table_type, TableType::Managed);
1634        assert_eq!(request.data_source_format, Some(DataSourceFormat::Alopex));
1635        assert_eq!(request.properties.as_ref().unwrap().len(), 0);
1636
1637        let err = CreateTableRequest::new("users").build().unwrap_err();
1638        assert!(matches!(err, Error::SchemaRequired));
1639
1640        let err = CreateTableRequest::new("ext")
1641            .with_table_type(TableType::External)
1642            .build()
1643            .unwrap_err();
1644        assert!(matches!(err, Error::StorageRootRequired));
1645
1646        let request = CreateTableRequest::new("ext")
1647            .with_table_type(TableType::External)
1648            .with_storage_root("/external")
1649            .build()
1650            .unwrap();
1651        assert_eq!(request.storage_root.as_deref(), Some("/external"));
1652        assert_eq!(request.data_source_format, Some(DataSourceFormat::Alopex));
1653        assert!(request.properties.as_ref().unwrap().is_empty());
1654    }
1655
1656    #[test]
1657    fn table_info_converts_from_metadata() {
1658        let mut table = TableMetadata::new(
1659            "users",
1660            vec![
1661                ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true),
1662                ColumnMetadata::new("name", ResolvedType::Text),
1663            ],
1664        )
1665        .with_table_id(42);
1666        table.catalog_name = "main".to_string();
1667        table.namespace_name = "default".to_string();
1668        table.primary_key = Some(vec!["id".to_string()]);
1669        table.storage_options = StorageOptions {
1670            storage_type: StorageType::Columnar,
1671            compression: Compression::Zstd,
1672            row_group_size: 1024,
1673            row_id_mode: RowIdMode::Direct,
1674        };
1675
1676        let info = TableInfo::from(table);
1677        assert_eq!(info.name, "users");
1678        assert_eq!(info.table_id, 42);
1679        assert_eq!(info.catalog_name, "main");
1680        assert_eq!(info.namespace_name, "default");
1681        assert_eq!(info.columns.len(), 2);
1682        assert_eq!(info.columns[0].data_type, "INTEGER");
1683        assert!(info.columns[0].is_primary_key);
1684        assert_eq!(info.storage_options.storage_type, "columnar");
1685        assert_eq!(info.storage_options.compression, "zstd");
1686    }
1687
1688    #[test]
1689    fn table_info_defaults_storage_options_to_row_none() {
1690        let table = TableMetadata::new(
1691            "logs",
1692            vec![ColumnMetadata::new("id", ResolvedType::Integer)],
1693        );
1694        let info = TableInfo::from(table);
1695        assert_eq!(info.storage_options.storage_type, "row");
1696        assert_eq!(info.storage_options.compression, "none");
1697    }
1698
1699    #[test]
1700    fn index_info_converts_from_metadata() {
1701        let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
1702            .with_unique(true)
1703            .with_method(IndexMethod::Hnsw);
1704        index.catalog_name = "main".to_string();
1705        index.namespace_name = "default".to_string();
1706
1707        let info = IndexInfo::from(index);
1708        assert_eq!(info.name, "idx_users_id");
1709        assert_eq!(info.table_name, "users");
1710        assert_eq!(info.method, "hnsw");
1711        assert!(info.is_unique);
1712    }
1713
1714    fn ensure_default_catalog_and_namespace(db: &Database) {
1715        let _ = db.create_catalog(CreateCatalogRequest::new("default"));
1716        let _ = db.create_namespace(CreateNamespaceRequest::new("default", "default"));
1717    }
1718
1719    #[test]
1720    fn database_catalog_and_namespace_crud() {
1721        let db = Database::new();
1722
1723        let catalog = db
1724            .create_catalog(CreateCatalogRequest::new("main"))
1725            .unwrap();
1726        assert_eq!(catalog.name, "main");
1727
1728        let namespace = db
1729            .create_namespace(CreateNamespaceRequest::new("main", "analytics"))
1730            .unwrap();
1731        assert_eq!(namespace.catalog_name, "main");
1732        assert_eq!(namespace.name, "analytics");
1733
1734        let list = db.list_namespaces("main").unwrap();
1735        assert_eq!(list.len(), 1);
1736
1737        let err = db.delete_catalog("main", false).unwrap_err();
1738        assert!(matches!(err, Error::CatalogNotEmpty(_)));
1739
1740        db.delete_catalog("main", true).unwrap();
1741
1742        let err = db.get_catalog("main").unwrap_err();
1743        assert!(matches!(err, Error::CatalogNotFound(_)));
1744    }
1745
1746    #[test]
1747    fn cannot_delete_default_catalog_or_namespace() {
1748        let db = Database::new();
1749        ensure_default_catalog_and_namespace(&db);
1750
1751        let err = db.delete_catalog("default", true).unwrap_err();
1752        assert!(matches!(err, Error::CannotDeleteDefault(_)));
1753
1754        let err = db.delete_namespace("default", "default", true).unwrap_err();
1755        assert!(matches!(err, Error::CannotDeleteDefault(_)));
1756
1757        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1758        let err = txn.delete_catalog("default", true).unwrap_err();
1759        assert!(matches!(err, Error::CannotDeleteDefault(_)));
1760
1761        let err = txn
1762            .delete_namespace("default", "default", true)
1763            .unwrap_err();
1764        assert!(matches!(err, Error::CannotDeleteDefault(_)));
1765    }
1766
1767    #[test]
1768    fn database_table_crud_and_simple_helpers() {
1769        let db = Database::new();
1770        ensure_default_catalog_and_namespace(&db);
1771
1772        let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1773        let info = db.create_table_simple("users", schema).unwrap();
1774        assert_eq!(info.catalog_name, "default");
1775        assert_eq!(info.namespace_name, "default");
1776        assert_eq!(info.table_type, TableType::Managed);
1777        assert_eq!(info.data_source_format, DataSourceFormat::Alopex);
1778        assert_eq!(info.storage_options.storage_type, "row");
1779        assert_eq!(info.storage_options.compression, "none");
1780
1781        let tables = db.list_tables_simple().unwrap();
1782        assert_eq!(tables.len(), 1);
1783
1784        let info = db.get_table_info_simple("users").unwrap();
1785        assert_eq!(info.name, "users");
1786
1787        let err = db
1788            .create_table_simple(
1789                "users",
1790                vec![ColumnDefinition::new("id", DataType::Integer)],
1791            )
1792            .unwrap_err();
1793        assert!(matches!(err, Error::TableAlreadyExists(_)));
1794
1795        db.delete_table_simple("users").unwrap();
1796        assert!(db.list_tables_simple().unwrap().is_empty());
1797    }
1798
1799    #[test]
1800    fn database_index_read_helpers() {
1801        let db = Database::new();
1802        ensure_default_catalog_and_namespace(&db);
1803
1804        let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1805        db.create_table_simple("users", schema).unwrap();
1806
1807        let result = db
1808            .execute_sql("CREATE INDEX idx_users_id ON users (id);")
1809            .unwrap();
1810        assert!(matches!(result, ExecutionResult::Success));
1811
1812        let indexes = db.list_indexes_simple("users").unwrap();
1813        assert_eq!(indexes.len(), 1);
1814        assert_eq!(indexes[0].name, "idx_users_id");
1815        assert_eq!(indexes[0].method, "btree");
1816
1817        let index = db.get_index_info_simple("users", "idx_users_id").unwrap();
1818        assert_eq!(index.table_name, "users");
1819    }
1820
1821    #[test]
1822    fn transaction_overlay_visibility_and_commit() {
1823        let db = Database::new();
1824        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1825
1826        txn.create_catalog(CreateCatalogRequest::new("main"))
1827            .unwrap();
1828        txn.create_namespace(CreateNamespaceRequest::new("main", "default"))
1829            .unwrap();
1830
1831        let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1832        txn.create_table(
1833            CreateTableRequest::new("events")
1834                .with_catalog_name("main")
1835                .with_namespace_name("default")
1836                .with_schema(schema),
1837        )
1838        .unwrap();
1839
1840        let tables = txn.list_tables("main", "default").unwrap();
1841        assert_eq!(tables.len(), 1);
1842
1843        txn.commit().unwrap();
1844
1845        let info = db.get_table_info("main", "default", "events").unwrap();
1846        assert_eq!(info.name, "events");
1847    }
1848
1849    #[test]
1850    fn transaction_commit_persists_overlay_to_store() {
1851        let db = Database::new();
1852        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1853
1854        txn.create_catalog(CreateCatalogRequest::new("main"))
1855            .unwrap();
1856        txn.create_namespace(CreateNamespaceRequest::new("main", "default"))
1857            .unwrap();
1858
1859        let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1860        txn.create_table(
1861            CreateTableRequest::new("events")
1862                .with_catalog_name("main")
1863                .with_namespace_name("default")
1864                .with_schema(schema),
1865        )
1866        .unwrap();
1867
1868        txn.commit().unwrap();
1869
1870        let reloaded = alopex_sql::catalog::PersistentCatalog::load(db.store.clone()).unwrap();
1871        assert!(reloaded.get_catalog("main").is_some());
1872        assert!(reloaded.get_namespace("main", "default").is_some());
1873        assert!(reloaded.table_exists("events"));
1874    }
1875
1876    #[test]
1877    fn transaction_rollback_discards_overlay() {
1878        let db = Database::new();
1879        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1880
1881        txn.create_catalog(CreateCatalogRequest::new("main"))
1882            .unwrap();
1883        txn.create_namespace(CreateNamespaceRequest::new("main", "default"))
1884            .unwrap();
1885
1886        let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1887        txn.create_table(
1888            CreateTableRequest::new("staging")
1889                .with_catalog_name("main")
1890                .with_namespace_name("default")
1891                .with_schema(schema),
1892        )
1893        .unwrap();
1894
1895        txn.rollback().unwrap();
1896
1897        let err = db.get_table_info("main", "default", "staging").unwrap_err();
1898        assert!(matches!(err, Error::CatalogNotFound(_)));
1899    }
1900
1901    #[test]
1902    fn transaction_readonly_rejects_ddl() {
1903        let db = Database::new();
1904        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1905        let err = txn
1906            .create_catalog(CreateCatalogRequest::new("main"))
1907            .unwrap_err();
1908        assert!(matches!(err, Error::TxnReadOnly));
1909    }
1910}