Skip to main content

cqlite_core/schema/
aggregator.rs

1//! Schema Aggregator for M2-CLI
2//!
3//! This module implements schema loading and merging from multiple sources (CQL and JSON files/directories).
4//! It handles two-pass loading (UDTs first, then tables) and implements last-wins merging strategy.
5
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11use crate::error::{Error, Result};
12use crate::schema::{
13    cql_parser::{classify_statement, parse_create_type, split_cql_statements, StatementType},
14    parse_cql_schema, ClusteringColumn, Column, KeyColumn, TableSchema, UdtRegistry,
15};
16use crate::types::UdtTypeDef;
17
18#[allow(unused_imports)]
19use crate::schema::cql_parser;
20
21/// Configuration for schema aggregator behavior
22#[derive(Debug, Clone)]
23pub struct AggregatorConfig {
24    /// Whether to continue loading after encountering errors
25    pub graceful_degradation: bool,
26    /// Whether to validate UDT dependencies
27    pub validate_udt_dependencies: bool,
28}
29
30impl Default for AggregatorConfig {
31    fn default() -> Self {
32        Self {
33            graceful_degradation: true,
34            validate_udt_dependencies: true,
35        }
36    }
37}
38
39/// Schema aggregator for loading and merging schemas from multiple sources
40pub struct SchemaAggregator {
41    /// Schema registry for storing table schemas
42    registry: Arc<RwLock<crate::schema::registry::SchemaRegistry>>,
43    /// UDT registry for managing user-defined types
44    udt_registry: Arc<RwLock<UdtRegistry>>,
45    /// Configuration
46    config: AggregatorConfig,
47    /// Collected errors during loading
48    errors: Vec<SchemaLoadError>,
49    /// Collected warnings during loading
50    warnings: Vec<SchemaLoadWarning>,
51}
52
53/// Result of schema loading operation
54#[derive(Debug, Clone)]
55pub struct LoadResult {
56    /// Number of table schemas successfully loaded
57    pub schemas_loaded: usize,
58    /// Number of UDTs successfully loaded
59    pub udts_loaded: usize,
60    /// Errors encountered during loading
61    pub errors: Vec<SchemaLoadError>,
62    /// Warnings encountered during loading
63    pub warnings: Vec<SchemaLoadWarning>,
64}
65
66/// Error encountered during schema loading
67#[derive(Debug, Clone)]
68pub struct SchemaLoadError {
69    /// File path where the error occurred
70    pub file_path: Option<PathBuf>,
71    /// Type of error
72    pub error_type: LoadErrorType,
73    /// Error message
74    pub message: String,
75}
76
77/// Types of schema load errors
78#[derive(Debug, Clone)]
79pub enum LoadErrorType {
80    /// Failed to read file
81    FileRead,
82    /// Invalid JSON format
83    InvalidJson,
84    /// Invalid CQL syntax
85    InvalidCql,
86    /// Missing UDT dependency
87    MissingUdtDependency,
88    /// Circular UDT dependency
89    CircularUdtDependency,
90    /// Schema validation failed
91    ValidationFailed,
92    /// Invalid file format (neither .cql nor .json)
93    InvalidFileFormat,
94}
95
96/// Warning encountered during schema loading
97#[derive(Debug, Clone)]
98pub struct SchemaLoadWarning {
99    /// File path where the warning occurred
100    pub file_path: Option<PathBuf>,
101    /// Warning message
102    pub message: String,
103}
104
105/// Intermediate parsed schema data before registry insertion
106#[derive(Debug, Clone)]
107struct ParsedSchema {
108    /// Keyspace name (for context only; tables/udts are now keyed by qualified names)
109    #[allow(dead_code)]
110    keyspace: String,
111    /// Table schemas (keyed by qualified name: "keyspace.table")
112    tables: HashMap<String, TableSchema>,
113    /// UDT definitions (keyed by qualified name: "keyspace.typename")
114    udts: HashMap<String, UdtTypeDef>,
115}
116
117/// JSON schema formats
118#[derive(Debug, serde::Deserialize)]
119#[serde(untagged)]
120enum JsonSchemaFormat {
121    /// Minimal format: single table with "table" field
122    Minimal(MinimalTableSchema),
123    /// Full format: with "tables" array and optional "udts" array
124    Full(FullSchema),
125}
126
127/// Minimal JSON schema format (single table)
128#[derive(Debug, serde::Deserialize)]
129struct MinimalTableSchema {
130    keyspace: String,
131    table: String,
132    columns: Vec<JsonColumn>,
133    #[serde(default)]
134    partition_keys: Vec<String>,
135    #[serde(default)]
136    primary_key: Vec<String>, // Synonym for partition_keys when no clustering
137    #[serde(default)]
138    clustering_keys: Vec<JsonClusteringKey>,
139}
140
141/// Full JSON schema format (multiple tables + UDTs)
142/// Note: Both `udts` and `tables` are optional to support UDT-only or table-only files
143#[derive(Debug, serde::Deserialize)]
144struct FullSchema {
145    keyspace: String,
146    #[serde(default)]
147    udts: Vec<JsonUdt>,
148    #[serde(default)]
149    tables: Vec<JsonTable>,
150}
151
152/// JSON table definition
153#[derive(Debug, serde::Deserialize)]
154struct JsonTable {
155    name: String,
156    columns: Vec<JsonColumn>,
157    #[serde(default)]
158    partition_keys: Vec<String>,
159    #[serde(default)]
160    primary_key: Vec<String>,
161    #[serde(default)]
162    clustering_keys: Vec<JsonClusteringKey>,
163}
164
165/// JSON column definition
166#[derive(Debug, serde::Deserialize)]
167struct JsonColumn {
168    name: String,
169    #[serde(alias = "data_type")]
170    r#type: String,
171    #[serde(default)]
172    nullable: bool,
173}
174
175/// JSON clustering key definition
176#[derive(Debug, serde::Deserialize)]
177struct JsonClusteringKey {
178    name: String,
179    #[serde(alias = "data_type")]
180    r#type: String,
181    #[serde(default)]
182    order: Option<String>,
183}
184
185/// JSON UDT definition
186#[derive(Debug, serde::Deserialize)]
187struct JsonUdt {
188    name: String,
189    fields: Vec<JsonUdtField>,
190}
191
192/// JSON UDT field definition
193#[derive(Debug, serde::Deserialize)]
194struct JsonUdtField {
195    name: String,
196    #[serde(alias = "data_type")]
197    r#type: String,
198    #[serde(default = "default_nullable")]
199    nullable: bool,
200}
201
202fn default_nullable() -> bool {
203    true
204}
205
206/// Extract keyspace name from USE statement
207/// Example: "USE test_basic;" -> Some("test_basic")
208/// Example: "USE \"test-basic\";" -> Some("test-basic")
209fn extract_use_keyspace(statement: &str) -> Option<String> {
210    let normalized = statement.trim().to_lowercase();
211    if !normalized.starts_with("use ") {
212        return None;
213    }
214
215    // Extract keyspace name after "USE "
216    let after_use = statement.trim()[4..].trim();
217    let mut ks_name = after_use.trim_end_matches(';').trim();
218
219    // Strip quotes from keyspace name if present
220    if ks_name.starts_with('"') && ks_name.ends_with('"') && ks_name.len() > 1 {
221        ks_name = &ks_name[1..ks_name.len() - 1];
222    }
223
224    if ks_name.is_empty() {
225        None
226    } else {
227        Some(ks_name.to_string())
228    }
229}
230
231/// Extract keyspace name from CREATE KEYSPACE statement
232/// Example: "CREATE KEYSPACE IF NOT EXISTS test_basic WITH ..." -> Some("test_basic")
233/// Example: "CREATE KEYSPACE \"test-basic\" WITH ..." -> Some("test-basic")
234fn extract_create_keyspace_name(statement: &str) -> Option<String> {
235    let normalized = statement.trim().to_lowercase();
236    if !normalized.starts_with("create keyspace") {
237        return None;
238    }
239
240    // Split by whitespace and find the keyspace name
241    let words: Vec<&str> = statement.split_whitespace().collect();
242
243    // Pattern: CREATE KEYSPACE [IF NOT EXISTS] <name> ...
244    let start_idx = if words.len() > 2 && words[2].eq_ignore_ascii_case("if") {
245        5 // Skip "CREATE KEYSPACE IF NOT EXISTS"
246    } else {
247        2 // Skip "CREATE KEYSPACE"
248    };
249
250    if words.len() > start_idx {
251        let mut ks_name = words[start_idx].trim();
252
253        // Strip quotes from keyspace name if present
254        if ks_name.starts_with('"') && ks_name.ends_with('"') && ks_name.len() > 1 {
255            ks_name = &ks_name[1..ks_name.len() - 1];
256        }
257
258        Some(ks_name.to_string())
259    } else {
260        None
261    }
262}
263
264impl SchemaAggregator {
265    /// Create a new schema aggregator
266    pub fn new(
267        registry: Arc<RwLock<crate::schema::registry::SchemaRegistry>>,
268        udt_registry: Arc<RwLock<UdtRegistry>>,
269        config: AggregatorConfig,
270    ) -> Self {
271        Self {
272            registry,
273            udt_registry,
274            config,
275            errors: Vec::new(),
276            warnings: Vec::new(),
277        }
278    }
279
280    /// Load schemas from multiple paths (files or directories)
281    pub async fn load_from_paths(&mut self, paths: &[PathBuf]) -> Result<LoadResult> {
282        self.errors.clear();
283        self.warnings.clear();
284
285        // Step 1: Discover all files from paths in order
286        let mut all_files = Vec::new();
287        for path in paths {
288            if let Err(e) = self.discover_files(path, &mut all_files) {
289                self.errors.push(SchemaLoadError {
290                    file_path: Some(path.clone()),
291                    error_type: LoadErrorType::FileRead,
292                    message: format!("Failed to discover files: {}", e),
293                });
294            }
295        }
296
297        if all_files.is_empty() && !self.errors.is_empty() {
298            return Ok(self.build_result(0, 0));
299        }
300
301        // Step 2: Parse all files into intermediate format
302        let mut parsed_schemas = Vec::new();
303        for file_path in &all_files {
304            match self.parse_file(file_path).await {
305                Ok(Some(schema)) => parsed_schemas.push(schema),
306                Ok(None) => {} // Skipped file
307                Err(e) => {
308                    // Map error type based on the actual error variant
309                    let error_type = match &e {
310                        Error::Io(_) => LoadErrorType::FileRead,
311                        Error::CqlParse(_) => LoadErrorType::InvalidCql,
312                        Error::Schema(_) => {
313                            // Schema errors are structural validation failures
314                            // Check message for JSON vs general validation
315                            let msg = e.to_string();
316                            if msg.contains("Invalid JSON")
317                                || msg.contains("JSON")
318                                || msg.contains("json")
319                            {
320                                LoadErrorType::InvalidJson
321                            } else {
322                                // Missing partition_keys, bad clustering config, etc.
323                                LoadErrorType::ValidationFailed
324                            }
325                        }
326                        _ => {
327                            // Fallback: check error message for clues
328                            let msg = e.to_string();
329                            if msg.contains("JSON") || msg.contains("json") {
330                                LoadErrorType::InvalidJson
331                            } else if msg.contains("CQL") || msg.contains("parse") {
332                                LoadErrorType::InvalidCql
333                            } else {
334                                // Unknown error types default to validation failure, not I/O
335                                LoadErrorType::ValidationFailed
336                            }
337                        }
338                    };
339                    self.errors.push(SchemaLoadError {
340                        file_path: Some(file_path.clone()),
341                        error_type,
342                        message: format!("Failed to parse file: {}", e),
343                    });
344                    // Check graceful_degradation after parse failure
345                    if !self.config.graceful_degradation {
346                        return Ok(self.build_result(0, 0));
347                    }
348                }
349            }
350        }
351
352        // Early return if parsing failed and strict mode is enabled
353        if !self.config.graceful_degradation && !self.errors.is_empty() {
354            return Ok(self.build_result(0, 0));
355        }
356
357        // Step 3: Two-pass loading - UDTs first, then tables
358        let (udts_loaded, tables_loaded) = self.apply_schemas(parsed_schemas).await;
359
360        Ok(self.build_result(tables_loaded, udts_loaded))
361    }
362
363    /// Discover files from a path (file or directory)
364    fn discover_files(&mut self, path: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
365        if !path.exists() {
366            return Err(Error::InvalidPath(format!(
367                "Path does not exist: {}",
368                path.display()
369            )));
370        }
371
372        if path.is_file() {
373            // Single file - validate extension
374            if let Some(ext) = path.extension() {
375                let ext_str = ext.to_string_lossy().to_lowercase();
376                if ext_str == "cql" || ext_str == "json" {
377                    files.push(path.to_path_buf());
378                } else {
379                    self.warnings.push(SchemaLoadWarning {
380                        file_path: Some(path.to_path_buf()),
381                        message: format!("Skipping file with unsupported extension: {}", ext_str),
382                    });
383                }
384            }
385        } else if path.is_dir() {
386            // Directory - scan recursively in lexical order
387            self.scan_directory_recursive(path, files)?;
388        }
389
390        Ok(())
391    }
392
393    /// Recursively scan directory for schema files in lexical order
394    #[allow(clippy::only_used_in_recursion)]
395    fn scan_directory_recursive(&mut self, dir: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
396        let mut entries: Vec<PathBuf> = std::fs::read_dir(dir)
397            .map_err(Error::Io)?
398            .filter_map(|entry| entry.ok().map(|e| e.path()))
399            .collect();
400
401        // Sort lexically for deterministic ordering
402        entries.sort();
403
404        for entry in entries {
405            if entry.is_file() {
406                if let Some(ext) = entry.extension() {
407                    let ext_str = ext.to_string_lossy().to_lowercase();
408                    if ext_str == "cql" || ext_str == "json" {
409                        files.push(entry);
410                    }
411                }
412            } else if entry.is_dir() {
413                self.scan_directory_recursive(&entry, files)?;
414            }
415        }
416
417        Ok(())
418    }
419
420    /// Parse a single file into intermediate schema format
421    async fn parse_file(&self, path: &Path) -> Result<Option<ParsedSchema>> {
422        let ext = path
423            .extension()
424            .ok_or_else(|| Error::InvalidPath("File has no extension".to_string()))?;
425
426        let ext_str = ext.to_string_lossy().to_lowercase();
427
428        match ext_str.as_str() {
429            "cql" => self.parse_cql_file(path).await,
430            "json" => self.parse_json_file(path).await,
431            _ => Err(Error::InvalidPath(format!(
432                "Unsupported file extension: {}",
433                ext_str
434            ))),
435        }
436    }
437
438    /// Parse a CQL file (supports multiple statements: CREATE TYPE and CREATE TABLE)
439    async fn parse_cql_file(&self, path: &Path) -> Result<Option<ParsedSchema>> {
440        let content = std::fs::read_to_string(path)?;
441
442        // Split file content into individual statements
443        let statements = split_cql_statements(&content);
444
445        if statements.is_empty() {
446            return Ok(None);
447        }
448
449        let mut keyspace: Option<String> = None;
450        let mut tables = HashMap::new();
451        let mut udts = HashMap::new();
452        let mut errors = Vec::new();
453
454        // Separate CREATE TYPE from CREATE TABLE statements
455        let mut create_type_stmts = Vec::new();
456        let mut create_table_stmts = Vec::new();
457
458        for statement in &statements {
459            match classify_statement(statement) {
460                StatementType::CreateType => create_type_stmts.push(statement.as_str()),
461                StatementType::CreateTable => create_table_stmts.push(statement.as_str()),
462                StatementType::Other(ref kind) if kind == "use" => {
463                    // Extract keyspace name from USE statement
464                    if let Some(ks_name) = extract_use_keyspace(statement) {
465                        keyspace = Some(ks_name);
466                    }
467                }
468                StatementType::Other(ref kind) if kind == "create" => {
469                    // Handle CREATE KEYSPACE statements - extract keyspace name
470                    if let Some(ks_name) = extract_create_keyspace_name(statement) {
471                        // Only set keyspace if not already set by USE statement
472                        if keyspace.is_none() {
473                            keyspace = Some(ks_name);
474                        }
475                    }
476                }
477                StatementType::Other(_kind) => {
478                    // Skip other statement types silently (e.g., ALTER, DROP, comments)
479                }
480            }
481        }
482
483        // Parse CREATE TYPE statements first (UDTs must be registered before tables)
484        for stmt in create_type_stmts {
485            match parse_create_type(stmt) {
486                Ok((_, (type_name, type_keyspace, fields))) => {
487                    // Determine keyspace (use from statement or inherit from file context)
488                    let udt_keyspace = type_keyspace.unwrap_or_else(|| {
489                        keyspace.clone().unwrap_or_else(|| "default".to_string())
490                    });
491
492                    // Update keyspace if not set
493                    if keyspace.is_none() {
494                        keyspace = Some(udt_keyspace.clone());
495                    }
496
497                    // Build UdtTypeDef
498                    let mut udt_def = UdtTypeDef::new(udt_keyspace.clone(), type_name.clone());
499                    for (field_name, field_type_str) in fields {
500                        // Parse field type
501                        let field_type = crate::schema::CqlType::parse(&field_type_str)?;
502                        udt_def = udt_def.with_field(field_name, field_type, true);
503                    }
504
505                    // Key by fully-qualified name (keyspace.typename) to avoid multi-keyspace collisions
506                    let qualified_name = format!("{}.{}", udt_keyspace, type_name);
507                    udts.insert(qualified_name, udt_def);
508                }
509                Err(e) => {
510                    errors.push(format!(
511                        "Failed to parse CREATE TYPE in {}: {:?}",
512                        path.display(),
513                        e
514                    ));
515                }
516            }
517        }
518
519        // Parse CREATE TABLE statements
520        for stmt in create_table_stmts {
521            match parse_cql_schema(stmt) {
522                Ok(mut table_schema) => {
523                    // Override keyspace with the one from USE statement or CREATE KEYSPACE
524                    // Only override if the table doesn't have an explicit qualified name
525                    if table_schema.keyspace == "default" {
526                        if let Some(ref active_keyspace) = keyspace {
527                            table_schema.keyspace = active_keyspace.clone();
528                        }
529                    }
530
531                    // Update keyspace if not set (from first table's explicit keyspace)
532                    if keyspace.is_none() {
533                        keyspace = Some(table_schema.keyspace.clone());
534                    }
535
536                    // Key by fully-qualified name (keyspace.table) to avoid multi-keyspace collisions
537                    let qualified_name =
538                        format!("{}.{}", table_schema.keyspace, table_schema.table);
539                    tables.insert(qualified_name, table_schema);
540                }
541                Err(e) => {
542                    errors.push(format!(
543                        "Failed to parse CREATE TABLE in {}: {}",
544                        path.display(),
545                        e
546                    ));
547                }
548            }
549        }
550
551        // If there were errors and no successful parses, return error
552        if !errors.is_empty() && tables.is_empty() && udts.is_empty() {
553            return Err(Error::CqlParse(format!(
554                "Failed to parse CQL file {}: {}",
555                path.display(),
556                errors.join("; ")
557            )));
558        }
559
560        // If no tables or UDTs were parsed, and statements exist, treat as error
561        // This catches truly invalid CQL that doesn't match any expected pattern
562        if tables.is_empty() && udts.is_empty() && !statements.is_empty() {
563            // Check if all statements are legitimate "other" types (USE, CREATE KEYSPACE, etc.)
564            // or if there are truly unrecognized/invalid statements
565            let legitimate_keywords = [
566                "use", "create", "alter", "drop", "grant", "revoke", "truncate",
567            ];
568            let has_invalid_statement = statements.iter().any(|stmt| {
569                let normalized = stmt.trim().to_lowercase();
570                let first_word = normalized.split_whitespace().next().unwrap_or("");
571
572                // If it's a CREATE statement that wasn't successfully parsed, it's invalid
573                if normalized.starts_with("create ") {
574                    return true;
575                }
576
577                // If it's not a legitimate keyword, it's invalid
578                !legitimate_keywords.contains(&first_word)
579            });
580
581            // If there are invalid statements, return an error
582            if has_invalid_statement {
583                return Err(Error::CqlParse(format!(
584                    "Failed to parse CQL file {}: No valid CREATE TABLE or CREATE TYPE statements found",
585                    path.display()
586                )));
587            }
588        }
589
590        // Determine final keyspace (use first discovered or default)
591        let final_keyspace = keyspace.unwrap_or_else(|| "default".to_string());
592
593        Ok(Some(ParsedSchema {
594            keyspace: final_keyspace,
595            tables,
596            udts,
597        }))
598    }
599
600    /// Parse a JSON file (either minimal or full format)
601    async fn parse_json_file(&self, path: &Path) -> Result<Option<ParsedSchema>> {
602        let content = std::fs::read_to_string(path)?;
603
604        let json_schema: JsonSchemaFormat = serde_json::from_str(&content)
605            .map_err(|e| Error::schema(format!("Invalid JSON in {}: {}", path.display(), e)))?;
606
607        match json_schema {
608            JsonSchemaFormat::Minimal(minimal) => self.parse_minimal_format(path, minimal).await,
609            JsonSchemaFormat::Full(full) => self.parse_full_format(path, full).await,
610        }
611    }
612
613    /// Parse minimal JSON format (single table)
614    async fn parse_minimal_format(
615        &self,
616        _path: &Path,
617        minimal: MinimalTableSchema,
618    ) -> Result<Option<ParsedSchema>> {
619        let table_schema = self.convert_minimal_to_table_schema(minimal)?;
620        let keyspace = table_schema.keyspace.clone();
621
622        let mut tables = HashMap::new();
623        // Key by fully-qualified name (keyspace.table) to avoid multi-keyspace collisions
624        let qualified_name = format!("{}.{}", table_schema.keyspace, table_schema.table);
625        tables.insert(qualified_name, table_schema);
626
627        Ok(Some(ParsedSchema {
628            keyspace,
629            tables,
630            udts: HashMap::new(),
631        }))
632    }
633
634    /// Parse full JSON format (multiple tables + UDTs)
635    async fn parse_full_format(
636        &self,
637        _path: &Path,
638        full: FullSchema,
639    ) -> Result<Option<ParsedSchema>> {
640        let keyspace = full.keyspace.clone();
641        let mut tables = HashMap::new();
642        let mut udts = HashMap::new();
643
644        // Parse UDTs
645        for udt_json in full.udts {
646            let udt_def = self.convert_json_udt_to_typedef(&keyspace, udt_json)?;
647            // Key by fully-qualified name (keyspace.typename) to avoid multi-keyspace collisions
648            let qualified_name = format!("{}.{}", udt_def.keyspace, udt_def.name);
649            udts.insert(qualified_name, udt_def);
650        }
651
652        // Parse tables
653        for table_json in full.tables {
654            let table_schema = self.convert_json_table_to_table_schema(&keyspace, table_json)?;
655            // Key by fully-qualified name (keyspace.table) to avoid multi-keyspace collisions
656            let qualified_name = format!("{}.{}", table_schema.keyspace, table_schema.table);
657            tables.insert(qualified_name, table_schema);
658        }
659
660        Ok(Some(ParsedSchema {
661            keyspace,
662            tables,
663            udts,
664        }))
665    }
666
667    /// Convert minimal JSON format to TableSchema
668    fn convert_minimal_to_table_schema(&self, minimal: MinimalTableSchema) -> Result<TableSchema> {
669        // Determine partition keys (prefer partition_keys, fallback to primary_key)
670        let partition_key_names = if !minimal.partition_keys.is_empty() {
671            minimal.partition_keys
672        } else if !minimal.primary_key.is_empty() {
673            minimal.primary_key
674        } else {
675            return Err(Error::schema(
676                "Table must have partition_keys or primary_key".to_string(),
677            ));
678        };
679
680        // Build columns
681        let columns: Vec<Column> = minimal
682            .columns
683            .iter()
684            .map(|col| Column {
685                name: col.name.clone(),
686                data_type: col.r#type.clone(),
687                nullable: col.nullable,
688                default: None,
689                is_static: false, // TODO: minimal schema doesn't track static columns yet
690            })
691            .collect();
692
693        // Build partition keys
694        let partition_keys: Vec<KeyColumn> = partition_key_names
695            .iter()
696            .enumerate()
697            .map(|(pos, name)| {
698                let col = minimal
699                    .columns
700                    .iter()
701                    .find(|c| &c.name == name)
702                    .ok_or_else(|| {
703                        Error::schema(format!("Partition key '{}' not found in columns", name))
704                    })?;
705
706                Ok(KeyColumn {
707                    name: col.name.clone(),
708                    data_type: col.r#type.clone(),
709                    position: pos,
710                })
711            })
712            .collect::<Result<Vec<_>>>()?;
713
714        // Build clustering keys
715        let clustering_keys: Vec<ClusteringColumn> = minimal
716            .clustering_keys
717            .iter()
718            .enumerate()
719            .map(|(pos, ck)| ClusteringColumn {
720                name: ck.name.clone(),
721                data_type: ck.r#type.clone(),
722                position: pos,
723                order: ck.order.as_deref().map(|s| s.into()).unwrap_or_default(),
724            })
725            .collect();
726
727        let schema = TableSchema {
728            keyspace: minimal.keyspace,
729            table: minimal.table,
730            partition_keys,
731            clustering_keys,
732            columns,
733            comments: HashMap::new(),
734        };
735
736        schema.validate()?;
737        Ok(schema)
738    }
739
740    /// Convert JSON table to TableSchema
741    fn convert_json_table_to_table_schema(
742        &self,
743        keyspace: &str,
744        table_json: JsonTable,
745    ) -> Result<TableSchema> {
746        let partition_key_names = if !table_json.partition_keys.is_empty() {
747            table_json.partition_keys
748        } else if !table_json.primary_key.is_empty() {
749            table_json.primary_key
750        } else {
751            return Err(Error::schema(format!(
752                "Table '{}' must have partition_keys or primary_key",
753                table_json.name
754            )));
755        };
756
757        let columns: Vec<Column> = table_json
758            .columns
759            .iter()
760            .map(|col| Column {
761                name: col.name.clone(),
762                data_type: col.r#type.clone(),
763                nullable: col.nullable,
764                default: None,
765                is_static: false, // TODO: JSON schema doesn't track static columns yet
766            })
767            .collect();
768
769        let partition_keys: Vec<KeyColumn> = partition_key_names
770            .iter()
771            .enumerate()
772            .map(|(pos, name)| {
773                let col = table_json
774                    .columns
775                    .iter()
776                    .find(|c| &c.name == name)
777                    .ok_or_else(|| {
778                        Error::schema(format!(
779                            "Partition key '{}' not found in columns of table '{}'",
780                            name, table_json.name
781                        ))
782                    })?;
783
784                Ok(KeyColumn {
785                    name: col.name.clone(),
786                    data_type: col.r#type.clone(),
787                    position: pos,
788                })
789            })
790            .collect::<Result<Vec<_>>>()?;
791
792        let clustering_keys: Vec<ClusteringColumn> = table_json
793            .clustering_keys
794            .iter()
795            .enumerate()
796            .map(|(pos, ck)| ClusteringColumn {
797                name: ck.name.clone(),
798                data_type: ck.r#type.clone(),
799                position: pos,
800                order: ck.order.as_deref().map(|s| s.into()).unwrap_or_default(),
801            })
802            .collect();
803
804        let schema = TableSchema {
805            keyspace: keyspace.to_string(),
806            table: table_json.name,
807            partition_keys,
808            clustering_keys,
809            columns,
810            comments: HashMap::new(),
811        };
812
813        schema.validate()?;
814        Ok(schema)
815    }
816
817    /// Convert JSON UDT to UdtTypeDef
818    fn convert_json_udt_to_typedef(&self, keyspace: &str, udt_json: JsonUdt) -> Result<UdtTypeDef> {
819        let mut udt_def = UdtTypeDef::new(keyspace.to_string(), udt_json.name);
820
821        for field in udt_json.fields {
822            let field_type = crate::schema::CqlType::parse(&field.r#type)?;
823            udt_def = udt_def.with_field(field.name, field_type, field.nullable);
824        }
825
826        Ok(udt_def)
827    }
828
829    /// Apply parsed schemas to registries (two-pass: UDTs first, then tables)
830    async fn apply_schemas(&mut self, parsed_schemas: Vec<ParsedSchema>) -> (usize, usize) {
831        // Pass 1: Register all UDTs with last-wins strategy
832        let mut udt_map: HashMap<String, (String, UdtTypeDef)> = HashMap::new(); // key: keyspace.udt_name -> (keyspace, UdtTypeDef)
833
834        for parsed in &parsed_schemas {
835            for (qualified_name, udt_def) in &parsed.udts {
836                // qualified_name is already "keyspace.typename" from parse_cql_file
837                udt_map.insert(
838                    qualified_name.clone(),
839                    (udt_def.keyspace.clone(), udt_def.clone()),
840                );
841            }
842        }
843
844        // Register UDTs in registry
845        let mut udts_loaded = 0;
846        {
847            let mut udt_registry = self.udt_registry.write().await;
848            for (_key, (_keyspace, udt_def)) in udt_map {
849                if self.config.validate_udt_dependencies {
850                    // Validate dependencies exist
851                    if let Err(e) = udt_registry.register_udt_with_validation(udt_def.clone()) {
852                        self.errors.push(SchemaLoadError {
853                            file_path: None,
854                            error_type: LoadErrorType::CircularUdtDependency,
855                            message: format!("UDT validation failed: {}", e),
856                        });
857                        // Check graceful_degradation after UDT validation failure
858                        if !self.config.graceful_degradation {
859                            // Return early with UDTs loaded so far, skip tables
860                            return (udts_loaded, 0);
861                        }
862                        continue;
863                    }
864                } else {
865                    udt_registry.register_udt(udt_def);
866                }
867                udts_loaded += 1;
868            }
869        }
870
871        // Early return after UDT phase if strict mode and errors exist
872        if !self.config.graceful_degradation && !self.errors.is_empty() {
873            return (udts_loaded, 0);
874        }
875
876        // Pass 2: Register all tables with last-wins strategy
877        let mut table_map: HashMap<String, TableSchema> = HashMap::new();
878
879        for parsed in &parsed_schemas {
880            for (qualified_name, table_schema) in &parsed.tables {
881                // qualified_name is already "keyspace.table" from parse_cql_file
882                table_map.insert(qualified_name.clone(), table_schema.clone());
883            }
884        }
885
886        // Register tables in registry
887        let mut tables_loaded = 0;
888        {
889            let registry = self.registry.write().await;
890            for (_key, table_schema) in table_map {
891                match registry
892                    .register_schema(
893                        table_schema.clone(),
894                        crate::schema::registry::SchemaSource::Manual,
895                    )
896                    .await
897                {
898                    Ok(_) => tables_loaded += 1,
899                    Err(e) => {
900                        self.errors.push(SchemaLoadError {
901                            file_path: None,
902                            error_type: LoadErrorType::ValidationFailed,
903                            message: format!(
904                                "Failed to register table '{}.{}': {}",
905                                table_schema.keyspace, table_schema.table, e
906                            ),
907                        });
908                        // Check graceful_degradation after table registration failure
909                        if !self.config.graceful_degradation {
910                            // Return early with counts so far
911                            return (udts_loaded, tables_loaded);
912                        }
913                    }
914                }
915            }
916        }
917
918        (udts_loaded, tables_loaded)
919    }
920
921    /// Build load result from current state
922    fn build_result(&self, schemas_loaded: usize, udts_loaded: usize) -> LoadResult {
923        LoadResult {
924            schemas_loaded,
925            udts_loaded,
926            errors: self.errors.clone(),
927            warnings: self.warnings.clone(),
928        }
929    }
930}
931
932#[cfg(test)]
933mod tests {
934    use super::*;
935    use crate::platform::Platform;
936    use crate::schema::registry::{SchemaRegistry, SchemaRegistryConfig};
937    use crate::Config;
938    use std::io::Write;
939    use tempfile::TempDir;
940
941    async fn setup_test_aggregator() -> (SchemaAggregator, TempDir) {
942        let temp_dir = TempDir::new().unwrap();
943        let config = Config::default();
944        let platform = Arc::new(Platform::new(&config).await.unwrap());
945
946        let registry_config = SchemaRegistryConfig::default();
947        let registry = Arc::new(RwLock::new(
948            SchemaRegistry::new(registry_config, platform, config)
949                .await
950                .unwrap(),
951        ));
952        let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
953
954        let aggregator = SchemaAggregator::new(registry, udt_registry, AggregatorConfig::default());
955
956        (aggregator, temp_dir)
957    }
958
959    fn write_file(dir: &Path, name: &str, content: &str) -> PathBuf {
960        let path = dir.join(name);
961        let mut file = std::fs::File::create(&path).unwrap();
962        file.write_all(content.as_bytes()).unwrap();
963        path
964    }
965
966    #[tokio::test]
967    async fn test_load_single_json_file() {
968        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
969
970        let json_content = r#"
971        {
972            "keyspace": "test_ks",
973            "table": "users",
974            "columns": [
975                {"name": "id", "type": "uuid"},
976                {"name": "name", "type": "text"}
977            ],
978            "partition_keys": ["id"],
979            "clustering_keys": []
980        }
981        "#;
982
983        let json_path = write_file(temp_dir.path(), "users.json", json_content);
984        let result = aggregator.load_from_paths(&[json_path]).await.unwrap();
985
986        assert_eq!(result.schemas_loaded, 1);
987        assert_eq!(result.udts_loaded, 0);
988        assert!(result.errors.is_empty());
989    }
990
991    #[tokio::test]
992    async fn test_load_single_cql_file() {
993        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
994
995        let cql_content = r#"
996        CREATE TABLE test_ks.products (
997            id uuid PRIMARY KEY,
998            name text,
999            price decimal
1000        );
1001        "#;
1002
1003        let cql_path = write_file(temp_dir.path(), "products.cql", cql_content);
1004        let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1005
1006        assert_eq!(result.schemas_loaded, 1);
1007        assert_eq!(result.udts_loaded, 0);
1008        assert!(result.errors.is_empty());
1009    }
1010
1011    #[tokio::test]
1012    async fn test_directory_scanning_lexical_order() {
1013        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1014
1015        // Create files in non-lexical order, verify they're processed lexically
1016        write_file(
1017            temp_dir.path(),
1018            "c_table.json",
1019            r#"{"keyspace":"ks","table":"c","columns":[{"name":"id","type":"uuid"}],"partition_keys":["id"]}"#,
1020        );
1021        write_file(
1022            temp_dir.path(),
1023            "a_table.json",
1024            r#"{"keyspace":"ks","table":"a","columns":[{"name":"id","type":"uuid"}],"partition_keys":["id"]}"#,
1025        );
1026        write_file(
1027            temp_dir.path(),
1028            "b_table.json",
1029            r#"{"keyspace":"ks","table":"b","columns":[{"name":"id","type":"uuid"}],"partition_keys":["id"]}"#,
1030        );
1031
1032        let result = aggregator
1033            .load_from_paths(&[temp_dir.path().to_path_buf()])
1034            .await
1035            .unwrap();
1036
1037        assert_eq!(result.schemas_loaded, 3);
1038        assert!(result.errors.is_empty());
1039    }
1040
1041    #[tokio::test]
1042    async fn test_last_wins_for_duplicate_tables() {
1043        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1044
1045        let first_json = r#"
1046        {
1047            "keyspace": "ks",
1048            "table": "users",
1049            "columns": [
1050                {"name": "id", "type": "uuid"},
1051                {"name": "name", "type": "text"}
1052            ],
1053            "partition_keys": ["id"]
1054        }
1055        "#;
1056
1057        let second_json = r#"
1058        {
1059            "keyspace": "ks",
1060            "table": "users",
1061            "columns": [
1062                {"name": "id", "type": "uuid"},
1063                {"name": "name", "type": "text"},
1064                {"name": "email", "type": "text"}
1065            ],
1066            "partition_keys": ["id"]
1067        }
1068        "#;
1069
1070        let path1 = write_file(temp_dir.path(), "users_v1.json", first_json);
1071        let path2 = write_file(temp_dir.path(), "users_v2.json", second_json);
1072
1073        let result = aggregator.load_from_paths(&[path1, path2]).await.unwrap();
1074
1075        // Last wins, so we should have 1 schema (the second one)
1076        assert_eq!(result.schemas_loaded, 1);
1077
1078        // Verify the schema has 3 columns (from second definition)
1079        let registry = aggregator.registry.read().await;
1080        let schema = registry.get_schema("ks", "users").await.unwrap();
1081        assert_eq!(schema.columns.len(), 3);
1082    }
1083
1084    #[tokio::test]
1085    async fn test_two_pass_udt_then_tables() {
1086        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1087
1088        let full_schema_json = r#"
1089        {
1090            "keyspace": "ks",
1091            "udts": [
1092                {
1093                    "name": "address",
1094                    "fields": [
1095                        {"name": "street", "type": "text"},
1096                        {"name": "city", "type": "text"}
1097                    ]
1098                }
1099            ],
1100            "tables": [
1101                {
1102                    "name": "users",
1103                    "columns": [
1104                        {"name": "id", "type": "uuid"},
1105                        {"name": "addr", "type": "frozen<address>"}
1106                    ],
1107                    "partition_keys": ["id"],
1108                    "clustering_keys": []
1109                }
1110            ]
1111        }
1112        "#;
1113
1114        let path = write_file(temp_dir.path(), "schema.json", full_schema_json);
1115        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1116
1117        assert_eq!(result.schemas_loaded, 1);
1118        assert_eq!(result.udts_loaded, 1);
1119        assert!(result.errors.is_empty());
1120
1121        // Verify UDT was registered
1122        let udt_registry = aggregator.udt_registry.read().await;
1123        assert!(udt_registry.contains_udt("ks", "address"));
1124    }
1125
1126    /// Test for Issue #230: REPL fails when schema directory contains UDT-only JSON files
1127    /// UDT-only files (without "tables" array) should parse successfully
1128    #[tokio::test]
1129    async fn test_udt_only_json_schema_issue_230() {
1130        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1131
1132        // This JSON has "udts" but no "tables" - should work after fix
1133        let udt_only_json = r#"
1134        {
1135            "keyspace": "test_keyspace",
1136            "udts": [
1137                {
1138                    "name": "address_type",
1139                    "fields": [
1140                        { "name": "street", "type": "text" },
1141                        { "name": "city", "type": "text" },
1142                        { "name": "zip", "type": "int" }
1143                    ]
1144                }
1145            ]
1146        }
1147        "#;
1148
1149        let path = write_file(temp_dir.path(), "address.json", udt_only_json);
1150        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1151
1152        // Should load successfully with no errors
1153        assert!(
1154            result.errors.is_empty(),
1155            "Expected no errors but got: {:?}",
1156            result.errors
1157        );
1158        assert_eq!(result.udts_loaded, 1, "Expected 1 UDT to be loaded");
1159        assert_eq!(
1160            result.schemas_loaded, 0,
1161            "Expected 0 tables (UDT-only file)"
1162        );
1163
1164        // Verify UDT was registered
1165        let udt_registry = aggregator.udt_registry.read().await;
1166        assert!(
1167            udt_registry.contains_udt("test_keyspace", "address_type"),
1168            "UDT address_type should be registered in test_keyspace"
1169        );
1170    }
1171
1172    /// Test symmetric case: table-only JSON files (tables without UDTs)
1173    /// This validates that #[serde(default)] works for both fields
1174    #[tokio::test]
1175    async fn test_table_only_json_schema_symmetry() {
1176        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1177
1178        // This JSON has "tables" but no "udts" - symmetric to Issue #230 fix
1179        let table_only_json = r#"
1180        {
1181            "keyspace": "test_keyspace",
1182            "tables": [
1183                {
1184                    "name": "simple_table",
1185                    "columns": [
1186                        { "name": "id", "type": "uuid" },
1187                        { "name": "data", "type": "text" }
1188                    ],
1189                    "partition_keys": ["id"],
1190                    "clustering_keys": []
1191                }
1192            ]
1193        }
1194        "#;
1195
1196        let path = write_file(temp_dir.path(), "table_only.json", table_only_json);
1197        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1198
1199        // Should load successfully with no errors
1200        assert!(
1201            result.errors.is_empty(),
1202            "Expected no errors but got: {:?}",
1203            result.errors
1204        );
1205        assert_eq!(result.udts_loaded, 0, "Expected 0 UDTs (table-only file)");
1206        assert_eq!(result.schemas_loaded, 1, "Expected 1 table to be loaded");
1207
1208        // Verify table was registered
1209        let registry = aggregator.registry.read().await;
1210        assert!(
1211            registry
1212                .get_schema("test_keyspace", "simple_table")
1213                .await
1214                .is_ok(),
1215            "Table simple_table should be registered in test_keyspace"
1216        );
1217    }
1218
1219    #[tokio::test]
1220    async fn test_invalid_json_error_collection() {
1221        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1222
1223        let invalid_json = r#"{"keyspace": "ks", "table": "broken""#; // Missing closing brace
1224
1225        let path = write_file(temp_dir.path(), "broken.json", invalid_json);
1226        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1227
1228        assert_eq!(result.schemas_loaded, 0);
1229        assert!(!result.errors.is_empty());
1230        assert!(matches!(
1231            result.errors[0].error_type,
1232            LoadErrorType::InvalidJson
1233        ));
1234    }
1235
1236    #[tokio::test]
1237    async fn test_minimal_format_with_primary_key_synonym() {
1238        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1239
1240        let json_content = r#"
1241        {
1242            "keyspace": "ks",
1243            "table": "items",
1244            "columns": [
1245                {"name": "id", "type": "uuid"},
1246                {"name": "data", "type": "text"}
1247            ],
1248            "primary_key": ["id"]
1249        }
1250        "#;
1251
1252        let path = write_file(temp_dir.path(), "items.json", json_content);
1253        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1254
1255        assert_eq!(result.schemas_loaded, 1);
1256        assert!(result.errors.is_empty());
1257    }
1258
1259    #[tokio::test]
1260    async fn test_data_type_alias_support() {
1261        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1262
1263        let json_content = r#"
1264        {
1265            "keyspace": "ks",
1266            "table": "legacy",
1267            "columns": [
1268                {"name": "id", "data_type": "uuid"},
1269                {"name": "value", "data_type": "text"}
1270            ],
1271            "partition_keys": ["id"]
1272        }
1273        "#;
1274
1275        let path = write_file(temp_dir.path(), "legacy.json", json_content);
1276        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1277
1278        assert_eq!(result.schemas_loaded, 1);
1279        assert!(result.errors.is_empty());
1280    }
1281
1282    #[tokio::test]
1283    async fn test_error_type_mapping_io_error() {
1284        let (mut aggregator, _temp_dir) = setup_test_aggregator().await;
1285
1286        // Test with a non-existent file to trigger IO error
1287        let non_existent_path = PathBuf::from("/nonexistent/path/schema.json");
1288        let result = aggregator
1289            .load_from_paths(std::slice::from_ref(&non_existent_path))
1290            .await
1291            .unwrap();
1292
1293        assert_eq!(result.schemas_loaded, 0);
1294        assert_eq!(result.errors.len(), 1);
1295        assert!(matches!(
1296            result.errors[0].error_type,
1297            LoadErrorType::FileRead
1298        ));
1299        assert!(result.errors[0]
1300            .message
1301            .contains("Failed to discover files"));
1302    }
1303
1304    #[tokio::test]
1305    async fn test_error_type_mapping_invalid_json() {
1306        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1307
1308        // Test with malformed JSON
1309        let invalid_json = r#"{"keyspace": "ks", "table": "broken", invalid}"#;
1310        let path = write_file(temp_dir.path(), "invalid.json", invalid_json);
1311        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1312
1313        assert_eq!(result.schemas_loaded, 0);
1314        assert_eq!(result.errors.len(), 1);
1315        assert!(matches!(
1316            result.errors[0].error_type,
1317            LoadErrorType::InvalidJson
1318        ));
1319        assert!(result.errors[0].message.contains("Failed to parse file"));
1320        assert!(result.errors[0].message.contains("Invalid JSON"));
1321    }
1322
1323    #[tokio::test]
1324    async fn test_error_type_mapping_invalid_cql() {
1325        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1326
1327        // Test with invalid CQL syntax
1328        let invalid_cql = r#"
1329        CREATE INVALID SYNTAX HERE
1330        id uuid PRIMARY KEY
1331        "#;
1332        let path = write_file(temp_dir.path(), "invalid.cql", invalid_cql);
1333        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1334
1335        assert_eq!(result.schemas_loaded, 0);
1336        assert_eq!(result.errors.len(), 1);
1337        assert!(matches!(
1338            result.errors[0].error_type,
1339            LoadErrorType::InvalidCql
1340        ));
1341        assert!(result.errors[0].message.contains("Failed to parse file"));
1342    }
1343
1344    #[tokio::test]
1345    async fn test_error_message_preservation() {
1346        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1347
1348        // Test that original error messages are preserved
1349        let invalid_json = r#"{"keyspace": "ks""#; // Missing closing brace
1350        let path = write_file(temp_dir.path(), "broken.json", invalid_json);
1351        let result = aggregator
1352            .load_from_paths(std::slice::from_ref(&path))
1353            .await
1354            .unwrap();
1355
1356        assert_eq!(result.errors.len(), 1);
1357        // Error message should contain both "Failed to parse file" and the original error
1358        assert!(result.errors[0].message.contains("Failed to parse file"));
1359        assert!(result.errors[0].message.contains("Invalid JSON"));
1360        // File path should be preserved
1361        assert_eq!(result.errors[0].file_path, Some(path));
1362    }
1363
1364    #[tokio::test]
1365    async fn test_multiple_error_types_in_batch() {
1366        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1367
1368        // Create multiple files with different error types
1369        let invalid_json = r#"{"invalid json"#;
1370        let invalid_cql = r#"INVALID CQL SYNTAX"#;
1371
1372        let json_path = write_file(temp_dir.path(), "bad.json", invalid_json);
1373        let cql_path = write_file(temp_dir.path(), "bad.cql", invalid_cql);
1374
1375        let result = aggregator
1376            .load_from_paths(&[json_path, cql_path])
1377            .await
1378            .unwrap();
1379
1380        assert_eq!(result.schemas_loaded, 0);
1381        assert_eq!(result.errors.len(), 2);
1382
1383        // Find the JSON and CQL errors
1384        let json_error = result
1385            .errors
1386            .iter()
1387            .find(|e| {
1388                e.file_path
1389                    .as_ref()
1390                    .unwrap()
1391                    .to_str()
1392                    .unwrap()
1393                    .ends_with(".json")
1394            })
1395            .unwrap();
1396        let cql_error = result
1397            .errors
1398            .iter()
1399            .find(|e| {
1400                e.file_path
1401                    .as_ref()
1402                    .unwrap()
1403                    .to_str()
1404                    .unwrap()
1405                    .ends_with(".cql")
1406            })
1407            .unwrap();
1408
1409        // Verify correct error types
1410        assert!(matches!(json_error.error_type, LoadErrorType::InvalidJson));
1411        assert!(matches!(cql_error.error_type, LoadErrorType::InvalidCql));
1412    }
1413
1414    #[tokio::test]
1415    #[cfg(unix)]
1416    async fn test_file_read_error_from_parse_file() {
1417        use std::fs;
1418        use std::os::unix::fs::PermissionsExt;
1419
1420        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1421
1422        // Create a file and make it unreadable (Unix-only test)
1423        let json_content =
1424            r#"{"keyspace": "ks", "table": "test", "columns": [], "partition_keys": ["id"]}"#;
1425        let path = write_file(temp_dir.path(), "unreadable.json", json_content);
1426
1427        // Make file unreadable
1428        let mut perms = fs::metadata(&path).unwrap().permissions();
1429        perms.set_mode(0o000);
1430        fs::set_permissions(&path, perms).unwrap();
1431
1432        // Privileged users (e.g. uid 0 in containerized CI) bypass file
1433        // permissions, so the read-error precondition cannot be created.
1434        if fs::File::open(&path).is_ok() {
1435            return;
1436        }
1437
1438        let result = aggregator
1439            .load_from_paths(std::slice::from_ref(&path))
1440            .await
1441            .unwrap();
1442
1443        // Restore permissions for cleanup
1444        let mut perms = fs::metadata(&path).unwrap().permissions();
1445        perms.set_mode(0o644);
1446        let _ = fs::set_permissions(&path, perms);
1447
1448        // Should have an IO error
1449        assert_eq!(result.schemas_loaded, 0);
1450        assert_eq!(result.errors.len(), 1);
1451        assert!(matches!(
1452            result.errors[0].error_type,
1453            LoadErrorType::FileRead
1454        ));
1455    }
1456
1457    #[tokio::test]
1458    async fn test_multi_statement_cql_file_with_create_type_and_create_table() {
1459        // Create aggregator without UDT validation to avoid dependency ordering issues
1460        let temp_dir = TempDir::new().unwrap();
1461        let config = Config::default();
1462        let platform = Arc::new(Platform::new(&config).await.unwrap());
1463
1464        let registry_config = SchemaRegistryConfig::default();
1465        let registry = Arc::new(RwLock::new(
1466            SchemaRegistry::new(registry_config, platform, config)
1467                .await
1468                .unwrap(),
1469        ));
1470        let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1471
1472        let mut aggregator = SchemaAggregator::new(
1473            registry,
1474            udt_registry,
1475            AggregatorConfig {
1476                graceful_degradation: true,
1477                validate_udt_dependencies: false, // Disable to avoid HashMap ordering issues
1478            },
1479        );
1480
1481        // Multi-statement CQL file with CREATE TYPE and CREATE TABLE
1482        let cql_content = r#"
1483        -- Test schema with UDTs
1484        CREATE TYPE test_ks.address (
1485            street text,
1486            city text,
1487            zip_code int
1488        );
1489
1490        CREATE TYPE test_ks.contact_info (
1491            email text,
1492            phone text,
1493            address address
1494        );
1495
1496        CREATE TABLE test_ks.users (
1497            id uuid PRIMARY KEY,
1498            name text,
1499            contact contact_info
1500        );
1501        "#;
1502
1503        let cql_path = write_file(temp_dir.path(), "schema.cql", cql_content);
1504        let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1505
1506        // Verify both UDTs and table were loaded
1507        assert_eq!(result.udts_loaded, 2, "Expected 2 UDTs to be loaded");
1508        assert_eq!(result.schemas_loaded, 1, "Expected 1 table to be loaded");
1509        assert!(
1510            result.errors.is_empty(),
1511            "Expected no errors, got: {:?}",
1512            result.errors
1513        );
1514
1515        // Verify UDTs were registered
1516        let udt_registry = aggregator.udt_registry.read().await;
1517        assert!(
1518            udt_registry.contains_udt("test_ks", "address"),
1519            "address UDT should be registered"
1520        );
1521        assert!(
1522            udt_registry.contains_udt("test_ks", "contact_info"),
1523            "contact_info UDT should be registered"
1524        );
1525
1526        // Verify table was registered
1527        let registry = aggregator.registry.read().await;
1528        let schema = registry.get_schema("test_ks", "users").await.unwrap();
1529        assert_eq!(schema.table, "users");
1530        assert_eq!(schema.columns.len(), 3);
1531    }
1532
1533    #[tokio::test]
1534    #[ignore = "Test fails due to UDT dependency validation not implemented - see Issue #117 review"]
1535    async fn test_cql_file_with_comments_and_semicolons() {
1536        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1537
1538        // Test edge cases: comments with semicolons
1539        let cql_content = r#"
1540        -- This is a comment with ; semicolon
1541        CREATE TYPE test_ks.metadata (
1542            key text,
1543            value text
1544        );
1545
1546        /* Multi-line comment
1547           with ; semicolon */
1548        CREATE TABLE test_ks.data (
1549            id uuid PRIMARY KEY,
1550            info metadata
1551        );
1552        "#;
1553
1554        let cql_path = write_file(temp_dir.path(), "edge_cases.cql", cql_content);
1555        let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1556
1557        assert_eq!(result.udts_loaded, 1);
1558        assert_eq!(result.schemas_loaded, 1);
1559        assert!(result.errors.is_empty());
1560    }
1561
1562    #[tokio::test]
1563    async fn test_backward_compat_single_create_table() {
1564        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1565
1566        // Ensure backward compatibility with single CREATE TABLE files
1567        let cql_content = r#"
1568        CREATE TABLE test_ks.simple (
1569            id uuid PRIMARY KEY,
1570            data text
1571        );
1572        "#;
1573
1574        let cql_path = write_file(temp_dir.path(), "simple.cql", cql_content);
1575        let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1576
1577        assert_eq!(result.schemas_loaded, 1);
1578        assert_eq!(result.udts_loaded, 0);
1579        assert!(result.errors.is_empty());
1580    }
1581
1582    #[tokio::test]
1583    async fn test_graceful_degradation_false_fails_on_invalid_json() {
1584        let temp_dir = TempDir::new().unwrap();
1585        let config = Config::default();
1586        let platform = Arc::new(Platform::new(&config).await.unwrap());
1587
1588        let registry_config = SchemaRegistryConfig::default();
1589        let registry = Arc::new(RwLock::new(
1590            SchemaRegistry::new(registry_config, platform, config)
1591                .await
1592                .unwrap(),
1593        ));
1594        let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1595
1596        // Create aggregator with strict mode (graceful_degradation = false)
1597        let mut aggregator = SchemaAggregator::new(
1598            registry,
1599            udt_registry,
1600            AggregatorConfig {
1601                graceful_degradation: false,
1602                validate_udt_dependencies: true,
1603            },
1604        );
1605
1606        // Create invalid JSON file followed by valid JSON file
1607        let invalid_json = r#"{"keyspace": "ks", "table": "broken""#; // Missing closing brace
1608        let valid_json = r#"
1609        {
1610            "keyspace": "ks",
1611            "table": "valid_table",
1612            "columns": [
1613                {"name": "id", "type": "uuid"}
1614            ],
1615            "partition_keys": ["id"]
1616        }
1617        "#;
1618
1619        let invalid_path = write_file(temp_dir.path(), "01_invalid.json", invalid_json);
1620        let valid_path = write_file(temp_dir.path(), "02_valid.json", valid_json);
1621
1622        let result = aggregator
1623            .load_from_paths(&[invalid_path, valid_path])
1624            .await
1625            .unwrap();
1626
1627        // In strict mode, should fail immediately after first error
1628        assert_eq!(result.schemas_loaded, 0);
1629        assert_eq!(result.udts_loaded, 0);
1630        assert!(!result.errors.is_empty());
1631        assert!(matches!(
1632            result.errors[0].error_type,
1633            LoadErrorType::InvalidJson
1634        ));
1635    }
1636
1637    #[tokio::test]
1638    async fn test_graceful_degradation_true_continues_after_error() {
1639        let temp_dir = TempDir::new().unwrap();
1640        let config = Config::default();
1641        let platform = Arc::new(Platform::new(&config).await.unwrap());
1642
1643        let registry_config = SchemaRegistryConfig::default();
1644        let registry = Arc::new(RwLock::new(
1645            SchemaRegistry::new(registry_config, platform, config)
1646                .await
1647                .unwrap(),
1648        ));
1649        let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1650
1651        // Create aggregator with graceful mode (graceful_degradation = true)
1652        let mut aggregator = SchemaAggregator::new(
1653            registry,
1654            udt_registry,
1655            AggregatorConfig {
1656                graceful_degradation: true,
1657                validate_udt_dependencies: true,
1658            },
1659        );
1660
1661        // Create invalid JSON file followed by valid JSON file
1662        let invalid_json = r#"{"keyspace": "ks", "table": "broken""#; // Missing closing brace
1663        let valid_json = r#"
1664        {
1665            "keyspace": "ks",
1666            "table": "valid_table",
1667            "columns": [
1668                {"name": "id", "type": "uuid"}
1669            ],
1670            "partition_keys": ["id"]
1671        }
1672        "#;
1673
1674        let invalid_path = write_file(temp_dir.path(), "01_invalid.json", invalid_json);
1675        let valid_path = write_file(temp_dir.path(), "02_valid.json", valid_json);
1676
1677        let result = aggregator
1678            .load_from_paths(&[invalid_path, valid_path])
1679            .await
1680            .unwrap();
1681
1682        // In graceful mode, should continue and load valid table
1683        assert_eq!(result.schemas_loaded, 1);
1684        assert_eq!(result.udts_loaded, 0);
1685        assert_eq!(result.errors.len(), 1); // Should collect the error but continue
1686        assert!(matches!(
1687            result.errors[0].error_type,
1688            LoadErrorType::InvalidJson
1689        ));
1690    }
1691
1692    #[tokio::test]
1693    #[ignore = "Test fails because register_udt_with_validation does not catch invalid UDT references - pre-existing limitation"]
1694    async fn test_graceful_degradation_false_fails_on_invalid_udt() {
1695        let temp_dir = TempDir::new().unwrap();
1696        let config = Config::default();
1697        let platform = Arc::new(Platform::new(&config).await.unwrap());
1698
1699        let registry_config = SchemaRegistryConfig::default();
1700        let registry = Arc::new(RwLock::new(
1701            SchemaRegistry::new(registry_config, platform, config)
1702                .await
1703                .unwrap(),
1704        ));
1705        let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1706
1707        // Create aggregator with strict mode
1708        let mut aggregator = SchemaAggregator::new(
1709            registry,
1710            udt_registry,
1711            AggregatorConfig {
1712                graceful_degradation: false,
1713                validate_udt_dependencies: true,
1714            },
1715        );
1716
1717        // Create schema with UDT that references non-existent UDT
1718        let schema_with_invalid_udt = r#"
1719        {
1720            "keyspace": "ks",
1721            "udts": [
1722                {
1723                    "name": "user_type",
1724                    "fields": [
1725                        {"name": "addr", "type": "frozen<nonexistent_udt>"}
1726                    ]
1727                }
1728            ],
1729            "tables": [
1730                {
1731                    "name": "users",
1732                    "columns": [
1733                        {"name": "id", "type": "uuid"},
1734                        {"name": "data", "type": "text"}
1735                    ],
1736                    "partition_keys": ["id"]
1737                }
1738            ]
1739        }
1740        "#;
1741
1742        let path = write_file(temp_dir.path(), "schema.json", schema_with_invalid_udt);
1743        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1744
1745        // In strict mode, should fail on UDT validation and NOT load tables
1746        // NOTE: This test currently fails because register_udt_with_validation
1747        // does not validate nested UDT references. This is a pre-existing limitation.
1748        // TODO: Implement proper UDT dependency validation before enabling this test.
1749        assert_eq!(result.schemas_loaded, 0); // Tables should NOT be loaded
1750        assert_eq!(result.udts_loaded, 0); // UDT should not be loaded
1751        assert!(!result.errors.is_empty());
1752        // Error should be about circular/missing dependency
1753        assert!(matches!(
1754            result.errors[0].error_type,
1755            LoadErrorType::CircularUdtDependency
1756        ));
1757    }
1758
1759    #[tokio::test]
1760    #[ignore = "Test fails because register_udt_with_validation does not catch invalid UDT references - pre-existing limitation"]
1761    async fn test_graceful_degradation_true_loads_tables_despite_invalid_udt() {
1762        let temp_dir = TempDir::new().unwrap();
1763        let config = Config::default();
1764        let platform = Arc::new(Platform::new(&config).await.unwrap());
1765
1766        let registry_config = SchemaRegistryConfig::default();
1767        let registry = Arc::new(RwLock::new(
1768            SchemaRegistry::new(registry_config, platform, config)
1769                .await
1770                .unwrap(),
1771        ));
1772        let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1773
1774        // Create aggregator with graceful mode
1775        let mut aggregator = SchemaAggregator::new(
1776            registry,
1777            udt_registry,
1778            AggregatorConfig {
1779                graceful_degradation: true,
1780                validate_udt_dependencies: true,
1781            },
1782        );
1783
1784        // Create schema with UDT that references non-existent UDT
1785        let schema_with_invalid_udt = r#"
1786        {
1787            "keyspace": "ks",
1788            "udts": [
1789                {
1790                    "name": "user_type",
1791                    "fields": [
1792                        {"name": "addr", "type": "frozen<nonexistent_udt>"}
1793                    ]
1794                }
1795            ],
1796            "tables": [
1797                {
1798                    "name": "users",
1799                    "columns": [
1800                        {"name": "id", "type": "uuid"},
1801                        {"name": "data", "type": "text"}
1802                    ],
1803                    "partition_keys": ["id"]
1804                }
1805            ]
1806        }
1807        "#;
1808
1809        let path = write_file(temp_dir.path(), "schema.json", schema_with_invalid_udt);
1810        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1811
1812        // In graceful mode, should continue and load tables despite UDT failure
1813        assert_eq!(result.schemas_loaded, 1); // Table SHOULD be loaded
1814        assert_eq!(result.udts_loaded, 0); // UDT should not be loaded
1815        assert_eq!(result.errors.len(), 1); // Should collect the error
1816        assert!(matches!(
1817            result.errors[0].error_type,
1818            LoadErrorType::CircularUdtDependency
1819        ));
1820    }
1821
1822    #[tokio::test]
1823    async fn test_multi_keyspace_cql_file_no_collision() {
1824        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1825
1826        // CQL file with UDTs and tables from TWO different keyspaces
1827        let cql_content = r#"
1828        CREATE TYPE ks_a.address (
1829            street text,
1830            city text
1831        );
1832
1833        CREATE TYPE ks_b.address (
1834            country text,
1835            postal_code text
1836        );
1837
1838        CREATE TABLE ks_a.users (
1839            id uuid PRIMARY KEY,
1840            addr frozen<address>
1841        );
1842
1843        CREATE TABLE ks_b.customers (
1844            id uuid PRIMARY KEY,
1845            location frozen<address>
1846        );
1847        "#;
1848
1849        let cql_path = write_file(temp_dir.path(), "multi_ks.cql", cql_content);
1850        let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1851
1852        // Both UDTs should be loaded (no collision)
1853        assert_eq!(
1854            result.udts_loaded, 2,
1855            "Expected 2 UDTs from different keyspaces"
1856        );
1857        assert_eq!(
1858            result.schemas_loaded, 2,
1859            "Expected 2 tables from different keyspaces"
1860        );
1861        assert!(
1862            result.errors.is_empty(),
1863            "Expected no errors, got: {:?}",
1864            result.errors
1865        );
1866
1867        // Verify both UDTs are registered with correct keyspaces
1868        let udt_registry = aggregator.udt_registry.read().await;
1869        assert!(
1870            udt_registry.contains_udt("ks_a", "address"),
1871            "ks_a.address should be registered"
1872        );
1873        assert!(
1874            udt_registry.contains_udt("ks_b", "address"),
1875            "ks_b.address should be registered"
1876        );
1877
1878        // Verify both tables are registered with correct keyspaces
1879        let registry = aggregator.registry.read().await;
1880        let schema_a = registry.get_schema("ks_a", "users").await.unwrap();
1881        assert_eq!(schema_a.keyspace, "ks_a");
1882        assert_eq!(schema_a.table, "users");
1883
1884        let schema_b = registry.get_schema("ks_b", "customers").await.unwrap();
1885        assert_eq!(schema_b.keyspace, "ks_b");
1886        assert_eq!(schema_b.table, "customers");
1887    }
1888
1889    #[tokio::test]
1890    async fn test_error_schema_validation_not_mislabeled_as_file_read() {
1891        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1892
1893        // JSON file with structural validation error (missing partition_keys)
1894        let invalid_schema = r#"
1895        {
1896            "keyspace": "ks",
1897            "table": "broken_table",
1898            "columns": [
1899                {"name": "id", "type": "uuid"},
1900                {"name": "data", "type": "text"}
1901            ]
1902        }
1903        "#;
1904
1905        let path = write_file(temp_dir.path(), "invalid_schema.json", invalid_schema);
1906        let result = aggregator.load_from_paths(&[path]).await.unwrap();
1907
1908        // Should fail with ValidationFailed, NOT FileRead
1909        assert_eq!(result.schemas_loaded, 0);
1910        assert!(!result.errors.is_empty());
1911        assert!(
1912            matches!(result.errors[0].error_type, LoadErrorType::ValidationFailed),
1913            "Expected ValidationFailed for missing partition_keys, got: {:?}",
1914            result.errors[0].error_type
1915        );
1916        assert!(
1917            result.errors[0].message.contains("partition_keys")
1918                || result.errors[0].message.contains("primary_key"),
1919            "Error message should mention missing keys: {}",
1920            result.errors[0].message
1921        );
1922    }
1923
1924    #[tokio::test]
1925    async fn test_multi_keyspace_json_files_no_collision() {
1926        let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1927
1928        // First JSON file for keyspace ks_a
1929        let json_ks_a = r#"
1930        {
1931            "keyspace": "ks_a",
1932            "udts": [
1933                {
1934                    "name": "address",
1935                    "fields": [
1936                        {"name": "street", "type": "text"},
1937                        {"name": "city", "type": "text"}
1938                    ]
1939                }
1940            ],
1941            "tables": [
1942                {
1943                    "name": "users",
1944                    "columns": [
1945                        {"name": "id", "type": "uuid"},
1946                        {"name": "name", "type": "text"}
1947                    ],
1948                    "partition_keys": ["id"]
1949                }
1950            ]
1951        }
1952        "#;
1953
1954        // Second JSON file for keyspace ks_b with SAME UDT and table names
1955        let json_ks_b = r#"
1956        {
1957            "keyspace": "ks_b",
1958            "udts": [
1959                {
1960                    "name": "address",
1961                    "fields": [
1962                        {"name": "country", "type": "text"},
1963                        {"name": "postal_code", "type": "text"}
1964                    ]
1965                }
1966            ],
1967            "tables": [
1968                {
1969                    "name": "users",
1970                    "columns": [
1971                        {"name": "id", "type": "uuid"},
1972                        {"name": "email", "type": "text"}
1973                    ],
1974                    "partition_keys": ["id"]
1975                }
1976            ]
1977        }
1978        "#;
1979
1980        let path_a = write_file(temp_dir.path(), "ks_a.json", json_ks_a);
1981        let path_b = write_file(temp_dir.path(), "ks_b.json", json_ks_b);
1982
1983        let result = aggregator.load_from_paths(&[path_a, path_b]).await.unwrap();
1984
1985        // Both UDTs and both tables should be loaded (no collision)
1986        assert_eq!(
1987            result.udts_loaded, 2,
1988            "Expected 2 UDTs from different keyspaces"
1989        );
1990        assert_eq!(
1991            result.schemas_loaded, 2,
1992            "Expected 2 tables from different keyspaces"
1993        );
1994        assert!(
1995            result.errors.is_empty(),
1996            "Expected no errors, got: {:?}",
1997            result.errors
1998        );
1999
2000        // Verify both UDTs are registered with correct keyspaces
2001        let udt_registry = aggregator.udt_registry.read().await;
2002        assert!(
2003            udt_registry.contains_udt("ks_a", "address"),
2004            "ks_a.address should be registered"
2005        );
2006        assert!(
2007            udt_registry.contains_udt("ks_b", "address"),
2008            "ks_b.address should be registered"
2009        );
2010
2011        // Verify both tables are registered with correct keyspaces and different columns
2012        let registry = aggregator.registry.read().await;
2013        let schema_a = registry.get_schema("ks_a", "users").await.unwrap();
2014        assert_eq!(schema_a.keyspace, "ks_a");
2015        assert_eq!(schema_a.table, "users");
2016        assert!(
2017            schema_a.columns.iter().any(|c| c.name == "name"),
2018            "ks_a.users should have 'name' column"
2019        );
2020
2021        let schema_b = registry.get_schema("ks_b", "users").await.unwrap();
2022        assert_eq!(schema_b.keyspace, "ks_b");
2023        assert_eq!(schema_b.table, "users");
2024        assert!(
2025            schema_b.columns.iter().any(|c| c.name == "email"),
2026            "ks_b.users should have 'email' column"
2027        );
2028    }
2029}