1use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11
12use alopex_core::kv::{KVStore, KVTransaction};
13use alopex_core::types::TxnMode;
14use serde::{Deserialize, Serialize};
15use thiserror::Error;
16
17use crate::ast::ddl::{IndexMethod, VectorMetric};
18use crate::catalog::{
19 Catalog, ColumnMetadata, Compression, IndexMetadata, MemoryCatalog, RowIdMode,
20};
21use crate::catalog::{StorageOptions, StorageType, TableMetadata};
22use crate::planner::PlannerError;
23use crate::planner::types::ResolvedType;
24
25pub const CATALOG_PREFIX: &[u8] = b"__catalog__/";
27pub const CATALOGS_PREFIX: &[u8] = b"__catalog__/catalogs/";
28pub const NAMESPACES_PREFIX: &[u8] = b"__catalog__/namespaces/";
29pub const TABLES_PREFIX: &[u8] = b"__catalog__/tables/";
30pub const INDEXES_PREFIX: &[u8] = b"__catalog__/indexes/";
31pub const META_KEY: &[u8] = b"__catalog__/meta";
32
33const CATALOG_VERSION: u32 = 2;
34
35#[derive(Debug, Error)]
36pub enum CatalogError {
37 #[error("kv error: {0}")]
38 Kv(#[from] alopex_core::Error),
39
40 #[error("serialize error: {0}")]
41 Serialize(#[from] bincode::Error),
42
43 #[error("invalid catalog key: {0}")]
44 InvalidKey(String),
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
48struct CatalogState {
49 version: u32,
50 table_id_counter: u32,
51 index_id_counter: u32,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct PersistedCatalogMeta {
56 pub name: String,
57 pub comment: Option<String>,
58 pub storage_root: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62pub struct PersistedNamespaceMeta {
63 pub name: String,
64 pub catalog_name: String,
65 pub comment: Option<String>,
66 pub storage_root: Option<String>,
67}
68
69pub type CatalogMeta = PersistedCatalogMeta;
70pub type NamespaceMeta = PersistedNamespaceMeta;
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
73pub struct TableFqn {
74 pub catalog: String,
75 pub namespace: String,
76 pub table: String,
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct IndexFqn {
81 pub catalog: String,
82 pub namespace: String,
83 pub table: String,
84 pub index: String,
85}
86
87impl TableFqn {
88 pub fn new(catalog: &str, namespace: &str, table: &str) -> Self {
89 Self {
90 catalog: catalog.to_string(),
91 namespace: namespace.to_string(),
92 table: table.to_string(),
93 }
94 }
95}
96
97impl IndexFqn {
98 pub fn new(catalog: &str, namespace: &str, table: &str, index: &str) -> Self {
99 Self {
100 catalog: catalog.to_string(),
101 namespace: namespace.to_string(),
102 table: table.to_string(),
103 index: index.to_string(),
104 }
105 }
106}
107
108impl From<&TableMetadata> for TableFqn {
109 fn from(value: &TableMetadata) -> Self {
110 Self::new(&value.catalog_name, &value.namespace_name, &value.name)
111 }
112}
113
114impl From<&IndexMetadata> for IndexFqn {
115 fn from(value: &IndexMetadata) -> Self {
116 Self::new(
117 &value.catalog_name,
118 &value.namespace_name,
119 &value.table,
120 &value.name,
121 )
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
127pub enum TableType {
128 Managed,
129 External,
130}
131
132#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
133#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
134pub enum DataSourceFormat {
135 #[default]
136 Alopex,
137 Parquet,
138 Delta,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
142pub enum PersistedVectorMetric {
143 Cosine,
144 L2,
145 Inner,
146}
147
148impl From<VectorMetric> for PersistedVectorMetric {
149 fn from(value: VectorMetric) -> Self {
150 match value {
151 VectorMetric::Cosine => Self::Cosine,
152 VectorMetric::L2 => Self::L2,
153 VectorMetric::Inner => Self::Inner,
154 }
155 }
156}
157
158impl From<PersistedVectorMetric> for VectorMetric {
159 fn from(value: PersistedVectorMetric) -> Self {
160 match value {
161 PersistedVectorMetric::Cosine => Self::Cosine,
162 PersistedVectorMetric::L2 => Self::L2,
163 PersistedVectorMetric::Inner => Self::Inner,
164 }
165 }
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
169pub enum PersistedType {
170 Integer,
171 BigInt,
172 Float,
173 Double,
174 Text,
175 Blob,
176 Boolean,
177 Timestamp,
178 Vector {
179 dimension: u32,
180 metric: PersistedVectorMetric,
181 },
182 Null,
183}
184
185impl From<ResolvedType> for PersistedType {
186 fn from(value: ResolvedType) -> Self {
187 match value {
188 ResolvedType::Integer => Self::Integer,
189 ResolvedType::BigInt => Self::BigInt,
190 ResolvedType::Float => Self::Float,
191 ResolvedType::Double => Self::Double,
192 ResolvedType::Text => Self::Text,
193 ResolvedType::Blob => Self::Blob,
194 ResolvedType::Boolean => Self::Boolean,
195 ResolvedType::Timestamp => Self::Timestamp,
196 ResolvedType::Vector { dimension, metric } => Self::Vector {
197 dimension,
198 metric: metric.into(),
199 },
200 ResolvedType::Null => Self::Null,
201 }
202 }
203}
204
205impl From<PersistedType> for ResolvedType {
206 fn from(value: PersistedType) -> Self {
207 match value {
208 PersistedType::Integer => Self::Integer,
209 PersistedType::BigInt => Self::BigInt,
210 PersistedType::Float => Self::Float,
211 PersistedType::Double => Self::Double,
212 PersistedType::Text => Self::Text,
213 PersistedType::Blob => Self::Blob,
214 PersistedType::Boolean => Self::Boolean,
215 PersistedType::Timestamp => Self::Timestamp,
216 PersistedType::Vector { dimension, metric } => Self::Vector {
217 dimension,
218 metric: metric.into(),
219 },
220 PersistedType::Null => Self::Null,
221 }
222 }
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
226pub enum PersistedIndexType {
227 BTree,
228 Hnsw,
229}
230
231impl From<PersistedIndexType> for IndexMethod {
232 fn from(value: PersistedIndexType) -> Self {
233 match value {
234 PersistedIndexType::BTree => IndexMethod::BTree,
235 PersistedIndexType::Hnsw => IndexMethod::Hnsw,
236 }
237 }
238}
239
240impl TryFrom<IndexMethod> for PersistedIndexType {
241 type Error = ();
242
243 fn try_from(value: IndexMethod) -> Result<Self, Self::Error> {
244 match value {
245 IndexMethod::BTree => Ok(Self::BTree),
246 IndexMethod::Hnsw => Ok(Self::Hnsw),
247 }
248 }
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
252pub enum PersistedStorageType {
253 Row,
254 Columnar,
255}
256
257impl From<PersistedStorageType> for StorageType {
258 fn from(value: PersistedStorageType) -> Self {
259 match value {
260 PersistedStorageType::Row => Self::Row,
261 PersistedStorageType::Columnar => Self::Columnar,
262 }
263 }
264}
265
266impl From<StorageType> for PersistedStorageType {
267 fn from(value: StorageType) -> Self {
268 match value {
269 StorageType::Row => Self::Row,
270 StorageType::Columnar => Self::Columnar,
271 }
272 }
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
276pub enum PersistedCompression {
277 None,
278 Lz4,
279 Zstd,
280}
281
282impl From<PersistedCompression> for Compression {
283 fn from(value: PersistedCompression) -> Self {
284 match value {
285 PersistedCompression::None => Self::None,
286 PersistedCompression::Lz4 => Self::Lz4,
287 PersistedCompression::Zstd => Self::Zstd,
288 }
289 }
290}
291
292impl From<Compression> for PersistedCompression {
293 fn from(value: Compression) -> Self {
294 match value {
295 Compression::None => Self::None,
296 Compression::Lz4 => Self::Lz4,
297 Compression::Zstd => Self::Zstd,
298 }
299 }
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
303pub enum PersistedRowIdMode {
304 None,
305 Direct,
306}
307
308impl From<PersistedRowIdMode> for RowIdMode {
309 fn from(value: PersistedRowIdMode) -> Self {
310 match value {
311 PersistedRowIdMode::None => Self::None,
312 PersistedRowIdMode::Direct => Self::Direct,
313 }
314 }
315}
316
317impl From<RowIdMode> for PersistedRowIdMode {
318 fn from(value: RowIdMode) -> Self {
319 match value {
320 RowIdMode::None => Self::None,
321 RowIdMode::Direct => Self::Direct,
322 }
323 }
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
327pub struct PersistedStorageOptions {
328 pub storage_type: PersistedStorageType,
329 pub compression: PersistedCompression,
330 pub row_group_size: u32,
331 pub row_id_mode: PersistedRowIdMode,
332}
333
334impl From<StorageOptions> for PersistedStorageOptions {
335 fn from(value: StorageOptions) -> Self {
336 Self {
337 storage_type: value.storage_type.into(),
338 compression: value.compression.into(),
339 row_group_size: value.row_group_size,
340 row_id_mode: value.row_id_mode.into(),
341 }
342 }
343}
344
345impl From<PersistedStorageOptions> for StorageOptions {
346 fn from(value: PersistedStorageOptions) -> Self {
347 Self {
348 storage_type: value.storage_type.into(),
349 compression: value.compression.into(),
350 row_group_size: value.row_group_size,
351 row_id_mode: value.row_id_mode.into(),
352 }
353 }
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
357pub struct PersistedColumnMeta {
358 pub name: String,
359 pub data_type: PersistedType,
360 pub not_null: bool,
361 pub primary_key: bool,
362 pub unique: bool,
363}
364
365impl From<&ColumnMetadata> for PersistedColumnMeta {
366 fn from(value: &ColumnMetadata) -> Self {
367 Self {
368 name: value.name.clone(),
369 data_type: value.data_type.clone().into(),
370 not_null: value.not_null,
371 primary_key: value.primary_key,
372 unique: value.unique,
373 }
374 }
375}
376
377impl From<PersistedColumnMeta> for ColumnMetadata {
378 fn from(value: PersistedColumnMeta) -> Self {
379 ColumnMetadata::new(value.name, value.data_type.into())
380 .with_not_null(value.not_null)
381 .with_primary_key(value.primary_key)
382 .with_unique(value.unique)
383 }
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
387pub struct PersistedTableMeta {
388 pub table_id: u32,
389 pub name: String,
390 pub catalog_name: String,
391 pub namespace_name: String,
392 pub table_type: TableType,
393 pub data_source_format: DataSourceFormat,
394 pub columns: Vec<PersistedColumnMeta>,
395 pub primary_key: Option<Vec<String>>,
396 pub storage_options: PersistedStorageOptions,
397 pub storage_location: Option<String>,
398 pub comment: Option<String>,
399 pub properties: HashMap<String, String>,
400}
401
402#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
403struct PersistedTableMetaV1 {
404 table_id: u32,
405 name: String,
406 columns: Vec<PersistedColumnMeta>,
407 primary_key: Option<Vec<String>>,
408 storage_options: PersistedStorageOptions,
409}
410
411impl From<&TableMetadata> for PersistedTableMeta {
412 fn from(value: &TableMetadata) -> Self {
413 Self {
414 table_id: value.table_id,
415 name: value.name.clone(),
416 catalog_name: value.catalog_name.clone(),
417 namespace_name: value.namespace_name.clone(),
418 table_type: value.table_type,
419 data_source_format: value.data_source_format,
420 columns: value
421 .columns
422 .iter()
423 .map(PersistedColumnMeta::from)
424 .collect(),
425 primary_key: value.primary_key.clone(),
426 storage_options: value.storage_options.clone().into(),
427 storage_location: value.storage_location.clone(),
428 comment: value.comment.clone(),
429 properties: value.properties.clone(),
430 }
431 }
432}
433
434impl From<PersistedTableMeta> for TableMetadata {
435 fn from(value: PersistedTableMeta) -> Self {
436 let mut table = TableMetadata::new(
437 value.name,
438 value
439 .columns
440 .into_iter()
441 .map(ColumnMetadata::from)
442 .collect(),
443 )
444 .with_table_id(value.table_id);
445 table.primary_key = value.primary_key;
446 table.storage_options = value.storage_options.into();
447 table.catalog_name = value.catalog_name;
448 table.namespace_name = value.namespace_name;
449 table.table_type = value.table_type;
450 table.data_source_format = value.data_source_format;
451 table.storage_location = value.storage_location;
452 table.comment = value.comment;
453 table.properties = value.properties;
454 table
455 }
456}
457
458#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
459pub struct PersistedIndexMeta {
460 pub index_id: u32,
461 pub name: String,
462 pub table: String,
463 pub columns: Vec<String>,
464 pub column_indices: Vec<usize>,
465 pub unique: bool,
466 pub method: Option<PersistedIndexType>,
467 pub options: Vec<(String, String)>,
468 pub catalog_name: String,
469 pub namespace_name: String,
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
473struct PersistedIndexMetaV1 {
474 index_id: u32,
475 name: String,
476 table: String,
477 columns: Vec<String>,
478 column_indices: Vec<usize>,
479 unique: bool,
480 method: Option<PersistedIndexType>,
481 options: Vec<(String, String)>,
482}
483
484impl From<&IndexMetadata> for PersistedIndexMeta {
485 fn from(value: &IndexMetadata) -> Self {
486 Self {
487 index_id: value.index_id,
488 name: value.name.clone(),
489 table: value.table.clone(),
490 columns: value.columns.clone(),
491 column_indices: value.column_indices.clone(),
492 unique: value.unique,
493 method: value
494 .method
495 .and_then(|m| PersistedIndexType::try_from(m).ok()),
496 options: value.options.clone(),
497 catalog_name: value.catalog_name.clone(),
498 namespace_name: value.namespace_name.clone(),
499 }
500 }
501}
502
503impl From<PersistedIndexMeta> for IndexMetadata {
504 fn from(value: PersistedIndexMeta) -> Self {
505 let mut index = IndexMetadata::new(value.index_id, value.name, value.table, value.columns)
506 .with_column_indices(value.column_indices)
507 .with_unique(value.unique)
508 .with_options(value.options);
509 index.catalog_name = value.catalog_name;
510 index.namespace_name = value.namespace_name;
511 if let Some(method) = value.method {
512 index = index.with_method(method.into());
513 }
514 index
515 }
516}
517
518fn deserialize_table_meta(bytes: &[u8]) -> Result<PersistedTableMeta, CatalogError> {
519 match bincode::deserialize::<PersistedTableMeta>(bytes) {
520 Ok(meta) => Ok(meta),
521 Err(err) => {
522 let is_legacy = matches!(
523 err.as_ref(),
524 bincode::ErrorKind::Io(io)
525 if io.kind() == std::io::ErrorKind::UnexpectedEof
526 );
527 if !is_legacy {
528 return Err(err.into());
529 }
530 let legacy: PersistedTableMetaV1 = bincode::deserialize(bytes)?;
531 Ok(PersistedTableMeta {
532 table_id: legacy.table_id,
533 name: legacy.name,
534 catalog_name: "default".to_string(),
535 namespace_name: "default".to_string(),
536 table_type: TableType::Managed,
537 data_source_format: DataSourceFormat::Alopex,
538 columns: legacy.columns,
539 primary_key: legacy.primary_key,
540 storage_options: legacy.storage_options,
541 storage_location: None,
542 comment: None,
543 properties: HashMap::new(),
544 })
545 }
546 }
547}
548
549fn deserialize_index_meta(bytes: &[u8]) -> Result<PersistedIndexMeta, CatalogError> {
550 match bincode::deserialize::<PersistedIndexMeta>(bytes) {
551 Ok(meta) => Ok(meta),
552 Err(err) => {
553 let is_legacy = matches!(
554 err.as_ref(),
555 bincode::ErrorKind::Io(io)
556 if io.kind() == std::io::ErrorKind::UnexpectedEof
557 );
558 if !is_legacy {
559 return Err(err.into());
560 }
561 let legacy: PersistedIndexMetaV1 = bincode::deserialize(bytes)?;
562 Ok(PersistedIndexMeta {
563 index_id: legacy.index_id,
564 name: legacy.name,
565 table: legacy.table,
566 columns: legacy.columns,
567 column_indices: legacy.column_indices,
568 unique: legacy.unique,
569 method: legacy.method,
570 options: legacy.options,
571 catalog_name: "default".to_string(),
572 namespace_name: "default".to_string(),
573 })
574 }
575 }
576}
577
578fn table_key(catalog_name: &str, namespace_name: &str, table_name: &str) -> Vec<u8> {
579 let mut key = TABLES_PREFIX.to_vec();
580 key.extend_from_slice(catalog_name.as_bytes());
581 key.push(b'/');
582 key.extend_from_slice(namespace_name.as_bytes());
583 key.push(b'/');
584 key.extend_from_slice(table_name.as_bytes());
585 key
586}
587
588fn catalog_key(name: &str) -> Vec<u8> {
589 let mut key = CATALOGS_PREFIX.to_vec();
590 key.extend_from_slice(name.as_bytes());
591 key
592}
593
594fn namespace_key(catalog_name: &str, namespace_name: &str) -> Vec<u8> {
595 let mut key = NAMESPACES_PREFIX.to_vec();
596 key.extend_from_slice(catalog_name.as_bytes());
597 key.push(b'/');
598 key.extend_from_slice(namespace_name.as_bytes());
599 key
600}
601
602fn index_key(
603 catalog_name: &str,
604 namespace_name: &str,
605 table_name: &str,
606 index_name: &str,
607) -> Vec<u8> {
608 let mut key = INDEXES_PREFIX.to_vec();
609 key.extend_from_slice(catalog_name.as_bytes());
610 key.push(b'/');
611 key.extend_from_slice(namespace_name.as_bytes());
612 key.push(b'/');
613 key.extend_from_slice(table_name.as_bytes());
614 key.push(b'/');
615 key.extend_from_slice(index_name.as_bytes());
616 key
617}
618
619fn index_prefix(catalog_name: &str, namespace_name: &str, table_name: &str) -> Vec<u8> {
620 let mut key = INDEXES_PREFIX.to_vec();
621 key.extend_from_slice(catalog_name.as_bytes());
622 key.push(b'/');
623 key.extend_from_slice(namespace_name.as_bytes());
624 key.push(b'/');
625 key.extend_from_slice(table_name.as_bytes());
626 key.push(b'/');
627 key
628}
629
630fn key_suffix(prefix: &[u8], key: &[u8]) -> Result<String, CatalogError> {
631 let suffix = key
632 .strip_prefix(prefix)
633 .ok_or_else(|| CatalogError::InvalidKey(format!("{key:?}")))?;
634 std::str::from_utf8(suffix)
635 .map(|s| s.to_string())
636 .map_err(|_| CatalogError::InvalidKey(format!("{key:?}")))
637}
638
639fn parse_table_key_suffix(suffix: &str) -> Result<TableFqn, CatalogError> {
640 let mut parts = suffix.splitn(3, '/');
641 let catalog = parts
642 .next()
643 .filter(|part| !part.is_empty())
644 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
645 let namespace = parts
646 .next()
647 .filter(|part| !part.is_empty())
648 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
649 let table = parts
650 .next()
651 .filter(|part| !part.is_empty())
652 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
653 Ok(TableFqn::new(catalog, namespace, table))
654}
655
656fn parse_index_key_suffix(suffix: &str) -> Result<IndexFqn, CatalogError> {
657 let mut parts = suffix.splitn(4, '/');
658 let catalog = parts
659 .next()
660 .filter(|part| !part.is_empty())
661 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
662 let namespace = parts
663 .next()
664 .filter(|part| !part.is_empty())
665 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
666 let table = parts
667 .next()
668 .filter(|part| !part.is_empty())
669 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
670 let index = parts
671 .next()
672 .filter(|part| !part.is_empty())
673 .ok_or_else(|| CatalogError::InvalidKey(suffix.to_string()))?;
674 Ok(IndexFqn::new(catalog, namespace, table, index))
675}
676
677#[derive(Debug, Clone, Default)]
678pub struct CatalogOverlay {
679 added_catalogs: HashMap<String, CatalogMeta>,
680 dropped_catalogs: HashSet<String>,
681 added_namespaces: HashMap<(String, String), NamespaceMeta>,
682 dropped_namespaces: HashSet<(String, String)>,
683 added_tables: HashMap<TableFqn, TableMetadata>,
684 dropped_tables: HashSet<TableFqn>,
685 added_indexes: HashMap<IndexFqn, IndexMetadata>,
686 dropped_indexes: HashSet<IndexFqn>,
687}
688
689impl CatalogOverlay {
690 pub fn new() -> Self {
691 Self::default()
692 }
693
694 pub fn add_catalog(&mut self, meta: CatalogMeta) {
695 self.dropped_catalogs.remove(&meta.name);
696 self.added_catalogs.insert(meta.name.clone(), meta);
697 }
698
699 pub fn drop_catalog(&mut self, name: &str) {
700 self.added_catalogs.remove(name);
701 self.dropped_catalogs.insert(name.to_string());
702 }
703
704 pub fn add_namespace(&mut self, meta: NamespaceMeta) {
705 let key = (meta.catalog_name.clone(), meta.name.clone());
706 self.dropped_namespaces.remove(&key);
707 self.added_namespaces.insert(key, meta);
708 }
709
710 pub fn drop_namespace(&mut self, catalog_name: &str, namespace_name: &str) {
711 let key = (catalog_name.to_string(), namespace_name.to_string());
712 self.added_namespaces.remove(&key);
713 self.dropped_namespaces.insert(key);
714 }
715
716 pub fn add_table(&mut self, fqn: TableFqn, table: TableMetadata) {
717 self.dropped_tables.remove(&fqn);
718 self.added_tables.insert(fqn, table);
719 }
720
721 pub fn drop_table(&mut self, fqn: &TableFqn) {
722 self.added_tables.remove(fqn);
723 self.dropped_tables.insert(fqn.clone());
724 self.added_indexes.retain(|key, _| {
725 key.catalog != fqn.catalog || key.namespace != fqn.namespace || key.table != fqn.table
726 });
727 }
728
729 pub fn add_index(&mut self, fqn: IndexFqn, index: IndexMetadata) {
730 self.dropped_indexes.remove(&fqn);
731 self.added_indexes.insert(fqn, index);
732 }
733
734 pub fn drop_index(&mut self, fqn: &IndexFqn) {
735 self.added_indexes.remove(fqn);
736 self.dropped_indexes.insert(fqn.clone());
737 }
738
739 pub fn drop_cascade_catalog(&mut self, catalog: &str) {
740 self.drop_catalog(catalog);
741
742 let namespace_keys: Vec<(String, String)> = self
743 .added_namespaces
744 .keys()
745 .filter(|(cat, _)| cat == catalog)
746 .cloned()
747 .collect();
748 for (catalog_name, namespace_name) in namespace_keys {
749 self.drop_namespace(&catalog_name, &namespace_name);
750 }
751
752 let index_keys: Vec<IndexFqn> = self
753 .added_indexes
754 .keys()
755 .filter(|fqn| fqn.catalog == catalog)
756 .cloned()
757 .collect();
758 for fqn in index_keys {
759 self.drop_index(&fqn);
760 }
761
762 let table_keys: Vec<TableFqn> = self
763 .added_tables
764 .keys()
765 .filter(|fqn| fqn.catalog == catalog)
766 .cloned()
767 .collect();
768 for fqn in table_keys {
769 self.drop_table(&fqn);
770 }
771 }
772
773 pub fn drop_cascade_namespace(&mut self, catalog: &str, namespace: &str) {
774 self.drop_namespace(catalog, namespace);
775
776 let index_keys: Vec<IndexFqn> = self
777 .added_indexes
778 .keys()
779 .filter(|fqn| fqn.catalog == catalog && fqn.namespace == namespace)
780 .cloned()
781 .collect();
782 for fqn in index_keys {
783 self.drop_index(&fqn);
784 }
785
786 let table_keys: Vec<TableFqn> = self
787 .added_tables
788 .keys()
789 .filter(|fqn| fqn.catalog == catalog && fqn.namespace == namespace)
790 .cloned()
791 .collect();
792 for fqn in table_keys {
793 self.drop_table(&fqn);
794 }
795 }
796}
797
798pub struct TxnCatalogView<'a, S: KVStore> {
803 catalog: &'a PersistentCatalog<S>,
804 overlay: &'a CatalogOverlay,
805}
806
807impl<'a, S: KVStore> TxnCatalogView<'a, S> {
808 pub fn new(catalog: &'a PersistentCatalog<S>, overlay: &'a CatalogOverlay) -> Self {
809 Self { catalog, overlay }
810 }
811}
812
813impl<'a, S: KVStore> Catalog for TxnCatalogView<'a, S> {
814 fn create_table(&mut self, _table: TableMetadata) -> Result<(), PlannerError> {
815 unreachable!("TxnCatalogView は参照専用です")
816 }
817
818 fn get_table(&self, name: &str) -> Option<&TableMetadata> {
819 self.catalog.get_table_in_txn(name, self.overlay)
820 }
821
822 fn drop_table(&mut self, _name: &str) -> Result<(), PlannerError> {
823 unreachable!("TxnCatalogView は参照専用です")
824 }
825
826 fn create_index(&mut self, _index: IndexMetadata) -> Result<(), PlannerError> {
827 unreachable!("TxnCatalogView は参照専用です")
828 }
829
830 fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
831 self.catalog.get_index_in_txn(name, self.overlay)
832 }
833
834 fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
835 let Some(table_meta) = self.catalog.get_table_in_txn(table, self.overlay) else {
836 return Vec::new();
837 };
838
839 let mut indexes: Vec<&IndexMetadata> = self
840 .catalog
841 .inner
842 .get_indexes_for_table(table)
843 .into_iter()
844 .filter(|idx| {
845 idx.catalog_name == table_meta.catalog_name
846 && idx.namespace_name == table_meta.namespace_name
847 && !self.catalog.index_hidden_by_overlay(idx, self.overlay)
848 })
849 .collect();
850
851 for idx in self.overlay.added_indexes.values() {
852 if idx.table == table
853 && idx.catalog_name == table_meta.catalog_name
854 && idx.namespace_name == table_meta.namespace_name
855 && !self.catalog.index_hidden_by_overlay(idx, self.overlay)
856 {
857 indexes.push(idx);
858 }
859 }
860
861 indexes
862 }
863
864 fn drop_index(&mut self, _name: &str) -> Result<(), PlannerError> {
865 unreachable!("TxnCatalogView は参照専用です")
866 }
867
868 fn table_exists(&self, name: &str) -> bool {
869 self.catalog.table_exists_in_txn(name, self.overlay)
870 }
871
872 fn index_exists(&self, name: &str) -> bool {
873 self.catalog.index_exists_in_txn(name, self.overlay)
874 }
875
876 fn next_table_id(&mut self) -> u32 {
877 unreachable!("TxnCatalogView は参照専用です")
878 }
879
880 fn next_index_id(&mut self) -> u32 {
881 unreachable!("TxnCatalogView は参照専用です")
882 }
883
884 fn list_tables(&self) -> Vec<TableMetadata> {
885 let mut names = HashSet::new();
886 for name in self.catalog.inner.table_names() {
887 names.insert(name.to_string());
888 }
889 for fqn in self.overlay.added_tables.keys() {
890 names.insert(fqn.table.clone());
891 }
892
893 let mut tables = Vec::new();
894 for name in names {
895 if let Some(table) = self.catalog.get_table_in_txn(&name, self.overlay) {
896 tables.push(table.clone());
897 }
898 }
899 tables
900 }
901}
902
903#[derive(Debug)]
905pub struct PersistentCatalog<S: KVStore> {
920 inner: MemoryCatalog,
921 store: Arc<S>,
922 catalogs: HashMap<String, CatalogMeta>,
923 namespaces: HashMap<(String, String), NamespaceMeta>,
924}
925
926impl<S: KVStore> PersistentCatalog<S> {
927 pub fn load(store: Arc<S>) -> Result<Self, CatalogError> {
928 let mut txn = store.begin(TxnMode::ReadOnly)?;
929 let meta_key = META_KEY.to_vec();
930 let mut meta_state: Option<CatalogState> = None;
931
932 if let Some(meta_bytes) = txn.get(&meta_key)? {
933 let meta: CatalogState = bincode::deserialize(&meta_bytes)?;
934 if meta.version > CATALOG_VERSION {
935 return Err(CatalogError::InvalidKey(format!(
936 "unsupported catalog version: {}",
937 meta.version
938 )));
939 }
940 meta_state = Some(meta);
941 }
942
943 let mut needs_migration = meta_state
944 .as_ref()
945 .is_some_and(|meta| meta.version < CATALOG_VERSION);
946 if !needs_migration && meta_state.is_none() {
947 for (key, _) in txn.scan_prefix(TABLES_PREFIX)? {
948 let suffix = key_suffix(TABLES_PREFIX, &key)?;
949 if !suffix.contains('/') {
950 needs_migration = true;
951 break;
952 }
953 }
954 if !needs_migration {
955 for (key, _) in txn.scan_prefix(INDEXES_PREFIX)? {
956 let suffix = key_suffix(INDEXES_PREFIX, &key)?;
957 if !suffix.contains('/') {
958 needs_migration = true;
959 break;
960 }
961 }
962 }
963 }
964
965 if needs_migration {
966 txn.rollback_self()?;
967 Self::migrate_v1_to_v2(&store)?;
968 return Self::load(store);
969 }
970
971 let mut inner = MemoryCatalog::new();
972 let mut catalogs = HashMap::new();
973 let mut namespaces = HashMap::new();
974
975 let mut max_table_id = 0u32;
976 let mut max_index_id = 0u32;
977
978 for (key, value) in txn.scan_prefix(CATALOGS_PREFIX)? {
979 let catalog_name = key_suffix(CATALOGS_PREFIX, &key)?;
980 let mut meta: CatalogMeta = bincode::deserialize(&value)?;
981 if meta.name != catalog_name {
982 meta.name = catalog_name.clone();
983 }
984 catalogs.insert(catalog_name, meta);
985 }
986
987 for (key, value) in txn.scan_prefix(NAMESPACES_PREFIX)? {
988 let suffix = key_suffix(NAMESPACES_PREFIX, &key)?;
989 let mut parts = suffix.splitn(2, '/');
990 let catalog_name = parts
991 .next()
992 .filter(|part| !part.is_empty())
993 .ok_or_else(|| CatalogError::InvalidKey(suffix.clone()))?;
994 let namespace_name = parts
995 .next()
996 .filter(|part| !part.is_empty())
997 .ok_or_else(|| CatalogError::InvalidKey(suffix.clone()))?;
998 let mut meta: NamespaceMeta = bincode::deserialize(&value)?;
999 if meta.catalog_name != catalog_name {
1000 meta.catalog_name = catalog_name.to_string();
1001 }
1002 if meta.name != namespace_name {
1003 meta.name = namespace_name.to_string();
1004 }
1005 namespaces.insert((meta.catalog_name.clone(), meta.name.clone()), meta);
1006 }
1007
1008 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1010 let suffix = key_suffix(TABLES_PREFIX, &key)?;
1011 let fqn = parse_table_key_suffix(&suffix)?;
1012 let mut persisted = deserialize_table_meta(&value)?;
1013 if persisted.catalog_name != fqn.catalog {
1014 persisted.catalog_name = fqn.catalog.clone();
1015 }
1016 if persisted.namespace_name != fqn.namespace {
1017 persisted.namespace_name = fqn.namespace.clone();
1018 }
1019 if persisted.name != fqn.table {
1020 persisted.name = fqn.table.clone();
1021 }
1022 max_table_id = max_table_id.max(persisted.table_id);
1023 let table: TableMetadata = persisted.into();
1024 inner.insert_table_unchecked(table);
1025 }
1026
1027 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1028 let suffix = key_suffix(INDEXES_PREFIX, &key)?;
1029 let fqn = parse_index_key_suffix(&suffix)?;
1030 let mut persisted = deserialize_index_meta(&value)?;
1031 if persisted.catalog_name != fqn.catalog {
1032 persisted.catalog_name = fqn.catalog.clone();
1033 }
1034 if persisted.namespace_name != fqn.namespace {
1035 persisted.namespace_name = fqn.namespace.clone();
1036 }
1037 if persisted.table != fqn.table {
1038 persisted.table = fqn.table.clone();
1039 }
1040 if persisted.name != fqn.index {
1041 persisted.name = fqn.index.clone();
1042 }
1043 max_index_id = max_index_id.max(persisted.index_id);
1044 let mut index: IndexMetadata = persisted.into();
1045 if let Some(table) = inner.get_table(&index.table) {
1047 if index.catalog_name != table.catalog_name
1048 || index.namespace_name != table.namespace_name
1049 {
1050 index.catalog_name = table.catalog_name.clone();
1051 index.namespace_name = table.namespace_name.clone();
1052 }
1053 inner.insert_index_unchecked(index);
1054 }
1055 }
1056
1057 let (mut table_id_counter, mut index_id_counter) = (max_table_id, max_index_id);
1058 if let Some(meta) = meta_state
1059 .as_ref()
1060 .filter(|meta| meta.version == CATALOG_VERSION)
1061 {
1062 table_id_counter = table_id_counter.max(meta.table_id_counter);
1063 index_id_counter = index_id_counter.max(meta.index_id_counter);
1064 }
1065 inner.set_counters(table_id_counter, index_id_counter);
1066
1067 txn.rollback_self()?;
1068
1069 Ok(Self {
1070 inner,
1071 store,
1072 catalogs,
1073 namespaces,
1074 })
1075 }
1076
1077 fn migrate_v1_to_v2(store: &Arc<S>) -> Result<(), CatalogError> {
1078 let mut txn = store.begin(TxnMode::ReadWrite)?;
1079
1080 if txn.get(&catalog_key("default"))?.is_none() {
1081 let meta = CatalogMeta {
1082 name: "default".to_string(),
1083 comment: None,
1084 storage_root: None,
1085 };
1086 let value = bincode::serialize(&meta)?;
1087 txn.put(catalog_key("default"), value)?;
1088 }
1089
1090 if txn.get(&namespace_key("default", "default"))?.is_none() {
1091 let meta = NamespaceMeta {
1092 name: "default".to_string(),
1093 catalog_name: "default".to_string(),
1094 comment: None,
1095 storage_root: None,
1096 };
1097 let value = bincode::serialize(&meta)?;
1098 txn.put(namespace_key("default", "default"), value)?;
1099 }
1100
1101 let mut table_updates = Vec::new();
1102 let mut table_keys_to_delete = Vec::new();
1103 let mut max_table_id = 0u32;
1104 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1105 let suffix = key_suffix(TABLES_PREFIX, &key)?;
1106 if suffix.contains('/') {
1107 continue;
1108 }
1109 let mut persisted = deserialize_table_meta(&value)?;
1110 if persisted.catalog_name.is_empty() {
1111 persisted.catalog_name = "default".to_string();
1112 }
1113 if persisted.namespace_name.is_empty() {
1114 persisted.namespace_name = "default".to_string();
1115 }
1116 persisted.table_type = TableType::Managed;
1117 persisted.data_source_format = DataSourceFormat::Alopex;
1118 max_table_id = max_table_id.max(persisted.table_id);
1119
1120 let new_key = table_key(
1121 &persisted.catalog_name,
1122 &persisted.namespace_name,
1123 &persisted.name,
1124 );
1125 let bytes = bincode::serialize(&persisted)?;
1126 table_updates.push((new_key, bytes));
1127 table_keys_to_delete.push(key);
1128 }
1129
1130 for (key, value) in table_updates {
1131 txn.put(key, value)?;
1132 }
1133 for key in table_keys_to_delete {
1134 txn.delete(key)?;
1135 }
1136
1137 let mut index_updates = Vec::new();
1138 let mut index_keys_to_delete = Vec::new();
1139 let mut max_index_id = 0u32;
1140 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1141 let suffix = key_suffix(INDEXES_PREFIX, &key)?;
1142 if suffix.contains('/') {
1143 continue;
1144 }
1145 let mut persisted = deserialize_index_meta(&value)?;
1146 if persisted.catalog_name.is_empty() {
1147 persisted.catalog_name = "default".to_string();
1148 }
1149 if persisted.namespace_name.is_empty() {
1150 persisted.namespace_name = "default".to_string();
1151 }
1152 max_index_id = max_index_id.max(persisted.index_id);
1153
1154 let new_key = index_key(
1155 &persisted.catalog_name,
1156 &persisted.namespace_name,
1157 &persisted.table,
1158 &persisted.name,
1159 );
1160 let bytes = bincode::serialize(&persisted)?;
1161 index_updates.push((new_key, bytes));
1162 index_keys_to_delete.push(key);
1163 }
1164
1165 for (key, value) in index_updates {
1166 txn.put(key, value)?;
1167 }
1168 for key in index_keys_to_delete {
1169 txn.delete(key)?;
1170 }
1171
1172 let mut table_id_counter = max_table_id;
1173 let mut index_id_counter = max_index_id;
1174 if let Some(meta_bytes) = txn.get(&META_KEY.to_vec())? {
1175 let meta: CatalogState = bincode::deserialize(&meta_bytes)?;
1176 table_id_counter = table_id_counter.max(meta.table_id_counter);
1177 index_id_counter = index_id_counter.max(meta.index_id_counter);
1178 }
1179 let meta = CatalogState {
1180 version: CATALOG_VERSION,
1181 table_id_counter,
1182 index_id_counter,
1183 };
1184 let meta_bytes = bincode::serialize(&meta)?;
1185 txn.put(META_KEY.to_vec(), meta_bytes)?;
1186 txn.commit_self()?;
1187
1188 Ok(())
1189 }
1190
1191 pub fn new(store: Arc<S>) -> Self {
1192 Self {
1193 inner: MemoryCatalog::new(),
1194 store,
1195 catalogs: HashMap::new(),
1196 namespaces: HashMap::new(),
1197 }
1198 }
1199
1200 pub fn store(&self) -> &Arc<S> {
1201 &self.store
1202 }
1203
1204 pub fn list_catalogs(&self) -> Vec<CatalogMeta> {
1205 let mut catalogs: Vec<CatalogMeta> = self.catalogs.values().cloned().collect();
1206 catalogs.sort_by(|a, b| a.name.cmp(&b.name));
1207 catalogs
1208 }
1209
1210 pub fn get_catalog(&self, name: &str) -> Option<CatalogMeta> {
1211 self.catalogs.get(name).cloned()
1212 }
1213
1214 pub fn create_catalog(&mut self, meta: CatalogMeta) -> Result<(), CatalogError> {
1215 let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1216 let value = bincode::serialize(&meta)?;
1217 txn.put(catalog_key(&meta.name), value)?;
1218 txn.commit_self()?;
1219 self.catalogs.insert(meta.name.clone(), meta);
1220 Ok(())
1221 }
1222
1223 pub fn delete_catalog(&mut self, name: &str) -> Result<(), CatalogError> {
1224 let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1225 txn.delete(catalog_key(name))?;
1226 let mut namespace_prefix = NAMESPACES_PREFIX.to_vec();
1227 namespace_prefix.extend_from_slice(name.as_bytes());
1228 namespace_prefix.push(b'/');
1229 let mut namespace_keys = Vec::new();
1230 for (key, _) in txn.scan_prefix(&namespace_prefix)? {
1231 namespace_keys.push(key);
1232 }
1233 for key in namespace_keys {
1234 txn.delete(key)?;
1235 }
1236 let mut table_keys = Vec::new();
1237 let mut table_fqns = Vec::new();
1238 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1239 let persisted = deserialize_table_meta(&value)?;
1240 if persisted.catalog_name == name {
1241 table_fqns.push(TableFqn::new(
1242 &persisted.catalog_name,
1243 &persisted.namespace_name,
1244 &persisted.name,
1245 ));
1246 table_keys.push(key);
1247 }
1248 }
1249 let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1250 for key in table_keys {
1251 txn.delete(key)?;
1252 }
1253 if !table_set.is_empty() {
1254 let mut index_keys = Vec::new();
1255 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1256 let persisted = deserialize_index_meta(&value)?;
1257 let fqn = TableFqn::new(
1258 &persisted.catalog_name,
1259 &persisted.namespace_name,
1260 &persisted.table,
1261 );
1262 if table_set.contains(&fqn) {
1263 index_keys.push(key);
1264 }
1265 }
1266 for key in index_keys {
1267 txn.delete(key)?;
1268 }
1269 }
1270 txn.commit_self()?;
1271 self.catalogs.remove(name);
1272 self.namespaces.retain(|(catalog, _), _| catalog != name);
1273 for fqn in table_fqns {
1274 self.inner.remove_table_unchecked(&fqn.table);
1275 }
1276 Ok(())
1277 }
1278
1279 pub fn list_namespaces(&self, catalog_name: &str) -> Vec<NamespaceMeta> {
1280 let mut namespaces: Vec<NamespaceMeta> = self
1281 .namespaces
1282 .values()
1283 .filter(|meta| meta.catalog_name == catalog_name)
1284 .cloned()
1285 .collect();
1286 namespaces.sort_by(|a, b| a.name.cmp(&b.name));
1287 namespaces
1288 }
1289
1290 pub fn get_namespace(&self, catalog_name: &str, namespace_name: &str) -> Option<NamespaceMeta> {
1291 self.namespaces
1292 .get(&(catalog_name.to_string(), namespace_name.to_string()))
1293 .cloned()
1294 }
1295
1296 pub fn create_namespace(&mut self, meta: NamespaceMeta) -> Result<(), CatalogError> {
1297 if !self.catalogs.contains_key(&meta.catalog_name) {
1298 return Err(CatalogError::InvalidKey(format!(
1299 "catalog not found: {}",
1300 meta.catalog_name
1301 )));
1302 }
1303
1304 let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1305 let value = bincode::serialize(&meta)?;
1306 txn.put(namespace_key(&meta.catalog_name, &meta.name), value)?;
1307 txn.commit_self()?;
1308 self.namespaces
1309 .insert((meta.catalog_name.clone(), meta.name.clone()), meta);
1310 Ok(())
1311 }
1312
1313 pub fn delete_namespace(
1314 &mut self,
1315 catalog_name: &str,
1316 namespace_name: &str,
1317 ) -> Result<(), CatalogError> {
1318 if !self.catalogs.contains_key(catalog_name) {
1319 return Err(CatalogError::InvalidKey(format!(
1320 "catalog not found: {}",
1321 catalog_name
1322 )));
1323 }
1324
1325 let mut txn = self.store.begin(TxnMode::ReadWrite)?;
1326 txn.delete(namespace_key(catalog_name, namespace_name))?;
1327 txn.commit_self()?;
1328 self.namespaces
1329 .remove(&(catalog_name.to_string(), namespace_name.to_string()));
1330 Ok(())
1331 }
1332
1333 fn persist_create_catalog(
1334 &mut self,
1335 txn: &mut S::Transaction<'_>,
1336 meta: &CatalogMeta,
1337 ) -> Result<(), CatalogError> {
1338 let value = bincode::serialize(meta)?;
1339 txn.put(catalog_key(&meta.name), value)?;
1340 Ok(())
1341 }
1342
1343 fn persist_drop_catalog(
1344 &mut self,
1345 txn: &mut S::Transaction<'_>,
1346 name: &str,
1347 ) -> Result<(), CatalogError> {
1348 txn.delete(catalog_key(name))?;
1349
1350 let mut namespace_prefix = NAMESPACES_PREFIX.to_vec();
1351 namespace_prefix.extend_from_slice(name.as_bytes());
1352 namespace_prefix.push(b'/');
1353 let mut namespace_keys = Vec::new();
1354 for (key, _) in txn.scan_prefix(&namespace_prefix)? {
1355 namespace_keys.push(key);
1356 }
1357 for key in namespace_keys {
1358 txn.delete(key)?;
1359 }
1360
1361 let mut table_keys = Vec::new();
1362 let mut table_fqns = Vec::new();
1363 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1364 let persisted = deserialize_table_meta(&value)?;
1365 if persisted.catalog_name == name {
1366 table_fqns.push(TableFqn::new(
1367 &persisted.catalog_name,
1368 &persisted.namespace_name,
1369 &persisted.name,
1370 ));
1371 table_keys.push(key);
1372 }
1373 }
1374 let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1375 for key in table_keys {
1376 txn.delete(key)?;
1377 }
1378 if !table_set.is_empty() {
1379 let mut index_keys = Vec::new();
1380 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1381 let persisted = deserialize_index_meta(&value)?;
1382 let fqn = TableFqn::new(
1383 &persisted.catalog_name,
1384 &persisted.namespace_name,
1385 &persisted.table,
1386 );
1387 if table_set.contains(&fqn) {
1388 index_keys.push(key);
1389 }
1390 }
1391 for key in index_keys {
1392 txn.delete(key)?;
1393 }
1394 }
1395 Ok(())
1396 }
1397
1398 fn persist_create_namespace(
1399 &mut self,
1400 txn: &mut S::Transaction<'_>,
1401 meta: &NamespaceMeta,
1402 ) -> Result<(), CatalogError> {
1403 let value = bincode::serialize(meta)?;
1404 txn.put(namespace_key(&meta.catalog_name, &meta.name), value)?;
1405 Ok(())
1406 }
1407
1408 fn persist_drop_namespace(
1409 &mut self,
1410 txn: &mut S::Transaction<'_>,
1411 catalog_name: &str,
1412 namespace_name: &str,
1413 ) -> Result<(), CatalogError> {
1414 txn.delete(namespace_key(catalog_name, namespace_name))?;
1415
1416 let mut table_keys = Vec::new();
1417 let mut table_fqns = Vec::new();
1418 for (key, value) in txn.scan_prefix(TABLES_PREFIX)? {
1419 let persisted = deserialize_table_meta(&value)?;
1420 if persisted.catalog_name == catalog_name && persisted.namespace_name == namespace_name
1421 {
1422 table_fqns.push(TableFqn::new(
1423 &persisted.catalog_name,
1424 &persisted.namespace_name,
1425 &persisted.name,
1426 ));
1427 table_keys.push(key);
1428 }
1429 }
1430 let table_set: HashSet<TableFqn> = table_fqns.iter().cloned().collect();
1431 for key in table_keys {
1432 txn.delete(key)?;
1433 }
1434 if !table_set.is_empty() {
1435 let mut index_keys = Vec::new();
1436 for (key, value) in txn.scan_prefix(INDEXES_PREFIX)? {
1437 let persisted = deserialize_index_meta(&value)?;
1438 let fqn = TableFqn::new(
1439 &persisted.catalog_name,
1440 &persisted.namespace_name,
1441 &persisted.table,
1442 );
1443 if table_set.contains(&fqn) {
1444 index_keys.push(key);
1445 }
1446 }
1447 for key in index_keys {
1448 txn.delete(key)?;
1449 }
1450 }
1451 Ok(())
1452 }
1453
1454 fn write_meta(&self, txn: &mut S::Transaction<'_>) -> Result<(), CatalogError> {
1455 let (table_id_counter, index_id_counter) = self.inner.counters();
1456 let meta = CatalogState {
1457 version: CATALOG_VERSION,
1458 table_id_counter,
1459 index_id_counter,
1460 };
1461 let meta_bytes = bincode::serialize(&meta)?;
1462 txn.put(META_KEY.to_vec(), meta_bytes)?;
1463 Ok(())
1464 }
1465
1466 pub fn persist_create_table(
1467 &mut self,
1468 txn: &mut S::Transaction<'_>,
1469 table: &TableMetadata,
1470 ) -> Result<(), CatalogError> {
1471 let persisted = PersistedTableMeta::from(table);
1472 let value = bincode::serialize(&persisted)?;
1473 txn.put(
1474 table_key(&table.catalog_name, &table.namespace_name, &table.name),
1475 value,
1476 )?;
1477 self.write_meta(txn)?;
1478 Ok(())
1479 }
1480
1481 pub fn persist_drop_table(
1482 &mut self,
1483 txn: &mut S::Transaction<'_>,
1484 fqn: &TableFqn,
1485 ) -> Result<(), CatalogError> {
1486 txn.delete(table_key(&fqn.catalog, &fqn.namespace, &fqn.table))?;
1487
1488 let mut to_delete: Vec<String> = Vec::new();
1490 let prefix = index_prefix(&fqn.catalog, &fqn.namespace, &fqn.table);
1491 for (key, _) in txn.scan_prefix(&prefix)? {
1492 let index_name = key_suffix(&prefix, &key)?;
1493 to_delete.push(index_name);
1494 }
1495 for index_name in to_delete {
1496 txn.delete(index_key(
1497 &fqn.catalog,
1498 &fqn.namespace,
1499 &fqn.table,
1500 &index_name,
1501 ))?;
1502 }
1503
1504 Ok(())
1505 }
1506
1507 pub fn persist_create_index(
1508 &mut self,
1509 txn: &mut S::Transaction<'_>,
1510 index: &IndexMetadata,
1511 ) -> Result<(), CatalogError> {
1512 let persisted = PersistedIndexMeta::from(index);
1513 let value = bincode::serialize(&persisted)?;
1514 txn.put(
1515 index_key(
1516 &index.catalog_name,
1517 &index.namespace_name,
1518 &index.table,
1519 &index.name,
1520 ),
1521 value,
1522 )?;
1523 self.write_meta(txn)?;
1524 Ok(())
1525 }
1526
1527 pub fn persist_drop_index(
1528 &mut self,
1529 txn: &mut S::Transaction<'_>,
1530 fqn: &IndexFqn,
1531 ) -> Result<(), CatalogError> {
1532 txn.delete(index_key(
1533 &fqn.catalog,
1534 &fqn.namespace,
1535 &fqn.table,
1536 &fqn.index,
1537 ))?;
1538 Ok(())
1539 }
1540
1541 pub fn persist_overlay(
1542 &mut self,
1543 txn: &mut S::Transaction<'_>,
1544 overlay: &CatalogOverlay,
1545 ) -> Result<(), CatalogError> {
1546 self.ensure_overlay_name_uniqueness(overlay)?;
1547
1548 for catalog in overlay.dropped_catalogs.iter() {
1549 self.persist_drop_catalog(txn, catalog)?;
1550 }
1551
1552 for (catalog, namespace) in overlay.dropped_namespaces.iter() {
1553 if overlay.dropped_catalogs.contains(catalog) {
1554 continue;
1555 }
1556 self.persist_drop_namespace(txn, catalog, namespace)?;
1557 }
1558
1559 for fqn in overlay.dropped_tables.iter() {
1560 if overlay.dropped_catalogs.contains(&fqn.catalog)
1561 || overlay
1562 .dropped_namespaces
1563 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1564 {
1565 continue;
1566 }
1567 self.persist_drop_table(txn, fqn)?;
1568 }
1569
1570 for fqn in overlay.dropped_indexes.iter() {
1571 if overlay.dropped_catalogs.contains(&fqn.catalog)
1572 || overlay
1573 .dropped_namespaces
1574 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1575 {
1576 continue;
1577 }
1578 self.persist_drop_index(txn, fqn)?;
1579 }
1580
1581 for meta in overlay.added_catalogs.values() {
1582 if overlay.dropped_catalogs.contains(&meta.name) {
1583 continue;
1584 }
1585 self.persist_create_catalog(txn, meta)?;
1586 }
1587
1588 for meta in overlay.added_namespaces.values() {
1589 if overlay.dropped_catalogs.contains(&meta.catalog_name)
1590 || overlay
1591 .dropped_namespaces
1592 .contains(&(meta.catalog_name.clone(), meta.name.clone()))
1593 {
1594 continue;
1595 }
1596 self.persist_create_namespace(txn, meta)?;
1597 }
1598
1599 for (fqn, table) in overlay.added_tables.iter() {
1600 if overlay.dropped_catalogs.contains(&fqn.catalog)
1601 || overlay
1602 .dropped_namespaces
1603 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1604 || overlay.dropped_tables.contains(fqn)
1605 {
1606 continue;
1607 }
1608 self.persist_create_table(txn, table)?;
1609 }
1610
1611 for (fqn, index) in overlay.added_indexes.iter() {
1612 if overlay.dropped_catalogs.contains(&fqn.catalog)
1613 || overlay
1614 .dropped_namespaces
1615 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1616 || overlay.dropped_indexes.contains(fqn)
1617 {
1618 continue;
1619 }
1620 self.persist_create_index(txn, index)?;
1621 }
1622
1623 Ok(())
1624 }
1625
1626 fn ensure_overlay_name_uniqueness(&self, overlay: &CatalogOverlay) -> Result<(), CatalogError> {
1627 let mut table_names: HashMap<String, TableFqn> = HashMap::new();
1628 for name in self.inner.table_names() {
1629 let Some(table) = self.inner.get_table(name) else {
1630 continue;
1631 };
1632 let fqn = TableFqn::from(table);
1633 if overlay.dropped_catalogs.contains(&fqn.catalog)
1634 || overlay
1635 .dropped_namespaces
1636 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1637 || overlay.dropped_tables.contains(&fqn)
1638 {
1639 continue;
1640 }
1641 table_names.insert(table.name.clone(), fqn);
1642 }
1643
1644 for (fqn, table) in overlay.added_tables.iter() {
1645 if overlay.dropped_catalogs.contains(&fqn.catalog)
1646 || overlay
1647 .dropped_namespaces
1648 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1649 || overlay.dropped_tables.contains(fqn)
1650 {
1651 continue;
1652 }
1653 if let Some(existing) = table_names.get(&table.name)
1654 && existing != fqn
1655 {
1656 return Err(CatalogError::InvalidKey(format!(
1657 "table name '{}' conflicts across namespaces: {}.{} vs {}.{}",
1658 table.name, existing.catalog, existing.namespace, fqn.catalog, fqn.namespace
1659 )));
1660 }
1661 table_names.insert(table.name.clone(), fqn.clone());
1662 }
1663
1664 let mut index_names: HashMap<String, IndexFqn> = HashMap::new();
1665 for name in self.inner.index_names() {
1666 let Some(index) = self.inner.get_index(name) else {
1667 continue;
1668 };
1669 let fqn = IndexFqn::from(index);
1670 if overlay.dropped_catalogs.contains(&fqn.catalog)
1671 || overlay
1672 .dropped_namespaces
1673 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1674 || overlay.dropped_indexes.contains(&fqn)
1675 || overlay.dropped_tables.contains(&TableFqn::new(
1676 &fqn.catalog,
1677 &fqn.namespace,
1678 &fqn.table,
1679 ))
1680 {
1681 continue;
1682 }
1683 index_names.insert(index.name.clone(), fqn);
1684 }
1685
1686 for (fqn, index) in overlay.added_indexes.iter() {
1687 if overlay.dropped_catalogs.contains(&fqn.catalog)
1688 || overlay
1689 .dropped_namespaces
1690 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
1691 || overlay.dropped_indexes.contains(fqn)
1692 || overlay.dropped_tables.contains(&TableFqn::new(
1693 &fqn.catalog,
1694 &fqn.namespace,
1695 &fqn.table,
1696 ))
1697 {
1698 continue;
1699 }
1700 if let Some(existing) = index_names.get(&index.name)
1701 && existing != fqn
1702 {
1703 return Err(CatalogError::InvalidKey(format!(
1704 "index name '{}' conflicts across namespaces: {}.{} vs {}.{}",
1705 index.name, existing.catalog, existing.namespace, fqn.catalog, fqn.namespace
1706 )));
1707 }
1708 index_names.insert(index.name.clone(), fqn.clone());
1709 }
1710
1711 Ok(())
1712 }
1713
1714 fn namespace_dropped(overlay: &CatalogOverlay, catalog: &str, namespace: &str) -> bool {
1715 overlay.dropped_catalogs.contains(catalog)
1716 || overlay
1717 .dropped_namespaces
1718 .contains(&(catalog.to_string(), namespace.to_string()))
1719 }
1720
1721 fn overlay_added_table_by_name<'a>(
1722 overlay: &'a CatalogOverlay,
1723 name: &str,
1724 ) -> Option<&'a TableMetadata> {
1725 let mut iter = overlay
1726 .added_tables
1727 .values()
1728 .filter(|table| table.name == name);
1729 let first = iter.next()?;
1730 if iter.next().is_some() {
1731 return None;
1732 }
1733 Some(first)
1734 }
1735
1736 fn overlay_added_index_by_name<'a>(
1737 overlay: &'a CatalogOverlay,
1738 name: &str,
1739 ) -> Option<&'a IndexMetadata> {
1740 let mut iter = overlay
1741 .added_indexes
1742 .values()
1743 .filter(|index| index.name == name);
1744 let first = iter.next()?;
1745 if iter.next().is_some() {
1746 return None;
1747 }
1748 Some(first)
1749 }
1750
1751 fn base_table_conflicts_with_overlay(
1752 &self,
1753 overlay: &CatalogOverlay,
1754 table: &TableMetadata,
1755 ) -> bool {
1756 let Some(base) = self.inner.get_table(&table.name) else {
1757 return false;
1758 };
1759 if self.table_hidden_by_overlay(base, overlay) {
1760 return false;
1761 }
1762 if overlay.dropped_tables.contains(&TableFqn::from(base)) {
1763 return false;
1764 }
1765 TableFqn::from(base) != TableFqn::from(table)
1766 }
1767
1768 fn base_index_conflicts_with_overlay(
1769 &self,
1770 overlay: &CatalogOverlay,
1771 index: &IndexMetadata,
1772 ) -> bool {
1773 let Some(base) = self.inner.get_index(&index.name) else {
1774 return false;
1775 };
1776 if Self::namespace_dropped(overlay, &base.catalog_name, &base.namespace_name) {
1777 return false;
1778 }
1779 if overlay.dropped_indexes.contains(&IndexFqn::from(base)) {
1780 return false;
1781 }
1782 if self.dropped_table_matches_fqn(
1783 &base.table,
1784 &base.catalog_name,
1785 &base.namespace_name,
1786 overlay,
1787 ) {
1788 return false;
1789 }
1790 IndexFqn::from(base) != IndexFqn::from(index)
1791 }
1792
1793 fn table_hidden_by_overlay(&self, table: &TableMetadata, overlay: &CatalogOverlay) -> bool {
1794 Self::namespace_dropped(overlay, &table.catalog_name, &table.namespace_name)
1795 }
1796
1797 fn dropped_table_matches_fqn(
1798 &self,
1799 table_name: &str,
1800 catalog: &str,
1801 namespace: &str,
1802 overlay: &CatalogOverlay,
1803 ) -> bool {
1804 let fqn = TableFqn::new(catalog, namespace, table_name);
1805 overlay.dropped_tables.contains(&fqn)
1806 }
1807
1808 fn index_hidden_by_overlay(&self, index: &IndexMetadata, overlay: &CatalogOverlay) -> bool {
1809 let index_fqn = IndexFqn::from(index);
1810 if overlay.dropped_indexes.contains(&index_fqn) {
1811 return true;
1812 }
1813 if Self::namespace_dropped(overlay, &index.catalog_name, &index.namespace_name) {
1814 return true;
1815 }
1816 if self.dropped_table_matches_fqn(
1817 &index.table,
1818 &index.catalog_name,
1819 &index.namespace_name,
1820 overlay,
1821 ) {
1822 return true;
1823 }
1824 match self.get_table_in_txn(&index.table, overlay) {
1825 Some(table) => {
1826 table.catalog_name != index.catalog_name
1827 || table.namespace_name != index.namespace_name
1828 }
1829 None => true,
1830 }
1831 }
1832
1833 pub fn get_catalog_in_txn<'a>(
1834 &'a self,
1835 name: &str,
1836 overlay: &'a CatalogOverlay,
1837 ) -> Option<&'a CatalogMeta> {
1838 if overlay.dropped_catalogs.contains(name) {
1839 return None;
1840 }
1841 if let Some(catalog) = overlay.added_catalogs.get(name) {
1842 return Some(catalog);
1843 }
1844 self.catalogs.get(name)
1845 }
1846
1847 pub fn get_namespace_in_txn<'a>(
1848 &'a self,
1849 catalog_name: &str,
1850 namespace_name: &str,
1851 overlay: &'a CatalogOverlay,
1852 ) -> Option<&'a NamespaceMeta> {
1853 if overlay.dropped_catalogs.contains(catalog_name) {
1854 return None;
1855 }
1856 let key = (catalog_name.to_string(), namespace_name.to_string());
1857 if overlay.dropped_namespaces.contains(&key) {
1858 return None;
1859 }
1860 if let Some(namespace) = overlay.added_namespaces.get(&key) {
1861 return Some(namespace);
1862 }
1863 self.namespaces.get(&key)
1864 }
1865
1866 pub fn list_catalogs_in_txn(&self, overlay: &CatalogOverlay) -> Vec<CatalogMeta> {
1867 let mut catalogs: HashMap<String, CatalogMeta> = HashMap::new();
1868 for (name, meta) in &self.catalogs {
1869 if !overlay.dropped_catalogs.contains(name) {
1870 catalogs.insert(name.clone(), meta.clone());
1871 }
1872 }
1873 for (name, meta) in &overlay.added_catalogs {
1874 if !overlay.dropped_catalogs.contains(name) {
1875 catalogs.insert(name.clone(), meta.clone());
1876 }
1877 }
1878 let mut values: Vec<CatalogMeta> = catalogs.into_values().collect();
1879 values.sort_by(|a, b| a.name.cmp(&b.name));
1880 values
1881 }
1882
1883 pub fn list_namespaces_in_txn(
1884 &self,
1885 catalog_name: &str,
1886 overlay: &CatalogOverlay,
1887 ) -> Vec<NamespaceMeta> {
1888 if overlay.dropped_catalogs.contains(catalog_name) {
1889 return Vec::new();
1890 }
1891 let mut namespaces: HashMap<(String, String), NamespaceMeta> = HashMap::new();
1892 for ((catalog, namespace), meta) in &self.namespaces {
1893 if catalog != catalog_name {
1894 continue;
1895 }
1896 let key = (catalog.clone(), namespace.clone());
1897 if overlay.dropped_namespaces.contains(&key) {
1898 continue;
1899 }
1900 namespaces.insert(key, meta.clone());
1901 }
1902 for ((catalog, namespace), meta) in &overlay.added_namespaces {
1903 if catalog != catalog_name {
1904 continue;
1905 }
1906 let key = (catalog.clone(), namespace.clone());
1907 if overlay.dropped_namespaces.contains(&key) {
1908 continue;
1909 }
1910 namespaces.insert(key, meta.clone());
1911 }
1912 let mut values: Vec<NamespaceMeta> = namespaces.into_values().collect();
1913 values.sort_by(|a, b| a.name.cmp(&b.name));
1914 values
1915 }
1916
1917 pub fn table_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
1918 if let Some(table) = Self::overlay_added_table_by_name(overlay, name) {
1919 if self.table_hidden_by_overlay(table, overlay) {
1920 return false;
1921 }
1922 if self.base_table_conflicts_with_overlay(overlay, table) {
1923 return false;
1924 }
1925 return true;
1926 }
1927 match self.inner.get_table(name) {
1928 Some(table) => {
1929 !self.table_hidden_by_overlay(table, overlay)
1930 && !overlay.dropped_tables.contains(&TableFqn::from(table))
1931 }
1932 None => false,
1933 }
1934 }
1935
1936 pub fn get_table_in_txn<'a>(
1937 &'a self,
1938 name: &str,
1939 overlay: &'a CatalogOverlay,
1940 ) -> Option<&'a TableMetadata> {
1941 if let Some(table) = Self::overlay_added_table_by_name(overlay, name) {
1942 if self.table_hidden_by_overlay(table, overlay) {
1943 return None;
1944 }
1945 if self.base_table_conflicts_with_overlay(overlay, table) {
1946 return None;
1947 }
1948 return Some(table);
1949 }
1950 self.inner.get_table(name).filter(|table| {
1951 !self.table_hidden_by_overlay(table, overlay)
1952 && !overlay.dropped_tables.contains(&TableFqn::from(*table))
1953 })
1954 }
1955
1956 pub fn index_exists_in_txn(&self, name: &str, overlay: &CatalogOverlay) -> bool {
1957 if let Some(index) = Self::overlay_added_index_by_name(overlay, name) {
1958 if self.index_hidden_by_overlay(index, overlay) {
1959 return false;
1960 }
1961 if self.base_index_conflicts_with_overlay(overlay, index) {
1962 return false;
1963 }
1964 return true;
1965 }
1966 match self.inner.get_index(name) {
1967 Some(index) => !self.index_hidden_by_overlay(index, overlay),
1968 None => false,
1969 }
1970 }
1971
1972 pub fn get_index_in_txn<'a>(
1973 &'a self,
1974 name: &str,
1975 overlay: &'a CatalogOverlay,
1976 ) -> Option<&'a IndexMetadata> {
1977 if let Some(index) = Self::overlay_added_index_by_name(overlay, name) {
1978 if self.index_hidden_by_overlay(index, overlay) {
1979 return None;
1980 }
1981 if self.base_index_conflicts_with_overlay(overlay, index) {
1982 return None;
1983 }
1984 return Some(index);
1985 }
1986 match self.inner.get_index(name) {
1987 Some(index) if self.index_hidden_by_overlay(index, overlay) => None,
1988 other => other,
1989 }
1990 }
1991
1992 pub fn list_tables_in_txn(
1993 &self,
1994 catalog_name: &str,
1995 namespace_name: &str,
1996 overlay: &CatalogOverlay,
1997 ) -> Vec<TableMetadata> {
1998 if overlay.dropped_catalogs.contains(catalog_name) {
1999 return Vec::new();
2000 }
2001 if overlay
2002 .dropped_namespaces
2003 .contains(&(catalog_name.to_string(), namespace_name.to_string()))
2004 {
2005 return Vec::new();
2006 }
2007 let mut tables: HashMap<TableFqn, TableMetadata> = HashMap::new();
2008 for name in self.inner.table_names() {
2009 if let Some(table) = self.inner.get_table(name)
2010 && table.catalog_name == catalog_name
2011 && table.namespace_name == namespace_name
2012 {
2013 let fqn = TableFqn::from(table);
2014 if !overlay.dropped_tables.contains(&fqn) {
2015 tables.insert(fqn, table.clone());
2016 }
2017 }
2018 }
2019 for table in overlay.added_tables.values() {
2020 if table.catalog_name == catalog_name && table.namespace_name == namespace_name {
2021 tables.insert(TableFqn::from(table), table.clone());
2022 }
2023 }
2024 let mut values: Vec<TableMetadata> = tables.into_values().collect();
2025 values.sort_by(|a, b| a.name.cmp(&b.name));
2026 values
2027 }
2028
2029 pub fn list_indexes_in_txn(
2030 &self,
2031 fqn: &TableFqn,
2032 overlay: &CatalogOverlay,
2033 ) -> Vec<IndexMetadata> {
2034 if overlay.dropped_catalogs.contains(&fqn.catalog) {
2035 return Vec::new();
2036 }
2037 if overlay
2038 .dropped_namespaces
2039 .contains(&(fqn.catalog.clone(), fqn.namespace.clone()))
2040 {
2041 return Vec::new();
2042 }
2043 if overlay
2044 .dropped_tables
2045 .contains(&TableFqn::new(&fqn.catalog, &fqn.namespace, &fqn.table))
2046 {
2047 return Vec::new();
2048 }
2049
2050 let mut indexes: HashMap<IndexFqn, IndexMetadata> = HashMap::new();
2051 for index in self.inner.get_indexes_for_table(&fqn.table) {
2052 if index.catalog_name == fqn.catalog && index.namespace_name == fqn.namespace {
2053 let index_fqn = IndexFqn::from(index);
2054 if !overlay.dropped_indexes.contains(&index_fqn) {
2055 indexes.insert(index_fqn, index.clone());
2056 }
2057 }
2058 }
2059 for index in overlay.added_indexes.values() {
2060 if index.table == fqn.table
2061 && index.catalog_name == fqn.catalog
2062 && index.namespace_name == fqn.namespace
2063 {
2064 indexes.insert(IndexFqn::from(index), index.clone());
2065 }
2066 }
2067 let mut values: Vec<IndexMetadata> = indexes.into_values().collect();
2068 values.sort_by(|a, b| a.name.cmp(&b.name));
2069 values
2070 }
2071
2072 pub fn apply_overlay(&mut self, overlay: CatalogOverlay) {
2073 let CatalogOverlay {
2074 added_catalogs,
2075 dropped_catalogs,
2076 added_namespaces,
2077 dropped_namespaces,
2078 added_tables,
2079 dropped_tables,
2080 added_indexes,
2081 dropped_indexes,
2082 } = overlay;
2083
2084 for (name, meta) in added_catalogs {
2085 self.catalogs.insert(name, meta);
2086 }
2087 for (catalog_name, namespace_name) in dropped_namespaces.iter() {
2088 self.namespaces
2089 .remove(&(catalog_name.clone(), namespace_name.clone()));
2090 }
2091 for ((catalog_name, namespace_name), meta) in added_namespaces {
2092 self.namespaces.insert((catalog_name, namespace_name), meta);
2093 }
2094 for name in dropped_catalogs {
2095 self.catalogs.remove(&name);
2096 self.namespaces.retain(|(catalog, _), _| catalog != &name);
2097 }
2098 for (_, table) in added_tables {
2099 self.inner.insert_table_unchecked(table);
2100 }
2101 for fqn in dropped_tables {
2102 self.inner.remove_table_unchecked(&fqn.table);
2103 }
2104 for (_, index) in added_indexes {
2105 self.inner.insert_index_unchecked(index);
2106 }
2107 for fqn in dropped_indexes {
2108 self.inner.remove_index_unchecked(&fqn.index);
2109 }
2110 }
2111
2112 pub fn discard_overlay(_overlay: CatalogOverlay) {}
2113}
2114
2115impl<S: KVStore> Catalog for PersistentCatalog<S> {
2116 fn create_table(&mut self, table: TableMetadata) -> Result<(), PlannerError> {
2117 self.inner.create_table(table)
2118 }
2119
2120 fn get_table(&self, name: &str) -> Option<&TableMetadata> {
2121 self.inner.get_table(name)
2122 }
2123
2124 fn drop_table(&mut self, name: &str) -> Result<(), PlannerError> {
2125 self.inner.drop_table(name)
2126 }
2127
2128 fn create_index(&mut self, index: IndexMetadata) -> Result<(), PlannerError> {
2129 self.inner.create_index(index)
2130 }
2131
2132 fn get_index(&self, name: &str) -> Option<&IndexMetadata> {
2133 self.inner.get_index(name)
2134 }
2135
2136 fn get_indexes_for_table(&self, table: &str) -> Vec<&IndexMetadata> {
2137 self.inner.get_indexes_for_table(table)
2138 }
2139
2140 fn drop_index(&mut self, name: &str) -> Result<(), PlannerError> {
2141 self.inner.drop_index(name)
2142 }
2143
2144 fn table_exists(&self, name: &str) -> bool {
2145 self.inner.table_exists(name)
2146 }
2147
2148 fn index_exists(&self, name: &str) -> bool {
2149 self.inner.index_exists(name)
2150 }
2151
2152 fn next_table_id(&mut self) -> u32 {
2153 self.inner.next_table_id()
2154 }
2155
2156 fn next_index_id(&mut self) -> u32 {
2157 self.inner.next_index_id()
2158 }
2159
2160 fn list_tables(&self) -> Vec<TableMetadata> {
2161 let mut tables = Vec::new();
2162 for name in self.inner.table_names() {
2163 if let Some(table) = self.inner.get_table(name) {
2164 tables.push(table.clone());
2165 }
2166 }
2167 tables
2168 }
2169
2170 fn persistence_enabled(&self) -> bool {
2171 true
2172 }
2173}
2174
2175#[cfg(test)]
2176mod tests {
2177 use super::*;
2178 use crate::planner::types::ResolvedType;
2179 use std::collections::HashSet;
2180
2181 fn test_table(name: &str, id: u32) -> TableMetadata {
2182 TableMetadata::new(
2183 name,
2184 vec![ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true)],
2185 )
2186 .with_table_id(id)
2187 .with_primary_key(vec!["id".to_string()])
2188 }
2189
2190 fn legacy_table_key(table_name: &str) -> Vec<u8> {
2191 let mut key = TABLES_PREFIX.to_vec();
2192 key.extend_from_slice(table_name.as_bytes());
2193 key
2194 }
2195
2196 fn legacy_index_key(index_name: &str) -> Vec<u8> {
2197 let mut key = INDEXES_PREFIX.to_vec();
2198 key.extend_from_slice(index_name.as_bytes());
2199 key
2200 }
2201
2202 fn seed_legacy_store(store: &Arc<alopex_core::kv::memory::MemoryKV>) {
2203 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2204 let table = test_table("users", 7);
2205 let legacy_table = PersistedTableMetaV1 {
2206 table_id: table.table_id,
2207 name: table.name.clone(),
2208 columns: table
2209 .columns
2210 .iter()
2211 .map(PersistedColumnMeta::from)
2212 .collect(),
2213 primary_key: table.primary_key.clone(),
2214 storage_options: table.storage_options.clone().into(),
2215 };
2216 let table_bytes = bincode::serialize(&legacy_table).unwrap();
2217 txn.put(legacy_table_key("users"), table_bytes).unwrap();
2218
2219 let legacy_index = PersistedIndexMetaV1 {
2220 index_id: 3,
2221 name: "idx_users_id".to_string(),
2222 table: "users".to_string(),
2223 columns: vec!["id".to_string()],
2224 column_indices: vec![0],
2225 unique: false,
2226 method: Some(PersistedIndexType::BTree),
2227 options: Vec::new(),
2228 };
2229 let index_bytes = bincode::serialize(&legacy_index).unwrap();
2230 txn.put(legacy_index_key("idx_users_id"), index_bytes)
2231 .unwrap();
2232
2233 let meta = CatalogState {
2234 version: 1,
2235 table_id_counter: 7,
2236 index_id_counter: 3,
2237 };
2238 let meta_bytes = bincode::serialize(&meta).unwrap();
2239 txn.put(META_KEY.to_vec(), meta_bytes).unwrap();
2240 txn.commit_self().unwrap();
2241 }
2242
2243 #[test]
2244 fn load_empty_store() {
2245 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2246 let catalog = PersistentCatalog::load(store).unwrap();
2247 assert_eq!(catalog.inner.table_count(), 0);
2248 assert_eq!(catalog.inner.index_count(), 0);
2249 }
2250
2251 #[test]
2252 fn load_migrates_v1_keys_and_meta() {
2253 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2254 seed_legacy_store(&store);
2255
2256 let reloaded = PersistentCatalog::load(store.clone()).unwrap();
2257 assert!(reloaded.get_catalog("default").is_some());
2258 assert!(reloaded.get_namespace("default", "default").is_some());
2259
2260 let table = reloaded.get_table("users").unwrap();
2261 assert_eq!(table.catalog_name, "default");
2262 assert_eq!(table.namespace_name, "default");
2263
2264 let index = reloaded.get_index("idx_users_id").unwrap();
2265 assert_eq!(index.catalog_name, "default");
2266 assert_eq!(index.namespace_name, "default");
2267 assert_eq!(index.table, "users");
2268
2269 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2270 assert!(txn.get(&legacy_table_key("users")).unwrap().is_none());
2271 assert!(
2272 txn.get(&table_key("default", "default", "users"))
2273 .unwrap()
2274 .is_some()
2275 );
2276 assert!(
2277 txn.get(&legacy_index_key("idx_users_id"))
2278 .unwrap()
2279 .is_none()
2280 );
2281 assert!(
2282 txn.get(&index_key("default", "default", "users", "idx_users_id"))
2283 .unwrap()
2284 .is_some()
2285 );
2286 let meta_bytes = txn.get(&META_KEY.to_vec()).unwrap().unwrap();
2287 let meta: CatalogState = bincode::deserialize(&meta_bytes).unwrap();
2288 assert_eq!(meta.version, CATALOG_VERSION);
2289 txn.rollback_self().unwrap();
2290 }
2291
2292 #[test]
2293 fn load_after_migration_keeps_v2_keys() {
2294 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2295 seed_legacy_store(&store);
2296
2297 let _ = PersistentCatalog::load(store.clone()).unwrap();
2298 let reloaded = PersistentCatalog::load(store.clone()).unwrap();
2299
2300 let table = reloaded.get_table("users").unwrap();
2301 assert_eq!(table.catalog_name, "default");
2302 assert_eq!(table.namespace_name, "default");
2303
2304 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2305 assert!(txn.get(&legacy_table_key("users")).unwrap().is_none());
2306 assert!(
2307 txn.get(&table_key("default", "default", "users"))
2308 .unwrap()
2309 .is_some()
2310 );
2311 let meta_bytes = txn.get(&META_KEY.to_vec()).unwrap().unwrap();
2312 let meta: CatalogState = bincode::deserialize(&meta_bytes).unwrap();
2313 assert_eq!(meta.version, CATALOG_VERSION);
2314 txn.rollback_self().unwrap();
2315 }
2316
2317 #[test]
2318 fn create_table_persists() {
2319 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2320 let mut catalog = PersistentCatalog::new(store.clone());
2321
2322 catalog.inner.set_counters(1, 0);
2324
2325 let table = test_table("users", 1);
2326 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2327 catalog.persist_create_table(&mut txn, &table).unwrap();
2328 txn.commit_self().unwrap();
2329
2330 let reloaded = PersistentCatalog::load(store).unwrap();
2331 assert!(reloaded.table_exists("users"));
2332 assert_eq!(reloaded.get_table("users").unwrap().table_id, 1);
2333 }
2334
2335 #[test]
2336 fn drop_table_removes() {
2337 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2338 let mut catalog = PersistentCatalog::new(store.clone());
2339 catalog.inner.set_counters(1, 0);
2340
2341 let table = test_table("users", 1);
2342 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2343 catalog.persist_create_table(&mut txn, &table).unwrap();
2344 txn.commit_self().unwrap();
2345
2346 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2347 let fqn = TableFqn::new("default", "default", "users");
2348 catalog.persist_drop_table(&mut txn, &fqn).unwrap();
2349 txn.commit_self().unwrap();
2350
2351 let reloaded = PersistentCatalog::load(store).unwrap();
2352 assert!(!reloaded.table_exists("users"));
2353 }
2354
2355 #[test]
2356 fn reload_preserves_state() {
2357 let temp_dir = tempfile::tempdir().unwrap();
2358 let wal_path = temp_dir.path().join("catalog.wal");
2359 let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
2360 let mut catalog = PersistentCatalog::new(store.clone());
2361 catalog.inner.set_counters(1, 0);
2362
2363 let table = test_table("users", 1);
2364 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2365 catalog.persist_create_table(&mut txn, &table).unwrap();
2366 txn.commit_self().unwrap();
2367 store.flush().unwrap();
2368
2369 drop(catalog);
2370 drop(store);
2371
2372 let store = Arc::new(alopex_core::kv::memory::MemoryKV::open(&wal_path).unwrap());
2373 let reloaded = PersistentCatalog::load(store).unwrap();
2374 assert!(reloaded.table_exists("users"));
2375 }
2376
2377 #[test]
2378 fn overlay_applied_on_commit() {
2379 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2380 let mut catalog = PersistentCatalog::new(store);
2381 let users = test_table("users", 1);
2382 catalog.inner.insert_table_unchecked(users.clone());
2383
2384 let mut overlay = CatalogOverlay::new();
2385 overlay.drop_table(&TableFqn::from(&users));
2386 let orders = test_table("orders", 2);
2387 overlay.add_table(TableFqn::from(&orders), orders);
2388
2389 assert!(!catalog.table_exists_in_txn("users", &overlay));
2390 assert!(catalog.table_exists_in_txn("orders", &overlay));
2391
2392 catalog.apply_overlay(overlay);
2393
2394 assert!(!catalog.table_exists("users"));
2395 assert!(catalog.table_exists("orders"));
2396 }
2397
2398 #[test]
2399 fn overlay_discarded_on_rollback() {
2400 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2401 let mut catalog = PersistentCatalog::new(store);
2402 let users = test_table("users", 1);
2403 catalog.inner.insert_table_unchecked(users.clone());
2404
2405 let mut overlay = CatalogOverlay::new();
2406 overlay.drop_table(&TableFqn::from(&users));
2407
2408 PersistentCatalog::<alopex_core::kv::memory::MemoryKV>::discard_overlay(overlay);
2409
2410 assert!(catalog.table_exists("users"));
2411 }
2412
2413 #[test]
2414 fn catalog_crud_persists() {
2415 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2416 let mut catalog = PersistentCatalog::new(store.clone());
2417
2418 let meta = CatalogMeta {
2419 name: "main".to_string(),
2420 comment: Some("primary".to_string()),
2421 storage_root: Some("/tmp/alopex".to_string()),
2422 };
2423
2424 catalog.create_catalog(meta.clone()).unwrap();
2425 assert!(catalog.get_catalog("main").is_some());
2426
2427 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2428 let stored = txn.get(&catalog_key("main")).unwrap().unwrap();
2429 let decoded: CatalogMeta = bincode::deserialize(&stored).unwrap();
2430 txn.rollback_self().unwrap();
2431 assert_eq!(decoded, meta);
2432
2433 catalog.delete_catalog("main").unwrap();
2434 assert!(catalog.get_catalog("main").is_none());
2435 }
2436
2437 #[test]
2438 fn namespace_crud_persists_and_validates_catalog() {
2439 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2440 let mut catalog = PersistentCatalog::new(store.clone());
2441
2442 let missing_catalog = NamespaceMeta {
2443 name: "analytics".to_string(),
2444 catalog_name: "missing".to_string(),
2445 comment: None,
2446 storage_root: None,
2447 };
2448 let err = catalog.create_namespace(missing_catalog).unwrap_err();
2449 assert!(matches!(err, CatalogError::InvalidKey(_)));
2450
2451 catalog
2452 .create_catalog(CatalogMeta {
2453 name: "main".to_string(),
2454 comment: None,
2455 storage_root: None,
2456 })
2457 .unwrap();
2458
2459 let namespace = NamespaceMeta {
2460 name: "analytics".to_string(),
2461 catalog_name: "main".to_string(),
2462 comment: Some("warehouse".to_string()),
2463 storage_root: None,
2464 };
2465
2466 catalog.create_namespace(namespace.clone()).unwrap();
2467 assert!(catalog.get_namespace("main", "analytics").is_some());
2468
2469 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2470 let stored = txn
2471 .get(&namespace_key("main", "analytics"))
2472 .unwrap()
2473 .unwrap();
2474 let decoded: NamespaceMeta = bincode::deserialize(&stored).unwrap();
2475 txn.rollback_self().unwrap();
2476 assert_eq!(decoded, namespace);
2477
2478 catalog.delete_namespace("main", "analytics").unwrap();
2479 assert!(catalog.get_namespace("main", "analytics").is_none());
2480 }
2481
2482 #[test]
2483 fn delete_catalog_removes_namespaces_from_store() {
2484 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2485 let mut catalog = PersistentCatalog::new(store.clone());
2486
2487 catalog
2488 .create_catalog(CatalogMeta {
2489 name: "main".to_string(),
2490 comment: None,
2491 storage_root: None,
2492 })
2493 .unwrap();
2494 catalog
2495 .create_namespace(NamespaceMeta {
2496 name: "analytics".to_string(),
2497 catalog_name: "main".to_string(),
2498 comment: None,
2499 storage_root: None,
2500 })
2501 .unwrap();
2502
2503 catalog.delete_catalog("main").unwrap();
2504
2505 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2506 let mut prefix = NAMESPACES_PREFIX.to_vec();
2507 prefix.extend_from_slice(b"main");
2508 prefix.push(b'/');
2509 let remaining: Vec<_> = txn.scan_prefix(&prefix).unwrap().collect();
2510 txn.rollback_self().unwrap();
2511
2512 assert!(remaining.is_empty());
2513 assert!(catalog.list_namespaces("main").is_empty());
2514 }
2515
2516 #[test]
2517 fn delete_catalog_removes_tables_and_indexes_from_store() {
2518 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2519 let mut catalog = PersistentCatalog::new(store.clone());
2520
2521 catalog
2522 .create_catalog(CatalogMeta {
2523 name: "main".to_string(),
2524 comment: None,
2525 storage_root: None,
2526 })
2527 .unwrap();
2528
2529 let mut table = test_table("users", 1);
2530 table.catalog_name = "main".to_string();
2531 table.namespace_name = "default".to_string();
2532
2533 let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2534 .with_column_indices(vec![0]);
2535 index.catalog_name = "main".to_string();
2536 index.namespace_name = "default".to_string();
2537
2538 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2539 catalog.persist_create_table(&mut txn, &table).unwrap();
2540 catalog.persist_create_index(&mut txn, &index).unwrap();
2541 txn.commit_self().unwrap();
2542
2543 catalog.inner.insert_table_unchecked(table);
2544 catalog.inner.insert_index_unchecked(index);
2545
2546 catalog.delete_catalog("main").unwrap();
2547
2548 assert!(catalog.inner.get_table("users").is_none());
2549 assert!(catalog.inner.get_index("idx_users_id").is_none());
2550
2551 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
2552 assert!(
2553 txn.get(&table_key("main", "default", "users"))
2554 .unwrap()
2555 .is_none()
2556 );
2557 assert!(
2558 txn.get(&index_key("main", "default", "users", "idx_users_id"))
2559 .unwrap()
2560 .is_none()
2561 );
2562 txn.rollback_self().unwrap();
2563 }
2564
2565 #[test]
2566 fn index_meta_loads_catalog_and_namespace_from_table() {
2567 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2568 let mut catalog = PersistentCatalog::new(store.clone());
2569 catalog.inner.set_counters(1, 1);
2570
2571 let mut table = test_table("users", 1);
2572 table.catalog_name = "main".to_string();
2573 table.namespace_name = "analytics".to_string();
2574
2575 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2576 catalog.persist_create_table(&mut txn, &table).unwrap();
2577 let mut index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2578 .with_column_indices(vec![0])
2579 .with_method(IndexMethod::BTree);
2580 index.catalog_name = "main".to_string();
2581 index.namespace_name = "analytics".to_string();
2582 catalog.persist_create_index(&mut txn, &index).unwrap();
2583 txn.commit_self().unwrap();
2584
2585 let reloaded = PersistentCatalog::load(store).unwrap();
2586 let index = reloaded.get_index("idx_users_id").unwrap();
2587 assert_eq!(index.catalog_name, "main");
2588 assert_eq!(index.namespace_name, "analytics");
2589 }
2590
2591 #[test]
2592 fn legacy_index_meta_loads_catalog_and_namespace_from_table() {
2593 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2594 let mut catalog = PersistentCatalog::new(store.clone());
2595 catalog.inner.set_counters(1, 1);
2596
2597 let mut table = test_table("users", 1);
2598 table.catalog_name = "main".to_string();
2599 table.namespace_name = "analytics".to_string();
2600
2601 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2602 catalog.persist_create_table(&mut txn, &table).unwrap();
2603 let legacy = PersistedIndexMetaV1 {
2604 index_id: 1,
2605 name: "idx_users_id".to_string(),
2606 table: "users".to_string(),
2607 columns: vec!["id".to_string()],
2608 column_indices: vec![0],
2609 unique: false,
2610 method: Some(PersistedIndexType::BTree),
2611 options: Vec::new(),
2612 };
2613 let bytes = bincode::serialize(&legacy).unwrap();
2614 txn.put(
2615 index_key("main", "analytics", "users", "idx_users_id"),
2616 bytes,
2617 )
2618 .unwrap();
2619 txn.commit_self().unwrap();
2620
2621 let reloaded = PersistentCatalog::load(store).unwrap();
2622 let index = reloaded.get_index("idx_users_id").unwrap();
2623 assert_eq!(index.catalog_name, "main");
2624 assert_eq!(index.namespace_name, "analytics");
2625 }
2626
2627 #[test]
2628 fn overlay_catalog_get_and_list() {
2629 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2630 let mut catalog = PersistentCatalog::new(store);
2631 catalog
2632 .create_catalog(CatalogMeta {
2633 name: "main".to_string(),
2634 comment: None,
2635 storage_root: None,
2636 })
2637 .unwrap();
2638
2639 let mut overlay = CatalogOverlay::new();
2640 overlay.add_catalog(CatalogMeta {
2641 name: "temp".to_string(),
2642 comment: None,
2643 storage_root: None,
2644 });
2645 overlay.drop_catalog("main");
2646
2647 assert!(catalog.get_catalog_in_txn("main", &overlay).is_none());
2648 assert!(catalog.get_catalog_in_txn("temp", &overlay).is_some());
2649
2650 let names: Vec<String> = catalog
2651 .list_catalogs_in_txn(&overlay)
2652 .into_iter()
2653 .map(|meta| meta.name)
2654 .collect();
2655 assert_eq!(names, vec!["temp".to_string()]);
2656 }
2657
2658 #[test]
2659 fn overlay_namespace_get_and_list() {
2660 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2661 let mut catalog = PersistentCatalog::new(store);
2662 catalog
2663 .create_catalog(CatalogMeta {
2664 name: "main".to_string(),
2665 comment: None,
2666 storage_root: None,
2667 })
2668 .unwrap();
2669 catalog
2670 .create_namespace(NamespaceMeta {
2671 name: "default".to_string(),
2672 catalog_name: "main".to_string(),
2673 comment: None,
2674 storage_root: None,
2675 })
2676 .unwrap();
2677
2678 let mut overlay = CatalogOverlay::new();
2679 overlay.add_namespace(NamespaceMeta {
2680 name: "analytics".to_string(),
2681 catalog_name: "main".to_string(),
2682 comment: None,
2683 storage_root: None,
2684 });
2685 overlay.drop_namespace("main", "default");
2686
2687 assert!(
2688 catalog
2689 .get_namespace_in_txn("main", "default", &overlay)
2690 .is_none()
2691 );
2692 assert!(
2693 catalog
2694 .get_namespace_in_txn("main", "analytics", &overlay)
2695 .is_some()
2696 );
2697
2698 let names: Vec<String> = catalog
2699 .list_namespaces_in_txn("main", &overlay)
2700 .into_iter()
2701 .map(|meta| meta.name)
2702 .collect();
2703 assert_eq!(names, vec!["analytics".to_string()]);
2704 }
2705
2706 #[test]
2707 fn overlay_table_and_index_list() {
2708 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2709 let mut catalog = PersistentCatalog::new(store);
2710
2711 let mut users = test_table("users", 1);
2712 users.catalog_name = "main".to_string();
2713 users.namespace_name = "default".to_string();
2714 catalog.inner.insert_table_unchecked(users.clone());
2715
2716 let mut users_index =
2717 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2718 .with_column_indices(vec![0]);
2719 users_index.catalog_name = "main".to_string();
2720 users_index.namespace_name = "default".to_string();
2721 catalog.inner.insert_index_unchecked(users_index);
2722
2723 let mut overlay = CatalogOverlay::new();
2724
2725 let mut orders = test_table("orders", 2);
2726 orders.catalog_name = "main".to_string();
2727 orders.namespace_name = "default".to_string();
2728 overlay.add_table(TableFqn::from(&orders), orders.clone());
2729
2730 let mut orders_index =
2731 IndexMetadata::new(2, "idx_orders_id", "orders", vec!["id".to_string()])
2732 .with_column_indices(vec![0]);
2733 orders_index.catalog_name = "main".to_string();
2734 orders_index.namespace_name = "default".to_string();
2735 overlay.add_index(IndexFqn::from(&orders_index), orders_index);
2736
2737 overlay.drop_table(&TableFqn::from(&users));
2738
2739 let table_names: Vec<String> = catalog
2740 .list_tables_in_txn("main", "default", &overlay)
2741 .into_iter()
2742 .map(|table| table.name)
2743 .collect();
2744 assert_eq!(table_names, vec!["orders".to_string()]);
2745
2746 let users_fqn = TableFqn::new("main", "default", "users");
2747 assert!(catalog.list_indexes_in_txn(&users_fqn, &overlay).is_empty());
2748
2749 let orders_fqn = TableFqn::new("main", "default", "orders");
2750 let index_names: Vec<String> = catalog
2751 .list_indexes_in_txn(&orders_fqn, &overlay)
2752 .into_iter()
2753 .map(|index| index.name)
2754 .collect();
2755 assert_eq!(index_names, vec!["idx_orders_id".to_string()]);
2756 }
2757
2758 #[test]
2759 fn overlay_name_lookup_ambiguous_returns_none() {
2760 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2761 let catalog = PersistentCatalog::new(store);
2762
2763 let mut overlay = CatalogOverlay::new();
2764
2765 let mut users_default = test_table("users", 1);
2766 users_default.catalog_name = "main".to_string();
2767 users_default.namespace_name = "default".to_string();
2768 overlay.add_table(TableFqn::from(&users_default), users_default);
2769
2770 let mut users_analytics = test_table("users", 2);
2771 users_analytics.catalog_name = "main".to_string();
2772 users_analytics.namespace_name = "analytics".to_string();
2773 overlay.add_table(TableFqn::from(&users_analytics), users_analytics);
2774
2775 assert!(catalog.get_table_in_txn("users", &overlay).is_none());
2776 assert!(!catalog.table_exists_in_txn("users", &overlay));
2777
2778 let mut idx_default =
2779 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2780 .with_column_indices(vec![0]);
2781 idx_default.catalog_name = "main".to_string();
2782 idx_default.namespace_name = "default".to_string();
2783 overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2784
2785 let mut idx_analytics =
2786 IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2787 .with_column_indices(vec![0]);
2788 idx_analytics.catalog_name = "main".to_string();
2789 idx_analytics.namespace_name = "analytics".to_string();
2790 overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2791
2792 assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
2793 assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
2794 }
2795
2796 #[test]
2797 fn overlay_name_lookup_ambiguous_with_base_returns_none() {
2798 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2799 let mut catalog = PersistentCatalog::new(store);
2800
2801 let mut overlay = CatalogOverlay::new();
2802
2803 let mut base_users = test_table("users", 1);
2804 base_users.catalog_name = "main".to_string();
2805 base_users.namespace_name = "default".to_string();
2806 catalog.inner.insert_table_unchecked(base_users);
2807
2808 let mut overlay_users = test_table("users", 2);
2809 overlay_users.catalog_name = "main".to_string();
2810 overlay_users.namespace_name = "analytics".to_string();
2811 overlay.add_table(TableFqn::from(&overlay_users), overlay_users);
2812
2813 assert!(catalog.get_table_in_txn("users", &overlay).is_none());
2814 assert!(!catalog.table_exists_in_txn("users", &overlay));
2815
2816 let mut base_index = IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2817 .with_column_indices(vec![0]);
2818 base_index.catalog_name = "main".to_string();
2819 base_index.namespace_name = "default".to_string();
2820 catalog.inner.insert_index_unchecked(base_index);
2821
2822 let mut overlay_index =
2823 IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2824 .with_column_indices(vec![0]);
2825 overlay_index.catalog_name = "main".to_string();
2826 overlay_index.namespace_name = "analytics".to_string();
2827 overlay.add_index(IndexFqn::from(&overlay_index), overlay_index);
2828
2829 assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
2830 assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
2831 }
2832
2833 #[test]
2834 fn overlay_fqn_tables_separate_namespaces() {
2835 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2836 let catalog = PersistentCatalog::new(store);
2837
2838 let mut overlay = CatalogOverlay::new();
2839
2840 let mut users_default = test_table("users", 1);
2841 users_default.catalog_name = "main".to_string();
2842 users_default.namespace_name = "default".to_string();
2843 overlay.add_table(TableFqn::from(&users_default), users_default.clone());
2844
2845 let mut users_analytics = test_table("users", 2);
2846 users_analytics.catalog_name = "main".to_string();
2847 users_analytics.namespace_name = "analytics".to_string();
2848 overlay.add_table(TableFqn::from(&users_analytics), users_analytics.clone());
2849
2850 let default_tables: Vec<String> = catalog
2851 .list_tables_in_txn("main", "default", &overlay)
2852 .into_iter()
2853 .map(|table| table.name)
2854 .collect();
2855 assert_eq!(default_tables, vec!["users".to_string()]);
2856
2857 let analytics_tables: Vec<String> = catalog
2858 .list_tables_in_txn("main", "analytics", &overlay)
2859 .into_iter()
2860 .map(|table| table.name)
2861 .collect();
2862 assert_eq!(analytics_tables, vec!["users".to_string()]);
2863
2864 overlay.drop_table(&TableFqn::from(&users_default));
2865
2866 let default_tables_after: Vec<String> = catalog
2867 .list_tables_in_txn("main", "default", &overlay)
2868 .into_iter()
2869 .map(|table| table.name)
2870 .collect();
2871 assert!(default_tables_after.is_empty());
2872
2873 let analytics_tables_after: Vec<String> = catalog
2874 .list_tables_in_txn("main", "analytics", &overlay)
2875 .into_iter()
2876 .map(|table| table.name)
2877 .collect();
2878 assert_eq!(analytics_tables_after, vec!["users".to_string()]);
2879 }
2880
2881 #[test]
2882 fn overlay_fqn_indexes_separate_namespaces() {
2883 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2884 let catalog = PersistentCatalog::new(store);
2885
2886 let mut overlay = CatalogOverlay::new();
2887
2888 let mut users_default = test_table("users", 1);
2889 users_default.catalog_name = "main".to_string();
2890 users_default.namespace_name = "default".to_string();
2891 overlay.add_table(TableFqn::from(&users_default), users_default.clone());
2892
2893 let mut users_analytics = test_table("users", 2);
2894 users_analytics.catalog_name = "main".to_string();
2895 users_analytics.namespace_name = "analytics".to_string();
2896 overlay.add_table(TableFqn::from(&users_analytics), users_analytics.clone());
2897
2898 let mut idx_default =
2899 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2900 .with_column_indices(vec![0]);
2901 idx_default.catalog_name = "main".to_string();
2902 idx_default.namespace_name = "default".to_string();
2903 overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2904
2905 let mut idx_analytics =
2906 IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2907 .with_column_indices(vec![0]);
2908 idx_analytics.catalog_name = "main".to_string();
2909 idx_analytics.namespace_name = "analytics".to_string();
2910 overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2911
2912 let default_fqn = TableFqn::new("main", "default", "users");
2913 let analytics_fqn = TableFqn::new("main", "analytics", "users");
2914
2915 let default_indexes: Vec<String> = catalog
2916 .list_indexes_in_txn(&default_fqn, &overlay)
2917 .into_iter()
2918 .map(|index| index.name)
2919 .collect();
2920 assert_eq!(default_indexes, vec!["idx_users_id".to_string()]);
2921
2922 let analytics_indexes: Vec<String> = catalog
2923 .list_indexes_in_txn(&analytics_fqn, &overlay)
2924 .into_iter()
2925 .map(|index| index.name)
2926 .collect();
2927 assert_eq!(analytics_indexes, vec!["idx_users_id".to_string()]);
2928 }
2929
2930 #[test]
2931 fn persist_overlay_rejects_duplicate_table_names() {
2932 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2933 let mut catalog = PersistentCatalog::new(store.clone());
2934
2935 let mut overlay = CatalogOverlay::new();
2936
2937 let mut users_default = test_table("users", 1);
2938 users_default.catalog_name = "main".to_string();
2939 users_default.namespace_name = "default".to_string();
2940 overlay.add_table(TableFqn::from(&users_default), users_default);
2941
2942 let mut users_analytics = test_table("users", 2);
2943 users_analytics.catalog_name = "main".to_string();
2944 users_analytics.namespace_name = "analytics".to_string();
2945 overlay.add_table(TableFqn::from(&users_analytics), users_analytics);
2946
2947 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2948 let err = catalog.persist_overlay(&mut txn, &overlay).unwrap_err();
2949 assert!(matches!(err, CatalogError::InvalidKey(_)));
2950 txn.rollback_self().unwrap();
2951 }
2952
2953 #[test]
2954 fn persist_overlay_rejects_duplicate_index_names() {
2955 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
2956 let mut catalog = PersistentCatalog::new(store.clone());
2957
2958 let mut overlay = CatalogOverlay::new();
2959
2960 let mut idx_default =
2961 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2962 .with_column_indices(vec![0]);
2963 idx_default.catalog_name = "main".to_string();
2964 idx_default.namespace_name = "default".to_string();
2965 overlay.add_index(IndexFqn::from(&idx_default), idx_default);
2966
2967 let mut idx_analytics =
2968 IndexMetadata::new(2, "idx_users_id", "users", vec!["id".to_string()])
2969 .with_column_indices(vec![0]);
2970 idx_analytics.catalog_name = "main".to_string();
2971 idx_analytics.namespace_name = "analytics".to_string();
2972 overlay.add_index(IndexFqn::from(&idx_analytics), idx_analytics);
2973
2974 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
2975 let err = catalog.persist_overlay(&mut txn, &overlay).unwrap_err();
2976 assert!(matches!(err, CatalogError::InvalidKey(_)));
2977 txn.rollback_self().unwrap();
2978 }
2979
2980 #[test]
2981 fn overlay_drop_cascade_namespace_removes_children() {
2982 let mut overlay = CatalogOverlay::new();
2983
2984 overlay.add_namespace(NamespaceMeta {
2985 name: "default".to_string(),
2986 catalog_name: "main".to_string(),
2987 comment: None,
2988 storage_root: None,
2989 });
2990
2991 let mut users = test_table("users", 1);
2992 users.catalog_name = "main".to_string();
2993 users.namespace_name = "default".to_string();
2994 overlay.add_table(TableFqn::from(&users), users.clone());
2995
2996 let mut users_index =
2997 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
2998 .with_column_indices(vec![0]);
2999 users_index.catalog_name = "main".to_string();
3000 users_index.namespace_name = "default".to_string();
3001 let users_index_fqn = IndexFqn::from(&users_index);
3002 overlay.add_index(users_index_fqn.clone(), users_index);
3003
3004 overlay.drop_cascade_namespace("main", "default");
3005
3006 assert!(
3007 overlay
3008 .dropped_namespaces
3009 .contains(&("main".to_string(), "default".to_string()))
3010 );
3011 assert!(overlay.dropped_tables.contains(&TableFqn::from(&users)));
3012 assert!(overlay.dropped_indexes.contains(&users_index_fqn));
3013 assert!(!overlay.added_tables.contains_key(&TableFqn::from(&users)));
3014 assert!(!overlay.added_indexes.contains_key(&users_index_fqn));
3015 }
3016
3017 #[test]
3018 fn overlay_drop_cascade_catalog_removes_children() {
3019 let mut overlay = CatalogOverlay::new();
3020
3021 overlay.add_catalog(CatalogMeta {
3022 name: "main".to_string(),
3023 comment: None,
3024 storage_root: None,
3025 });
3026 overlay.add_namespace(NamespaceMeta {
3027 name: "default".to_string(),
3028 catalog_name: "main".to_string(),
3029 comment: None,
3030 storage_root: None,
3031 });
3032
3033 let mut users = test_table("users", 1);
3034 users.catalog_name = "main".to_string();
3035 users.namespace_name = "default".to_string();
3036 overlay.add_table(TableFqn::from(&users), users.clone());
3037
3038 let mut users_index =
3039 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3040 .with_column_indices(vec![0]);
3041 users_index.catalog_name = "main".to_string();
3042 users_index.namespace_name = "default".to_string();
3043 let users_index_fqn = IndexFqn::from(&users_index);
3044 overlay.add_index(users_index_fqn.clone(), users_index);
3045
3046 overlay.drop_cascade_catalog("main");
3047
3048 assert!(overlay.dropped_catalogs.contains("main"));
3049 assert!(overlay.dropped_tables.contains(&TableFqn::from(&users)));
3050 assert!(overlay.dropped_indexes.contains(&users_index_fqn));
3051 assert!(!overlay.added_tables.contains_key(&TableFqn::from(&users)));
3052 assert!(!overlay.added_indexes.contains_key(&users_index_fqn));
3053 }
3054
3055 #[test]
3056 fn dropped_namespace_hides_tables_and_indexes() {
3057 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
3058 let mut catalog = PersistentCatalog::new(store);
3059
3060 let mut users = test_table("users", 1);
3061 users.catalog_name = "main".to_string();
3062 users.namespace_name = "default".to_string();
3063 catalog.inner.insert_table_unchecked(users);
3064
3065 let mut users_index =
3066 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3067 .with_column_indices(vec![0]);
3068 users_index.catalog_name = "main".to_string();
3069 users_index.namespace_name = "default".to_string();
3070 catalog.inner.insert_index_unchecked(users_index);
3071
3072 let mut overlay = CatalogOverlay::new();
3073 overlay.drop_namespace("main", "default");
3074
3075 let table_names: Vec<String> = catalog
3076 .list_tables_in_txn("main", "default", &overlay)
3077 .into_iter()
3078 .map(|table| table.name)
3079 .collect();
3080 assert!(table_names.is_empty());
3081
3082 let fqn = TableFqn {
3083 catalog: "main".to_string(),
3084 namespace: "default".to_string(),
3085 table: "users".to_string(),
3086 };
3087 let index_names: Vec<String> = catalog
3088 .list_indexes_in_txn(&fqn, &overlay)
3089 .into_iter()
3090 .map(|index| index.name)
3091 .collect();
3092 assert!(index_names.is_empty());
3093 }
3094
3095 #[test]
3096 fn dropped_namespace_hides_get_and_exists() {
3097 let store = Arc::new(alopex_core::kv::memory::MemoryKV::new());
3098 let mut catalog = PersistentCatalog::new(store);
3099
3100 let mut users = test_table("users", 1);
3101 users.catalog_name = "main".to_string();
3102 users.namespace_name = "default".to_string();
3103 catalog.inner.insert_table_unchecked(users);
3104
3105 let mut users_index =
3106 IndexMetadata::new(1, "idx_users_id", "users", vec!["id".to_string()])
3107 .with_column_indices(vec![0]);
3108 users_index.catalog_name = "main".to_string();
3109 users_index.namespace_name = "default".to_string();
3110 catalog.inner.insert_index_unchecked(users_index);
3111
3112 let mut overlay = CatalogOverlay::new();
3113 overlay.drop_namespace("main", "default");
3114
3115 assert!(!catalog.table_exists_in_txn("users", &overlay));
3116 assert!(catalog.get_table_in_txn("users", &overlay).is_none());
3117 assert!(!catalog.index_exists_in_txn("idx_users_id", &overlay));
3118 assert!(catalog.get_index_in_txn("idx_users_id", &overlay).is_none());
3119
3120 let view = TxnCatalogView::new(&catalog, &overlay);
3121 assert!(view.get_indexes_for_table("users").is_empty());
3122 }
3123
3124 #[test]
3125 fn persisted_catalog_meta_roundtrip() {
3126 let meta = PersistedCatalogMeta {
3127 name: "main".to_string(),
3128 comment: Some("primary catalog".to_string()),
3129 storage_root: Some("/tmp/alopex".to_string()),
3130 };
3131 let bytes = bincode::serialize(&meta).unwrap();
3132 let decoded: PersistedCatalogMeta = bincode::deserialize(&bytes).unwrap();
3133 assert_eq!(meta, decoded);
3134 }
3135
3136 #[test]
3137 fn persisted_namespace_meta_roundtrip() {
3138 let meta = PersistedNamespaceMeta {
3139 name: "analytics".to_string(),
3140 catalog_name: "main".to_string(),
3141 comment: Some("warehouse".to_string()),
3142 storage_root: Some("s3://bucket/ns".to_string()),
3143 };
3144 let bytes = bincode::serialize(&meta).unwrap();
3145 let decoded: PersistedNamespaceMeta = bincode::deserialize(&bytes).unwrap();
3146 assert_eq!(meta, decoded);
3147 }
3148
3149 #[test]
3150 fn table_fqn_hash_and_eq() {
3151 let first = TableFqn {
3152 catalog: "main".to_string(),
3153 namespace: "default".to_string(),
3154 table: "users".to_string(),
3155 };
3156 let same = TableFqn {
3157 catalog: "main".to_string(),
3158 namespace: "default".to_string(),
3159 table: "users".to_string(),
3160 };
3161 let different = TableFqn {
3162 catalog: "main".to_string(),
3163 namespace: "default".to_string(),
3164 table: "orders".to_string(),
3165 };
3166
3167 let mut set = HashSet::new();
3168 set.insert(first);
3169 assert!(set.contains(&same));
3170 assert!(!set.contains(&different));
3171 }
3172
3173 #[test]
3174 fn index_fqn_hash_and_eq() {
3175 let first = IndexFqn {
3176 catalog: "main".to_string(),
3177 namespace: "default".to_string(),
3178 table: "users".to_string(),
3179 index: "idx_users_id".to_string(),
3180 };
3181 let same = IndexFqn {
3182 catalog: "main".to_string(),
3183 namespace: "default".to_string(),
3184 table: "users".to_string(),
3185 index: "idx_users_id".to_string(),
3186 };
3187 let different = IndexFqn {
3188 catalog: "main".to_string(),
3189 namespace: "default".to_string(),
3190 table: "users".to_string(),
3191 index: "idx_users_email".to_string(),
3192 };
3193
3194 let mut set = HashSet::new();
3195 set.insert(first);
3196 assert!(set.contains(&same));
3197 assert!(!set.contains(&different));
3198 }
3199
3200 #[test]
3201 fn table_type_and_data_source_format_serde() {
3202 let managed = serde_json::to_string(&TableType::Managed).unwrap();
3203 let external = serde_json::to_string(&TableType::External).unwrap();
3204 let alopex = serde_json::to_string(&DataSourceFormat::Alopex).unwrap();
3205 let parquet = serde_json::to_string(&DataSourceFormat::Parquet).unwrap();
3206 let delta = serde_json::to_string(&DataSourceFormat::Delta).unwrap();
3207
3208 assert_eq!(managed, "\"MANAGED\"");
3209 assert_eq!(external, "\"EXTERNAL\"");
3210 assert_eq!(alopex, "\"ALOPEX\"");
3211 assert_eq!(parquet, "\"PARQUET\"");
3212 assert_eq!(delta, "\"DELTA\"");
3213 }
3214}