llkv_table/catalog/
table_catalog.rs

1//! Centralized table catalog for table and field name resolution.
2//!
3//! This module provides thread-safe, performant bidirectional mappings between:
4//! - Table names ↔ TableId (case-insensitive)
5//! - Field names ↔ FieldId per table (case-insensitive)
6//!
7//! # Purpose
8//!
9//! The catalog is an in-memory name resolution layer used during query execution.
10//! It does NOT create tables or generate IDs - use `CatalogService` for table creation.
11//!
12//! # Architecture
13//!
14//! - **CatalogService**: Creates tables (coordinates metadata + catalog + storage)
15//! - **TableCatalog** (this module): Name lookups during query execution
16//! - **MetadataManager**: Persistent storage, generates table IDs (source of truth)
17//! - **Table**: Data operations (append, scan, update, delete)
18//!
19//! # Design Goals
20//!
21//! 1. **Single source of truth**: MetadataManager generates IDs, catalog registers them
22//! 2. **Thread-safe**: Concurrent reads via `Arc<RwLock<_>>`
23//! 3. **Performance**: Fast lookups with FxHashMap
24//! 4. **Transaction isolation**: Immutable snapshots for MVCC
25//! 5. **Case-insensitive**: SQL standard compliance, preserves display names
26
27use crate::resolvers::TableNameKey;
28use crate::types::TableId;
29use llkv_result::{Error, Result};
30use llkv_types::FieldId;
31use rustc_hash::FxHashMap;
32use std::sync::{Arc, RwLock};
33
34pub use crate::resolvers::{
35    ColumnResolution, FieldConstraints, FieldDefinition, FieldInfo, FieldResolver,
36    FieldResolverState, FieldState, IdentifierContext, IdentifierResolver, QualifiedTableName,
37    QualifiedTableNameRef,
38};
39
40// ============================================================================
41// TableCatalog - Table-level resolver
42// ============================================================================
43
44/// In-memory table name resolution for query execution.
45///
46/// **To create tables, use `CatalogService`, not this struct directly.**
47///
48/// The catalog maps table names to TableIds for fast lookups during query planning
49/// and execution. It does not create tables, generate IDs, or persist metadata.
50///
51/// Table IDs are provided by `MetadataManager` (the persistent layer and source of truth).
52/// The catalog simply registers these IDs for in-memory name resolution.
53#[derive(Debug, Clone)]
54pub struct TableCatalog {
55    inner: Arc<RwLock<TableCatalogInner>>,
56}
57
58#[derive(Debug)]
59struct TableCatalogInner {
60    /// Canonical table name (lowercase components) -> TableId
61    /// Table names may be schema-qualified (e.g., "test.tbl")
62    table_name_to_id: FxHashMap<TableNameKey, TableId>,
63    /// TableId -> TableMetadata
64    table_id_to_meta: FxHashMap<TableId, TableMetadata>,
65    /// Next table ID to assign (monotonically increasing)
66    next_table_id: TableId,
67    /// Set of registered schema names (canonical lowercase)
68    schemas: rustc_hash::FxHashSet<String>,
69}
70
71/// Metadata for a registered table
72#[derive(Debug, Clone)]
73struct TableMetadata {
74    /// Display name (preserves original case)
75    display_name: QualifiedTableName,
76    /// Canonical name (lowercase for case-insensitive lookups)
77    canonical_name: TableNameKey,
78    /// Field resolver for this table's columns
79    field_resolver: FieldResolver,
80}
81
82#[derive(Debug, Clone)]
83pub(crate) struct TableMetadataView {
84    canonical_name: TableNameKey,
85}
86
87impl TableMetadata {
88    fn to_view(&self) -> TableMetadataView {
89        TableMetadataView {
90            canonical_name: self.canonical_name.clone(),
91        }
92    }
93}
94
95impl TableMetadataView {
96    pub(crate) fn canonical_table(&self) -> &str {
97        self.canonical_name.table()
98    }
99
100    pub(crate) fn canonical_schema(&self) -> Option<&str> {
101        self.canonical_name.schema()
102    }
103}
104
105impl TableCatalog {
106    /// Create a new empty catalog.
107    ///
108    /// The catalog starts with no registered tables. The first assigned TableId will be 1.
109    /// TableId 0 is reserved for special purposes (e.g., system tables).
110    pub fn new() -> Self {
111        Self {
112            inner: Arc::new(RwLock::new(TableCatalogInner {
113                table_name_to_id: FxHashMap::default(),
114                table_id_to_meta: FxHashMap::default(),
115                next_table_id: 1, // Start at 1, reserve 0 for system
116                schemas: rustc_hash::FxHashSet::default(),
117            })),
118        }
119    }
120
121    /// Register a table in the catalog with the specified table ID.
122    ///
123    /// **This does NOT create tables.** It registers an existing table ID from
124    /// `MetadataManager` for name lookups. Use `CatalogService` to create tables.
125    ///
126    /// The table_id must come from metadata (source of truth):
127    /// - New tables: `MetadataManager::reserve_table_id()`
128    /// - Database restart: `MetadataManager::all_table_metas()`
129    ///
130    /// # Example
131    ///
132    /// ```
133    /// use llkv_table::catalog::TableCatalog;
134    ///
135    /// let catalog = TableCatalog::new();
136    /// catalog.register_table("Users", 42).unwrap();
137    ///
138    /// assert_eq!(catalog.table_id("users"), Some(42));
139    /// ```
140    ///
141    /// # Errors
142    ///
143    /// Returns error if the name or ID is already registered.
144    pub fn register_table(
145        &self,
146        name: impl Into<QualifiedTableName>,
147        table_id: TableId,
148    ) -> Result<()> {
149        let display_name: QualifiedTableName = name.into();
150        let canonical_name = display_name.canonical_key();
151
152        let mut inner = self
153            .inner
154            .write()
155            .map_err(|_| Error::Internal("Failed to acquire catalog write lock".to_string()))?;
156
157        // Check for duplicate name (case-insensitive)
158        if inner.table_name_to_id.contains_key(&canonical_name) {
159            return Err(Error::CatalogError(format!(
160                "Table '{}' already exists in catalog",
161                display_name
162            )));
163        }
164
165        // Check for duplicate ID
166        if inner.table_id_to_meta.contains_key(&table_id) {
167            return Err(Error::CatalogError(format!(
168                "Table ID {} already exists in catalog",
169                table_id
170            )));
171        }
172
173        // Update next_table_id if necessary to avoid conflicts with future allocations
174        if table_id >= inner.next_table_id {
175            inner.next_table_id = table_id
176                .checked_add(1)
177                .ok_or_else(|| Error::Internal("TableId overflow".to_string()))?;
178        }
179
180        // Create field resolver for this table
181        let field_resolver = FieldResolver::new();
182
183        // Store metadata
184        let metadata = TableMetadata {
185            display_name: display_name.clone(),
186            canonical_name: canonical_name.clone(),
187            field_resolver,
188        };
189
190        inner.table_name_to_id.insert(canonical_name, table_id);
191        inner.table_id_to_meta.insert(table_id, metadata);
192
193        Ok(())
194    }
195
196    /// Update the catalog entry for a table to use a new display/canonical name.
197    ///
198    /// This helper is scoped to crate consumers (CatalogManager) so higher layers have a single
199    /// rename entry point. Schema changes are not allowed; the table must remain in the same
200    /// schema.
201    pub(crate) fn rename_registered_table(
202        &self,
203        old_name: impl Into<QualifiedTableName>,
204        new_name: impl Into<QualifiedTableName>,
205    ) -> Result<()> {
206        let old_name: QualifiedTableName = old_name.into();
207        let new_name: QualifiedTableName = new_name.into();
208
209        let old_display = old_name.to_display_string();
210        let new_display = new_name.to_display_string();
211
212        // Prevent moving tables across schemas for now.
213        let old_schema = old_name.schema().map(str::to_ascii_lowercase);
214        let new_schema = new_name.schema().map(str::to_ascii_lowercase);
215        if old_schema != new_schema {
216            return Err(Error::InvalidArgumentError(
217                "ALTER TABLE RENAME cannot change table schema".into(),
218            ));
219        }
220
221        let old_key = old_name.canonical_key();
222        let new_key = new_name.canonical_key();
223
224        let mut inner = self
225            .inner
226            .write()
227            .map_err(|_| Error::Internal("Failed to acquire catalog write lock".into()))?;
228
229        let table_id = inner
230            .table_name_to_id
231            .get(&old_key)
232            .copied()
233            .ok_or_else(|| {
234                Error::CatalogError(format!("Table '{}' does not exist in catalog", old_display))
235            })?;
236
237        if old_key != new_key
238            && inner.table_name_to_id.contains_key(&new_key)
239            && !old_display.eq_ignore_ascii_case(&new_display)
240        {
241            return Err(Error::CatalogError(format!(
242                "Table '{}' already exists in catalog",
243                new_display
244            )));
245        }
246
247        if let Some(meta) = inner.table_id_to_meta.get_mut(&table_id) {
248            meta.display_name = new_name.clone();
249            meta.canonical_name = new_key.clone();
250        }
251
252        if old_key != new_key {
253            inner.table_name_to_id.remove(&old_key);
254            inner.table_name_to_id.insert(new_key, table_id);
255        }
256
257        Ok(())
258    }
259
260    /// Unregister a table from the catalog.
261    ///
262    /// Removes the table and its associated field resolver from the catalog.
263    /// This is typically called when a table is dropped.
264    ///
265    /// # Arguments
266    ///
267    /// * `table_id` - The unique table identifier
268    ///
269    /// # Returns
270    ///
271    /// `true` if the table was found and removed, `false` if it didn't exist.
272    ///
273    /// # Example
274    ///
275    /// ```
276    /// use llkv_table::catalog::TableCatalog;
277    ///
278    /// let catalog = TableCatalog::new();
279    /// catalog.register_table("Users", 1).unwrap();
280    /// assert!(catalog.unregister_table(1));
281    /// assert_eq!(catalog.table_id("users"), None);
282    /// ```
283    pub fn unregister_table(&self, table_id: TableId) -> bool {
284        let mut inner = match self.inner.write() {
285            Ok(guard) => guard,
286            Err(_) => return false,
287        };
288
289        // Remove from id → metadata map and get the canonical name
290        if let Some(meta) = inner.table_id_to_meta.remove(&table_id) {
291            // Remove from name → id map using the canonical key
292            let canonical_key = meta.display_name.canonical_key();
293            inner.table_name_to_id.remove(&canonical_key);
294            true
295        } else {
296            false
297        }
298    }
299
300    /// Get TableId by table name (case-insensitive lookup).
301    ///
302    /// # Arguments
303    ///
304    /// * `name` - Table name (any casing)
305    ///
306    /// # Returns
307    ///
308    /// `Some(TableId)` if table exists, `None` otherwise.
309    ///
310    /// # Example
311    ///
312    /// ```
313    /// use llkv_table::catalog::TableCatalog;
314    ///
315    /// let catalog = TableCatalog::new();
316    /// catalog.register_table("Users", 42).unwrap();
317    ///
318    /// assert_eq!(catalog.table_id("users"), Some(42));
319    /// assert_eq!(catalog.table_id("USERS"), Some(42));
320    /// assert_eq!(catalog.table_id("Users"), Some(42));
321    /// ```
322    pub fn table_id(&self, name: &str) -> Option<TableId> {
323        let canonical = QualifiedTableNameRef::from(name).canonical_key();
324        let inner = self.inner.read().ok()?;
325        inner.table_name_to_id.get(&canonical).copied()
326    }
327
328    /// Get display name for a TableId.
329    ///
330    /// Returns the original table name with preserved casing.
331    ///
332    /// # Arguments
333    ///
334    /// * `id` - The TableId to look up
335    ///
336    /// # Returns
337    ///
338    /// `Some(String)` with the display name if table exists, `None` otherwise.
339    pub fn table_name(&self, id: TableId) -> Option<QualifiedTableName> {
340        let inner = self.inner.read().ok()?;
341        inner
342            .table_id_to_meta
343            .get(&id)
344            .map(|meta| meta.display_name.clone())
345    }
346
347    pub(crate) fn table_metadata_view(&self, id: TableId) -> Option<TableMetadataView> {
348        let inner = self.inner.read().ok()?;
349        inner.table_id_to_meta.get(&id).map(TableMetadata::to_view)
350    }
351
352    /// Get field resolver for a table.
353    ///
354    /// # Arguments
355    ///
356    /// * `table_id` - The TableId to get the resolver for
357    ///
358    /// # Returns
359    ///
360    /// `Some(FieldResolver)` if table exists, `None` otherwise.
361    pub fn field_resolver(&self, table_id: TableId) -> Option<FieldResolver> {
362        let inner = self.inner.read().ok()?;
363        inner
364            .table_id_to_meta
365            .get(&table_id)
366            .map(|meta| meta.field_resolver.clone())
367    }
368
369    /// List all registered table names.
370    ///
371    /// Returns display names (with original casing) for all tables.
372    /// Useful for `SHOW TABLES` queries.
373    ///
374    /// # Returns
375    ///
376    /// Vector of table display names in insertion order (not guaranteed sorted).
377    pub fn table_names(&self) -> Vec<String> {
378        let inner = match self.inner.read() {
379            Ok(inner) => inner,
380            Err(_) => return Vec::new(),
381        };
382
383        inner
384            .table_id_to_meta
385            .values()
386            .map(|meta| meta.display_name.to_display_string())
387            .collect()
388    }
389
390    /// Create an immutable snapshot of the catalog for transaction isolation.
391    ///
392    /// Returns a `TableCatalogSnapshot` containing all current table name→ID mappings.
393    /// Snapshots are cheap to create (uses Arc internally).
394    ///
395    /// # Example
396    ///
397    /// ```
398    /// use llkv_table::catalog::TableCatalog;
399    ///
400    /// let catalog = TableCatalog::new();
401    /// catalog.register_table("Users", 7).unwrap();
402    ///
403    /// let snapshot = catalog.snapshot();
404    /// catalog.unregister_table(7);
405    ///
406    /// assert_eq!(snapshot.table_id("users"), Some(7));
407    /// assert_eq!(catalog.table_id("users"), None);
408    /// ```
409    pub fn snapshot(&self) -> TableCatalogSnapshot {
410        let inner = match self.inner.read() {
411            Ok(inner) => inner,
412            Err(_) => {
413                return TableCatalogSnapshot {
414                    table_ids: Arc::new(FxHashMap::default()),
415                };
416            }
417        };
418
419        // Clone the mapping into an Arc for cheap snapshot sharing
420        let table_ids = Arc::new(inner.table_name_to_id.clone());
421
422        TableCatalogSnapshot { table_ids }
423    }
424
425    /// Check if a table exists (case-insensitive).
426    ///
427    /// # Arguments
428    ///
429    /// * `name` - Table name to check
430    ///
431    /// # Returns
432    ///
433    /// `true` if table exists, `false` otherwise.
434    pub fn table_exists(&self, name: &str) -> bool {
435        self.table_id(name).is_some()
436    }
437
438    /// Get the number of registered tables.
439    pub fn table_count(&self) -> usize {
440        match self.inner.read() {
441            Ok(inner) => inner.table_id_to_meta.len(),
442            Err(_) => 0,
443        }
444    }
445
446    /// Register a new schema in the catalog.
447    ///
448    /// # Arguments
449    ///
450    /// * `name` - The schema name (case will be preserved in lookups but stored canonically)
451    ///
452    /// # Returns
453    ///
454    /// `Ok(())` if the schema was registered successfully.
455    ///
456    /// # Errors
457    ///
458    /// Returns `Error::CatalogError` if a schema with this name already exists (case-insensitive).
459    pub fn register_schema(&self, name: impl Into<String>) -> Result<()> {
460        let name = name.into();
461        let canonical = name.to_ascii_lowercase();
462
463        let mut inner = self
464            .inner
465            .write()
466            .map_err(|_| Error::Internal("Failed to acquire catalog write lock".to_string()))?;
467
468        if inner.schemas.contains(&canonical) {
469            return Err(Error::CatalogError(format!(
470                "Schema '{}' already exists",
471                name
472            )));
473        }
474
475        inner.schemas.insert(canonical);
476        Ok(())
477    }
478
479    /// Check if a schema exists (case-insensitive).
480    ///
481    /// # Arguments
482    ///
483    /// * `name` - Schema name to check
484    ///
485    /// # Returns
486    ///
487    /// `true` if schema exists, `false` otherwise.
488    pub fn schema_exists(&self, name: &str) -> bool {
489        let canonical = name.to_ascii_lowercase();
490        match self.inner.read() {
491            Ok(inner) => inner.schemas.contains(&canonical),
492            Err(_) => false,
493        }
494    }
495
496    /// List all registered schema names.
497    ///
498    /// Returns all schema names in canonical (lowercase) form.
499    pub fn schema_names(&self) -> Vec<String> {
500        match self.inner.read() {
501            Ok(inner) => inner.schemas.iter().cloned().collect(),
502            Err(_) => Vec::new(),
503        }
504    }
505
506    /// Drop (unregister) a schema from the catalog.
507    ///
508    /// This does NOT cascade to tables - caller must handle that separately.
509    ///
510    /// # Returns
511    ///
512    /// `true` if the schema was found and removed, `false` if it didn't exist.
513    pub fn unregister_schema(&self, name: &str) -> bool {
514        let canonical = name.to_ascii_lowercase();
515        let mut inner = match self.inner.write() {
516            Ok(guard) => guard,
517            Err(_) => return false,
518        };
519
520        inner.schemas.remove(&canonical)
521    }
522
523    /// Export catalog state for persistence.
524    ///
525    /// Returns a serializable representation of the entire catalog state,
526    /// including all table and field mappings.
527    ///
528    /// # Returns
529    ///
530    /// `TableCatalogState` containing all tables, fields, and next ID counters.
531    ///
532    /// # Example
533    ///
534    /// ```
535    /// use llkv_table::catalog::TableCatalog;
536    ///
537    /// let catalog = TableCatalog::new();
538    /// catalog.register_table("Users", 9).unwrap();
539    ///
540    /// let state = catalog.export_state();
541    /// let restored = TableCatalog::from_state(state).unwrap();
542    ///
543    /// assert_eq!(restored.table_id("users"), Some(9));
544    /// ```
545    pub fn export_state(&self) -> TableCatalogState {
546        let inner = match self.inner.read() {
547            Ok(inner) => inner,
548            Err(_) => {
549                return TableCatalogState {
550                    tables: Vec::new(),
551                    next_table_id: 1,
552                    schemas: Vec::new(),
553                };
554            }
555        };
556
557        let mut tables = Vec::new();
558
559        for (&table_id, meta) in &inner.table_id_to_meta {
560            let field_state = meta.field_resolver.export_state();
561            let display_schema = meta.display_name.schema().map(|s| s.to_string());
562            let display_table = meta.display_name.table().to_string();
563            let canonical_schema = meta.canonical_name.schema().map(|s| s.to_string());
564            let canonical_table = meta.canonical_name.table().to_string();
565            tables.push(TableState {
566                table_id,
567                display_schema,
568                display_table,
569                canonical_schema,
570                canonical_table,
571                fields: field_state.fields,
572                next_field_id: field_state.next_field_id,
573            });
574        }
575
576        let schemas: Vec<String> = inner.schemas.iter().cloned().collect();
577
578        TableCatalogState {
579            tables,
580            next_table_id: inner.next_table_id,
581            schemas,
582        }
583    }
584
585    /// Restore catalog from persisted state.
586    ///
587    /// Creates a new catalog instance with all table and field mappings
588    /// restored from the provided state.
589    ///
590    /// # Arguments
591    ///
592    /// * `state` - Previously exported catalog state
593    ///
594    /// # Returns
595    ///
596    /// A fully restored `TableCatalog` instance.
597    ///
598    /// # Errors
599    ///
600    /// Returns `Error::CatalogError` if state is invalid (e.g., duplicate names/IDs).
601    pub fn from_state(state: TableCatalogState) -> Result<Self> {
602        let mut table_name_to_id = FxHashMap::default();
603        let mut table_id_to_meta = FxHashMap::default();
604
605        for table_state in state.tables {
606            // Check for duplicate table IDs
607            if table_id_to_meta.contains_key(&table_state.table_id) {
608                return Err(Error::CatalogError(format!(
609                    "Duplicate table_id {} in catalog state",
610                    table_state.table_id
611                )));
612            }
613
614            // Check for duplicate table names
615            let canonical_name = TableNameKey::new(
616                table_state.canonical_schema.as_deref(),
617                &table_state.canonical_table,
618            );
619
620            if table_name_to_id.contains_key(&canonical_name) {
621                return Err(Error::CatalogError(format!(
622                    "Duplicate table name '{}' in catalog state",
623                    QualifiedTableName::new(
624                        table_state.display_schema.clone(),
625                        table_state.display_table.clone(),
626                    )
627                )));
628            }
629
630            // Restore field resolver
631            let field_resolver = FieldResolver::from_state(FieldResolverState {
632                fields: table_state.fields,
633                next_field_id: table_state.next_field_id,
634            })?;
635
636            let metadata = TableMetadata {
637                display_name: QualifiedTableName::new(
638                    table_state.display_schema.clone(),
639                    table_state.display_table.clone(),
640                ),
641                canonical_name: canonical_name.clone(),
642                field_resolver,
643            };
644
645            table_name_to_id.insert(canonical_name, table_state.table_id);
646            table_id_to_meta.insert(table_state.table_id, metadata);
647        }
648
649        Ok(Self {
650            inner: Arc::new(RwLock::new(TableCatalogInner {
651                table_name_to_id,
652                table_id_to_meta,
653                next_table_id: state.next_table_id,
654                schemas: state.schemas.into_iter().collect(),
655            })),
656        })
657    }
658}
659
660impl Default for TableCatalog {
661    fn default() -> Self {
662        Self::new()
663    }
664}
665
666// ============================================================================
667// TableCatalogSnapshot - Immutable view for transaction isolation
668// ============================================================================
669
670/// Immutable snapshot of table catalog state for transaction isolation.
671///
672/// Snapshots capture the table name→ID mappings at a point in time.
673/// Transactions use snapshots to ensure consistent view of the catalog
674/// throughout their execution, even if new tables are created concurrently.
675#[derive(Debug, Clone)]
676pub struct TableCatalogSnapshot {
677    /// Canonical table name -> TableId (immutable)
678    table_ids: Arc<FxHashMap<TableNameKey, TableId>>,
679}
680
681impl TableCatalogSnapshot {
682    /// Get TableId by name from this snapshot (case-insensitive).
683    ///
684    /// # Arguments
685    ///
686    /// * `name` - Table name (any casing)
687    ///
688    /// # Returns
689    ///
690    /// `Some(TableId)` if table existed in snapshot, `None` otherwise.
691    pub fn table_id(&self, name: &str) -> Option<TableId> {
692        let canonical = QualifiedTableNameRef::from(name).canonical_key();
693        self.table_ids.get(&canonical).copied()
694    }
695
696    /// Check if table exists in this snapshot (case-insensitive).
697    pub fn table_exists(&self, name: &str) -> bool {
698        self.table_id(name).is_some()
699    }
700
701    /// Get all table names in this snapshot.
702    pub fn table_names(&self) -> Vec<String> {
703        self.table_ids.keys().map(TableNameKey::to_string).collect()
704    }
705
706    /// Get the number of tables in this snapshot.
707    pub fn table_count(&self) -> usize {
708        self.table_ids.len()
709    }
710}
711
712// ============================================================================
713// Persistence Types
714// ============================================================================
715
716/// Serializable table catalog state for persistence.
717///
718/// This structure contains all information needed to restore a table catalog
719/// from disk, including all table and field mappings.
720#[derive(Debug, Clone, bitcode::Encode, bitcode::Decode)]
721pub struct TableCatalogState {
722    /// All registered tables with their field resolvers
723    pub tables: Vec<TableState>,
724    /// Next table ID to assign
725    pub next_table_id: TableId,
726    /// All registered schema names (canonical lowercase)
727    pub schemas: Vec<String>,
728}
729
730/// Serializable table state.
731#[derive(Debug, Clone, bitcode::Encode, bitcode::Decode)]
732pub struct TableState {
733    /// The table's unique ID
734    pub table_id: TableId,
735    /// Display schema (preserves original case)
736    pub display_schema: Option<String>,
737    /// Display table name (preserves original case)
738    pub display_table: String,
739    /// Canonical schema (lowercase)
740    pub canonical_schema: Option<String>,
741    /// Canonical table name (lowercase)
742    pub canonical_table: String,
743    /// All fields in this table
744    pub fields: Vec<FieldState>,
745    /// Next field ID to assign
746    pub next_field_id: FieldId,
747}
748
749#[cfg(test)]
750mod tests {
751    use super::*;
752
753    #[test]
754    fn test_catalog_basic_operations() {
755        let catalog = TableCatalog::new();
756
757        // Register tables with explicit IDs (simulating metadata layer)
758        let users_id = 1;
759        let orders_id = 2;
760        catalog.register_table("Users", users_id).unwrap();
761        catalog.register_table("Orders", orders_id).unwrap();
762
763        assert_ne!(users_id, orders_id);
764
765        // Lookup by name (case-insensitive)
766        assert_eq!(catalog.table_id("users"), Some(users_id));
767        assert_eq!(catalog.table_id("USERS"), Some(users_id));
768        assert_eq!(catalog.table_id("Users"), Some(users_id));
769
770        // Reverse lookup preserves display name
771        assert_eq!(
772            catalog.table_name(users_id),
773            Some(QualifiedTableName::from("Users"))
774        );
775        assert_eq!(
776            catalog.table_name(orders_id),
777            Some(QualifiedTableName::from("Orders"))
778        );
779
780        // Non-existent table
781        assert_eq!(catalog.table_id("Products"), None);
782        assert_eq!(catalog.table_name(999), None);
783    }
784
785    #[test]
786    fn test_catalog_duplicate_detection() {
787        let catalog = TableCatalog::new();
788
789        catalog.register_table("Users", 1).unwrap();
790
791        // Case-insensitive duplicate detection
792        assert!(catalog.register_table("users", 2).is_err());
793        assert!(catalog.register_table("USERS", 3).is_err());
794        assert!(catalog.register_table("UsErS", 4).is_err());
795    }
796
797    #[test]
798    fn test_catalog_table_names() {
799        let catalog = TableCatalog::new();
800
801        catalog.register_table("Users", 1).unwrap();
802        catalog.register_table("Orders", 2).unwrap();
803        catalog.register_table("Products", 3).unwrap();
804
805        let names = catalog.table_names();
806        assert_eq!(names.len(), 3);
807        assert!(names.contains(&"Users".to_string()));
808        assert!(names.contains(&"Orders".to_string()));
809        assert!(names.contains(&"Products".to_string()));
810    }
811
812    #[test]
813    fn test_catalog_snapshot() {
814        let catalog = TableCatalog::new();
815
816        let users_id = 1;
817        catalog.register_table("Users", users_id).unwrap();
818        let snapshot = catalog.snapshot();
819
820        // Snapshot sees existing table
821        assert_eq!(snapshot.table_id("users"), Some(users_id));
822
823        // Add new table after snapshot
824        catalog.register_table("Orders", 2).unwrap();
825
826        // Snapshot doesn't see new table (isolation)
827        assert_eq!(snapshot.table_id("orders"), None);
828        assert_eq!(snapshot.table_count(), 1);
829
830        // But new snapshot does
831        let snapshot2 = catalog.snapshot();
832        assert!(snapshot2.table_id("orders").is_some());
833        assert_eq!(snapshot2.table_count(), 2);
834    }
835
836    #[test]
837    fn test_field_resolver_basic_operations() {
838        let resolver = FieldResolver::new();
839
840        // Register fields
841        let name_fid = resolver.register_field("name").unwrap();
842        let email_fid = resolver.register_field("Email").unwrap();
843        let age_fid = resolver.register_field("AGE").unwrap();
844
845        assert_ne!(name_fid, email_fid);
846        assert_ne!(email_fid, age_fid);
847
848        // All should be >= FIRST_USER_FIELD_ID (1)
849        // System columns use 0 for row_id, sentinels for MVCC
850        assert!(name_fid >= crate::reserved::FIRST_USER_FIELD_ID);
851        assert!(email_fid >= crate::reserved::FIRST_USER_FIELD_ID);
852        assert!(age_fid >= crate::reserved::FIRST_USER_FIELD_ID);
853
854        // Lookup by name (case-insensitive)
855        assert_eq!(resolver.field_id("name"), Some(name_fid));
856        assert_eq!(resolver.field_id("NAME"), Some(name_fid));
857        assert_eq!(resolver.field_id("Name"), Some(name_fid));
858
859        assert_eq!(resolver.field_id("email"), Some(email_fid));
860        assert_eq!(resolver.field_id("EMAIL"), Some(email_fid));
861
862        // Reverse lookup preserves display name
863        assert_eq!(resolver.field_name(name_fid), Some("name".to_string()));
864        assert_eq!(resolver.field_name(email_fid), Some("Email".to_string()));
865        assert_eq!(resolver.field_name(age_fid), Some("AGE".to_string()));
866
867        // Non-existent field
868        assert_eq!(resolver.field_id("salary"), None);
869        assert_eq!(resolver.field_name(999), None);
870    }
871
872    #[test]
873    fn test_field_resolver_duplicate_detection() {
874        let resolver = FieldResolver::new();
875
876        resolver.register_field("name").unwrap();
877
878        // Case-insensitive duplicate detection
879        assert!(resolver.register_field("name").is_err());
880        assert!(resolver.register_field("NAME").is_err());
881        assert!(resolver.register_field("NaMe").is_err());
882    }
883
884    #[test]
885    fn test_field_resolver_field_names() {
886        let resolver = FieldResolver::new();
887
888        resolver.register_field("name").unwrap();
889        resolver.register_field("Email").unwrap();
890        resolver.register_field("AGE").unwrap();
891
892        let names = resolver.field_names();
893        assert_eq!(names.len(), 3);
894        assert!(names.contains(&"name".to_string()));
895        assert!(names.contains(&"Email".to_string()));
896        assert!(names.contains(&"AGE".to_string()));
897    }
898
899    #[test]
900    fn test_catalog_with_field_resolver() {
901        let catalog = TableCatalog::new();
902
903        // Register table
904        let table_id = 1;
905        catalog.register_table("Users", table_id).unwrap();
906
907        // Get field resolver
908        let resolver = catalog.field_resolver(table_id).unwrap();
909
910        // Register fields
911        let name_fid = resolver.register_field("name").unwrap();
912        let email_fid = resolver.register_field("email").unwrap();
913
914        // Verify fields are registered
915        assert_eq!(resolver.field_id("name"), Some(name_fid));
916        assert_eq!(resolver.field_id("email"), Some(email_fid));
917
918        // Get resolver again (should be same instance via Arc)
919        let resolver2 = catalog.field_resolver(table_id).unwrap();
920        assert_eq!(resolver2.field_id("name"), Some(name_fid));
921        assert_eq!(resolver2.field_id("email"), Some(email_fid));
922    }
923
924    #[test]
925    fn test_catalog_exists_helpers() {
926        let catalog = TableCatalog::new();
927        catalog.register_table("Users", 1).unwrap();
928
929        assert!(catalog.table_exists("users"));
930        assert!(catalog.table_exists("USERS"));
931        assert!(!catalog.table_exists("orders"));
932
933        assert_eq!(catalog.table_count(), 1);
934    }
935
936    #[test]
937    fn test_field_resolver_exists_helpers() {
938        let resolver = FieldResolver::new();
939        resolver.register_field("name").unwrap();
940
941        assert!(resolver.field_exists("name"));
942        assert!(resolver.field_exists("NAME"));
943        assert!(!resolver.field_exists("email"));
944
945        assert_eq!(resolver.field_count(), 1);
946    }
947
948    #[test]
949    fn test_catalog_persistence_export_import() {
950        let catalog = TableCatalog::new();
951
952        // Register tables with fields
953        let users_id = 1;
954        let orders_id = 2;
955        catalog.register_table("Users", users_id).unwrap();
956        catalog.register_table("Orders", orders_id).unwrap();
957
958        let users_resolver = catalog.field_resolver(users_id).unwrap();
959        let name_fid = users_resolver.register_field("name").unwrap();
960        let email_fid = users_resolver.register_field("Email").unwrap();
961
962        let orders_resolver = catalog.field_resolver(orders_id).unwrap();
963        let product_fid = orders_resolver.register_field("product").unwrap();
964        let qty_fid = orders_resolver.register_field("quantity").unwrap();
965
966        // Export state
967        let state = catalog.export_state();
968        assert_eq!(state.tables.len(), 2);
969        assert!(state.next_table_id > orders_id);
970
971        // Restore from state
972        let restored_catalog = TableCatalog::from_state(state).unwrap();
973
974        // Verify tables
975        assert_eq!(restored_catalog.table_id("users"), Some(users_id));
976        assert_eq!(restored_catalog.table_id("orders"), Some(orders_id));
977        assert_eq!(
978            restored_catalog.table_name(users_id),
979            Some(QualifiedTableName::from("Users"))
980        );
981        assert_eq!(
982            restored_catalog.table_name(orders_id),
983            Some(QualifiedTableName::from("Orders"))
984        );
985
986        // Verify fields
987        let restored_users_resolver = restored_catalog.field_resolver(users_id).unwrap();
988        assert_eq!(restored_users_resolver.field_id("name"), Some(name_fid));
989        assert_eq!(restored_users_resolver.field_id("email"), Some(email_fid));
990        assert_eq!(
991            restored_users_resolver.field_name(name_fid),
992            Some("name".to_string())
993        );
994        assert_eq!(
995            restored_users_resolver.field_name(email_fid),
996            Some("Email".to_string())
997        );
998
999        let restored_orders_resolver = restored_catalog.field_resolver(orders_id).unwrap();
1000        assert_eq!(
1001            restored_orders_resolver.field_id("product"),
1002            Some(product_fid)
1003        );
1004        assert_eq!(restored_orders_resolver.field_id("quantity"), Some(qty_fid));
1005    }
1006
1007    #[test]
1008    fn test_catalog_persistence_id_stability() {
1009        let catalog = TableCatalog::new();
1010
1011        // Register tables
1012        let table1_id = 1;
1013        let table2_id = 2;
1014        catalog.register_table("Table1", table1_id).unwrap();
1015        catalog.register_table("Table2", table2_id).unwrap();
1016
1017        // Export and restore
1018        let state = catalog.export_state();
1019        let restored = TableCatalog::from_state(state).unwrap();
1020
1021        // IDs should be stable
1022        assert_eq!(restored.table_id("table1"), Some(table1_id));
1023        assert_eq!(restored.table_id("table2"), Some(table2_id));
1024
1025        // New registrations should continue from saved counter
1026        let table3_id = 3;
1027        restored.register_table("Table3", table3_id).unwrap();
1028        assert!(table3_id > table2_id);
1029    }
1030
1031    #[test]
1032    fn test_field_resolver_persistence() {
1033        let resolver = FieldResolver::new();
1034
1035        let fid1 = resolver.register_field("field1").unwrap();
1036        let fid2 = resolver.register_field("Field2").unwrap();
1037        let fid3 = resolver.register_field("FIELD3").unwrap();
1038
1039        // Export state
1040        let state = resolver.export_state();
1041        assert_eq!(state.fields.len(), 3);
1042
1043        // Restore from state
1044        let restored = FieldResolver::from_state(state).unwrap();
1045
1046        // Verify case-insensitive lookups work
1047        assert_eq!(restored.field_id("field1"), Some(fid1));
1048        assert_eq!(restored.field_id("FIELD1"), Some(fid1));
1049        assert_eq!(restored.field_id("field2"), Some(fid2));
1050        assert_eq!(restored.field_id("field3"), Some(fid3));
1051
1052        // Verify display names preserved
1053        assert_eq!(restored.field_name(fid1), Some("field1".to_string()));
1054        assert_eq!(restored.field_name(fid2), Some("Field2".to_string()));
1055        assert_eq!(restored.field_name(fid3), Some("FIELD3".to_string()));
1056
1057        // New registrations should continue from saved counter
1058        let fid4 = restored.register_field("field4").unwrap();
1059        assert!(fid4 > fid3);
1060    }
1061
1062    #[test]
1063    fn test_field_constraints_roundtrip() {
1064        let resolver = FieldResolver::new();
1065
1066        let fid = resolver
1067            .register_field(
1068                FieldDefinition::new("id")
1069                    .with_primary_key(true)
1070                    .with_unique(true)
1071                    .with_check_expr(Some("id > 0".to_string())),
1072            )
1073            .unwrap();
1074
1075        let constraints = resolver.field_constraints(fid).unwrap();
1076        assert!(constraints.primary_key);
1077        assert!(constraints.unique);
1078        assert_eq!(constraints.check_expr.as_deref(), Some("id > 0"));
1079
1080        let by_name = resolver.field_constraints_by_name("ID").unwrap();
1081        assert_eq!(by_name, constraints);
1082
1083        let info = resolver.field_info(fid).unwrap();
1084        assert_eq!(info.field_id, fid);
1085        assert_eq!(info.display_name, "id");
1086        assert!(info.constraints.primary_key);
1087        assert!(info.constraints.unique);
1088
1089        let state = resolver.export_state();
1090        let restored = FieldResolver::from_state(state).unwrap();
1091        let restored_constraints = restored.field_constraints(fid).unwrap();
1092        assert_eq!(restored_constraints, constraints);
1093        let restored_info = restored.field_info_by_name("id").unwrap();
1094        assert_eq!(restored_info.constraints, constraints);
1095    }
1096
1097    #[test]
1098    fn test_catalog_persistence_error_duplicate_table_id() {
1099        let state = TableCatalogState {
1100            tables: vec![
1101                TableState {
1102                    table_id: 1,
1103                    display_schema: None,
1104                    display_table: "Table1".to_string(),
1105                    canonical_schema: None,
1106                    canonical_table: "table1".to_string(),
1107                    fields: Vec::new(),
1108                    next_field_id: 3,
1109                },
1110                TableState {
1111                    table_id: 1, // Duplicate!
1112                    display_schema: None,
1113                    display_table: "Table2".to_string(),
1114                    canonical_schema: None,
1115                    canonical_table: "table2".to_string(),
1116                    fields: Vec::new(),
1117                    next_field_id: 3,
1118                },
1119            ],
1120            next_table_id: 3,
1121            schemas: Vec::new(),
1122        };
1123
1124        assert!(TableCatalog::from_state(state).is_err());
1125    }
1126
1127    #[test]
1128    fn test_catalog_persistence_error_duplicate_table_name() {
1129        let state = TableCatalogState {
1130            tables: vec![
1131                TableState {
1132                    table_id: 1,
1133                    display_schema: None,
1134                    display_table: "Table1".to_string(),
1135                    canonical_schema: None,
1136                    canonical_table: "table1".to_string(),
1137                    fields: Vec::new(),
1138                    next_field_id: 3,
1139                },
1140                TableState {
1141                    table_id: 2,
1142                    display_schema: None,
1143                    display_table: "TABLE1".to_string(), // Duplicate (case-insensitive)
1144                    canonical_schema: None,
1145                    canonical_table: "table1".to_string(),
1146                    fields: Vec::new(),
1147                    next_field_id: 3,
1148                },
1149            ],
1150            next_table_id: 3,
1151            schemas: Vec::new(),
1152        };
1153
1154        assert!(TableCatalog::from_state(state).is_err());
1155    }
1156}