1use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11
12use alopex_core::kv::{KVStore, KVTransaction};
13use alopex_core::types::TxnMode;
14use serde::{Deserialize, Serialize};
15use thiserror::Error;
16
17use crate::ast::ddl::{IndexMethod, VectorMetric};
18use crate::catalog::{
19 Catalog, ColumnMetadata, Compression, IndexMetadata, MemoryCatalog, RowIdMode,
20};
21use crate::catalog::{StorageOptions, StorageType, TableMetadata};
22use crate::planner::PlannerError;
23use crate::planner::types::ResolvedType;
24
25pub const CATALOG_PREFIX: &[u8] = b"__catalog__/";
27pub const CATALOGS_PREFIX: &[u8] = b"__catalog__/catalogs/";
28pub const NAMESPACES_PREFIX: &[u8] = b"__catalog__/namespaces/";
29pub const TABLES_PREFIX: &[u8] = b"__catalog__/tables/";
30pub const INDEXES_PREFIX: &[u8] = b"__catalog__/indexes/";
31pub const META_KEY: &[u8] = b"__catalog__/meta";
32
33const CATALOG_VERSION: u32 = 2;
34
35#[derive(Debug, Error)]
36pub enum CatalogError {
37 #[error("kv error: {0}")]
38 Kv(#[from] alopex_core::Error),
39
40 #[error("serialize error: {0}")]
41 Serialize(#[from] bincode::Error),
42
43 #[error("invalid catalog key: {0}")]
44 InvalidKey(String),
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
48struct CatalogState {
49 version: u32,
50 table_id_counter: u32,
51 index_id_counter: u32,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct PersistedCatalogMeta {
56 pub name: String,
57 pub comment: Option<String>,
58 pub storage_root: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62pub struct PersistedNamespaceMeta {
63 pub name: String,
64 pub catalog_name: String,
65 pub comment: Option<String>,
66 pub storage_root: Option<String>,
67}
68
69pub type CatalogMeta = PersistedCatalogMeta;
70pub type NamespaceMeta = PersistedNamespaceMeta;
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
73pub struct TableFqn {
74 pub catalog: String,
75 pub namespace: String,
76 pub table: String,
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct IndexFqn {
81 pub catalog: String,
82 pub namespace: String,
83 pub table: String,
84 pub index: String,
85}
86
87impl TableFqn {
88 pub fn new(catalog: &str, namespace: &str, table: &str) -> Self {
89 Self {
90 catalog: catalog.to_string(),
91 namespace: namespace.to_string(),
92 table: table.to_string(),
93 }
94 }
95}
96
97impl IndexFqn {
98 pub fn new(catalog: &str, namespace: &str, table: &str, index: &str) -> Self {
99 Self {
100 catalog: catalog.to_string(),
101 namespace: namespace.to_string(),
102 table: table.to_string(),
103 index: index.to_string(),
104 }
105 }
106}
107
108impl From<&TableMetadata> for TableFqn {
109 fn from(value: &TableMetadata) -> Self {
110 Self::new(&value.catalog_name, &value.namespace_name, &value.name)
111 }
112}
113
114impl From<&IndexMetadata> for IndexFqn {
115 fn from(value: &IndexMetadata) -> Self {
116 Self::new(
117 &value.catalog_name,
118 &value.namespace_name,
119 &value.table,
120 &value.name,
121 )
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
127pub enum TableType {
128 Managed,
129 External,
130}
131
132#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
133#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
134pub enum DataSourceFormat {
135 #[default]
136 Alopex,
137 Parquet,
138 Delta,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
142pub enum PersistedVectorMetric {
143 Cosine,
144 L2,
145 Inner,
146}
147
148impl From<VectorMetric> for PersistedVectorMetric {
149 fn from(value: VectorMetric) -> Self {
150 match value {
151 VectorMetric::Cosine => Self::Cosine,
152 VectorMetric::L2 => Self::L2,
153 VectorMetric::Inner => Self::Inner,
154 }
155 }
156}
157
158impl From<PersistedVectorMetric> for VectorMetric {
159 fn from(value: PersistedVectorMetric) -> Self {
160 match value {
161 PersistedVectorMetric::Cosine => Self::Cosine,
162 PersistedVectorMetric::L2 => Self::L2,
163 PersistedVectorMetric::Inner => Self::Inner,
164 }
165 }
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
169pub enum PersistedType {
170 Integer,
171 BigInt,
172 Float,
173 Double,
174 Text,
175 Blob,
176 Boolean,
177 Timestamp,
178 Vector {
179 dimension: u32,
180 metric: PersistedVectorMetric,
181 },
182 Null,
183}
184
185impl From<ResolvedType> for PersistedType {
186 fn from(value: ResolvedType) -> Self {
187 match value {
188 ResolvedType::Integer => Self::Integer,
189 ResolvedType::BigInt => Self::BigInt,
190 ResolvedType::Float => Self::Float,
191 ResolvedType::Double => Self::Double,
192 ResolvedType::Text => Self::Text,
193 ResolvedType::Blob => Self::Blob,
194 ResolvedType::Boolean => Self::Boolean,
195 ResolvedType::Timestamp => Self::Timestamp,
196 ResolvedType::Vector { dimension, metric } => Self::Vector {
197 dimension,
198 metric: metric.into(),
199 },
200 ResolvedType::Null => Self::Null,
201 }
202 }
203}
204
205impl From<PersistedType> for ResolvedType {
206 fn from(value: PersistedType) -> Self {
207 match value {
208 PersistedType::Integer => Self::Integer,
209 PersistedType::BigInt => Self::BigInt,
210 PersistedType::Float => Self::Float,
211 PersistedType::Double => Self::Double,
212 PersistedType::Text => Self::Text,
213 PersistedType::Blob => Self::Blob,
214 PersistedType::Boolean => Self::Boolean,
215 PersistedType::Timestamp => Self::Timestamp,
216 PersistedType::Vector { dimension, metric } => Self::Vector {
217 dimension,
218 metric: metric.into(),
219 },
220 PersistedType::Null => Self::Null,
221 }
222 }
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
226pub enum PersistedIndexType {
227 BTree,
228 Hnsw,
229}
230
231impl From<PersistedIndexType> for IndexMethod {
232 fn from(value: PersistedIndexType) -> Self {
233 match value {
234 PersistedIndexType::BTree => IndexMethod::BTree,
235 PersistedIndexType::Hnsw => IndexMethod::Hnsw,
236 }
237 }
238}
239
240impl TryFrom<IndexMethod> for PersistedIndexType {
241 type Error = ();
242
243 fn try_from(value: IndexMethod) -> Result<Self, Self::Error> {
244 match value {
245 IndexMethod::BTree => Ok(Self::BTree),
246 IndexMethod::Hnsw => Ok(Self::Hnsw),
247 }
248 }
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
252pub enum PersistedStorageType {
253 Row,
254 Columnar,
255}
256
257impl From<PersistedStorageType> for StorageType {
258 fn from(value: PersistedStorageType) -> Self {
259 match value {
260 PersistedStorageType::Row => Self::Row,
261 PersistedStorageType::Columnar => Self::Columnar,
262 }
263 }
264}
265
266impl From<StorageType> for PersistedStorageType {
267 fn from(value: StorageType) -> Self {
268 match value {
269 StorageType::Row => Self::Row,
270 StorageType::Columnar => Self::Columnar,
271 }
272 }
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
276pub enum PersistedCompression {
277 None,
278 Lz4,
279 Zstd,
280}
281
282impl From<PersistedCompression> for Compression {
283 fn from(value: PersistedCompression) -> Self {
284 match value {
285 PersistedCompression::None => Self::None,
286 PersistedCompression::Lz4 => Self::Lz4,
287 PersistedCompression::Zstd => Self::Zstd,
288 }
289 }
290}
291
292impl From<Compression> for PersistedCompression {
293 fn from(value: Compression) -> Self {
294 match value {
295 Compression::None => Self::None,
296 Compression::Lz4 => Self::Lz4,
297 Compression::Zstd => Self::Zstd,
298 }
299 }
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
303pub enum PersistedRowIdMode {
304 None,
305 Direct,
306}
307
308impl From<PersistedRowIdMode> for RowIdMode {
309 fn from(value: PersistedRowIdMode) -> Self {
310 match value {
311 PersistedRowIdMode::None => Self::None,
312 PersistedRowIdMode::Direct => Self::Direct,
313 }
314 }
315}
316
317impl From<RowIdMode> for PersistedRowIdMode {
318 fn from(value: RowIdMode) -> Self {
319 match value {
320 RowIdMode::None => Self::None,
321 RowIdMode::Direct => Self::Direct,
322 }
323 }
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
327pub struct PersistedStorageOptions {
328 pub storage_type: PersistedStorageType,
329 pub compression: PersistedCompression,
330 pub row_group_size: u32,
331 pub row_id_mode: PersistedRowIdMode,
332}
333
334impl From<StorageOptions> for PersistedStorageOptions {
335 fn from(value: StorageOptions) -> Self {
336 Self {
337 storage_type: value.storage_type.into(),
338 compression: value.compression.into(),
339 row_group_size: value.row_group_size,
340 row_id_mode: value.row_id_mode.into(),
341 }
342 }
343}
344
345impl From<PersistedStorageOptions> for StorageOptions {
346 fn from(value: PersistedStorageOptions) -> Self {
347 Self {
348 storage_type: value.storage_type.into(),
349 compression: value.compression.into(),
350 row_group_size: value.row_group_size,
351 row_id_mode: value.row_id_mode.into(),
352 }
353 }
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
357pub struct PersistedColumnMeta {
358 pub name: String,
359 pub data_type: PersistedType,
360 pub not_null: bool,
361 pub primary_key: bool,
362 pub unique: bool,
363}
364
365impl From<&ColumnMetadata> for PersistedColumnMeta {
366 fn from(value: &ColumnMetadata) -> Self {
367 Self {
368 name: value.name.clone(),
369 data_type: value.data_type.clone().into(),
370 not_null: value.not_null,
371 primary_key: value.primary_key,
372 unique: value.unique,
373 }
374 }
375}
376
377impl From<PersistedColumnMeta> for ColumnMetadata {
378 fn from(value: PersistedColumnMeta) -> Self {
379 ColumnMetadata::new(value.name, value.data_type.into())
380 .with_not_null(value.not_null)
381 .with_primary_key(value.primary_key)
382 .with_unique(value.unique)
383 }
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
387pub struct PersistedTableMeta {
388 pub table_id: u32,
389 pub name: String,
390 pub catalog_name: String,
391 pub namespace_name: String,
392 pub table_type: TableType,
393 pub data_source_format: DataSourceFormat,
394 pub columns: Vec<PersistedColumnMeta>,
395 pub primary_key: Option<Vec<String>>,
396 pub storage_options: PersistedStorageOptions,
397 pub storage_location: Option<String>,
398 pub comment: Option<String>,
399 pub properties: HashMap<String, String>,
400}
401
402#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
403struct PersistedTableMetaV1 {
404 table_id: u32,
405 name: String,
406 columns: Vec<PersistedColumnMeta>,
407 primary_key: Option<Vec<String>>,
408 storage_options: PersistedStorageOptions,
409}
410
411impl From<&TableMetadata> for PersistedTableMeta {
412 fn from(value: &TableMetadata) -> Self {
413 Self {
414 table_id: value.table_id,
415 name: value.name.clone(),
416 catalog_name: value.catalog_name.clone(),
417 namespace_name: value.namespace_name.clone(),
418 table_type: value.table_type,
419 data_source_format: value.data_source_format,
420 columns: value
421 .columns
422 .iter()
423 .map(PersistedColumnMeta::from)
424 .collect(),
425 primary_key: value.primary_key.clone(),
426 storage_options: value.storage_options.clone().into(),
427 storage_location: value.storage_location.clone(),
428 comment: value.comment.clone(),
429 properties: value.properties.clone(),
430 }
431 }
432}
433
434impl From<PersistedTableMeta> for TableMetadata {
435 fn from(value: PersistedTableMeta) -> Self {
436 let mut table = TableMetadata::new(
437 value.name,
438 value
439 .columns
440 .into_iter()
441 .map(ColumnMetadata::from)
442 .collect(),
443 )
444 .with_table_id(value.table_id);
445 table.primary_key = value.primary_key;
446 table.storage_options = value.storage_options.into();
447 table.catalog_name = value.catalog_name;
448 table.namespace_name = value.namespace_name;
449 table.table_type = value.table_type;
450 table.data_source_format = value.data_source_format;
451 table.storage_location = value.storage_location;
452 table.comment = value.comment;
453 table.properties = value.properties;
454 table
455 }
456}
457
458#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
459pub struct PersistedIndexMeta {
460 pub index_id: u32,
461 pub name: String,
462 pub table: String,
463 pub columns: Vec<String>,
464 pub column_indices: Vec<usize>,
465 pub unique: bool,
466 pub method: Option<PersistedIndexType>,
467 pub options: Vec<(String, String)>,
468 pub catalog_name: String,
469 pub namespace_name: String,
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
473struct PersistedIndexMetaV1 {
474 index_id: u32,
475 name: String,
476 table: String,
477 columns: Vec<String>,
478 column_indices: Vec<usize>,
479 unique: bool,
480 method: Option<PersistedIndexType>,
481 options: Vec<(String, String)>,
482}
483
484impl From<&IndexMetadata> for PersistedIndexMeta {
485 fn from(value: &IndexMetadata) -> Self {
486 Self {
487 index_id: value.index_id,
488 name: value.name.clone(),
489 table: value.table.clone(),
490 columns: value.columns.clone(),
491 column_indices: value.column_indices.clone(),
492 unique: value.unique,
493 method: value
494 .method
495 .and_then(|m| PersistedIndexType::try_from(m).ok()),
496 options: value.options.clone(),
497 catalog_name: value.catalog_name.clone(),
498 namespace_name: value.namespace_name.clone(),
499 }
500 }
501}
502
503impl From<PersistedIndexMeta> for IndexMetadata {
504 fn from(value: PersistedIndexMeta) -> Self {
505 let mut index = IndexMetadata::new(value.index_id, value.name, value.table, value.columns)
506 .with_column_indices(value.column_indices)
507 .with_unique(value.unique)
508 .with_options(value.options);
509 index.catalog_name = value.catalog_name;
510 index.namespace_name = value.namespace_name;
511 if let Some(method) = value.method {
512 index = index.with_method(method.into());
513 }
514 index
515 }
516}
517
518fn deserialize_table_meta(bytes: &[u8]) -> Result<PersistedTableMeta, CatalogError> {
519 match bincode::deserialize::<PersistedTableMeta>(bytes) {
520 Ok(meta) => Ok(meta),
521 Err(err) => {
522 let is_legacy = matches!(
523 err.as_ref(),
524 bincode::ErrorKind::Io(io)
525 if io.kind() == std::io::ErrorKind::UnexpectedEof
526 );
527 if !is_legacy {
528 return Err(err.into());
529 }
530 let legacy: PersistedTableMetaV1 = bincode::deserialize(bytes)?;
531 Ok(PersistedTableMeta {
532 table_id: legacy.table_id,
533 name: legacy.name,
534 catalog_name: "default".to_string(),
535 namespace_name: "default".to_string(),
536 table_type: TableType::Managed,
537 data_source_format: DataSourceFormat::Alopex,
538 columns: legacy.columns,
539 primary_key: legacy.primary_key,
540 storage_options: legacy.storage_options,
541 storage_location: None,
542 comment: None,
543 properties: HashMap::new(),
544 })
545 }
546 }
547}
548
549fn deserialize_index_meta(bytes: &[u8]) -> Result<PersistedIndexMeta, CatalogError> {
550 match bincode::deserialize::<PersistedIndexMeta>(bytes) {
551 Ok(meta) => Ok(meta),
552 Err(err) => {
553 let is_legacy = matches!(
554 err.as_ref(),
555 bincode::ErrorKind::Io(io)
556 if io.kind() == std::io::ErrorKind::UnexpectedEof
557 );
558 if !is_legacy {
559 return Err(err.into());
560 }
561 let legacy: PersistedIndexMetaV1 = bincode::deserialize(bytes)?;
562 Ok(PersistedIndexMeta {
563 index_id: legacy.index_id,
564 name: legacy.name,
565 table: legacy.table,
566 columns: legacy.columns,
567 column_indices: legacy.column_indices,
568 unique: legacy.unique,
569 method: legacy.method,
570 options: legacy.options,
571 catalog_name: "default".to_string(),
572 namespace_name: "default".to_string(),
573 })
574 }
575 }
576}
577
578fn table_key(catalog_name: &str, namespace_name: &str, table_name: &str) -> Vec<u8> {
579 let mut key = TABLES_PREFIX.to_vec();
580 key.extend_from_slice(catalog_name.as_bytes());
581 key.push(b'/');
582 key.extend_from_slice(namespace_name.as_bytes());
583 key.push(b'/');
584 key.extend_from_slice(table_name.as_bytes());
585 key
586}
587
588fn catalog_key(name: &str) -> Vec<u8> {
589 let mut key = CATALOGS_PREFIX.to_vec();
590 key.extend_from_slice(name.as_bytes());
591 key
592}
593
594fn namespace_key(catalog_name: &str, namespace_name: &str) -> Vec<u8> {
595 let mut key = NAMESPACES_PREFIX.to_vec();
596 key.extend_from_slice(catalog_name.as_bytes());
597 key.push(b'/');
598 key.extend_from_slice(namespace_name.as_bytes());
599 key
600}
601
602fn index_key(
603 catalog_name: &str,
604 namespace_name: &str,
605 table_name: &str,
606 index_name: &str,
607) -> Vec<u8> {
608 let mut key = INDEXES_PREFIX.to_vec();
609 key.extend_from_slice(catalog_name.as_bytes());
610 key.push(b'/');
611 key.extend_from_slice(namespace_name.as_bytes());
612 key.push(b'/');
613 key.extend_from_slice(table_name.as_bytes());
614 key.push(b'/');
615 key.extend_from_slice(index_name.as_bytes());
616 key
617}
618
619fn index_prefix(catalog_name: &str, namespace_name: &str, table_name: &str) -> Vec<u8> {
620 let mut key = INDEXES_PREFIX.to_vec();
621 key.extend_from_slice(catalog_name.as_bytes());
622 key.push(b'/');
623 key.extend_from_slice(namespace_name.as_bytes());
624 key.push(b'/');
625 key.extend_from_slice(table_name.as_bytes());
626 key.push(b'/');
627 key
628}
629
630fn key_suffix(prefix: &[u8], key: &[u8]) -> Result<String, CatalogError> {
631 let suffix = key
632 .strip_prefix(prefix)
633 .ok_or_else(|| CatalogError::InvalidKey(format!("{key:?}")))?;
634 std::str::from_utf8(suffix)
635 .map(|s| s.to_string())
636 .map_err(|_| CatalogError::InvalidKey(format!("{key:?}")))
637}
638
639fn parse_table_key_suffix(suffix: &str) -> Result<TableFqn, CatalogError> {
640 let mut parts = suffix.splitn(3, '/');
641 let catalog = parts
642 .next()
643 .filter(|part| !part.is_empty())
644 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
645 let namespace = parts
646 .next()
647 .filter(|part| !part.is_empty())
648 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
649 let table = parts
650 .next()
651 .filter(|part| !part.is_empty())
652 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
653 Ok(TableFqn::new(catalog, namespace, table))
654}
655
656fn parse_index_key_suffix(suffix: &str) -> Result<IndexFqn, CatalogError> {
657 let mut parts = suffix.splitn(4, '/');
658 let catalog = parts
659 .next()
660 .filter(|part| !part.is_empty())
661 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
662 let namespace = parts
663 .next()
664 .filter(|part| !part.is_empty())
665 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
666 let table = parts
667 .next()
668 .filter(|part| !part.is_empty())
669 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
670 let index = parts
671 .next()
672 .filter(|part| !part.is_empty())
673 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
674 Ok(IndexFqn::new(catalog, namespace, table, index))
675}
676
677#[derive(Debug, Clone, Default)]
678pub struct CatalogOverlay {
679 added_catalogs: HashMap<String, CatalogMeta>,
680 dropped_catalogs: HashSet<String>,
681 added_namespaces: HashMap<(String, String), NamespaceMeta>,
682 dropped_namespaces: HashSet<(String, String)>,
683 added_tables: HashMap<TableFqn, TableMetadata>,
684 dropped_tables: HashSet<TableFqn>,
685 added_indexes: HashMap<IndexFqn, IndexMetadata>,
686 dropped_indexes: HashSet<IndexFqn>,
687}
688
689impl CatalogOverlay {
690 pub fn new() -> Self {
691 Self::default()
692 }
693
694 pub fn add_catalog(&mut self, meta: CatalogMeta) {
695 self.dropped_catalogs.remove(&meta.name);
696 self.added_catalogs.insert(meta.name.clone(), meta);
697 }
698
699 pub fn drop_catalog(&mut self, name: &str) {
700 self.added_catalogs.remove(name);
701 self.dropped_catalogs.insert(name.to_string());
702 }
703
704 pub fn add_namespace(&mut self, meta: NamespaceMeta) {
705 let key = (meta.catalog_name.clone(), meta.name.clone());
706 self.dropped_namespaces.remove(&key);
707 self.added_namespaces.insert(key, meta);
708 }
709
710 pub fn drop_namespace(&mut self, catalog_name: &str, namespace_name: &str) {
711 let key = (catalog_name.to_string(), namespace_name.to_string());
712 self.added_namespaces.remove(&key);
713 self.dropped_namespaces.insert(key);
714 }
715
716 pub fn add_table(&mut self, fqn: TableFqn, table: TableMetadata) {
717 self.dropped_tables.remove(&fqn);
718 self.added_tables.insert(fqn, table);
719 }
720
721 pub fn drop_table(&mut self, fqn: &TableFqn) {
722 self.added_tables.remove(fqn);
723 self.dropped_tables.insert(fqn.clone());
724 self.added_indexes.retain(|key, _| {
725 key.catalog != fqn.catalog || key.namespace != fqn.namespace || key.table != fqn.table
726 });
727 }
728
729 pub fn add_index(&mut self, fqn: IndexFqn, index: IndexMetadata) {
730 self.dropped_indexes.remove(&fqn);
731 self.added_indexes.insert(fqn, index);
732 }
733
734 pub fn drop_index(&mut self, fqn: &IndexFqn) {
735 self.added_indexes.remove(fqn);
736 self.dropped_indexes.insert(fqn.clone());
737 }
738
739 pub fn drop_cascade_catalog(&mut self, catalog: &str) {
740 self.drop_catalog(catalog);
741
742 let namespace_keys: Vec<(String, String)> = self
743 .added_namespaces
744 .keys()
745 .filter(|(cat, _)| cat == catalog)
746 .cloned()
747 .collect();
748 for (catalog_name, namespace_name) in namespace_keys {
749 self.drop_namespace(&catalog_name, &namespace_name);
750 }
751
752 let index_keys: Vec<IndexFqn> = self
753 .added_indexes
754 .keys()
755 .filter(|fqn| fqn.catalog == catalog)
756 .cloned()
757 .collect();
758 for fqn in index_keys {
759 self.drop_index(&fqn);
760 }
761
762 let table_keys: Vec<TableFqn> = self
763 .added_tables
764 .keys()
765 .filter(|fqn| fqn.catalog == catalog)
766 .cloned()
767 .collect();
768 for fqn in table_keys {
769 self.drop_table(&fqn);
770 }
771 }
772
773 pub fn drop_cascade_namespace(&mut self, catalog: &str, namespace: &str) {
774 self.drop_namespace(catalog, namespace);
775
776 let index_keys: Vec<IndexFqn> = self
777 .added_indexes
778 .keys()
779 .filter(|fqn| fqn.catalog == catalog && fqn.namespace == namespace)
780 .cloned()
781 .collect();
782 for fqn in index_keys {
783 self.drop_index(&fqn);
784 }
785
786 let table_keys: Vec<TableFqn> = self
787 .added_tables
788 .keys()
789 .filter(|fqn| fqn.catalog == catalog && fqn.namespace == namespace)
790 .cloned()
791 .collect();
792 for fqn in table_keys {
793 self.drop_table(&fqn);
794 }
795 }
796}
797
798pub struct TxnCatalogView<'a, S: KVStore> {
803 catalog: &'a PersistentCatalog<S>,
804 overlay: &'a CatalogOverlay,
805}
806
807impl<'a, S: KVStore> TxnCatalogView<'a, S> {
808 pub fn new(catalog: &'a PersistentCatalog<S>, overlay: &'a CatalogOverlay) -> Self {
809 Self { catalog, overlay }
810 }
811}
812
813impl<'a, S: KVStore> Catalog for TxnCatalogView<'a, S> {
814 fn create_table(&mut self, _table: TableMetadata) -> Result<(), PlannerError> {
815 unreachable!("TxnCatalogView は参照専用です")
816 }
817
818 fn get_table(&self, name: &str) -> Option<&TableMetadata> {
819 self.catalog.get_table_in_txn(name, self.overlay)
820 }
821
822 fn drop_table(&mut self, _name: &str) -> Result<(), PlannerError> {
823 unreachable!("TxnCatalogView は参照専用です")
824 }
825
826 fn create_index(&mut self, _index: IndexMetadata) -> Result<(), PlannerError> {
827 unreachable!("TxnCatalogView は参照専用です")
828 }
829
830 fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
831 self.catalog.get_index_in_txn(name, self.overlay)
832 }
833
834 fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
835 let Some(table_meta) = self.catalog.get_table_in_txn(table, self.overlay) else {
836 return Vec::new();
837 };
838
839 let mut indexes: Vec<&IndexMetadata> = self
840 .catalog
841 .inner
842 .get_indexes_for_table(table)
843 .into_iter()
844 .filter(|idx| {
845 idx.catalog_name == table_meta.catalog_name
846 && idx.namespace_name == table_meta.namespace_name
847 && !self.catalog.index_hidden_by_overlay(idx, self.overlay)
848 })
849 .collect();
850
851 for idx in self.overlay.added_indexes.values() {
852 if idx.table == table
853 && idx.catalog_name == table_meta.catalog_name
854 && idx.namespace_name == table_meta.namespace_name
855 && !self.catalog.index_hidden_by_overlay(idx, self.overlay)
856 {
857 indexes.push(idx);
858 }
859 }
860
861 indexes
862 }
863
864 fn drop_index(&mut self, _name: &str) -> Result<(), PlannerError> {
865 unreachable!("TxnCatalogView は参照専用です")
866 }
867
868 fn table_exists(&self, name: &str) -> bool {
869 self.catalog.table_exists_in_txn(name, self.overlay)
870 }
871
872 fn index_exists(&self, name: &str) -> bool {
873 self.catalog.index_exists_in_txn(name, self.overlay)
874 }
875
876 fn next_table_id(&mut self) -> u32 {
877 unreachable!("TxnCatalogView は参照専用です")
878 }
879
880 fn next_index_id(&mut self) -> u32 {
881 unreachable!("TxnCatalogView は参照専用です")
882 }
883}
884
885#[derive(Debug)]
887pub struct PersistentCatalog<S: KVStore> {
902 inner: MemoryCatalog,
903 store: Arc<S>,
904 catalogs: HashMap<String, CatalogMeta>,
905 namespaces: HashMap<(String, String), NamespaceMeta>,
906}
907
908impl<S: KVStore> PersistentCatalog<S> {
909 pub fn load(store: Arc<S>) -> Result<Self, CatalogError> {
910 let mut txn = store.begin(TxnMode::ReadOnly)?;
911 let meta_key = META_KEY.to_vec();
912 let mut meta_state: Option<CatalogState> = None;
913
914 if let Some(meta_bytes) = txn.get(&meta_key)? {
915 let meta: CatalogState = bincode::deserialize(&meta_bytes)?;
916 if meta.version > CATALOG_VERSION {
917 return Err(CatalogError::InvalidKey(format!(
918 "unsupported catalog version: {}",
919 meta.version
920 )));
921 }
922 meta_state = Some(meta);
923 }
924
925 let mut needs_migration = meta_state
926 .as_ref()
927 .is_some_and(|meta| meta.version < CATALOG_VERSION);
928 if !needs_migration && meta_state.is_none() {
929 for (key, _) in txn.scan_prefix(TABLES_PREFIX)? {
930 let suffix = key_suffix(TABLES_PREFIX, &key)?;
931 if !suffix.contains('/') {
932 needs_migration = true;
933 break;
934 }
935 }
936 if !needs_migration {
937 for (key, _) in txn.scan_prefix(INDEXES_PREFIX)? {
938 let suffix = key_suffix(INDEXES_PREFIX, &key)?;
939 if !suffix.contains('/') {
940 needs_migration = true;
941 break;
942 }
943 }
944 }
945 }
946
947 if needs_migration {
948 txn.rollback_self()?;
949 Self::migrate_v1_to_v2(&store)?;
950 return Self::load(store);
951 }
952
953 let mut inner = MemoryCatalog::new();
954 let mut catalogs = HashMap::new();
955 let mut namespaces = HashMap::new();
956
957 let mut max_table_id = 0u32;
958 let mut max_index_id = 0u32;
959
960 for (key, value) in txn.scan_prefix(CATALOGS_PREFIX)? {
961 let catalog_name = key_suffix(CATALOGS_PREFIX, &key)?;
962 let mut meta: CatalogMeta = bincode::deserialize(&value)?;
963 if meta.name != catalog_name {
964 meta.name = catalog_name.clone();
965 }
966 catalogs.insert(catalog_name, meta);
967 }
968
969 for (key, value) in txn.scan_prefix(NAMESPACES_PREFIX)? {
970 let suffix = key_suffix(NAMESPACES_PREFIX, &key)?;
971 let mut parts = suffix.splitn(2, '/');
972 let catalog_name = parts
973 .next()
974 .filter(|part| !part.is_empty())
975 .ok_or_else(|| CatalogError::InvalidKey(suffix.clone()))?;
976 let namespace_name = parts
977 .next()
978 .filter(|part| !part.is_empty())
979 .ok_or_else(|| CatalogError::InvalidKey(suffix.clone()))?;
980 let mut meta: NamespaceMeta = bincode::deserialize(&value)?;
981 if meta.catalog_name != catalog_name {
982 meta.catalog_name = catalog_name.to_string();
983 }
984 if meta.name != namespace_name {
985 meta.name = namespace_name.to_string();
986 }
987 namespaces.insert((meta.catalog_name.clone(), meta.name.clone()), meta);
988 }
989
990 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
992 let suffix = key_suffix(TABLES_PREFIX, &key)?;
993 let fqn = parse_table_key_suffix(&suffix)?;
994 let mut persisted = deserialize_table_meta(&value)?;
995 if persisted.catalog_name != fqn.catalog {
996 persisted.catalog_name = fqn.catalog.clone();
997 }
998 if persisted.namespace_name != fqn.namespace {
999 persisted.namespace_name = fqn.namespace.clone();
1000 }
1001 if persisted.name != fqn.table {
1002 persisted.name = fqn.table.clone();
1003 }
1004 max_table_id = max_table_id.max(persisted.table_id);
1005 let table: TableMetadata = persisted.into();
1006 inner.insert_table_unchecked(table);
1007 }
1008
1009 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1010 let suffix = key_suffix(INDEXES_PREFIX, &key)?;
1011 let fqn = parse_index_key_suffix(&suffix)?;
1012 let mut persisted = deserialize_index_meta(&value)?;
1013 if persisted.catalog_name != fqn.catalog {
1014 persisted.catalog_name = fqn.catalog.clone();
1015 }
1016 if persisted.namespace_name != fqn.namespace {
1017 persisted.namespace_name = fqn.namespace.clone();
1018 }
1019 if persisted.table != fqn.table {
1020 persisted.table = fqn.table.clone();
1021 }
1022 if persisted.name != fqn.index {
1023 persisted.name = fqn.index.clone();
1024 }
1025 max_index_id = max_index_id.max(persisted.index_id);
1026 let mut index: IndexMetadata = persisted.into();
1027 if let Some(table) = inner.get_table(&index.table) {
1029 if index.catalog_name != table.catalog_name
1030 || index.namespace_name != table.namespace_name
1031 {
1032 index.catalog_name = table.catalog_name.clone();
1033 index.namespace_name = table.namespace_name.clone();
1034 }
1035 inner.insert_index_unchecked(index);
1036 }
1037 }
1038
1039 let (mut table_id_counter, mut index_id_counter) = (max_table_id, max_index_id);
1040 if let Some(meta) = meta_state
1041 .as_ref()
1042 .filter(|meta| meta.version == CATALOG_VERSION)
1043 {
1044 table_id_counter = table_id_counter.max(meta.table_id_counter);
1045 index_id_counter = index_id_counter.max(meta.index_id_counter);
1046 }
1047 inner.set_counters(table_id_counter, index_id_counter);
1048
1049 txn.rollback_self()?;
1050
1051 Ok(Self {
1052 inner,
1053 store,
1054 catalogs,
1055 namespaces,
1056 })
1057 }
1058
1059 fn migrate_v1_to_v2(store: &Arc<S>) -> Result<(), CatalogError> {
1060 let mut txn = store.begin(TxnMode::ReadWrite)?;
1061
1062 if txn.get(&catalog_key("default"))?.is_none() {
1063 let meta = CatalogMeta {
1064 name: "default".to_string(),
1065 comment: None,
1066 storage_root: None,
1067 };
1068 let value = bincode::serialize(&meta)?;
1069 txn.put(catalog_key("default"), value)?;
1070 }
1071
1072 if txn.get(&namespace_key("default", "default"))?.is_none() {
1073 let meta = NamespaceMeta {
1074 name: "default".to_string(),
1075 catalog_name: "default".to_string(),
1076 comment: None,
1077 storage_root: None,
1078 };
1079 let value = bincode::serialize(&meta)?;
1080 txn.put(namespace_key("default", "default"), value)?;
1081 }
1082
1083 let mut table_updates = Vec::new();
1084 let mut table_keys_to_delete = Vec::new();
1085 let mut max_table_id = 0u32;
1086 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1087 let suffix = key_suffix(TABLES_PREFIX, &key)?;
1088 if suffix.contains('/') {
1089 continue;
1090 }
1091 let mut persisted = deserialize_table_meta(&value)?;
1092 if persisted.catalog_name.is_empty() {
1093 persisted.catalog_name = "default".to_string();
1094 }
1095 if persisted.namespace_name.is_empty() {
1096 persisted.namespace_name = "default".to_string();
1097 }
1098 persisted.table_type = TableType::Managed;
1099 persisted.data_source_format = DataSourceFormat::Alopex;
1100 max_table_id = max_table_id.max(persisted.table_id);
1101
1102 let new_key = table_key(
1103 &persisted.catalog_name,
1104 &persisted.namespace_name,
1105 &persisted.name,
1106 );
1107 let bytes = bincode::serialize(&persisted)?;
1108 table_updates.push((new_key, bytes));
1109 table_keys_to_delete.push(key);
1110 }
1111
1112 for (key, value) in table_updates {
1113 txn.put(key, value)?;
1114 }
1115 for key in table_keys_to_delete {
1116 txn.delete(key)?;
1117 }
1118
1119 let mut index_updates = Vec::new();
1120 let mut index_keys_to_delete = Vec::new();
1121 let mut max_index_id = 0u32;
1122 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1123 let suffix = key_suffix(INDEXES_PREFIX, &key)?;
1124 if suffix.contains('/') {
1125 continue;
1126 }
1127 let mut persisted = deserialize_index_meta(&value)?;
1128 if persisted.catalog_name.is_empty() {
1129 persisted.catalog_name = "default".to_string();
1130 }
1131 if persisted.namespace_name.is_empty() {
1132 persisted.namespace_name = "default".to_string();
1133 }
1134 max_index_id = max_index_id.max(persisted.index_id);
1135
1136 let new_key = index_key(
1137 &persisted.catalog_name,
1138 &persisted.namespace_name,
1139 &persisted.table,
1140 &persisted.name,
1141 );
1142 let bytes = bincode::serialize(&persisted)?;
1143 index_updates.push((new_key, bytes));
1144 index_keys_to_delete.push(key);
1145 }
1146
1147 for (key, value) in index_updates {
1148 txn.put(key, value)?;
1149 }
1150 for key in index_keys_to_delete {
1151 txn.delete(key)?;
1152 }
1153
1154 let mut table_id_counter = max_table_id;
1155 let mut index_id_counter = max_index_id;
1156 if let Some(meta_bytes) = txn.get(&META_KEY.to_vec())? {
1157 let meta: CatalogState = bincode::deserialize(&meta_bytes)?;
1158 table_id_counter = table_id_counter.max(meta.table_id_counter);
1159 index_id_counter = index_id_counter.max(meta.index_id_counter);
1160 }
1161 let meta = CatalogState {
1162 version: CATALOG_VERSION,
1163 table_id_counter,
1164 index_id_counter,
1165 };
1166 let meta_bytes = bincode::serialize(&meta)?;
1167 txn.put(META_KEY.to_vec(), meta_bytes)?;
1168 txn.commit_self()?;
1169
1170 Ok(())
1171 }
1172
1173 pub fn new(store: Arc<S>) -> Self {
1174 Self {
1175 inner: MemoryCatalog::new(),
1176 store,
1177 catalogs: HashMap::new(),
1178 namespaces: HashMap::new(),
1179 }
1180 }
1181
1182 pub fn store(&self) -> &Arc<S> {
1183 &self.store
1184 }
1185
1186 pub fn list_catalogs(&self) -> Vec<CatalogMeta> {
1187 let mut catalogs: Vec<CatalogMeta> = self.catalogs.values().cloned().collect();
1188 catalogs.sort_by(|a, b| a.name.cmp(&b.name));
1189 catalogs
1190 }
1191
1192 pub fn get_catalog(&self, name: &str) -> Option<CatalogMeta> {
1193 self.catalogs.get(name).cloned()
1194 }
1195
1196 pub fn create_catalog(&mut self, meta: CatalogMeta) -> Result<(), CatalogError> {
1197 let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1198 let value = bincode::serialize(&meta)?;
1199 txn.put(catalog_key(&meta.name), value)?;
1200 txn.commit_self()?;
1201 self.catalogs.insert(meta.name.clone(), meta);
1202 Ok(())
1203 }
1204
1205 pub fn delete_catalog(&mut self, name: &str) -> Result<(), CatalogError> {
1206 let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1207 txn.delete(catalog_key(name))?;
1208 let mut namespace_prefix = NAMESPACES_PREFIX.to_vec();
1209 namespace_prefix.extend_from_slice(name.as_bytes());
1210 namespace_prefix.push(b'/');
1211 let mut namespace_keys = Vec::new();
1212 for (key, _) in txn.scan_prefix(&namespace_prefix)? {
1213 namespace_keys.push(key);
1214 }
1215 for key in namespace_keys {
1216 txn.delete(key)?;
1217 }
1218 let mut table_keys = Vec::new();
1219 let mut table_fqns = Vec::new();
1220 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1221 let persisted = deserialize_table_meta(&value)?;
1222 if persisted.catalog_name == name {
1223 table_fqns.push(TableFqn::new(
1224 &persisted.catalog_name,
1225 &persisted.namespace_name,
1226 &persisted.name,
1227 ));
1228 table_keys.push(key);
1229 }
1230 }
1231 let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1232 for key in table_keys {
1233 txn.delete(key)?;
1234 }
1235 if !table_set.is_empty() {
1236 let mut index_keys = Vec::new();
1237 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1238 let persisted = deserialize_index_meta(&value)?;
1239 let fqn = TableFqn::new(
1240 &persisted.catalog_name,
1241 &persisted.namespace_name,
1242 &persisted.table,
1243 );
1244 if table_set.contains(&fqn) {
1245 index_keys.push(key);
1246 }
1247 }
1248 for key in index_keys {
1249 txn.delete(key)?;
1250 }
1251 }
1252 txn.commit_self()?;
1253 self.catalogs.remove(name);
1254 self.namespaces.retain(|(catalog, _), _| catalog != name);
1255 for fqn in table_fqns {
1256 self.inner.remove_table_unchecked(&fqn.table);
1257 }
1258 Ok(())
1259 }
1260
1261 pub fn list_namespaces(&self, catalog_name: &str) -> Vec<NamespaceMeta> {
1262 let mut namespaces: Vec<NamespaceMeta> = self
1263 .namespaces
1264 .values()
1265 .filter(|meta| meta.catalog_name == catalog_name)
1266 .cloned()
1267 .collect();
1268 namespaces.sort_by(|a, b| a.name.cmp(&b.name));
1269 namespaces
1270 }
1271
1272 pub fn get_namespace(&self, catalog_name: &str, namespace_name: &str) -> Option<NamespaceMeta> {
1273 self.namespaces
1274 .get(&(catalog_name.to_string(), namespace_name.to_string()))
1275 .cloned()
1276 }
1277
1278 pub fn create_namespace(&mut self, meta: NamespaceMeta) -> Result<(), CatalogError> {
1279 if !self.catalogs.contains_key(&meta.catalog_name) {
1280 return Err(CatalogError::InvalidKey(format!(
1281 "catalog not found: {}",
1282 meta.catalog_name
1283 )));
1284 }
1285
1286 let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1287 let value = bincode::serialize(&meta)?;
1288 txn.put(namespace_key(&meta.catalog_name, &meta.name), value)?;
1289 txn.commit_self()?;
1290 self.namespaces
1291 .insert((meta.catalog_name.clone(), meta.name.clone()), meta);
1292 Ok(())
1293 }
1294
1295 pub fn delete_namespace(
1296 &mut self,
1297 catalog_name: &str,
1298 namespace_name: &str,
1299 ) -> Result<(), CatalogError> {
1300 if !self.catalogs.contains_key(catalog_name) {
1301 return Err(CatalogError::InvalidKey(format!(
1302 "catalog not found: {}",
1303 catalog_name
1304 )));
1305 }
1306
1307 let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1308 txn.delete(namespace_key(catalog_name, namespace_name))?;
1309 txn.commit_self()?;
1310 self.namespaces
1311 .remove(&(catalog_name.to_string(), namespace_name.to_string()));
1312 Ok(())
1313 }
1314
1315 fn persist_create_catalog(
1316 &mut self,
1317 txn: &mut S::Transaction<'_>,
1318 meta: &CatalogMeta,
1319 ) -> Result<(), CatalogError> {
1320 let value = bincode::serialize(meta)?;
1321 txn.put(catalog_key(&meta.name), value)?;
1322 Ok(())
1323 }
1324
1325 fn persist_drop_catalog(
1326 &mut self,
1327 txn: &mut S::Transaction<'_>,
1328 name: &str,
1329 ) -> Result<(), CatalogError> {
1330 txn.delete(catalog_key(name))?;
1331
1332 let mut namespace_prefix = NAMESPACES_PREFIX.to_vec();
1333 namespace_prefix.extend_from_slice(name.as_bytes());
1334 namespace_prefix.push(b'/');
1335 let mut namespace_keys = Vec::new();
1336 for (key, _) in txn.scan_prefix(&namespace_prefix)? {
1337 namespace_keys.push(key);
1338 }
1339 for key in namespace_keys {
1340 txn.delete(key)?;
1341 }
1342
1343 let mut table_keys = Vec::new();
1344 let mut table_fqns = Vec::new();
1345 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1346 let persisted = deserialize_table_meta(&value)?;
1347 if persisted.catalog_name == name {
1348 table_fqns.push(TableFqn::new(
1349 &persisted.catalog_name,
1350 &persisted.namespace_name,
1351 &persisted.name,
1352 ));
1353 table_keys.push(key);
1354 }
1355 }
1356 let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1357 for key in table_keys {
1358 txn.delete(key)?;
1359 }
1360 if !table_set.is_empty() {
1361 let mut index_keys = Vec::new();
1362 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1363 let persisted = deserialize_index_meta(&value)?;
1364 let fqn = TableFqn::new(
1365 &persisted.catalog_name,
1366 &persisted.namespace_name,
1367 &persisted.table,
1368 );
1369 if table_set.contains(&fqn) {
1370 index_keys.push(key);
1371 }
1372 }
1373 for key in index_keys {
1374 txn.delete(key)?;
1375 }
1376 }
1377 Ok(())
1378 }
1379
1380 fn persist_create_namespace(
1381 &mut self,
1382 txn: &mut S::Transaction<'_>,
1383 meta: &NamespaceMeta,
1384 ) -> Result<(), CatalogError> {
1385 let value = bincode::serialize(meta)?;
1386 txn.put(namespace_key(&meta.catalog_name, &meta.name), value)?;
1387 Ok(())
1388 }
1389
1390 fn persist_drop_namespace(
1391 &mut self,
1392 txn: &mut S::Transaction<'_>,
1393 catalog_name: &str,
1394 namespace_name: &str,
1395 ) -> Result<(), CatalogError> {
1396 txn.delete(namespace_key(catalog_name, namespace_name))?;
1397
1398 let mut table_keys = Vec::new();
1399 let mut table_fqns = Vec::new();
1400 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1401 let persisted = deserialize_table_meta(&value)?;
1402 if persisted.catalog_name == catalog_name && persisted.namespace_name == namespace_name
1403 {
1404 table_fqns.push(TableFqn::new(
1405 &persisted.catalog_name,
1406 &persisted.namespace_name,
1407 &persisted.name,
1408 ));
1409 table_keys.push(key);
1410 }
1411 }
1412 let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1413 for key in table_keys {
1414 txn.delete(key)?;
1415 }
1416 if !table_set.is_empty() {
1417 let mut index_keys = Vec::new();
1418 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1419 let persisted = deserialize_index_meta(&value)?;
1420 let fqn = TableFqn::new(
1421 &persisted.catalog_name,
1422 &persisted.namespace_name,
1423 &persisted.table,
1424 );
1425 if table_set.contains(&fqn) {
1426 index_keys.push(key);
1427 }
1428 }
1429 for key in index_keys {
1430 txn.delete(key)?;
1431 }
1432 }
1433 Ok(())
1434 }
1435
1436 fn write_meta(&self, txn: &mut S::Transaction<'_>) -> Result<(), CatalogError> {
1437 let (table_id_counter, index_id_counter) = self.inner.counters();
1438 let meta = CatalogState {
1439 version: CATALOG_VERSION,
1440 table_id_counter,
1441 index_id_counter,
1442 };
1443 let meta_bytes = bincode::serialize(&meta)?;
1444 txn.put(META_KEY.to_vec(), meta_bytes)?;
1445 Ok(())
1446 }
1447
1448 pub fn persist_create_table(
1449 &mut self,
1450 txn: &mut S::Transaction<'_>,
1451 table: &TableMetadata,
1452 ) -> Result<(), CatalogError> {
1453 let persisted = PersistedTableMeta::from(table);
1454 let value = bincode::serialize(&persisted)?;
1455 txn.put(
1456 table_key(&table.catalog_name, &table.namespace_name, &table.name),
1457 value,
1458 )?;
1459 self.write_meta(txn)?;
1460 Ok(())
1461 }
1462
1463 pub fn persist_drop_table(
1464 &mut self,
1465 txn: &mut S::Transaction<'_>,
1466 fqn: &TableFqn,
1467 ) -> Result<(), CatalogError> {
1468 txn.delete(table_key(&fqn.catalog, &fqn.namespace, &fqn.table))?;
1469
1470 let mut to_delete: Vec<String> = Vec::new();
1472 let prefix = index_prefix(&fqn.catalog, &fqn.namespace, &fqn.table);
1473 for (key, _) in txn.scan_prefix(&prefix)? {
1474 let index_name = key_suffix(&prefix, &key)?;
1475 to_delete.push(index_name);
1476 }
1477 for index_name in to_delete {
1478 txn.delete(index_key(
1479 &fqn.catalog,
1480 &fqn.namespace,
1481 &fqn.table,
1482 &index_name,
1483 ))?;
1484 }
1485
1486 Ok(())
1487 }
1488
1489 pub fn persist_create_index(
1490 &mut self,
1491 txn: &mut S::Transaction<'_>,
1492 index: &IndexMetadata,
1493 ) -> Result<(), CatalogError> {
1494 let persisted = PersistedIndexMeta::from(index);
1495 let value = bincode::serialize(&persisted)?;
1496 txn.put(
1497 index_key(
1498 &index.catalog_name,
1499 &index.namespace_name,
1500 &index.table,
1501 &index.name,
1502 ),
1503 value,
1504 )?;
1505 self.write_meta(txn)?;
1506 Ok(())
1507 }
1508
1509 pub fn persist_drop_index(
1510 &mut self,
1511 txn: &mut S::Transaction<'_>,
1512 fqn: &IndexFqn,
1513 ) -> Result<(), CatalogError> {
1514 txn.delete(index_key(
1515 &fqn.catalog,
1516 &fqn.namespace,
1517 &fqn.table,
1518 &fqn.index,
1519 ))?;
1520 Ok(())
1521 }
1522
1523 pub fn persist_overlay(
1524 &mut self,
1525 txn: &mut S::Transaction<'_>,
1526 overlay: &CatalogOverlay,
1527 ) -> Result<(), CatalogError> {
1528 self.ensure_overlay_name_uniqueness(overlay)?;
1529
1530 for catalog in overlay.dropped_catalogs.iter() {
1531 self.persist_drop_catalog(txn, catalog)?;
1532 }
1533
1534 for (catalog, namespace) in overlay.dropped_namespaces.iter() {
1535 if overlay.dropped_catalogs.contains(catalog) {
1536 continue;
1537 }
1538 self.persist_drop_namespace(txn, catalog, namespace)?;
1539 }
1540
1541 for fqn in overlay.dropped_tables.iter() {
1542 if overlay.dropped_catalogs.contains(&fqn.catalog)
1543 || overlay
1544 .dropped_namespaces
1545 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1546 {
1547 continue;
1548 }
1549 self.persist_drop_table(txn, fqn)?;
1550 }
1551
1552 for fqn in overlay.dropped_indexes.iter() {
1553 if overlay.dropped_catalogs.contains(&fqn.catalog)
1554 || overlay
1555 .dropped_namespaces
1556 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1557 {
1558 continue;
1559 }
1560 self.persist_drop_index(txn, fqn)?;
1561 }
1562
1563 for meta in overlay.added_catalogs.values() {
1564 if overlay.dropped_catalogs.contains(&meta.name) {
1565 continue;
1566 }
1567 self.persist_create_catalog(txn, meta)?;
1568 }
1569
1570 for meta in overlay.added_namespaces.values() {
1571 if overlay.dropped_catalogs.contains(&meta.catalog_name)
1572 || overlay
1573 .dropped_namespaces
1574 .contains(&(meta.catalog_name.clone(), meta.name.clone()))
1575 {
1576 continue;
1577 }
1578 self.persist_create_namespace(txn, meta)?;
1579 }
1580
1581 for (fqn, table) in overlay.added_tables.iter() {
1582 if overlay.dropped_catalogs.contains(&fqn.catalog)
1583 || overlay
1584 .dropped_namespaces
1585 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1586 || overlay.dropped_tables.contains(fqn)
1587 {
1588 continue;
1589 }
1590 self.persist_create_table(txn, table)?;
1591 }
1592
1593 for (fqn, index) in overlay.added_indexes.iter() {
1594 if overlay.dropped_catalogs.contains(&fqn.catalog)
1595 || overlay
1596 .dropped_namespaces
1597 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1598 || overlay.dropped_indexes.contains(fqn)
1599 {
1600 continue;
1601 }
1602 self.persist_create_index(txn, index)?;
1603 }
1604
1605 Ok(())
1606 }
1607
1608 fn ensure_overlay_name_uniqueness(&self, overlay: &CatalogOverlay) -> Result<(), CatalogError> {
1609 let mut table_names: HashMap<String, TableFqn> = HashMap::new();
1610 for name in self.inner.table_names() {
1611 let Some(table) = self.inner.get_table(name) else {
1612 continue;
1613 };
1614 let fqn = TableFqn::from(table);
1615 if overlay.dropped_catalogs.contains(&fqn.catalog)
1616 || overlay
1617 .dropped_namespaces
1618 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1619 || overlay.dropped_tables.contains(&fqn)
1620 {
1621 continue;
1622 }
1623 table_names.insert(table.name.clone(), fqn);
1624 }
1625
1626 for (fqn, table) in overlay.added_tables.iter() {
1627 if overlay.dropped_catalogs.contains(&fqn.catalog)
1628 || overlay
1629 .dropped_namespaces
1630 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1631 || overlay.dropped_tables.contains(fqn)
1632 {
1633 continue;
1634 }
1635 if let Some(existing) = table_names.get(&table.name)
1636 && existing != fqn
1637 {
1638 return Err(CatalogError::InvalidKey(format!(
1639 "table name '{}' conflicts across namespaces: {}.{} vs {}.{}",
1640 table.name, existing.catalog, existing.namespace, fqn.catalog, fqn.namespace
1641 )));
1642 }
1643 table_names.insert(table.name.clone(), fqn.clone());
1644 }
1645
1646 let mut index_names: HashMap<String, IndexFqn> = HashMap::new();
1647 for name in self.inner.index_names() {
1648 let Some(index) = self.inner.get_index(name) else {
1649 continue;
1650 };
1651 let fqn = IndexFqn::from(index);
1652 if overlay.dropped_catalogs.contains(&fqn.catalog)
1653 || overlay
1654 .dropped_namespaces
1655 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1656 || overlay.dropped_indexes.contains(&fqn)
1657 || overlay.dropped_tables.contains(&TableFqn::new(
1658 &fqn.catalog,
1659 &fqn.namespace,
1660 &fqn.table,
1661 ))
1662 {
1663 continue;
1664 }
1665 index_names.insert(index.name.clone(), fqn);
1666 }
1667
1668 for (fqn, index) in overlay.added_indexes.iter() {
1669 if overlay.dropped_catalogs.contains(&fqn.catalog)
1670 || overlay
1671 .dropped_namespaces
1672 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1673 || overlay.dropped_indexes.contains(fqn)
1674 || overlay.dropped_tables.contains(&TableFqn::new(
1675 &fqn.catalog,
1676 &fqn.namespace,
1677 &fqn.table,
1678 ))
1679 {
1680 continue;
1681 }
1682 if let Some(existing) = index_names.get(&index.name)
1683 && existing != fqn
1684 {
1685 return Err(CatalogError::InvalidKey(format!(
1686 "index name '{}' conflicts across namespaces: {}.{} vs {}.{}",
1687 index.name, existing.catalog, existing.namespace, fqn.catalog, fqn.namespace
1688 )));
1689 }
1690 index_names.insert(index.name.clone(), fqn.clone());
1691 }
1692
1693 Ok(())
1694 }
1695
1696 fn namespace_dropped(overlay: &CatalogOverlay, catalog: &str, namespace: &str) -> bool {
1697 overlay.dropped_catalogs.contains(catalog)
1698 || overlay
1699 .dropped_namespaces
1700 .contains(&(catalog.to_string(), namespace.to_string()))
1701 }
1702
1703 fn overlay_added_table_by_name<'a>(
1704 overlay: &'a CatalogOverlay,
1705 name: &str,
1706 ) -> Option<&'a TableMetadata> {
1707 let mut iter = overlay
1708 .added_tables
1709 .values()
1710 .filter(|table| table.name == name);
1711 let first = iter.next()?;
1712 if iter.next().is_some() {
1713 return None;
1714 }
1715 Some(first)
1716 }
1717
1718 fn overlay_added_index_by_name<'a>(
1719 overlay: &'a CatalogOverlay,
1720 name: &str,
1721 ) -> Option<&'a IndexMetadata> {
1722 let mut iter = overlay
1723 .added_indexes
1724 .values()
1725 .filter(|index| index.name == name);
1726 let first = iter.next()?;
1727 if iter.next().is_some() {
1728 return None;
1729 }
1730 Some(first)
1731 }
1732
1733 fn base_table_conflicts_with_overlay(
1734 &self,
1735 overlay: &CatalogOverlay,
1736 table: &TableMetadata,
1737 ) -> bool {
1738 let Some(base) = self.inner.get_table(&table.name) else {
1739 return false;
1740 };
1741 if self.table_hidden_by_overlay(base, overlay) {
1742 return false;
1743 }
1744 if overlay.dropped_tables.contains(&TableFqn::from(base)) {
1745 return false;
1746 }
1747 TableFqn::from(base) != TableFqn::from(table)
1748 }
1749
1750 fn base_index_conflicts_with_overlay(
1751 &self,
1752 overlay: &CatalogOverlay,
1753 index: &IndexMetadata,
1754 ) -> bool {
1755 let Some(base) = self.inner.get_index(&index.name) else {
1756 return false;
1757 };
1758 if Self::namespace_dropped(overlay, &base.catalog_name, &base.namespace_name) {
1759 return false;
1760 }
1761 if overlay.dropped_indexes.contains(&IndexFqn::from(base)) {
1762 return false;
1763 }
1764 if self.dropped_table_matches_fqn(
1765 &base.table,
1766 &base.catalog_name,
1767 &base.namespace_name,
1768 overlay,
1769 ) {
1770 return false;
1771 }
1772 IndexFqn::from(base) != IndexFqn::from(index)
1773 }
1774
1775 fn table_hidden_by_overlay(&self, table: &TableMetadata, overlay: &CatalogOverlay) -> bool {
1776 Self::namespace_dropped(overlay, &table.catalog_name, &table.namespace_name)
1777 }
1778
1779 fn dropped_table_matches_fqn(
1780 &self,
1781 table_name: &str,
1782 catalog: &str,
1783 namespace: &str,
1784 overlay: &CatalogOverlay,
1785 ) -> bool {
1786 let fqn = TableFqn::new(catalog, namespace, table_name);
1787 overlay.dropped_tables.contains(&fqn)
1788 }
1789
1790 fn index_hidden_by_overlay(&self, index: &IndexMetadata, overlay: &CatalogOverlay) -> bool {
1791 let index_fqn = IndexFqn::from(index);
1792 if overlay.dropped_indexes.contains(&index_fqn) {
1793 return true;
1794 }
1795 if Self::namespace_dropped(overlay, &index.catalog_name, &index.namespace_name) {
1796 return true;
1797 }
1798 if self.dropped_table_matches_fqn(
1799 &index.table,
1800 &index.catalog_name,
1801 &index.namespace_name,
1802 overlay,
1803 ) {
1804 return true;
1805 }
1806 match self.get_table_in_txn(&index.table, overlay) {
1807 Some(table) => {
1808 table.catalog_name != index.catalog_name
1809 || table.namespace_name != index.namespace_name
1810 }
1811 None => true,
1812 }
1813 }
1814
1815 pub fn get_catalog_in_txn<'a>(
1816 &'a self,
1817 name: &str,
1818 overlay: &'a CatalogOverlay,
1819 ) -> Option<&'a CatalogMeta> {
1820 if overlay.dropped_catalogs.contains(name) {
1821 return None;
1822 }
1823 if let Some(catalog) = overlay.added_catalogs.get(name) {
1824 return Some(catalog);
1825 }
1826 self.catalogs.get(name)
1827 }
1828
1829 pub fn get_namespace_in_txn<'a>(
1830 &'a self,
1831 catalog_name: &str,
1832 namespace_name: &str,
1833 overlay: &'a CatalogOverlay,
1834 ) -> Option<&'a NamespaceMeta> {
1835 if overlay.dropped_catalogs.contains(catalog_name) {
1836 return None;
1837 }
1838 let key = (catalog_name.to_string(), namespace_name.to_string());
1839 if overlay.dropped_namespaces.contains(&key) {
1840 return None;
1841 }
1842 if let Some(namespace) = overlay.added_namespaces.get(&key) {
1843 return Some(namespace);
1844 }
1845 self.namespaces.get(&key)
1846 }
1847
1848 pub fn list_catalogs_in_txn(&self, overlay: &CatalogOverlay) -> Vec<CatalogMeta> {
1849 let mut catalogs: HashMap<String, CatalogMeta> = HashMap::new();
1850 for (name, meta) in &self.catalogs {
1851 if !overlay.dropped_catalogs.contains(name) {
1852 catalogs.insert(name.clone(), meta.clone());
1853 }
1854 }
1855 for (name, meta) in &overlay.added_catalogs {
1856 if !overlay.dropped_catalogs.contains(name) {
1857 catalogs.insert(name.clone(), meta.clone());
1858 }
1859 }
1860 let mut values: Vec<CatalogMeta> = catalogs.into_values().collect();
1861 values.sort_by(|a, b| a.name.cmp(&b.name));
1862 values
1863 }
1864
1865 pub fn list_namespaces_in_txn(
1866 &self,
1867 catalog_name: &str,
1868 overlay: &CatalogOverlay,
1869 ) -> Vec<NamespaceMeta> {
1870 if overlay.dropped_catalogs.contains(catalog_name) {
1871 return Vec::new();
1872 }
1873 let mut namespaces: HashMap<(String, String), NamespaceMeta> = HashMap::new();
1874 for ((catalog, namespace), meta) in &self.namespaces {
1875 if catalog != catalog_name {
1876 continue;
1877 }
1878 let key = (catalog.clone(), namespace.clone());
1879 if overlay.dropped_namespaces.contains(&key) {
1880 continue;
1881 }
1882 namespaces.insert(key, meta.clone());
1883 }
1884 for ((catalog, namespace), meta) in &overlay.added_namespaces {
1885 if catalog != catalog_name {
1886 continue;
1887 }
1888 let key = (catalog.clone(), namespace.clone());
1889 if overlay.dropped_namespaces.contains(&key) {
1890 continue;
1891 }
1892 namespaces.insert(key, meta.clone());
1893 }
1894 let mut values: Vec<NamespaceMeta> = namespaces.into_values().collect();
1895 values.sort_by(|a, b| a.name.cmp(&b.name));
1896 values
1897 }
1898
1899 pub fn table_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
1900 if let Some(table) = Self::overlay_added_table_by_name(overlay, name) {
1901 if self.table_hidden_by_overlay(table, overlay) {
1902 return false;
1903 }
1904 if self.base_table_conflicts_with_overlay(overlay, table) {
1905 return false;
1906 }
1907 return true;
1908 }
1909 match self.inner.get_table(name) {
1910 Some(table) => {
1911 !self.table_hidden_by_overlay(table, overlay)
1912 && !overlay.dropped_tables.contains(&TableFqn::from(table))
1913 }
1914 None => false,
1915 }
1916 }
1917
1918 pub fn get_table_in_txn<'a>(
1919 &'a self,
1920 name: &str,
1921 overlay: &'a CatalogOverlay,
1922 ) -> Option<&'a TableMetadata> {
1923 if let Some(table) = Self::overlay_added_table_by_name(overlay, name) {
1924 if self.table_hidden_by_overlay(table, overlay) {
1925 return None;
1926 }
1927 if self.base_table_conflicts_with_overlay(overlay, table) {
1928 return None;
1929 }
1930 return Some(table);
1931 }
1932 self.inner.get_table(name).filter(|table| {
1933 !self.table_hidden_by_overlay(table, overlay)
1934 && !overlay.dropped_tables.contains(&TableFqn::from(*table))
1935 })
1936 }
1937
1938 pub fn index_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
1939 if let Some(index) = Self::overlay_added_index_by_name(overlay, name) {
1940 if self.index_hidden_by_overlay(index, overlay) {
1941 return false;
1942 }
1943 if self.base_index_conflicts_with_overlay(overlay, index) {
1944 return false;
1945 }
1946 return true;
1947 }
1948 match self.inner.get_index(name) {
1949 Some(index) => !self.index_hidden_by_overlay(index, overlay),
1950 None => false,
1951 }
1952 }
1953
1954 pub fn get_index_in_txn<'a>(
1955 &'a self,
1956 name: &str,
1957 overlay: &'a CatalogOverlay,
1958 ) -> Option<&'a IndexMetadata> {
1959 if let Some(index) = Self::overlay_added_index_by_name(overlay, name) {
1960 if self.index_hidden_by_overlay(index, overlay) {
1961 return None;
1962 }
1963 if self.base_index_conflicts_with_overlay(overlay, index) {
1964 return None;
1965 }
1966 return Some(index);
1967 }
1968 match self.inner.get_index(name) {
1969 Some(index) if self.index_hidden_by_overlay(index, overlay) => None,
1970 other => other,
1971 }
1972 }
1973
1974 pub fn list_tables_in_txn(
1975 &self,
1976 catalog_name: &str,
1977 namespace_name: &str,
1978 overlay: &CatalogOverlay,
1979 ) -> Vec<TableMetadata> {
1980 if overlay.dropped_catalogs.contains(catalog_name) {
1981 return Vec::new();
1982 }
1983 if overlay
1984 .dropped_namespaces
1985 .contains(&(catalog_name.to_string(), namespace_name.to_string()))
1986 {
1987 return Vec::new();
1988 }
1989 let mut tables: HashMap<TableFqn, TableMetadata> = HashMap::new();
1990 for name in self.inner.table_names() {
1991 if let Some(table) = self.inner.get_table(name)
1992 && table.catalog_name == catalog_name
1993 && table.namespace_name == namespace_name
1994 {
1995 let fqn = TableFqn::from(table);
1996 if !overlay.dropped_tables.contains(&fqn) {
1997 tables.insert(fqn, table.clone());
1998 }
1999 }
2000 }
2001 for table in overlay.added_tables.values() {
2002 if table.catalog_name == catalog_name && table.namespace_name == namespace_name {
2003 tables.insert(TableFqn::from(table), table.clone());
2004 }
2005 }
2006 let mut values: Vec<TableMetadata> = tables.into_values().collect();
2007 values.sort_by(|a, b| a.name.cmp(&b.name));
2008 values
2009 }
2010
2011 pub fn list_indexes_in_txn(
2012 &self,
2013 fqn: &TableFqn,
2014 overlay: &CatalogOverlay,
2015 ) -> Vec<IndexMetadata> {
2016 if overlay.dropped_catalogs.contains(&fqn.catalog) {
2017 return Vec::new();
2018 }
2019 if overlay
2020 .dropped_namespaces
2021 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
2022 {
2023 return Vec::new();
2024 }
2025 if overlay
2026 .dropped_tables
2027 .contains(&TableFqn::new(&fqn.catalog, &fqn.namespace, &fqn.table))
2028 {
2029 return Vec::new();
2030 }
2031
2032 let mut indexes: HashMap<IndexFqn, IndexMetadata> = HashMap::new();
2033 for index in self.inner.get_indexes_for_table(&fqn.table) {
2034 if index.catalog_name == fqn.catalog && index.namespace_name == fqn.namespace {
2035 let index_fqn = IndexFqn::from(index);
2036 if !overlay.dropped_indexes.contains(&index_fqn) {
2037 indexes.insert(index_fqn, index.clone());
2038 }
2039 }
2040 }
2041 for index in overlay.added_indexes.values() {
2042 if index.table == fqn.table
2043 && index.catalog_name == fqn.catalog
2044 && index.namespace_name == fqn.namespace
2045 {
2046 indexes.insert(IndexFqn::from(index), index.clone());
2047 }
2048 }
2049 let mut values: Vec<IndexMetadata> = indexes.into_values().collect();
2050 values.sort_by(|a, b| a.name.cmp(&b.name));
2051 values
2052 }
2053
2054 pub fn apply_overlay(&mut self, overlay: CatalogOverlay) {
2055 let CatalogOverlay {
2056 added_catalogs,
2057 dropped_catalogs,
2058 added_namespaces,
2059 dropped_namespaces,
2060 added_tables,
2061 dropped_tables,
2062 added_indexes,
2063 dropped_indexes,
2064 } = overlay;
2065
2066 for (name, meta) in added_catalogs {
2067 self.catalogs.insert(name, meta);
2068 }
2069 for (catalog_name, namespace_name) in dropped_namespaces.iter() {
2070 self.namespaces
2071 .remove(&(catalog_name.clone(), namespace_name.clone()));
2072 }
2073 for ((catalog_name, namespace_name), meta) in added_namespaces {
2074 self.namespaces.insert((catalog_name, namespace_name), meta);
2075 }
2076 for name in dropped_catalogs {
2077 self.catalogs.remove(&name);
2078 self.namespaces.retain(|(catalog, _), _| catalog != &name);
2079 }
2080 for (_, table) in added_tables {
2081 self.inner.insert_table_unchecked(table);
2082 }
2083 for fqn in dropped_tables {
2084 self.inner.remove_table_unchecked(&fqn.table);
2085 }
2086 for (_, index) in added_indexes {
2087 self.inner.insert_index_unchecked(index);
2088 }
2089 for fqn in dropped_indexes {
2090 self.inner.remove_index_unchecked(&fqn.index);
2091 }
2092 }
2093
2094 pub fn discard_overlay(_overlay: CatalogOverlay) {}
2095}
2096
2097impl<S: KVStore> Catalog for PersistentCatalog<S> {
2098 fn create_table(&mut self, table: TableMetadata) -> Result<(), PlannerError> {
2099 self.inner.create_table(table)
2100 }
2101
2102 fn get_table(&self, name: &str) -> Option<&TableMetadata> {
2103 self.inner.get_table(name)
2104 }
2105
2106 fn drop_table(&mut self, name: &str) -> Result<(), PlannerError> {
2107 self.inner.drop_table(name)
2108 }
2109
2110 fn create_index(&mut self, index: IndexMetadata) -> Result<(), PlannerError> {
2111 self.inner.create_index(index)
2112 }
2113
2114 fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
2115 self.inner.get_index(name)
2116 }
2117
2118 fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
2119 self.inner.get_indexes_for_table(table)
2120 }
2121
2122 fn drop_index(&mut self, name: &str) -> Result<(), PlannerError> {
2123 self.inner.drop_index(name)
2124 }
2125
2126 fn table_exists(&self, name: &str) -> bool {
2127 self.inner.table_exists(name)
2128 }
2129
2130 fn index_exists(&self, name: &str) -> bool {
2131 self.inner.index_exists(name)
2132 }
2133
2134 fn next_table_id(&mut self) -> u32 {
2135 self.inner.next_table_id()
2136 }
2137
2138 fn next_index_id(&mut self) -> u32 {
2139 self.inner.next_index_id()
2140 }
2141}
2142
2143#[cfg(test)]
2144mod tests {
2145 use super::*;
2146 use crate::planner::types::ResolvedType;
2147 use std::collections::HashSet;
2148
2149 fn test_table(name: &str, id: u32) -> TableMetadata {
2150 TableMetadata::new(
2151 name,
2152 vec![ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true)],
2153 )
2154 .with_table_id(id)
2155 .with_primary_key(vec!["id".to_string()])
2156 }
2157
2158 fn legacy_table_key(table_name: &str) -> Vec<u8> {
2159 let mut key = TABLES_PREFIX.to_vec();
2160 key.extend_from_slice(table_name.as_bytes());
2161 key
2162 }
2163
2164 fn legacy_index_key(index_name: &str) -> Vec<u8> {
2165 let mut key = INDEXES_PREFIX.to_vec();
2166 key.extend_from_slice(index_name.as_bytes());
2167 key
2168 }
2169
2170 fn seed_legacy_store(store: &Arc<alopex_core::kv::memory::MemoryKV>) {
2171 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2172 let table = test_table("users", 7);
2173 let legacy_table = PersistedTableMetaV1 {
2174 table_id: table.table_id,
2175 name: table.name.clone(),
2176 columns: table
2177 .columns
2178 .iter()
2179 .map(PersistedColumnMeta::from)
2180 .collect(),
2181 primary_key: table.primary_key.clone(),
2182 storage_options: table.storage_options.clone().into(),
2183 };
2184 let table_bytes = bincode::serialize(&legacy_table).unwrap();
2185 txn.put(legacy_table_key("users"), table_bytes).unwrap();
2186
2187 let legacy_index = PersistedIndexMetaV1 {
2188 index_id: 3,
2189 name: "idx_users_id".to_string(),
2190 table: "users".to_string(),
2191 columns: vec!["id".to_string()],
2192 column_indices: vec![0],
2193 unique: false,
2194 method: Some(PersistedIndexType::BTree),
2195 options: Vec::new(),
2196 };
2197 let index_bytes = bincode::serialize(&legacy_index).unwrap();
2198 txn.put(legacy_index_key("idx_users_id"), index_bytes)
2199 .unwrap();
2200
2201 let meta = CatalogState {
2202 version: 1,
2203 table_id_counter: 7,
2204 index_id_counter: 3,
2205 };
2206 let meta_bytes = bincode::serialize(&meta).unwrap();
2207 txn.put(META_KEY.to_vec(), meta_bytes).unwrap();
2208 txn.commit_self().unwrap();
2209 }
2210
2211 #[test]
2212 fn load_empty_store() {
2213 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2214 let catalog = PersistentCatalog::load(store).unwrap();
2215 assert_eq!(catalog.inner.table_count(), 0);
2216 assert_eq!(catalog.inner.index_count(), 0);
2217 }
2218
2219 #[test]
2220 fn load_migrates_v1_keys_and_meta() {
2221 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2222 seed_legacy_store(&store);
2223
2224 let reloaded = PersistentCatalog::load(store.clone()).unwrap();
2225 assert!(reloaded.get_catalog("default").is_some());
2226 assert!(reloaded.get_namespace("default", "default").is_some());
2227
2228 let table = reloaded.get_table("users").unwrap();
2229 assert_eq!(table.catalog_name, "default");
2230 assert_eq!(table.namespace_name, "default");
2231
2232 let index = reloaded.get_index("idx_users_id").unwrap();
2233 assert_eq!(index.catalog_name, "default");
2234 assert_eq!(index.namespace_name, "default");
2235 assert_eq!(index.table, "users");
2236
2237 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2238 assert!(txn.get(&legacy_table_key("users")).unwrap().is_none());
2239 assert!(
2240 txn.get(&table_key("default", "default", "users"))
2241 .unwrap()
2242 .is_some()
2243 );
2244 assert!(
2245 txn.get(&legacy_index_key("idx_users_id"))
2246 .unwrap()
2247 .is_none()
2248 );
2249 assert!(
2250 txn.get(&index_key("default", "default", "users", "idx_users_id"))
2251 .unwrap()
2252 .is_some()
2253 );
2254 let meta_bytes = txn.get(&META_KEY.to_vec()).unwrap().unwrap();
2255 let meta: CatalogState = bincode::deserialize(&meta_bytes).unwrap();
2256 assert_eq!(meta.version, CATALOG_VERSION);
2257 txn.rollback_self().unwrap();
2258 }
2259
2260 #[test]
2261 fn load_after_migration_keeps_v2_keys() {
2262 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2263 seed_legacy_store(&store);
2264
2265 let _ = PersistentCatalog::load(store.clone()).unwrap();
2266 let reloaded = PersistentCatalog::load(store.clone()).unwrap();
2267
2268 let table = reloaded.get_table("users").unwrap();
2269 assert_eq!(table.catalog_name, "default");
2270 assert_eq!(table.namespace_name, "default");
2271
2272 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2273 assert!(txn.get(&legacy_table_key("users")).unwrap().is_none());
2274 assert!(
2275 txn.get(&table_key("default", "default", "users"))
2276 .unwrap()
2277 .is_some()
2278 );
2279 let meta_bytes = txn.get(&META_KEY.to_vec()).unwrap().unwrap();
2280 let meta: CatalogState = bincode::deserialize(&meta_bytes).unwrap();
2281 assert_eq!(meta.version, CATALOG_VERSION);
2282 txn.rollback_self().unwrap();
2283 }
2284
2285 #[test]
2286 fn create_table_persists() {
2287 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2288 let mut catalog = PersistentCatalog::new(store.clone());
2289
2290 catalog.inner.set_counters(1, 0);
2292
2293 let table = test_table("users", 1);
2294 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2295 catalog.persist_create_table(&mut txn, &table).unwrap();
2296 txn.commit_self().unwrap();
2297
2298 let reloaded = PersistentCatalog::load(store).unwrap();
2299 assert!(reloaded.table_exists("users"));
2300 assert_eq!(reloaded.get_table("users").unwrap().table_id, 1);
2301 }
2302
2303 #[test]
2304 fn drop_table_removes() {
2305 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2306 let mut catalog = PersistentCatalog::new(store.clone());
2307 catalog.inner.set_counters(1, 0);
2308
2309 let table = test_table("users", 1);
2310 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2311 catalog.persist_create_table(&mut txn, &table).unwrap();
2312 txn.commit_self().unwrap();
2313
2314 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2315 let fqn = TableFqn::new("default", "default", "users");
2316 catalog.persist_drop_table(&mut txn, &fqn).unwrap();
2317 txn.commit_self().unwrap();
2318
2319 let reloaded = PersistentCatalog::load(store).unwrap();
2320 assert!(!reloaded.table_exists("users"));
2321 }
2322
2323 #[test]
2324 fn reload_preserves_state() {
2325 let temp_dir = tempfile::tempdir().unwrap();
2326 let wal_path = temp_dir.path().join("catalog.wal");
2327 let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
2328 let mut catalog = PersistentCatalog::new(store.clone());
2329 catalog.inner.set_counters(1, 0);
2330
2331 let table = test_table("users", 1);
2332 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2333 catalog.persist_create_table(&mut txn, &table).unwrap();
2334 txn.commit_self().unwrap();
2335 store.flush().unwrap();
2336
2337 drop(catalog);
2338 drop(store);
2339
2340 let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
2341 let reloaded = PersistentCatalog::load(store).unwrap();
2342 assert!(reloaded.table_exists("users"));
2343 }
2344
2345 #[test]
2346 fn overlay_applied_on_commit() {
2347 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2348 let mut catalog = PersistentCatalog::new(store);
2349 let users = test_table("users", 1);
2350 catalog.inner.insert_table_unchecked(users.clone());
2351
2352 let mut overlay = CatalogOverlay::new();
2353 overlay.drop_table(&TableFqn::from(&users));
2354 let orders = test_table("orders", 2);
2355 overlay.add_table(TableFqn::from(&orders), orders);
2356
2357 assert!(!catalog.table_exists_in_txn("users", &overlay));
2358 assert!(catalog.table_exists_in_txn("orders", &overlay));
2359
2360 catalog.apply_overlay(overlay);
2361
2362 assert!(!catalog.table_exists("users"));
2363 assert!(catalog.table_exists("orders"));
2364 }
2365
2366 #[test]
2367 fn overlay_discarded_on_rollback() {
2368 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2369 let mut catalog = PersistentCatalog::new(store);
2370 let users = test_table("users", 1);
2371 catalog.inner.insert_table_unchecked(users.clone());
2372
2373 let mut overlay = CatalogOverlay::new();
2374 overlay.drop_table(&TableFqn::from(&users));
2375
2376 PersistentCatalog::<alopex_core::kv::memory::MemoryKV>::discard_overlay(overlay);
2377
2378 assert!(catalog.table_exists("users"));
2379 }
2380
2381 #[test]
2382 fn catalog_crud_persists() {
2383 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2384 let mut catalog = PersistentCatalog::new(store.clone());
2385
2386 let meta = CatalogMeta {
2387 name: "main".to_string(),
2388 comment: Some("primary".to_string()),
2389 storage_root: Some("/tmp/alopex".to_string()),
2390 };
2391
2392 catalog.create_catalog(meta.clone()).unwrap();
2393 assert!(catalog.get_catalog("main").is_some());
2394
2395 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2396 let stored = txn.get(&catalog_key("main")).unwrap().unwrap();
2397 let decoded: CatalogMeta = bincode::deserialize(&stored).unwrap();
2398 txn.rollback_self().unwrap();
2399 assert_eq!(decoded, meta);
2400
2401 catalog.delete_catalog("main").unwrap();
2402 assert!(catalog.get_catalog("main").is_none());
2403 }
2404
2405 #[test]
2406 fn namespace_crud_persists_and_validates_catalog() {
2407 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2408 let mut catalog = PersistentCatalog::new(store.clone());
2409
2410 let missing_catalog = NamespaceMeta {
2411 name: "analytics".to_string(),
2412 catalog_name: "missing".to_string(),
2413 comment: None,
2414 storage_root: None,
2415 };
2416 let err = catalog.create_namespace(missing_catalog).unwrap_err();
2417 assert!(matches!(err, CatalogError::InvalidKey(_)));
2418
2419 catalog
2420 .create_catalog(CatalogMeta {
2421 name: "main".to_string(),
2422 comment: None,
2423 storage_root: None,
2424 })
2425 .unwrap();
2426
2427 let namespace = NamespaceMeta {
2428 name: "analytics".to_string(),
2429 catalog_name: "main".to_string(),
2430 comment: Some("warehouse".to_string()),
2431 storage_root: None,
2432 };
2433
2434 catalog.create_namespace(namespace.clone()).unwrap();
2435 assert!(catalog.get_namespace("main", "analytics").is_some());
2436
2437 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2438 let stored = txn
2439 .get(&namespace_key("main", "analytics"))
2440 .unwrap()
2441 .unwrap();
2442 let decoded: NamespaceMeta = bincode::deserialize(&stored).unwrap();
2443 txn.rollback_self().unwrap();
2444 assert_eq!(decoded, namespace);
2445
2446 catalog.delete_namespace("main", "analytics").unwrap();
2447 assert!(catalog.get_namespace("main", "analytics").is_none());
2448 }
2449
2450 #[test]
2451 fn delete_catalog_removes_namespaces_from_store() {
2452 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2453 let mut catalog = PersistentCatalog::new(store.clone());
2454
2455 catalog
2456 .create_catalog(CatalogMeta {
2457 name: "main".to_string(),
2458 comment: None,
2459 storage_root: None,
2460 })
2461 .unwrap();
2462 catalog
2463 .create_namespace(NamespaceMeta {
2464 name: "analytics".to_string(),
2465 catalog_name: "main".to_string(),
2466 comment: None,
2467 storage_root: None,
2468 })
2469 .unwrap();
2470
2471 catalog.delete_catalog("main").unwrap();
2472
2473 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2474 let mut prefix = NAMESPACES_PREFIX.to_vec();
2475 prefix.extend_from_slice(b"main");
2476 prefix.push(b'/');
2477 let remaining: Vec<_> = txn.scan_prefix(&prefix).unwrap().collect();
2478 txn.rollback_self().unwrap();
2479
2480 assert!(remaining.is_empty());
2481 assert!(catalog.list_namespaces("main").is_empty());
2482 }
2483
2484 #[test]
2485 fn delete_catalog_removes_tables_and_indexes_from_store() {
2486 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2487 let mut catalog = PersistentCatalog::new(store.clone());
2488
2489 catalog
2490 .create_catalog(CatalogMeta {
2491 name: "main".to_string(),
2492 comment: None,
2493 storage_root: None,
2494 })
2495 .unwrap();
2496
2497 let mut table = test_table("users", 1);
2498 table.catalog_name = "main".to_string();
2499 table.namespace_name = "default".to_string();
2500
2501 let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2502 .with_column_indices(vec![0]);
2503 index.catalog_name = "main".to_string();
2504 index.namespace_name = "default".to_string();
2505
2506 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2507 catalog.persist_create_table(&mut txn, &table).unwrap();
2508 catalog.persist_create_index(&mut txn, &index).unwrap();
2509 txn.commit_self().unwrap();
2510
2511 catalog.inner.insert_table_unchecked(table);
2512 catalog.inner.insert_index_unchecked(index);
2513
2514 catalog.delete_catalog("main").unwrap();
2515
2516 assert!(catalog.inner.get_table("users").is_none());
2517 assert!(catalog.inner.get_index("idx_users_id").is_none());
2518
2519 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2520 assert!(
2521 txn.get(&table_key("main", "default", "users"))
2522 .unwrap()
2523 .is_none()
2524 );
2525 assert!(
2526 txn.get(&index_key("main", "default", "users", "idx_users_id"))
2527 .unwrap()
2528 .is_none()
2529 );
2530 txn.rollback_self().unwrap();
2531 }
2532
2533 #[test]
2534 fn index_meta_loads_catalog_and_namespace_from_table() {
2535 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2536 let mut catalog = PersistentCatalog::new(store.clone());
2537 catalog.inner.set_counters(1, 1);
2538
2539 let mut table = test_table("users", 1);
2540 table.catalog_name = "main".to_string();
2541 table.namespace_name = "analytics".to_string();
2542
2543 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2544 catalog.persist_create_table(&mut txn, &table).unwrap();
2545 let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2546 .with_column_indices(vec![0])
2547 .with_method(IndexMethod::BTree);
2548 index.catalog_name = "main".to_string();
2549 index.namespace_name = "analytics".to_string();
2550 catalog.persist_create_index(&mut txn, &index).unwrap();
2551 txn.commit_self().unwrap();
2552
2553 let reloaded = PersistentCatalog::load(store).unwrap();
2554 let index = reloaded.get_index("idx_users_id").unwrap();
2555 assert_eq!(index.catalog_name, "main");
2556 assert_eq!(index.namespace_name, "analytics");
2557 }
2558
2559 #[test]
2560 fn legacy_index_meta_loads_catalog_and_namespace_from_table() {
2561 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2562 let mut catalog = PersistentCatalog::new(store.clone());
2563 catalog.inner.set_counters(1, 1);
2564
2565 let mut table = test_table("users", 1);
2566 table.catalog_name = "main".to_string();
2567 table.namespace_name = "analytics".to_string();
2568
2569 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2570 catalog.persist_create_table(&mut txn, &table).unwrap();
2571 let legacy = PersistedIndexMetaV1 {
2572 index_id: 1,
2573 name: "idx_users_id".to_string(),
2574 table: "users".to_string(),
2575 columns: vec!["id".to_string()],
2576 column_indices: vec![0],
2577 unique: false,
2578 method: Some(PersistedIndexType::BTree),
2579 options: Vec::new(),
2580 };
2581 let bytes = bincode::serialize(&legacy).unwrap();
2582 txn.put(
2583 index_key("main", "analytics", "users", "idx_users_id"),
2584 bytes,
2585 )
2586 .unwrap();
2587 txn.commit_self().unwrap();
2588
2589 let reloaded = PersistentCatalog::load(store).unwrap();
2590 let index = reloaded.get_index("idx_users_id").unwrap();
2591 assert_eq!(index.catalog_name, "main");
2592 assert_eq!(index.namespace_name, "analytics");
2593 }
2594
2595 #[test]
2596 fn overlay_catalog_get_and_list() {
2597 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2598 let mut catalog = PersistentCatalog::new(store);
2599 catalog
2600 .create_catalog(CatalogMeta {
2601 name: "main".to_string(),
2602 comment: None,
2603 storage_root: None,
2604 })
2605 .unwrap();
2606
2607 let mut overlay = CatalogOverlay::new();
2608 overlay.add_catalog(CatalogMeta {
2609 name: "temp".to_string(),
2610 comment: None,
2611 storage_root: None,
2612 });
2613 overlay.drop_catalog("main");
2614
2615 assert!(catalog.get_catalog_in_txn("main", &overlay).is_none());
2616 assert!(catalog.get_catalog_in_txn("temp", &overlay).is_some());
2617
2618 let names: Vec<String> = catalog
2619 .list_catalogs_in_txn(&overlay)
2620 .into_iter()
2621 .map(|meta| meta.name)
2622 .collect();
2623 assert_eq!(names, vec!["temp".to_string()]);
2624 }
2625
2626 #[test]
2627 fn overlay_namespace_get_and_list() {
2628 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2629 let mut catalog = PersistentCatalog::new(store);
2630 catalog
2631 .create_catalog(CatalogMeta {
2632 name: "main".to_string(),
2633 comment: None,
2634 storage_root: None,
2635 })
2636 .unwrap();
2637 catalog
2638 .create_namespace(NamespaceMeta {
2639 name: "default".to_string(),
2640 catalog_name: "main".to_string(),
2641 comment: None,
2642 storage_root: None,
2643 })
2644 .unwrap();
2645
2646 let mut overlay = CatalogOverlay::new();
2647 overlay.add_namespace(NamespaceMeta {
2648 name: "analytics".to_string(),
2649 catalog_name: "main".to_string(),
2650 comment: None,
2651 storage_root: None,
2652 });
2653 overlay.drop_namespace("main", "default");
2654
2655 assert!(
2656 catalog
2657 .get_namespace_in_txn("main", "default", &overlay)
2658 .is_none()
2659 );
2660 assert!(
2661 catalog
2662 .get_namespace_in_txn("main", "analytics", &overlay)
2663 .is_some()
2664 );
2665
2666 let names: Vec<String> = catalog
2667 .list_namespaces_in_txn("main", &overlay)
2668 .into_iter()
2669 .map(|meta| meta.name)
2670 .collect();
2671 assert_eq!(names, vec!["analytics".to_string()]);
2672 }
2673
2674 #[test]
2675 fn overlay_table_and_index_list() {
2676 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2677 let mut catalog = PersistentCatalog::new(store);
2678
2679 let mut users = test_table("users", 1);
2680 users.catalog_name = "main".to_string();
2681 users.namespace_name = "default".to_string();
2682 catalog.inner.insert_table_unchecked(users.clone());
2683
2684 let mut users_index =
2685 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2686 .with_column_indices(vec![0]);
2687 users_index.catalog_name = "main".to_string();
2688 users_index.namespace_name = "default".to_string();
2689 catalog.inner.insert_index_unchecked(users_index);
2690
2691 let mut overlay = CatalogOverlay::new();
2692
2693 let mut orders = test_table("orders", 2);
2694 orders.catalog_name = "main".to_string();
2695 orders.namespace_name = "default".to_string();
2696 overlay.add_table(TableFqn::from(&orders), orders.clone());
2697
2698 let mut orders_index =
2699 IndexMetadata::new(2, "idx_orders_id", "orders", vec!["id".to_string()])
2700 .with_column_indices(vec![0]);
2701 orders_index.catalog_name = "main".to_string();
2702 orders_index.namespace_name = "default".to_string();
2703 overlay.add_index(IndexFqn::from(&orders_index), orders_index);
2704
2705 overlay.drop_table(&TableFqn::from(&users));
2706
2707 let table_names: Vec<String> = catalog
2708 .list_tables_in_txn("main", "default", &overlay)
2709 .into_iter()
2710 .map(|table| table.name)
2711 .collect();
2712 assert_eq!(table_names, vec!["orders".to_string()]);
2713
2714 let users_fqn = TableFqn::new("main", "default", "users");
2715 assert!(catalog.list_indexes_in_txn(&users_fqn, &overlay).is_empty());
2716
2717 let orders_fqn = TableFqn::new("main", "default", "orders");
2718 let index_names: Vec<String> = catalog
2719 .list_indexes_in_txn(&orders_fqn, &overlay)
2720 .into_iter()
2721 .map(|index| index.name)
2722 .collect();
2723 assert_eq!(index_names, vec!["idx_orders_id".to_string()]);
2724 }
2725
2726 #[test]
2727 fn overlay_name_lookup_ambiguous_returns_none() {
2728 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2729 let catalog = PersistentCatalog::new(store);
2730
2731 let mut overlay = CatalogOverlay::new();
2732
2733 let mut users_default = test_table("users", 1);
2734 users_default.catalog_name = "main".to_string();
2735 users_default.namespace_name = "default".to_string();
2736 overlay.add_table(TableFqn::from(&users_default), users_default);
2737
2738 let mut users_analytics = test_table("users", 2);
2739 users_analytics.catalog_name = "main".to_string();
2740 users_analytics.namespace_name = "analytics".to_string();
2741 overlay.add_table(TableFqn::from(&users_analytics), users_analytics);
2742
2743 assert!(catalog.get_table_in_txn("users", &overlay).is_none());
2744 assert!(!catalog.table_exists_in_txn("users", &overlay));
2745
2746 let mut idx_default =
2747 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2748 .with_column_indices(vec![0]);
2749 idx_default.catalog_name = "main".to_string();
2750 idx_default.namespace_name = "default".to_string();
2751 overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2752
2753 let mut idx_analytics =
2754 IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2755 .with_column_indices(vec![0]);
2756 idx_analytics.catalog_name = "main".to_string();
2757 idx_analytics.namespace_name = "analytics".to_string();
2758 overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2759
2760 assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
2761 assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
2762 }
2763
2764 #[test]
2765 fn overlay_name_lookup_ambiguous_with_base_returns_none() {
2766 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2767 let mut catalog = PersistentCatalog::new(store);
2768
2769 let mut overlay = CatalogOverlay::new();
2770
2771 let mut base_users = test_table("users", 1);
2772 base_users.catalog_name = "main".to_string();
2773 base_users.namespace_name = "default".to_string();
2774 catalog.inner.insert_table_unchecked(base_users);
2775
2776 let mut overlay_users = test_table("users", 2);
2777 overlay_users.catalog_name = "main".to_string();
2778 overlay_users.namespace_name = "analytics".to_string();
2779 overlay.add_table(TableFqn::from(&overlay_users), overlay_users);
2780
2781 assert!(catalog.get_table_in_txn("users", &overlay).is_none());
2782 assert!(!catalog.table_exists_in_txn("users", &overlay));
2783
2784 let mut base_index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2785 .with_column_indices(vec![0]);
2786 base_index.catalog_name = "main".to_string();
2787 base_index.namespace_name = "default".to_string();
2788 catalog.inner.insert_index_unchecked(base_index);
2789
2790 let mut overlay_index =
2791 IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2792 .with_column_indices(vec![0]);
2793 overlay_index.catalog_name = "main".to_string();
2794 overlay_index.namespace_name = "analytics".to_string();
2795 overlay.add_index(IndexFqn::from(&overlay_index), overlay_index);
2796
2797 assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
2798 assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
2799 }
2800
2801 #[test]
2802 fn overlay_fqn_tables_separate_namespaces() {
2803 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2804 let catalog = PersistentCatalog::new(store);
2805
2806 let mut overlay = CatalogOverlay::new();
2807
2808 let mut users_default = test_table("users", 1);
2809 users_default.catalog_name = "main".to_string();
2810 users_default.namespace_name = "default".to_string();
2811 overlay.add_table(TableFqn::from(&users_default), users_default.clone());
2812
2813 let mut users_analytics = test_table("users", 2);
2814 users_analytics.catalog_name = "main".to_string();
2815 users_analytics.namespace_name = "analytics".to_string();
2816 overlay.add_table(TableFqn::from(&users_analytics), users_analytics.clone());
2817
2818 let default_tables: Vec<String> = catalog
2819 .list_tables_in_txn("main", "default", &overlay)
2820 .into_iter()
2821 .map(|table| table.name)
2822 .collect();
2823 assert_eq!(default_tables, vec!["users".to_string()]);
2824
2825 let analytics_tables: Vec<String> = catalog
2826 .list_tables_in_txn("main", "analytics", &overlay)
2827 .into_iter()
2828 .map(|table| table.name)
2829 .collect();
2830 assert_eq!(analytics_tables, vec!["users".to_string()]);
2831
2832 overlay.drop_table(&TableFqn::from(&users_default));
2833
2834 let default_tables_after: Vec<String> = catalog
2835 .list_tables_in_txn("main", "default", &overlay)
2836 .into_iter()
2837 .map(|table| table.name)
2838 .collect();
2839 assert!(default_tables_after.is_empty());
2840
2841 let analytics_tables_after: Vec<String> = catalog
2842 .list_tables_in_txn("main", "analytics", &overlay)
2843 .into_iter()
2844 .map(|table| table.name)
2845 .collect();
2846 assert_eq!(analytics_tables_after, vec!["users".to_string()]);
2847 }
2848
2849 #[test]
2850 fn overlay_fqn_indexes_separate_namespaces() {
2851 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2852 let catalog = PersistentCatalog::new(store);
2853
2854 let mut overlay = CatalogOverlay::new();
2855
2856 let mut users_default = test_table("users", 1);
2857 users_default.catalog_name = "main".to_string();
2858 users_default.namespace_name = "default".to_string();
2859 overlay.add_table(TableFqn::from(&users_default), users_default.clone());
2860
2861 let mut users_analytics = test_table("users", 2);
2862 users_analytics.catalog_name = "main".to_string();
2863 users_analytics.namespace_name = "analytics".to_string();
2864 overlay.add_table(TableFqn::from(&users_analytics), users_analytics.clone());
2865
2866 let mut idx_default =
2867 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2868 .with_column_indices(vec![0]);
2869 idx_default.catalog_name = "main".to_string();
2870 idx_default.namespace_name = "default".to_string();
2871 overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2872
2873 let mut idx_analytics =
2874 IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2875 .with_column_indices(vec![0]);
2876 idx_analytics.catalog_name = "main".to_string();
2877 idx_analytics.namespace_name = "analytics".to_string();
2878 overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2879
2880 let default_fqn = TableFqn::new("main", "default", "users");
2881 let analytics_fqn = TableFqn::new("main", "analytics", "users");
2882
2883 let default_indexes: Vec<String> = catalog
2884 .list_indexes_in_txn(&default_fqn, &overlay)
2885 .into_iter()
2886 .map(|index| index.name)
2887 .collect();
2888 assert_eq!(default_indexes, vec!["idx_users_id".to_string()]);
2889
2890 let analytics_indexes: Vec<String> = catalog
2891 .list_indexes_in_txn(&analytics_fqn, &overlay)
2892 .into_iter()
2893 .map(|index| index.name)
2894 .collect();
2895 assert_eq!(analytics_indexes, vec!["idx_users_id".to_string()]);
2896 }
2897
2898 #[test]
2899 fn persist_overlay_rejects_duplicate_table_names() {
2900 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2901 let mut catalog = PersistentCatalog::new(store.clone());
2902
2903 let mut overlay = CatalogOverlay::new();
2904
2905 let mut users_default = test_table("users", 1);
2906 users_default.catalog_name = "main".to_string();
2907 users_default.namespace_name = "default".to_string();
2908 overlay.add_table(TableFqn::from(&users_default), users_default);
2909
2910 let mut users_analytics = test_table("users", 2);
2911 users_analytics.catalog_name = "main".to_string();
2912 users_analytics.namespace_name = "analytics".to_string();
2913 overlay.add_table(TableFqn::from(&users_analytics), users_analytics);
2914
2915 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2916 let err = catalog.persist_overlay(&mut txn, &overlay).unwrap_err();
2917 assert!(matches!(err, CatalogError::InvalidKey(_)));
2918 txn.rollback_self().unwrap();
2919 }
2920
2921 #[test]
2922 fn persist_overlay_rejects_duplicate_index_names() {
2923 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2924 let mut catalog = PersistentCatalog::new(store.clone());
2925
2926 let mut overlay = CatalogOverlay::new();
2927
2928 let mut idx_default =
2929 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2930 .with_column_indices(vec![0]);
2931 idx_default.catalog_name = "main".to_string();
2932 idx_default.namespace_name = "default".to_string();
2933 overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2934
2935 let mut idx_analytics =
2936 IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2937 .with_column_indices(vec![0]);
2938 idx_analytics.catalog_name = "main".to_string();
2939 idx_analytics.namespace_name = "analytics".to_string();
2940 overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2941
2942 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2943 let err = catalog.persist_overlay(&mut txn, &overlay).unwrap_err();
2944 assert!(matches!(err, CatalogError::InvalidKey(_)));
2945 txn.rollback_self().unwrap();
2946 }
2947
2948 #[test]
2949 fn overlay_drop_cascade_namespace_removes_children() {
2950 let mut overlay = CatalogOverlay::new();
2951
2952 overlay.add_namespace(NamespaceMeta {
2953 name: "default".to_string(),
2954 catalog_name: "main".to_string(),
2955 comment: None,
2956 storage_root: None,
2957 });
2958
2959 let mut users = test_table("users", 1);
2960 users.catalog_name = "main".to_string();
2961 users.namespace_name = "default".to_string();
2962 overlay.add_table(TableFqn::from(&users), users.clone());
2963
2964 let mut users_index =
2965 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2966 .with_column_indices(vec![0]);
2967 users_index.catalog_name = "main".to_string();
2968 users_index.namespace_name = "default".to_string();
2969 let users_index_fqn = IndexFqn::from(&users_index);
2970 overlay.add_index(users_index_fqn.clone(), users_index);
2971
2972 overlay.drop_cascade_namespace("main", "default");
2973
2974 assert!(
2975 overlay
2976 .dropped_namespaces
2977 .contains(&("main".to_string(), "default".to_string()))
2978 );
2979 assert!(overlay.dropped_tables.contains(&TableFqn::from(&users)));
2980 assert!(overlay.dropped_indexes.contains(&users_index_fqn));
2981 assert!(!overlay.added_tables.contains_key(&TableFqn::from(&users)));
2982 assert!(!overlay.added_indexes.contains_key(&users_index_fqn));
2983 }
2984
2985 #[test]
2986 fn overlay_drop_cascade_catalog_removes_children() {
2987 let mut overlay = CatalogOverlay::new();
2988
2989 overlay.add_catalog(CatalogMeta {
2990 name: "main".to_string(),
2991 comment: None,
2992 storage_root: None,
2993 });
2994 overlay.add_namespace(NamespaceMeta {
2995 name: "default".to_string(),
2996 catalog_name: "main".to_string(),
2997 comment: None,
2998 storage_root: None,
2999 });
3000
3001 let mut users = test_table("users", 1);
3002 users.catalog_name = "main".to_string();
3003 users.namespace_name = "default".to_string();
3004 overlay.add_table(TableFqn::from(&users), users.clone());
3005
3006 let mut users_index =
3007 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3008 .with_column_indices(vec![0]);
3009 users_index.catalog_name = "main".to_string();
3010 users_index.namespace_name = "default".to_string();
3011 let users_index_fqn = IndexFqn::from(&users_index);
3012 overlay.add_index(users_index_fqn.clone(), users_index);
3013
3014 overlay.drop_cascade_catalog("main");
3015
3016 assert!(overlay.dropped_catalogs.contains("main"));
3017 assert!(overlay.dropped_tables.contains(&TableFqn::from(&users)));
3018 assert!(overlay.dropped_indexes.contains(&users_index_fqn));
3019 assert!(!overlay.added_tables.contains_key(&TableFqn::from(&users)));
3020 assert!(!overlay.added_indexes.contains_key(&users_index_fqn));
3021 }
3022
3023 #[test]
3024 fn dropped_namespace_hides_tables_and_indexes() {
3025 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
3026 let mut catalog = PersistentCatalog::new(store);
3027
3028 let mut users = test_table("users", 1);
3029 users.catalog_name = "main".to_string();
3030 users.namespace_name = "default".to_string();
3031 catalog.inner.insert_table_unchecked(users);
3032
3033 let mut users_index =
3034 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3035 .with_column_indices(vec![0]);
3036 users_index.catalog_name = "main".to_string();
3037 users_index.namespace_name = "default".to_string();
3038 catalog.inner.insert_index_unchecked(users_index);
3039
3040 let mut overlay = CatalogOverlay::new();
3041 overlay.drop_namespace("main", "default");
3042
3043 let table_names: Vec<String> = catalog
3044 .list_tables_in_txn("main", "default", &overlay)
3045 .into_iter()
3046 .map(|table| table.name)
3047 .collect();
3048 assert!(table_names.is_empty());
3049
3050 let fqn = TableFqn {
3051 catalog: "main".to_string(),
3052 namespace: "default".to_string(),
3053 table: "users".to_string(),
3054 };
3055 let index_names: Vec<String> = catalog
3056 .list_indexes_in_txn(&fqn, &overlay)
3057 .into_iter()
3058 .map(|index| index.name)
3059 .collect();
3060 assert!(index_names.is_empty());
3061 }
3062
3063 #[test]
3064 fn dropped_namespace_hides_get_and_exists() {
3065 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
3066 let mut catalog = PersistentCatalog::new(store);
3067
3068 let mut users = test_table("users", 1);
3069 users.catalog_name = "main".to_string();
3070 users.namespace_name = "default".to_string();
3071 catalog.inner.insert_table_unchecked(users);
3072
3073 let mut users_index =
3074 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3075 .with_column_indices(vec![0]);
3076 users_index.catalog_name = "main".to_string();
3077 users_index.namespace_name = "default".to_string();
3078 catalog.inner.insert_index_unchecked(users_index);
3079
3080 let mut overlay = CatalogOverlay::new();
3081 overlay.drop_namespace("main", "default");
3082
3083 assert!(!catalog.table_exists_in_txn("users", &overlay));
3084 assert!(catalog.get_table_in_txn("users", &overlay).is_none());
3085 assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
3086 assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
3087
3088 let view = TxnCatalogView::new(&catalog, &overlay);
3089 assert!(view.get_indexes_for_table("users").is_empty());
3090 }
3091
3092 #[test]
3093 fn persisted_catalog_meta_roundtrip() {
3094 let meta = PersistedCatalogMeta {
3095 name: "main".to_string(),
3096 comment: Some("primary catalog".to_string()),
3097 storage_root: Some("/tmp/alopex".to_string()),
3098 };
3099 let bytes = bincode::serialize(&meta).unwrap();
3100 let decoded: PersistedCatalogMeta = bincode::deserialize(&bytes).unwrap();
3101 assert_eq!(meta, decoded);
3102 }
3103
3104 #[test]
3105 fn persisted_namespace_meta_roundtrip() {
3106 let meta = PersistedNamespaceMeta {
3107 name: "analytics".to_string(),
3108 catalog_name: "main".to_string(),
3109 comment: Some("warehouse".to_string()),
3110 storage_root: Some("s3://bucket/ns".to_string()),
3111 };
3112 let bytes = bincode::serialize(&meta).unwrap();
3113 let decoded: PersistedNamespaceMeta = bincode::deserialize(&bytes).unwrap();
3114 assert_eq!(meta, decoded);
3115 }
3116
3117 #[test]
3118 fn table_fqn_hash_and_eq() {
3119 let first = TableFqn {
3120 catalog: "main".to_string(),
3121 namespace: "default".to_string(),
3122 table: "users".to_string(),
3123 };
3124 let same = TableFqn {
3125 catalog: "main".to_string(),
3126 namespace: "default".to_string(),
3127 table: "users".to_string(),
3128 };
3129 let different = TableFqn {
3130 catalog: "main".to_string(),
3131 namespace: "default".to_string(),
3132 table: "orders".to_string(),
3133 };
3134
3135 let mut set = HashSet::new();
3136 set.insert(first);
3137 assert!(set.contains(&same));
3138 assert!(!set.contains(&different));
3139 }
3140
3141 #[test]
3142 fn index_fqn_hash_and_eq() {
3143 let first = IndexFqn {
3144 catalog: "main".to_string(),
3145 namespace: "default".to_string(),
3146 table: "users".to_string(),
3147 index: "idx_users_id".to_string(),
3148 };
3149 let same = IndexFqn {
3150 catalog: "main".to_string(),
3151 namespace: "default".to_string(),
3152 table: "users".to_string(),
3153 index: "idx_users_id".to_string(),
3154 };
3155 let different = IndexFqn {
3156 catalog: "main".to_string(),
3157 namespace: "default".to_string(),
3158 table: "users".to_string(),
3159 index: "idx_users_email".to_string(),
3160 };
3161
3162 let mut set = HashSet::new();
3163 set.insert(first);
3164 assert!(set.contains(&same));
3165 assert!(!set.contains(&different));
3166 }
3167
3168 #[test]
3169 fn table_type_and_data_source_format_serde() {
3170 let managed = serde_json::to_string(&TableType::Managed).unwrap();
3171 let external = serde_json::to_string(&TableType::External).unwrap();
3172 let alopex = serde_json::to_string(&DataSourceFormat::Alopex).unwrap();
3173 let parquet = serde_json::to_string(&DataSourceFormat::Parquet).unwrap();
3174 let delta = serde_json::to_string(&DataSourceFormat::Delta).unwrap();
3175
3176 assert_eq!(managed, "\"MANAGED\"");
3177 assert_eq!(external, "\"EXTERNAL\"");
3178 assert_eq!(alopex, "\"ALOPEX\"");
3179 assert_eq!(parquet, "\"PARQUET\"");
3180 assert_eq!(delta, "\"DELTA\"");
3181 }
3182}