1use std::collections::HashMap;
4
5use alopex_core::{KVStore, KVTransaction};
6use alopex_sql::ast::ddl::{DataType, IndexMethod, VectorMetric};
7use alopex_sql::catalog::persistent::{CatalogMeta, NamespaceMeta, TableFqn};
8use alopex_sql::catalog::{
9 Catalog, CatalogOverlay, ColumnMetadata, Compression, IndexMetadata, StorageOptions,
10 StorageType, TableMetadata,
11};
12use alopex_sql::planner::types::ResolvedType;
13use alopex_sql::{DataSourceFormat, TableType};
14
15use crate::{Database, Error, Result, Transaction, TxnMode};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct CatalogInfo {
20 pub name: String,
22 pub comment: Option<String>,
24 pub storage_root: Option<String>,
26}
27
28impl From<CatalogMeta> for CatalogInfo {
29 fn from(value: CatalogMeta) -> Self {
30 Self {
31 name: value.name,
32 comment: value.comment,
33 storage_root: value.storage_root,
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct NamespaceInfo {
41 pub name: String,
43 pub catalog_name: String,
45 pub comment: Option<String>,
47 pub storage_root: Option<String>,
49}
50
51impl From<NamespaceMeta> for NamespaceInfo {
52 fn from(value: NamespaceMeta) -> Self {
53 Self {
54 name: value.name,
55 catalog_name: value.catalog_name,
56 comment: value.comment,
57 storage_root: value.storage_root,
58 }
59 }
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct ColumnInfo {
65 pub name: String,
67 pub data_type: String,
69 pub nullable: bool,
71 pub is_primary_key: bool,
73 pub comment: Option<String>,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct StorageInfo {
80 pub storage_type: String,
82 pub compression: String,
84}
85
86impl Default for StorageInfo {
87 fn default() -> Self {
88 Self {
89 storage_type: "row".to_string(),
90 compression: "none".to_string(),
91 }
92 }
93}
94
95impl From<&StorageOptions> for StorageInfo {
96 fn from(value: &StorageOptions) -> Self {
97 let storage_type = match value.storage_type {
98 StorageType::Row => "row",
99 StorageType::Columnar => "columnar",
100 };
101 let compression = match value.compression {
102 Compression::None => "none",
103 Compression::Lz4 => "lz4",
104 Compression::Zstd => "zstd",
105 };
106 Self {
107 storage_type: storage_type.to_string(),
108 compression: compression.to_string(),
109 }
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct TableInfo {
116 pub name: String,
118 pub catalog_name: String,
120 pub namespace_name: String,
122 pub table_id: u32,
124 pub table_type: TableType,
126 pub columns: Vec<ColumnInfo>,
128 pub primary_key: Option<Vec<String>>,
130 pub storage_location: Option<String>,
132 pub data_source_format: DataSourceFormat,
134 pub storage_options: StorageInfo,
136 pub comment: Option<String>,
138 pub properties: HashMap<String, String>,
140}
141
142impl From<&TableMetadata> for TableInfo {
143 fn from(value: &TableMetadata) -> Self {
144 let primary_key = value.primary_key.clone();
145 let columns = value
146 .columns
147 .iter()
148 .map(|column| ColumnInfo {
149 name: column.name.clone(),
150 data_type: resolved_type_to_string(&column.data_type),
151 nullable: !column.not_null,
152 is_primary_key: column.primary_key
153 || primary_key
154 .as_ref()
155 .map(|keys| keys.iter().any(|name| name == &column.name))
156 .unwrap_or(false),
157 comment: None,
158 })
159 .collect();
160 let storage_options = if value.storage_options == StorageOptions::default() {
161 StorageInfo::default()
162 } else {
163 StorageInfo::from(&value.storage_options)
164 };
165
166 Self {
167 name: value.name.clone(),
168 catalog_name: value.catalog_name.clone(),
169 namespace_name: value.namespace_name.clone(),
170 table_id: value.table_id,
171 table_type: value.table_type,
172 columns,
173 primary_key,
174 storage_location: value.storage_location.clone(),
175 data_source_format: value.data_source_format,
176 storage_options,
177 comment: value.comment.clone(),
178 properties: value.properties.clone(),
179 }
180 }
181}
182
183impl From<TableMetadata> for TableInfo {
184 fn from(value: TableMetadata) -> Self {
185 Self::from(&value)
186 }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct IndexInfo {
192 pub name: String,
194 pub index_id: u32,
196 pub catalog_name: String,
198 pub namespace_name: String,
200 pub table_name: String,
202 pub columns: Vec<String>,
204 pub method: String,
206 pub is_unique: bool,
208}
209
210impl From<&IndexMetadata> for IndexInfo {
211 fn from(value: &IndexMetadata) -> Self {
212 let method = match value.method {
213 Some(IndexMethod::BTree) | None => "btree",
214 Some(IndexMethod::Hnsw) => "hnsw",
215 };
216 Self {
217 name: value.name.clone(),
218 index_id: value.index_id,
219 catalog_name: value.catalog_name.clone(),
220 namespace_name: value.namespace_name.clone(),
221 table_name: value.table.clone(),
222 columns: value.columns.clone(),
223 method: method.to_string(),
224 is_unique: value.unique,
225 }
226 }
227}
228
229impl From<IndexMetadata> for IndexInfo {
230 fn from(value: IndexMetadata) -> Self {
231 Self::from(&value)
232 }
233}
234
235#[derive(Debug, Clone, PartialEq, Eq)]
237pub struct CreateCatalogRequest {
238 pub name: String,
240 pub comment: Option<String>,
242 pub storage_root: Option<String>,
244}
245
246impl CreateCatalogRequest {
247 pub fn new(name: impl Into<String>) -> Self {
249 Self {
250 name: name.into(),
251 comment: None,
252 storage_root: None,
253 }
254 }
255
256 pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
258 self.comment = Some(comment.into());
259 self
260 }
261
262 pub fn with_storage_root(mut self, storage_root: impl Into<String>) -> Self {
264 self.storage_root = Some(storage_root.into());
265 self
266 }
267
268 pub fn build(self) -> Result<Self> {
270 validate_required(&self.name, "catalog 名")?;
271 Ok(self)
272 }
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct CreateNamespaceRequest {
278 pub catalog_name: String,
280 pub name: String,
282 pub comment: Option<String>,
284 pub storage_root: Option<String>,
286}
287
288impl CreateNamespaceRequest {
289 pub fn new(catalog_name: impl Into<String>, name: impl Into<String>) -> Self {
291 Self {
292 catalog_name: catalog_name.into(),
293 name: name.into(),
294 comment: None,
295 storage_root: None,
296 }
297 }
298
299 pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
301 self.comment = Some(comment.into());
302 self
303 }
304
305 pub fn with_storage_root(mut self, storage_root: impl Into<String>) -> Self {
307 self.storage_root = Some(storage_root.into());
308 self
309 }
310
311 pub fn build(self) -> Result<Self> {
313 validate_required(&self.catalog_name, "catalog 名")?;
314 validate_required(&self.name, "namespace 名")?;
315 Ok(self)
316 }
317}
318
319#[derive(Debug, Clone)]
321pub struct CreateTableRequest {
322 pub catalog_name: String,
324 pub namespace_name: String,
326 pub name: String,
328 pub schema: Option<Vec<ColumnDefinition>>,
330 pub table_type: TableType,
332 pub data_source_format: Option<DataSourceFormat>,
334 pub primary_key: Option<Vec<String>>,
336 pub storage_root: Option<String>,
338 pub storage_options: Option<StorageOptions>,
340 pub comment: Option<String>,
342 pub properties: Option<HashMap<String, String>>,
344}
345
346impl CreateTableRequest {
347 pub fn new(name: impl Into<String>) -> Self {
349 Self {
350 catalog_name: "default".to_string(),
351 namespace_name: "default".to_string(),
352 name: name.into(),
353 schema: None,
354 table_type: TableType::Managed,
355 data_source_format: None,
356 primary_key: None,
357 storage_root: None,
358 storage_options: None,
359 comment: None,
360 properties: None,
361 }
362 }
363
364 pub fn with_catalog_name(mut self, catalog_name: impl Into<String>) -> Self {
366 self.catalog_name = catalog_name.into();
367 self
368 }
369
370 pub fn with_namespace_name(mut self, namespace_name: impl Into<String>) -> Self {
372 self.namespace_name = namespace_name.into();
373 self
374 }
375
376 pub fn with_schema(mut self, schema: Vec<ColumnDefinition>) -> Self {
378 self.schema = Some(schema);
379 self
380 }
381
382 pub fn with_table_type(mut self, table_type: TableType) -> Self {
384 self.table_type = table_type;
385 self
386 }
387
388 pub fn with_data_source_format(mut self, data_source_format: DataSourceFormat) -> Self {
390 self.data_source_format = Some(data_source_format);
391 self
392 }
393
394 pub fn with_primary_key(mut self, primary_key: Vec<String>) -> Self {
396 self.primary_key = Some(primary_key);
397 self
398 }
399
400 pub fn with_storage_root(mut self, storage_root: impl Into<String>) -> Self {
402 self.storage_root = Some(storage_root.into());
403 self
404 }
405
406 pub fn with_storage_options(mut self, storage_options: StorageOptions) -> Self {
408 self.storage_options = Some(storage_options);
409 self
410 }
411
412 pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
414 self.comment = Some(comment.into());
415 self
416 }
417
418 pub fn with_properties(mut self, properties: HashMap<String, String>) -> Self {
420 self.properties = Some(properties);
421 self
422 }
423
424 pub fn build(mut self) -> Result<Self> {
426 validate_required(&self.catalog_name, "catalog 名")?;
427 validate_required(&self.namespace_name, "namespace 名")?;
428 validate_required(&self.name, "table 名")?;
429
430 if self.table_type == TableType::Managed && self.schema.is_none() {
431 return Err(Error::SchemaRequired);
432 }
433 if self.table_type == TableType::External && self.storage_root.is_none() {
434 return Err(Error::StorageRootRequired);
435 }
436
437 if self.data_source_format.is_none() {
438 self.data_source_format = Some(DataSourceFormat::Alopex);
439 }
440 if self.properties.is_none() {
441 self.properties = Some(HashMap::new());
442 }
443 Ok(self)
444 }
445}
446
447#[derive(Debug, Clone)]
449pub struct ColumnDefinition {
450 pub name: String,
452 pub data_type: DataType,
454 pub nullable: bool,
456 pub comment: Option<String>,
458}
459
460impl ColumnDefinition {
461 pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
463 Self {
464 name: name.into(),
465 data_type,
466 nullable: true,
467 comment: None,
468 }
469 }
470
471 pub fn with_nullable(mut self, nullable: bool) -> Self {
473 self.nullable = nullable;
474 self
475 }
476
477 pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
479 self.comment = Some(comment.into());
480 self
481 }
482}
483
484impl Database {
485 pub fn list_catalogs(&self) -> Result<Vec<CatalogInfo>> {
487 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
488 Ok(catalog
489 .list_catalogs()
490 .into_iter()
491 .map(CatalogInfo::from)
492 .collect())
493 }
494
495 pub fn get_catalog(&self, name: &str) -> Result<CatalogInfo> {
497 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
498 let meta = catalog
499 .get_catalog(name)
500 .ok_or_else(|| Error::CatalogNotFound(name.to_string()))?;
501 Ok(meta.into())
502 }
503
504 pub fn list_namespaces(&self, catalog_name: &str) -> Result<Vec<NamespaceInfo>> {
506 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
507 ensure_catalog_exists(&*catalog, catalog_name)?;
508 Ok(catalog
509 .list_namespaces(catalog_name)
510 .into_iter()
511 .map(NamespaceInfo::from)
512 .collect())
513 }
514
515 pub fn get_namespace(&self, catalog_name: &str, namespace_name: &str) -> Result<NamespaceInfo> {
517 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
518 ensure_catalog_exists(&*catalog, catalog_name)?;
519 let meta = catalog
520 .get_namespace(catalog_name, namespace_name)
521 .ok_or_else(|| {
522 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
523 })?;
524 Ok(meta.into())
525 }
526
527 pub fn list_tables(&self, catalog_name: &str, namespace_name: &str) -> Result<Vec<TableInfo>> {
529 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
530 ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
531 let namespace = catalog
532 .get_namespace(catalog_name, namespace_name)
533 .ok_or_else(|| {
534 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
535 })?;
536
537 let overlay = CatalogOverlay::new();
538 let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, &overlay);
539 Ok(tables
540 .into_iter()
541 .map(|table| {
542 let info = TableInfo::from(table);
543 apply_storage_location(info, namespace.storage_root.as_deref())
544 })
545 .collect())
546 }
547
548 pub fn list_tables_simple(&self) -> Result<Vec<TableInfo>> {
550 self.list_tables("default", "default")
551 }
552
553 pub fn get_table_info(
555 &self,
556 catalog_name: &str,
557 namespace_name: &str,
558 table_name: &str,
559 ) -> Result<TableInfo> {
560 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
561 ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
562 let namespace = catalog
563 .get_namespace(catalog_name, namespace_name)
564 .ok_or_else(|| {
565 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
566 })?;
567
568 let overlay = CatalogOverlay::new();
569 let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, &overlay);
570 let table = tables
571 .into_iter()
572 .find(|table| table.name == table_name)
573 .ok_or_else(|| {
574 Error::TableNotFound(table_full_name(catalog_name, namespace_name, table_name))
575 })?;
576
577 let info = TableInfo::from(table);
578 Ok(apply_storage_location(
579 info,
580 namespace.storage_root.as_deref(),
581 ))
582 }
583
584 pub fn get_table_info_simple(&self, table_name: &str) -> Result<TableInfo> {
586 self.get_table_info("default", "default", table_name)
587 }
588
589 pub fn get_table_info_cached(
594 &self,
595 catalog_name: &str,
596 namespace_name: &str,
597 table_name: &str,
598 ) -> Result<crate::CachedTableInfo> {
599 if let Some(cached) = self.get_cached_table_info(catalog_name, namespace_name, table_name) {
601 return Ok(cached);
602 }
603
604 let info = self.get_table_info(catalog_name, namespace_name, table_name)?;
606 let cached = crate::CachedTableInfo {
607 storage_location: info.storage_location.clone(),
608 format: format!("{:?}", info.data_source_format).to_uppercase(),
609 };
610 self.cache_table_info(catalog_name, namespace_name, table_name, cached.clone());
611 Ok(cached)
612 }
613
614 pub fn list_indexes(
616 &self,
617 catalog_name: &str,
618 namespace_name: &str,
619 table_name: &str,
620 ) -> Result<Vec<IndexInfo>> {
621 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
622 ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
623 ensure_table_exists(&*catalog, catalog_name, namespace_name, table_name)?;
624
625 let overlay = CatalogOverlay::new();
626 let fqn = TableFqn::new(catalog_name, namespace_name, table_name);
627 let indexes = catalog.list_indexes_in_txn(&fqn, &overlay);
628 Ok(indexes.into_iter().map(IndexInfo::from).collect())
629 }
630
631 pub fn list_indexes_simple(&self, table_name: &str) -> Result<Vec<IndexInfo>> {
633 self.list_indexes("default", "default", table_name)
634 }
635
636 pub fn get_index_info(
638 &self,
639 catalog_name: &str,
640 namespace_name: &str,
641 table_name: &str,
642 index_name: &str,
643 ) -> Result<IndexInfo> {
644 let indexes = self.list_indexes(catalog_name, namespace_name, table_name)?;
645 indexes
646 .into_iter()
647 .find(|index| index.name == index_name)
648 .ok_or_else(|| {
649 Error::IndexNotFound(index_full_name(
650 catalog_name,
651 namespace_name,
652 table_name,
653 index_name,
654 ))
655 })
656 }
657
658 pub fn get_index_info_simple(&self, table_name: &str, index_name: &str) -> Result<IndexInfo> {
660 self.get_index_info("default", "default", table_name, index_name)
661 }
662
663 pub fn create_catalog(&self, request: CreateCatalogRequest) -> Result<CatalogInfo> {
675 let request = request.build()?;
676 let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
677 if catalog.get_catalog(&request.name).is_some() {
678 return Err(Error::CatalogAlreadyExists(request.name));
679 }
680 let meta = CatalogMeta {
681 name: request.name,
682 comment: request.comment,
683 storage_root: request.storage_root,
684 };
685 catalog
686 .create_catalog(meta.clone())
687 .map_err(|err| Error::Sql(err.into()))?;
688 self.invalidate_table_info_cache();
689 Ok(meta.into())
690 }
691
692 pub fn delete_catalog(&self, name: &str, force: bool) -> Result<()> {
694 if name == "default" {
695 return Err(Error::CannotDeleteDefault("catalog".to_string()));
696 }
697 let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
698 ensure_catalog_exists(&*catalog, name)?;
699
700 if !force {
701 let namespaces = catalog.list_namespaces(name);
702 let has_non_default = namespaces.iter().any(|ns| ns.name != "default");
703 let has_tables = namespaces.iter().any(|ns| {
704 let overlay = CatalogOverlay::new();
705 !catalog
706 .list_tables_in_txn(name, &ns.name, &overlay)
707 .is_empty()
708 });
709 if has_non_default || has_tables {
710 return Err(Error::CatalogNotEmpty(name.to_string()));
711 }
712 }
713
714 catalog
715 .delete_catalog(name)
716 .map_err(|err| Error::Sql(err.into()))?;
717 self.invalidate_table_info_cache();
718 Ok(())
719 }
720
721 pub fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<NamespaceInfo> {
737 let request = request.build()?;
738 let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
739 let catalog_meta = catalog
740 .get_catalog(&request.catalog_name)
741 .ok_or_else(|| Error::CatalogNotFound(request.catalog_name.clone()))?;
742 if catalog
743 .get_namespace(&request.catalog_name, &request.name)
744 .is_some()
745 {
746 return Err(Error::NamespaceAlreadyExists(
747 request.catalog_name,
748 request.name,
749 ));
750 }
751
752 let storage_root = request
753 .storage_root
754 .or_else(|| catalog_meta.storage_root.clone());
755 let meta = NamespaceMeta {
756 name: request.name,
757 catalog_name: request.catalog_name,
758 comment: request.comment,
759 storage_root,
760 };
761 catalog
762 .create_namespace(meta.clone())
763 .map_err(|err| Error::Sql(err.into()))?;
764 self.invalidate_table_info_cache();
765 Ok(meta.into())
766 }
767
768 pub fn delete_namespace(
770 &self,
771 catalog_name: &str,
772 namespace_name: &str,
773 force: bool,
774 ) -> Result<()> {
775 if namespace_name == "default" {
776 return Err(Error::CannotDeleteDefault("namespace".to_string()));
777 }
778 let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
779 ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
780
781 let overlay = CatalogOverlay::new();
782 let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, &overlay);
783 if !force && !tables.is_empty() {
784 return Err(Error::NamespaceNotEmpty(
785 catalog_name.to_string(),
786 namespace_name.to_string(),
787 ));
788 }
789
790 if force {
791 let store = catalog.store().clone();
792 let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
793 for table in &tables {
794 catalog
795 .persist_drop_table(&mut txn, &TableFqn::from(table))
796 .map_err(|err| Error::Sql(err.into()))?;
797 }
798 txn.commit_self().map_err(Error::Core)?;
799
800 let mut overlay = CatalogOverlay::new();
801 for table in tables {
802 overlay.drop_table(&TableFqn::from(&table));
803 }
804 catalog.apply_overlay(overlay);
805 }
806
807 catalog
808 .delete_namespace(catalog_name, namespace_name)
809 .map_err(|err| Error::Sql(err.into()))?;
810 self.invalidate_table_info_cache();
811 Ok(())
812 }
813
814 pub fn create_table(&self, request: CreateTableRequest) -> Result<TableInfo> {
837 let request = request.build()?;
838 let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
839
840 ensure_namespace_exists(&*catalog, &request.catalog_name, &request.namespace_name)?;
841 ensure_table_absent(
842 &*catalog,
843 &request.catalog_name,
844 &request.namespace_name,
845 &request.name,
846 )?;
847
848 if request.table_type == TableType::Managed && request.storage_root.is_some() {
849 eprintln!("警告: managed テーブルの storage_root は無視されます");
850 }
851
852 let table_id = catalog.next_table_id();
853 let primary_key = request.primary_key.clone();
854 let columns = build_columns(request.schema.clone(), primary_key.as_ref())?;
855
856 let storage_options = request.storage_options.unwrap_or_else(|| StorageOptions {
857 compression: Compression::None,
858 ..StorageOptions::default()
859 });
860
861 let namespace = catalog.get_namespace(&request.catalog_name, &request.namespace_name);
862 let storage_location = resolve_storage_location(
863 &request.table_type,
864 request.storage_root.as_deref(),
865 namespace.as_ref(),
866 &request.name,
867 )?;
868
869 let mut table = TableMetadata::new(&request.name, columns).with_table_id(table_id);
870 table.catalog_name = request.catalog_name.clone();
871 table.namespace_name = request.namespace_name.clone();
872 table.primary_key = primary_key;
873 table.storage_options = storage_options;
874 table.table_type = request.table_type;
875 table.data_source_format = request
876 .data_source_format
877 .unwrap_or(DataSourceFormat::Alopex);
878 table.storage_location = storage_location;
879 table.comment = request.comment;
880 table.properties = request.properties.unwrap_or_default();
881
882 let store = catalog.store().clone();
883 let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
884 catalog
885 .persist_create_table(&mut txn, &table)
886 .map_err(|err| Error::Sql(err.into()))?;
887 txn.commit_self().map_err(Error::Core)?;
888
889 let mut overlay = CatalogOverlay::new();
890 overlay.add_table(TableFqn::from(&table), table.clone());
891 catalog.apply_overlay(overlay);
892 drop(catalog); self.invalidate_table_info_cache();
894
895 let info = TableInfo::from(table);
896 let namespace_root = namespace.and_then(|ns| ns.storage_root);
897 Ok(apply_storage_location(info, namespace_root.as_deref()))
898 }
899
900 pub fn create_table_simple(
902 &self,
903 name: &str,
904 schema: Vec<ColumnDefinition>,
905 ) -> Result<TableInfo> {
906 self.create_table(CreateTableRequest::new(name).with_schema(schema))
907 }
908
909 pub fn delete_table(
911 &self,
912 catalog_name: &str,
913 namespace_name: &str,
914 table_name: &str,
915 ) -> Result<()> {
916 let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
917 ensure_namespace_exists(&*catalog, catalog_name, namespace_name)?;
918 let table = find_table_metadata(&*catalog, catalog_name, namespace_name, table_name)?
919 .ok_or_else(|| {
920 Error::TableNotFound(table_full_name(catalog_name, namespace_name, table_name))
921 })?;
922
923 let store = catalog.store().clone();
924 let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
925 catalog
926 .persist_drop_table(&mut txn, &TableFqn::from(&table))
927 .map_err(|err| Error::Sql(err.into()))?;
928 txn.commit_self().map_err(Error::Core)?;
929
930 let mut overlay = CatalogOverlay::new();
931 overlay.drop_table(&TableFqn::from(&table));
932 catalog.apply_overlay(overlay);
933 drop(catalog); self.invalidate_table_info_cache();
935 Ok(())
936 }
937
938 pub fn delete_table_simple(&self, name: &str) -> Result<()> {
940 self.delete_table("default", "default", name)
941 }
942}
943
944impl<'a> Transaction<'a> {
945 pub fn list_catalogs(&self) -> Result<Vec<CatalogInfo>> {
947 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
948 Ok(catalog
949 .list_catalogs_in_txn(self.catalog_overlay())
950 .into_iter()
951 .map(CatalogInfo::from)
952 .collect())
953 }
954
955 pub fn get_catalog(&self, name: &str) -> Result<CatalogInfo> {
957 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
958 let meta = catalog
959 .get_catalog_in_txn(name, self.catalog_overlay())
960 .ok_or_else(|| Error::CatalogNotFound(name.to_string()))?;
961 Ok(meta.clone().into())
962 }
963
964 pub fn list_namespaces(&self, catalog_name: &str) -> Result<Vec<NamespaceInfo>> {
966 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
967 ensure_catalog_exists_in_txn(&*catalog, self.catalog_overlay(), catalog_name)?;
968 Ok(catalog
969 .list_namespaces_in_txn(catalog_name, self.catalog_overlay())
970 .into_iter()
971 .map(NamespaceInfo::from)
972 .collect())
973 }
974
975 pub fn get_namespace(&self, catalog_name: &str, namespace_name: &str) -> Result<NamespaceInfo> {
977 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
978 ensure_catalog_exists_in_txn(&*catalog, self.catalog_overlay(), catalog_name)?;
979 let meta = catalog
980 .get_namespace_in_txn(catalog_name, namespace_name, self.catalog_overlay())
981 .ok_or_else(|| {
982 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
983 })?;
984 Ok(meta.clone().into())
985 }
986
987 pub fn list_tables(&self, catalog_name: &str, namespace_name: &str) -> Result<Vec<TableInfo>> {
989 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
990 ensure_namespace_exists_in_txn(
991 &*catalog,
992 self.catalog_overlay(),
993 catalog_name,
994 namespace_name,
995 )?;
996 let namespace = catalog
997 .get_namespace_in_txn(catalog_name, namespace_name, self.catalog_overlay())
998 .cloned()
999 .ok_or_else(|| {
1000 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
1001 })?;
1002 let tables =
1003 catalog.list_tables_in_txn(catalog_name, namespace_name, self.catalog_overlay());
1004 Ok(tables
1005 .into_iter()
1006 .map(|table| {
1007 let info = TableInfo::from(table);
1008 apply_storage_location(info, namespace.storage_root.as_deref())
1009 })
1010 .collect())
1011 }
1012
1013 pub fn get_table_info(
1015 &self,
1016 catalog_name: &str,
1017 namespace_name: &str,
1018 table_name: &str,
1019 ) -> Result<TableInfo> {
1020 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1021 ensure_namespace_exists_in_txn(
1022 &*catalog,
1023 self.catalog_overlay(),
1024 catalog_name,
1025 namespace_name,
1026 )?;
1027 let namespace = catalog
1028 .get_namespace_in_txn(catalog_name, namespace_name, self.catalog_overlay())
1029 .cloned()
1030 .ok_or_else(|| {
1031 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
1032 })?;
1033
1034 let tables =
1035 catalog.list_tables_in_txn(catalog_name, namespace_name, self.catalog_overlay());
1036 let table = tables
1037 .into_iter()
1038 .find(|table| table.name == table_name)
1039 .ok_or_else(|| {
1040 Error::TableNotFound(table_full_name(catalog_name, namespace_name, table_name))
1041 })?;
1042 let info = TableInfo::from(table);
1043 Ok(apply_storage_location(
1044 info,
1045 namespace.storage_root.as_deref(),
1046 ))
1047 }
1048
1049 pub fn create_catalog(&mut self, request: CreateCatalogRequest) -> Result<CatalogInfo> {
1062 ensure_write_mode(self)?;
1063 let request = request.build()?;
1064 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1065 if catalog
1066 .get_catalog_in_txn(&request.name, self.catalog_overlay())
1067 .is_some()
1068 {
1069 return Err(Error::CatalogAlreadyExists(request.name));
1070 }
1071
1072 let meta = CatalogMeta {
1073 name: request.name,
1074 comment: request.comment,
1075 storage_root: request.storage_root,
1076 };
1077 self.catalog_overlay_mut().add_catalog(meta.clone());
1078 self.catalog_modified = true;
1079 Ok(meta.into())
1080 }
1081
1082 pub fn delete_catalog(&mut self, name: &str, force: bool) -> Result<()> {
1084 ensure_write_mode(self)?;
1085 if name == "default" {
1086 return Err(Error::CannotDeleteDefault("catalog".to_string()));
1087 }
1088 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1089 ensure_catalog_exists_in_txn(&*catalog, self.catalog_overlay(), name)?;
1090
1091 if !force {
1092 let namespaces = catalog.list_namespaces_in_txn(name, self.catalog_overlay());
1093 let has_non_default = namespaces.iter().any(|ns| ns.name != "default");
1094 let has_tables = namespaces.iter().any(|ns| {
1095 !catalog
1096 .list_tables_in_txn(name, &ns.name, self.catalog_overlay())
1097 .is_empty()
1098 });
1099 if has_non_default || has_tables {
1100 return Err(Error::CatalogNotEmpty(name.to_string()));
1101 }
1102 }
1103
1104 if force {
1105 self.catalog_overlay_mut().drop_cascade_catalog(name);
1106 } else {
1107 self.catalog_overlay_mut().drop_catalog(name);
1108 }
1109 self.catalog_modified = true;
1110 Ok(())
1111 }
1112
1113 pub fn create_namespace(&mut self, request: CreateNamespaceRequest) -> Result<NamespaceInfo> {
1115 ensure_write_mode(self)?;
1116 let request = request.build()?;
1117 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1118 let catalog_meta = catalog
1119 .get_catalog_in_txn(&request.catalog_name, self.catalog_overlay())
1120 .ok_or_else(|| Error::CatalogNotFound(request.catalog_name.clone()))?;
1121 if catalog
1122 .get_namespace_in_txn(&request.catalog_name, &request.name, self.catalog_overlay())
1123 .is_some()
1124 {
1125 return Err(Error::NamespaceAlreadyExists(
1126 request.catalog_name,
1127 request.name,
1128 ));
1129 }
1130
1131 let storage_root = request
1132 .storage_root
1133 .or_else(|| catalog_meta.storage_root.clone());
1134 let meta = NamespaceMeta {
1135 name: request.name,
1136 catalog_name: request.catalog_name,
1137 comment: request.comment,
1138 storage_root,
1139 };
1140 self.catalog_overlay_mut().add_namespace(meta.clone());
1141 self.catalog_modified = true;
1142 Ok(meta.into())
1143 }
1144
1145 pub fn delete_namespace(
1147 &mut self,
1148 catalog_name: &str,
1149 namespace_name: &str,
1150 force: bool,
1151 ) -> Result<()> {
1152 ensure_write_mode(self)?;
1153 if namespace_name == "default" {
1154 return Err(Error::CannotDeleteDefault("namespace".to_string()));
1155 }
1156 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1157 ensure_namespace_exists_in_txn(
1158 &*catalog,
1159 self.catalog_overlay(),
1160 catalog_name,
1161 namespace_name,
1162 )?;
1163
1164 let tables =
1165 catalog.list_tables_in_txn(catalog_name, namespace_name, self.catalog_overlay());
1166 if !force && !tables.is_empty() {
1167 return Err(Error::NamespaceNotEmpty(
1168 catalog_name.to_string(),
1169 namespace_name.to_string(),
1170 ));
1171 }
1172
1173 if force {
1174 self.catalog_overlay_mut()
1175 .drop_cascade_namespace(catalog_name, namespace_name);
1176 } else {
1177 self.catalog_overlay_mut()
1178 .drop_namespace(catalog_name, namespace_name);
1179 }
1180 self.catalog_modified = true;
1181 Ok(())
1182 }
1183
1184 pub fn create_table(&mut self, request: CreateTableRequest) -> Result<TableInfo> {
1213 ensure_write_mode(self)?;
1214 let request = request.build()?;
1215
1216 let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1217 ensure_namespace_exists_in_txn(
1218 &*catalog,
1219 self.catalog_overlay(),
1220 &request.catalog_name,
1221 &request.namespace_name,
1222 )?;
1223 ensure_table_absent_in_txn(
1224 &*catalog,
1225 self.catalog_overlay(),
1226 &request.catalog_name,
1227 &request.namespace_name,
1228 &request.name,
1229 )?;
1230
1231 if request.table_type == TableType::Managed && request.storage_root.is_some() {
1232 eprintln!("警告: managed テーブルの storage_root は無視されます");
1233 }
1234
1235 let table_id = catalog.next_table_id();
1236 let primary_key = request.primary_key.clone();
1237 let columns = build_columns(request.schema.clone(), primary_key.as_ref())?;
1238
1239 let storage_options = request.storage_options.unwrap_or_else(|| StorageOptions {
1240 compression: Compression::None,
1241 ..StorageOptions::default()
1242 });
1243
1244 let namespace = catalog
1245 .get_namespace_in_txn(
1246 &request.catalog_name,
1247 &request.namespace_name,
1248 self.catalog_overlay(),
1249 )
1250 .cloned();
1251 let storage_location = resolve_storage_location(
1252 &request.table_type,
1253 request.storage_root.as_deref(),
1254 namespace.as_ref(),
1255 &request.name,
1256 )?;
1257
1258 let mut table = TableMetadata::new(&request.name, columns).with_table_id(table_id);
1259 table.catalog_name = request.catalog_name.clone();
1260 table.namespace_name = request.namespace_name.clone();
1261 table.primary_key = primary_key;
1262 table.storage_options = storage_options;
1263 table.table_type = request.table_type;
1264 table.data_source_format = request
1265 .data_source_format
1266 .unwrap_or(DataSourceFormat::Alopex);
1267 table.storage_location = storage_location;
1268 table.comment = request.comment;
1269 table.properties = request.properties.unwrap_or_default();
1270
1271 self.catalog_overlay_mut()
1272 .add_table(TableFqn::from(&table), table.clone());
1273 self.catalog_modified = true;
1274 let info = TableInfo::from(table);
1275 let namespace_root = namespace.and_then(|ns| ns.storage_root);
1276 Ok(apply_storage_location(info, namespace_root.as_deref()))
1277 }
1278
1279 pub fn delete_table(
1281 &mut self,
1282 catalog_name: &str,
1283 namespace_name: &str,
1284 table_name: &str,
1285 ) -> Result<()> {
1286 ensure_write_mode(self)?;
1287 let catalog = self.db.sql_catalog.read().expect("catalog lock poisoned");
1288 ensure_namespace_exists_in_txn(
1289 &*catalog,
1290 self.catalog_overlay(),
1291 catalog_name,
1292 namespace_name,
1293 )?;
1294 let table = find_table_metadata_in_txn(
1295 &*catalog,
1296 self.catalog_overlay(),
1297 catalog_name,
1298 namespace_name,
1299 table_name,
1300 )?
1301 .ok_or_else(|| {
1302 Error::TableNotFound(table_full_name(catalog_name, namespace_name, table_name))
1303 })?;
1304
1305 self.catalog_overlay_mut()
1306 .drop_table(&TableFqn::from(&table));
1307 self.catalog_modified = true;
1308 Ok(())
1309 }
1310}
1311
1312fn validate_required(value: &str, label: &str) -> Result<()> {
1313 if value.trim().is_empty() {
1314 return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
1315 "{label}が未指定です"
1316 ))));
1317 }
1318 Ok(())
1319}
1320
1321fn ensure_catalog_exists<S: alopex_core::kv::KVStore>(
1322 catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1323 name: &str,
1324) -> Result<()> {
1325 if catalog.get_catalog(name).is_none() {
1326 return Err(Error::CatalogNotFound(name.to_string()));
1327 }
1328 Ok(())
1329}
1330
1331fn ensure_catalog_exists_in_txn<S: alopex_core::kv::KVStore>(
1332 catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1333 overlay: &CatalogOverlay,
1334 name: &str,
1335) -> Result<()> {
1336 if catalog.get_catalog_in_txn(name, overlay).is_none() {
1337 return Err(Error::CatalogNotFound(name.to_string()));
1338 }
1339 Ok(())
1340}
1341
1342fn ensure_namespace_exists<S: alopex_core::kv::KVStore>(
1343 catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1344 catalog_name: &str,
1345 namespace_name: &str,
1346) -> Result<()> {
1347 ensure_catalog_exists(catalog, catalog_name)?;
1348 if catalog
1349 .get_namespace(catalog_name, namespace_name)
1350 .is_none()
1351 {
1352 return Err(Error::NamespaceNotFound(
1353 catalog_name.to_string(),
1354 namespace_name.to_string(),
1355 ));
1356 }
1357 Ok(())
1358}
1359
1360fn ensure_namespace_exists_in_txn<S: alopex_core::kv::KVStore>(
1361 catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1362 overlay: &CatalogOverlay,
1363 catalog_name: &str,
1364 namespace_name: &str,
1365) -> Result<()> {
1366 ensure_catalog_exists_in_txn(catalog, overlay, catalog_name)?;
1367 if catalog
1368 .get_namespace_in_txn(catalog_name, namespace_name, overlay)
1369 .is_none()
1370 {
1371 return Err(Error::NamespaceNotFound(
1372 catalog_name.to_string(),
1373 namespace_name.to_string(),
1374 ));
1375 }
1376 Ok(())
1377}
1378
1379fn ensure_table_exists<S: alopex_core::kv::KVStore>(
1380 catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1381 catalog_name: &str,
1382 namespace_name: &str,
1383 table_name: &str,
1384) -> Result<()> {
1385 let Some(table) = find_table_metadata(catalog, catalog_name, namespace_name, table_name)?
1386 else {
1387 return Err(Error::TableNotFound(table_full_name(
1388 catalog_name,
1389 namespace_name,
1390 table_name,
1391 )));
1392 };
1393 let _ = table;
1394 Ok(())
1395}
1396
1397fn ensure_table_absent<S: alopex_core::kv::KVStore>(
1398 catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1399 catalog_name: &str,
1400 namespace_name: &str,
1401 table_name: &str,
1402) -> Result<()> {
1403 if find_table_metadata(catalog, catalog_name, namespace_name, table_name)?.is_some() {
1404 return Err(Error::TableAlreadyExists(table_full_name(
1405 catalog_name,
1406 namespace_name,
1407 table_name,
1408 )));
1409 }
1410 Ok(())
1411}
1412
1413fn ensure_table_absent_in_txn<S: alopex_core::kv::KVStore>(
1414 catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1415 overlay: &CatalogOverlay,
1416 catalog_name: &str,
1417 namespace_name: &str,
1418 table_name: &str,
1419) -> Result<()> {
1420 if find_table_metadata_in_txn(catalog, overlay, catalog_name, namespace_name, table_name)?
1421 .is_some()
1422 {
1423 return Err(Error::TableAlreadyExists(table_full_name(
1424 catalog_name,
1425 namespace_name,
1426 table_name,
1427 )));
1428 }
1429 Ok(())
1430}
1431
1432fn find_table_metadata<S: alopex_core::kv::KVStore>(
1433 catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1434 catalog_name: &str,
1435 namespace_name: &str,
1436 table_name: &str,
1437) -> Result<Option<TableMetadata>> {
1438 let overlay = CatalogOverlay::new();
1439 let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, &overlay);
1440 Ok(tables.into_iter().find(|table| table.name == table_name))
1441}
1442
1443fn find_table_metadata_in_txn<S: alopex_core::kv::KVStore>(
1444 catalog: &alopex_sql::catalog::PersistentCatalog<S>,
1445 overlay: &CatalogOverlay,
1446 catalog_name: &str,
1447 namespace_name: &str,
1448 table_name: &str,
1449) -> Result<Option<TableMetadata>> {
1450 let tables = catalog.list_tables_in_txn(catalog_name, namespace_name, overlay);
1451 Ok(tables.into_iter().find(|table| table.name == table_name))
1452}
1453
1454fn table_full_name(catalog_name: &str, namespace_name: &str, table_name: &str) -> String {
1455 format!("{catalog_name}.{namespace_name}.{table_name}")
1456}
1457
1458fn index_full_name(
1459 catalog_name: &str,
1460 namespace_name: &str,
1461 table_name: &str,
1462 index_name: &str,
1463) -> String {
1464 format!("{catalog_name}.{namespace_name}.{table_name}.{index_name}")
1465}
1466
1467fn apply_storage_location(mut info: TableInfo, namespace_root: Option<&str>) -> TableInfo {
1468 if info.storage_location.is_none() && info.table_type == TableType::Managed {
1469 if let Some(root) = namespace_root {
1470 info.storage_location = Some(format!("{root}/{}", info.name));
1471 }
1472 }
1473 info
1474}
1475
1476fn resolve_storage_location(
1477 table_type: &TableType,
1478 request_storage_root: Option<&str>,
1479 namespace: Option<&NamespaceMeta>,
1480 table_name: &str,
1481) -> Result<Option<String>> {
1482 match table_type {
1483 TableType::Managed => Ok(namespace
1484 .and_then(|ns| ns.storage_root.as_deref())
1485 .map(|root| format!("{root}/{table_name}"))),
1486 TableType::External => {
1487 let storage_root = request_storage_root
1488 .map(|root| root.to_string())
1489 .ok_or(Error::StorageRootRequired)?;
1490 Ok(Some(storage_root))
1491 }
1492 }
1493}
1494
1495fn build_columns(
1496 schema: Option<Vec<ColumnDefinition>>,
1497 primary_key: Option<&Vec<String>>,
1498) -> Result<Vec<ColumnMetadata>> {
1499 let Some(schema) = schema else {
1500 return Ok(Vec::new());
1501 };
1502
1503 let mut columns = Vec::with_capacity(schema.len());
1504 for definition in schema {
1505 validate_required(&definition.name, "column 名")?;
1506 let mut column = ColumnMetadata::new(
1507 definition.name.clone(),
1508 ResolvedType::from_ast(&definition.data_type),
1509 )
1510 .with_not_null(!definition.nullable);
1511 if primary_key
1512 .map(|keys| keys.iter().any(|key| key == &definition.name))
1513 .unwrap_or(false)
1514 {
1515 column = column.with_primary_key(true).with_not_null(true);
1516 }
1517 columns.push(column);
1518 }
1519
1520 if let Some(keys) = primary_key {
1521 let missing: Vec<String> = keys
1522 .iter()
1523 .filter(|key| !columns.iter().any(|col| col.name == **key))
1524 .cloned()
1525 .collect();
1526 if !missing.is_empty() {
1527 return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
1528 "主キーが見つかりません: {}",
1529 missing.join(", ")
1530 ))));
1531 }
1532 }
1533
1534 Ok(columns)
1535}
1536
1537fn ensure_write_mode(txn: &Transaction<'_>) -> Result<()> {
1538 let mode = txn.txn_mode()?;
1539 if mode != TxnMode::ReadWrite {
1540 return Err(Error::TxnReadOnly);
1541 }
1542 Ok(())
1543}
1544
1545fn resolved_type_to_string(resolved_type: &ResolvedType) -> String {
1546 match resolved_type {
1547 ResolvedType::Integer => "INTEGER".to_string(),
1548 ResolvedType::BigInt => "BIGINT".to_string(),
1549 ResolvedType::Float => "FLOAT".to_string(),
1550 ResolvedType::Double => "DOUBLE".to_string(),
1551 ResolvedType::Text => "TEXT".to_string(),
1552 ResolvedType::Blob => "BLOB".to_string(),
1553 ResolvedType::Boolean => "BOOLEAN".to_string(),
1554 ResolvedType::Timestamp => "TIMESTAMP".to_string(),
1555 ResolvedType::Vector { dimension, metric } => {
1556 let metric = match metric {
1557 VectorMetric::Cosine => "COSINE",
1558 VectorMetric::L2 => "L2",
1559 VectorMetric::Inner => "INNER",
1560 };
1561 format!("VECTOR({dimension}, {metric})")
1562 }
1563 ResolvedType::Null => "NULL".to_string(),
1564 }
1565}
1566
1567#[cfg(test)]
1568mod tests {
1569 use super::*;
1570 use crate::{Database, TxnMode};
1571 use alopex_sql::catalog::{ColumnMetadata, RowIdMode};
1572 use alopex_sql::ExecutionResult;
1573
1574 #[test]
1575 fn storage_info_default_is_row_none() {
1576 let info = StorageInfo::default();
1577 assert_eq!(info.storage_type, "row");
1578 assert_eq!(info.compression, "none");
1579 }
1580
1581 #[test]
1582 fn column_definition_defaults_to_nullable() {
1583 let column = ColumnDefinition::new("id", DataType::Integer);
1584 assert!(column.nullable);
1585 assert!(column.comment.is_none());
1586
1587 let column = column.with_nullable(false).with_comment("ID");
1588 assert!(!column.nullable);
1589 assert_eq!(column.comment.as_deref(), Some("ID"));
1590 }
1591
1592 #[test]
1593 fn create_catalog_request_builder_validates_name() {
1594 let err = CreateCatalogRequest::new("").build().unwrap_err();
1595 assert!(matches!(err, Error::Core(_)));
1596
1597 let request = CreateCatalogRequest::new("main")
1598 .with_comment("メイン")
1599 .with_storage_root("/data")
1600 .build()
1601 .unwrap();
1602 assert_eq!(request.name, "main");
1603 assert_eq!(request.comment.as_deref(), Some("メイン"));
1604 assert_eq!(request.storage_root.as_deref(), Some("/data"));
1605 }
1606
1607 #[test]
1608 fn create_namespace_request_builder_validates_fields() {
1609 let err = CreateNamespaceRequest::new("", "default")
1610 .build()
1611 .unwrap_err();
1612 assert!(matches!(err, Error::Core(_)));
1613
1614 let request = CreateNamespaceRequest::new("main", "analytics")
1615 .with_comment("分析")
1616 .build()
1617 .unwrap();
1618 assert_eq!(request.catalog_name, "main");
1619 assert_eq!(request.name, "analytics");
1620 assert_eq!(request.comment.as_deref(), Some("分析"));
1621 }
1622
1623 #[test]
1624 fn create_table_request_defaults_and_validation() {
1625 let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1626
1627 let request = CreateTableRequest::new("users")
1628 .with_schema(schema.clone())
1629 .build()
1630 .unwrap();
1631 assert_eq!(request.catalog_name, "default");
1632 assert_eq!(request.namespace_name, "default");
1633 assert_eq!(request.table_type, TableType::Managed);
1634 assert_eq!(request.data_source_format, Some(DataSourceFormat::Alopex));
1635 assert_eq!(request.properties.as_ref().unwrap().len(), 0);
1636
1637 let err = CreateTableRequest::new("users").build().unwrap_err();
1638 assert!(matches!(err, Error::SchemaRequired));
1639
1640 let err = CreateTableRequest::new("ext")
1641 .with_table_type(TableType::External)
1642 .build()
1643 .unwrap_err();
1644 assert!(matches!(err, Error::StorageRootRequired));
1645
1646 let request = CreateTableRequest::new("ext")
1647 .with_table_type(TableType::External)
1648 .with_storage_root("/external")
1649 .build()
1650 .unwrap();
1651 assert_eq!(request.storage_root.as_deref(), Some("/external"));
1652 assert_eq!(request.data_source_format, Some(DataSourceFormat::Alopex));
1653 assert!(request.properties.as_ref().unwrap().is_empty());
1654 }
1655
1656 #[test]
1657 fn table_info_converts_from_metadata() {
1658 let mut table = TableMetadata::new(
1659 "users",
1660 vec![
1661 ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true),
1662 ColumnMetadata::new("name", ResolvedType::Text),
1663 ],
1664 )
1665 .with_table_id(42);
1666 table.catalog_name = "main".to_string();
1667 table.namespace_name = "default".to_string();
1668 table.primary_key = Some(vec!["id".to_string()]);
1669 table.storage_options = StorageOptions {
1670 storage_type: StorageType::Columnar,
1671 compression: Compression::Zstd,
1672 row_group_size: 1024,
1673 row_id_mode: RowIdMode::Direct,
1674 };
1675
1676 let info = TableInfo::from(table);
1677 assert_eq!(info.name, "users");
1678 assert_eq!(info.table_id, 42);
1679 assert_eq!(info.catalog_name, "main");
1680 assert_eq!(info.namespace_name, "default");
1681 assert_eq!(info.columns.len(), 2);
1682 assert_eq!(info.columns[0].data_type, "INTEGER");
1683 assert!(info.columns[0].is_primary_key);
1684 assert_eq!(info.storage_options.storage_type, "columnar");
1685 assert_eq!(info.storage_options.compression, "zstd");
1686 }
1687
1688 #[test]
1689 fn table_info_defaults_storage_options_to_row_none() {
1690 let table = TableMetadata::new(
1691 "logs",
1692 vec![ColumnMetadata::new("id", ResolvedType::Integer)],
1693 );
1694 let info = TableInfo::from(table);
1695 assert_eq!(info.storage_options.storage_type, "row");
1696 assert_eq!(info.storage_options.compression, "none");
1697 }
1698
1699 #[test]
1700 fn index_info_converts_from_metadata() {
1701 let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
1702 .with_unique(true)
1703 .with_method(IndexMethod::Hnsw);
1704 index.catalog_name = "main".to_string();
1705 index.namespace_name = "default".to_string();
1706
1707 let info = IndexInfo::from(index);
1708 assert_eq!(info.name, "idx_users_id");
1709 assert_eq!(info.table_name, "users");
1710 assert_eq!(info.method, "hnsw");
1711 assert!(info.is_unique);
1712 }
1713
1714 fn ensure_default_catalog_and_namespace(db: &Database) {
1715 let _ = db.create_catalog(CreateCatalogRequest::new("default"));
1716 let _ = db.create_namespace(CreateNamespaceRequest::new("default", "default"));
1717 }
1718
1719 #[test]
1720 fn database_catalog_and_namespace_crud() {
1721 let db = Database::new();
1722
1723 let catalog = db
1724 .create_catalog(CreateCatalogRequest::new("main"))
1725 .unwrap();
1726 assert_eq!(catalog.name, "main");
1727
1728 let namespace = db
1729 .create_namespace(CreateNamespaceRequest::new("main", "analytics"))
1730 .unwrap();
1731 assert_eq!(namespace.catalog_name, "main");
1732 assert_eq!(namespace.name, "analytics");
1733
1734 let list = db.list_namespaces("main").unwrap();
1735 assert_eq!(list.len(), 1);
1736
1737 let err = db.delete_catalog("main", false).unwrap_err();
1738 assert!(matches!(err, Error::CatalogNotEmpty(_)));
1739
1740 db.delete_catalog("main", true).unwrap();
1741
1742 let err = db.get_catalog("main").unwrap_err();
1743 assert!(matches!(err, Error::CatalogNotFound(_)));
1744 }
1745
1746 #[test]
1747 fn cannot_delete_default_catalog_or_namespace() {
1748 let db = Database::new();
1749 ensure_default_catalog_and_namespace(&db);
1750
1751 let err = db.delete_catalog("default", true).unwrap_err();
1752 assert!(matches!(err, Error::CannotDeleteDefault(_)));
1753
1754 let err = db.delete_namespace("default", "default", true).unwrap_err();
1755 assert!(matches!(err, Error::CannotDeleteDefault(_)));
1756
1757 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1758 let err = txn.delete_catalog("default", true).unwrap_err();
1759 assert!(matches!(err, Error::CannotDeleteDefault(_)));
1760
1761 let err = txn
1762 .delete_namespace("default", "default", true)
1763 .unwrap_err();
1764 assert!(matches!(err, Error::CannotDeleteDefault(_)));
1765 }
1766
1767 #[test]
1768 fn database_table_crud_and_simple_helpers() {
1769 let db = Database::new();
1770 ensure_default_catalog_and_namespace(&db);
1771
1772 let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1773 let info = db.create_table_simple("users", schema).unwrap();
1774 assert_eq!(info.catalog_name, "default");
1775 assert_eq!(info.namespace_name, "default");
1776 assert_eq!(info.table_type, TableType::Managed);
1777 assert_eq!(info.data_source_format, DataSourceFormat::Alopex);
1778 assert_eq!(info.storage_options.storage_type, "row");
1779 assert_eq!(info.storage_options.compression, "none");
1780
1781 let tables = db.list_tables_simple().unwrap();
1782 assert_eq!(tables.len(), 1);
1783
1784 let info = db.get_table_info_simple("users").unwrap();
1785 assert_eq!(info.name, "users");
1786
1787 let err = db
1788 .create_table_simple(
1789 "users",
1790 vec![ColumnDefinition::new("id", DataType::Integer)],
1791 )
1792 .unwrap_err();
1793 assert!(matches!(err, Error::TableAlreadyExists(_)));
1794
1795 db.delete_table_simple("users").unwrap();
1796 assert!(db.list_tables_simple().unwrap().is_empty());
1797 }
1798
1799 #[test]
1800 fn database_index_read_helpers() {
1801 let db = Database::new();
1802 ensure_default_catalog_and_namespace(&db);
1803
1804 let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1805 db.create_table_simple("users", schema).unwrap();
1806
1807 let result = db
1808 .execute_sql("CREATE INDEX idx_users_id ON users (id);")
1809 .unwrap();
1810 assert!(matches!(result, ExecutionResult::Success));
1811
1812 let indexes = db.list_indexes_simple("users").unwrap();
1813 assert_eq!(indexes.len(), 1);
1814 assert_eq!(indexes[0].name, "idx_users_id");
1815 assert_eq!(indexes[0].method, "btree");
1816
1817 let index = db.get_index_info_simple("users", "idx_users_id").unwrap();
1818 assert_eq!(index.table_name, "users");
1819 }
1820
1821 #[test]
1822 fn transaction_overlay_visibility_and_commit() {
1823 let db = Database::new();
1824 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1825
1826 txn.create_catalog(CreateCatalogRequest::new("main"))
1827 .unwrap();
1828 txn.create_namespace(CreateNamespaceRequest::new("main", "default"))
1829 .unwrap();
1830
1831 let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1832 txn.create_table(
1833 CreateTableRequest::new("events")
1834 .with_catalog_name("main")
1835 .with_namespace_name("default")
1836 .with_schema(schema),
1837 )
1838 .unwrap();
1839
1840 let tables = txn.list_tables("main", "default").unwrap();
1841 assert_eq!(tables.len(), 1);
1842
1843 txn.commit().unwrap();
1844
1845 let info = db.get_table_info("main", "default", "events").unwrap();
1846 assert_eq!(info.name, "events");
1847 }
1848
1849 #[test]
1850 fn transaction_commit_persists_overlay_to_store() {
1851 let db = Database::new();
1852 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1853
1854 txn.create_catalog(CreateCatalogRequest::new("main"))
1855 .unwrap();
1856 txn.create_namespace(CreateNamespaceRequest::new("main", "default"))
1857 .unwrap();
1858
1859 let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1860 txn.create_table(
1861 CreateTableRequest::new("events")
1862 .with_catalog_name("main")
1863 .with_namespace_name("default")
1864 .with_schema(schema),
1865 )
1866 .unwrap();
1867
1868 txn.commit().unwrap();
1869
1870 let reloaded = alopex_sql::catalog::PersistentCatalog::load(db.store.clone()).unwrap();
1871 assert!(reloaded.get_catalog("main").is_some());
1872 assert!(reloaded.get_namespace("main", "default").is_some());
1873 assert!(reloaded.table_exists("events"));
1874 }
1875
1876 #[test]
1877 fn transaction_rollback_discards_overlay() {
1878 let db = Database::new();
1879 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1880
1881 txn.create_catalog(CreateCatalogRequest::new("main"))
1882 .unwrap();
1883 txn.create_namespace(CreateNamespaceRequest::new("main", "default"))
1884 .unwrap();
1885
1886 let schema = vec![ColumnDefinition::new("id", DataType::Integer)];
1887 txn.create_table(
1888 CreateTableRequest::new("staging")
1889 .with_catalog_name("main")
1890 .with_namespace_name("default")
1891 .with_schema(schema),
1892 )
1893 .unwrap();
1894
1895 txn.rollback().unwrap();
1896
1897 let err = db.get_table_info("main", "default", "staging").unwrap_err();
1898 assert!(matches!(err, Error::CatalogNotFound(_)));
1899 }
1900
1901 #[test]
1902 fn transaction_readonly_rejects_ddl() {
1903 let db = Database::new();
1904 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1905 let err = txn
1906 .create_catalog(CreateCatalogRequest::new("main"))
1907 .unwrap_err();
1908 assert!(matches!(err, Error::TxnReadOnly));
1909 }
1910}