1use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11use crate::error::{Error, Result};
12use crate::schema::{
13 cql_parser::{classify_statement, parse_create_type, split_cql_statements, StatementType},
14 parse_cql_schema, ClusteringColumn, Column, KeyColumn, TableSchema, UdtRegistry,
15};
16use crate::types::UdtTypeDef;
17
18#[allow(unused_imports)]
19use crate::schema::cql_parser;
20
21#[derive(Debug, Clone)]
23pub struct AggregatorConfig {
24 pub graceful_degradation: bool,
26 pub validate_udt_dependencies: bool,
28}
29
30impl Default for AggregatorConfig {
31 fn default() -> Self {
32 Self {
33 graceful_degradation: true,
34 validate_udt_dependencies: true,
35 }
36 }
37}
38
39pub struct SchemaAggregator {
41 registry: Arc<RwLock<crate::schema::registry::SchemaRegistry>>,
43 udt_registry: Arc<RwLock<UdtRegistry>>,
45 config: AggregatorConfig,
47 errors: Vec<SchemaLoadError>,
49 warnings: Vec<SchemaLoadWarning>,
51}
52
53#[derive(Debug, Clone)]
55pub struct LoadResult {
56 pub schemas_loaded: usize,
58 pub udts_loaded: usize,
60 pub errors: Vec<SchemaLoadError>,
62 pub warnings: Vec<SchemaLoadWarning>,
64}
65
66#[derive(Debug, Clone)]
68pub struct SchemaLoadError {
69 pub file_path: Option<PathBuf>,
71 pub error_type: LoadErrorType,
73 pub message: String,
75}
76
77#[derive(Debug, Clone)]
79pub enum LoadErrorType {
80 FileRead,
82 InvalidJson,
84 InvalidCql,
86 MissingUdtDependency,
88 CircularUdtDependency,
90 ValidationFailed,
92 InvalidFileFormat,
94}
95
96#[derive(Debug, Clone)]
98pub struct SchemaLoadWarning {
99 pub file_path: Option<PathBuf>,
101 pub message: String,
103}
104
105#[derive(Debug, Clone)]
107struct ParsedSchema {
108 #[allow(dead_code)]
110 keyspace: String,
111 tables: HashMap<String, TableSchema>,
113 udts: HashMap<String, UdtTypeDef>,
115}
116
117#[derive(Debug, serde::Deserialize)]
119#[serde(untagged)]
120enum JsonSchemaFormat {
121 Minimal(MinimalTableSchema),
123 Full(FullSchema),
125}
126
127#[derive(Debug, serde::Deserialize)]
129struct MinimalTableSchema {
130 keyspace: String,
131 table: String,
132 columns: Vec<JsonColumn>,
133 #[serde(default)]
134 partition_keys: Vec<String>,
135 #[serde(default)]
136 primary_key: Vec<String>, #[serde(default)]
138 clustering_keys: Vec<JsonClusteringKey>,
139}
140
141#[derive(Debug, serde::Deserialize)]
144struct FullSchema {
145 keyspace: String,
146 #[serde(default)]
147 udts: Vec<JsonUdt>,
148 #[serde(default)]
149 tables: Vec<JsonTable>,
150}
151
152#[derive(Debug, serde::Deserialize)]
154struct JsonTable {
155 name: String,
156 columns: Vec<JsonColumn>,
157 #[serde(default)]
158 partition_keys: Vec<String>,
159 #[serde(default)]
160 primary_key: Vec<String>,
161 #[serde(default)]
162 clustering_keys: Vec<JsonClusteringKey>,
163}
164
165#[derive(Debug, serde::Deserialize)]
167struct JsonColumn {
168 name: String,
169 #[serde(alias = "data_type")]
170 r#type: String,
171 #[serde(default)]
172 nullable: bool,
173}
174
175#[derive(Debug, serde::Deserialize)]
177struct JsonClusteringKey {
178 name: String,
179 #[serde(alias = "data_type")]
180 r#type: String,
181 #[serde(default)]
182 order: Option<String>,
183}
184
185#[derive(Debug, serde::Deserialize)]
187struct JsonUdt {
188 name: String,
189 fields: Vec<JsonUdtField>,
190}
191
192#[derive(Debug, serde::Deserialize)]
194struct JsonUdtField {
195 name: String,
196 #[serde(alias = "data_type")]
197 r#type: String,
198 #[serde(default = "default_nullable")]
199 nullable: bool,
200}
201
202fn default_nullable() -> bool {
203 true
204}
205
206fn extract_use_keyspace(statement: &str) -> Option<String> {
210 let normalized = statement.trim().to_lowercase();
211 if !normalized.starts_with("use ") {
212 return None;
213 }
214
215 let after_use = statement.trim()[4..].trim();
217 let mut ks_name = after_use.trim_end_matches(';').trim();
218
219 if ks_name.starts_with('"') && ks_name.ends_with('"') && ks_name.len() > 1 {
221 ks_name = &ks_name[1..ks_name.len() - 1];
222 }
223
224 if ks_name.is_empty() {
225 None
226 } else {
227 Some(ks_name.to_string())
228 }
229}
230
231fn extract_create_keyspace_name(statement: &str) -> Option<String> {
235 let normalized = statement.trim().to_lowercase();
236 if !normalized.starts_with("create keyspace") {
237 return None;
238 }
239
240 let words: Vec<&str> = statement.split_whitespace().collect();
242
243 let start_idx = if words.len() > 2 && words[2].eq_ignore_ascii_case("if") {
245 5 } else {
247 2 };
249
250 if words.len() > start_idx {
251 let mut ks_name = words[start_idx].trim();
252
253 if ks_name.starts_with('"') && ks_name.ends_with('"') && ks_name.len() > 1 {
255 ks_name = &ks_name[1..ks_name.len() - 1];
256 }
257
258 Some(ks_name.to_string())
259 } else {
260 None
261 }
262}
263
264impl SchemaAggregator {
265 pub fn new(
267 registry: Arc<RwLock<crate::schema::registry::SchemaRegistry>>,
268 udt_registry: Arc<RwLock<UdtRegistry>>,
269 config: AggregatorConfig,
270 ) -> Self {
271 Self {
272 registry,
273 udt_registry,
274 config,
275 errors: Vec::new(),
276 warnings: Vec::new(),
277 }
278 }
279
280 pub async fn load_from_paths(&mut self, paths: &[PathBuf]) -> Result<LoadResult> {
282 self.errors.clear();
283 self.warnings.clear();
284
285 let mut all_files = Vec::new();
287 for path in paths {
288 if let Err(e) = self.discover_files(path, &mut all_files) {
289 self.errors.push(SchemaLoadError {
290 file_path: Some(path.clone()),
291 error_type: LoadErrorType::FileRead,
292 message: format!("Failed to discover files: {}", e),
293 });
294 }
295 }
296
297 if all_files.is_empty() && !self.errors.is_empty() {
298 return Ok(self.build_result(0, 0));
299 }
300
301 let mut parsed_schemas = Vec::new();
303 for file_path in &all_files {
304 match self.parse_file(file_path).await {
305 Ok(Some(schema)) => parsed_schemas.push(schema),
306 Ok(None) => {} Err(e) => {
308 let error_type = match &e {
310 Error::Io(_) => LoadErrorType::FileRead,
311 Error::CqlParse(_) => LoadErrorType::InvalidCql,
312 Error::Schema(_) => {
313 let msg = e.to_string();
316 if msg.contains("Invalid JSON")
317 || msg.contains("JSON")
318 || msg.contains("json")
319 {
320 LoadErrorType::InvalidJson
321 } else {
322 LoadErrorType::ValidationFailed
324 }
325 }
326 _ => {
327 let msg = e.to_string();
329 if msg.contains("JSON") || msg.contains("json") {
330 LoadErrorType::InvalidJson
331 } else if msg.contains("CQL") || msg.contains("parse") {
332 LoadErrorType::InvalidCql
333 } else {
334 LoadErrorType::ValidationFailed
336 }
337 }
338 };
339 self.errors.push(SchemaLoadError {
340 file_path: Some(file_path.clone()),
341 error_type,
342 message: format!("Failed to parse file: {}", e),
343 });
344 if !self.config.graceful_degradation {
346 return Ok(self.build_result(0, 0));
347 }
348 }
349 }
350 }
351
352 if !self.config.graceful_degradation && !self.errors.is_empty() {
354 return Ok(self.build_result(0, 0));
355 }
356
357 let (udts_loaded, tables_loaded) = self.apply_schemas(parsed_schemas).await;
359
360 Ok(self.build_result(tables_loaded, udts_loaded))
361 }
362
363 fn discover_files(&mut self, path: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
365 if !path.exists() {
366 return Err(Error::InvalidPath(format!(
367 "Path does not exist: {}",
368 path.display()
369 )));
370 }
371
372 if path.is_file() {
373 if let Some(ext) = path.extension() {
375 let ext_str = ext.to_string_lossy().to_lowercase();
376 if ext_str == "cql" || ext_str == "json" {
377 files.push(path.to_path_buf());
378 } else {
379 self.warnings.push(SchemaLoadWarning {
380 file_path: Some(path.to_path_buf()),
381 message: format!("Skipping file with unsupported extension: {}", ext_str),
382 });
383 }
384 }
385 } else if path.is_dir() {
386 self.scan_directory_recursive(path, files)?;
388 }
389
390 Ok(())
391 }
392
393 #[allow(clippy::only_used_in_recursion)]
395 fn scan_directory_recursive(&mut self, dir: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
396 let mut entries: Vec<PathBuf> = std::fs::read_dir(dir)
397 .map_err(Error::Io)?
398 .filter_map(|entry| entry.ok().map(|e| e.path()))
399 .collect();
400
401 entries.sort();
403
404 for entry in entries {
405 if entry.is_file() {
406 if let Some(ext) = entry.extension() {
407 let ext_str = ext.to_string_lossy().to_lowercase();
408 if ext_str == "cql" || ext_str == "json" {
409 files.push(entry);
410 }
411 }
412 } else if entry.is_dir() {
413 self.scan_directory_recursive(&entry, files)?;
414 }
415 }
416
417 Ok(())
418 }
419
420 async fn parse_file(&self, path: &Path) -> Result<Option<ParsedSchema>> {
422 let ext = path
423 .extension()
424 .ok_or_else(|| Error::InvalidPath("File has no extension".to_string()))?;
425
426 let ext_str = ext.to_string_lossy().to_lowercase();
427
428 match ext_str.as_str() {
429 "cql" => self.parse_cql_file(path).await,
430 "json" => self.parse_json_file(path).await,
431 _ => Err(Error::InvalidPath(format!(
432 "Unsupported file extension: {}",
433 ext_str
434 ))),
435 }
436 }
437
438 async fn parse_cql_file(&self, path: &Path) -> Result<Option<ParsedSchema>> {
440 let content = std::fs::read_to_string(path)?;
441
442 let statements = split_cql_statements(&content);
444
445 if statements.is_empty() {
446 return Ok(None);
447 }
448
449 let mut keyspace: Option<String> = None;
450 let mut tables = HashMap::new();
451 let mut udts = HashMap::new();
452 let mut errors = Vec::new();
453
454 let mut create_type_stmts = Vec::new();
456 let mut create_table_stmts = Vec::new();
457
458 for statement in &statements {
459 match classify_statement(statement) {
460 StatementType::CreateType => create_type_stmts.push(statement.as_str()),
461 StatementType::CreateTable => create_table_stmts.push(statement.as_str()),
462 StatementType::Other(ref kind) if kind == "use" => {
463 if let Some(ks_name) = extract_use_keyspace(statement) {
465 keyspace = Some(ks_name);
466 }
467 }
468 StatementType::Other(ref kind) if kind == "create" => {
469 if let Some(ks_name) = extract_create_keyspace_name(statement) {
471 if keyspace.is_none() {
473 keyspace = Some(ks_name);
474 }
475 }
476 }
477 StatementType::Other(_kind) => {
478 }
480 }
481 }
482
483 for stmt in create_type_stmts {
485 match parse_create_type(stmt) {
486 Ok((_, (type_name, type_keyspace, fields))) => {
487 let udt_keyspace = type_keyspace.unwrap_or_else(|| {
489 keyspace.clone().unwrap_or_else(|| "default".to_string())
490 });
491
492 if keyspace.is_none() {
494 keyspace = Some(udt_keyspace.clone());
495 }
496
497 let mut udt_def = UdtTypeDef::new(udt_keyspace.clone(), type_name.clone());
499 for (field_name, field_type_str) in fields {
500 let field_type = crate::schema::CqlType::parse(&field_type_str)?;
502 udt_def = udt_def.with_field(field_name, field_type, true);
503 }
504
505 let qualified_name = format!("{}.{}", udt_keyspace, type_name);
507 udts.insert(qualified_name, udt_def);
508 }
509 Err(e) => {
510 errors.push(format!(
511 "Failed to parse CREATE TYPE in {}: {:?}",
512 path.display(),
513 e
514 ));
515 }
516 }
517 }
518
519 for stmt in create_table_stmts {
521 match parse_cql_schema(stmt) {
522 Ok(mut table_schema) => {
523 if table_schema.keyspace == "default" {
526 if let Some(ref active_keyspace) = keyspace {
527 table_schema.keyspace = active_keyspace.clone();
528 }
529 }
530
531 if keyspace.is_none() {
533 keyspace = Some(table_schema.keyspace.clone());
534 }
535
536 let qualified_name =
538 format!("{}.{}", table_schema.keyspace, table_schema.table);
539 tables.insert(qualified_name, table_schema);
540 }
541 Err(e) => {
542 errors.push(format!(
543 "Failed to parse CREATE TABLE in {}: {}",
544 path.display(),
545 e
546 ));
547 }
548 }
549 }
550
551 if !errors.is_empty() && tables.is_empty() && udts.is_empty() {
553 return Err(Error::CqlParse(format!(
554 "Failed to parse CQL file {}: {}",
555 path.display(),
556 errors.join("; ")
557 )));
558 }
559
560 if tables.is_empty() && udts.is_empty() && !statements.is_empty() {
563 let legitimate_keywords = [
566 "use", "create", "alter", "drop", "grant", "revoke", "truncate",
567 ];
568 let has_invalid_statement = statements.iter().any(|stmt| {
569 let normalized = stmt.trim().to_lowercase();
570 let first_word = normalized.split_whitespace().next().unwrap_or("");
571
572 if normalized.starts_with("create ") {
574 return true;
575 }
576
577 !legitimate_keywords.contains(&first_word)
579 });
580
581 if has_invalid_statement {
583 return Err(Error::CqlParse(format!(
584 "Failed to parse CQL file {}: No valid CREATE TABLE or CREATE TYPE statements found",
585 path.display()
586 )));
587 }
588 }
589
590 let final_keyspace = keyspace.unwrap_or_else(|| "default".to_string());
592
593 Ok(Some(ParsedSchema {
594 keyspace: final_keyspace,
595 tables,
596 udts,
597 }))
598 }
599
600 async fn parse_json_file(&self, path: &Path) -> Result<Option<ParsedSchema>> {
602 let content = std::fs::read_to_string(path)?;
603
604 let json_schema: JsonSchemaFormat = serde_json::from_str(&content)
605 .map_err(|e| Error::schema(format!("Invalid JSON in {}: {}", path.display(), e)))?;
606
607 match json_schema {
608 JsonSchemaFormat::Minimal(minimal) => self.parse_minimal_format(path, minimal).await,
609 JsonSchemaFormat::Full(full) => self.parse_full_format(path, full).await,
610 }
611 }
612
613 async fn parse_minimal_format(
615 &self,
616 _path: &Path,
617 minimal: MinimalTableSchema,
618 ) -> Result<Option<ParsedSchema>> {
619 let table_schema = self.convert_minimal_to_table_schema(minimal)?;
620 let keyspace = table_schema.keyspace.clone();
621
622 let mut tables = HashMap::new();
623 let qualified_name = format!("{}.{}", table_schema.keyspace, table_schema.table);
625 tables.insert(qualified_name, table_schema);
626
627 Ok(Some(ParsedSchema {
628 keyspace,
629 tables,
630 udts: HashMap::new(),
631 }))
632 }
633
634 async fn parse_full_format(
636 &self,
637 _path: &Path,
638 full: FullSchema,
639 ) -> Result<Option<ParsedSchema>> {
640 let keyspace = full.keyspace.clone();
641 let mut tables = HashMap::new();
642 let mut udts = HashMap::new();
643
644 for udt_json in full.udts {
646 let udt_def = self.convert_json_udt_to_typedef(&keyspace, udt_json)?;
647 let qualified_name = format!("{}.{}", udt_def.keyspace, udt_def.name);
649 udts.insert(qualified_name, udt_def);
650 }
651
652 for table_json in full.tables {
654 let table_schema = self.convert_json_table_to_table_schema(&keyspace, table_json)?;
655 let qualified_name = format!("{}.{}", table_schema.keyspace, table_schema.table);
657 tables.insert(qualified_name, table_schema);
658 }
659
660 Ok(Some(ParsedSchema {
661 keyspace,
662 tables,
663 udts,
664 }))
665 }
666
667 fn convert_minimal_to_table_schema(&self, minimal: MinimalTableSchema) -> Result<TableSchema> {
669 let partition_key_names = if !minimal.partition_keys.is_empty() {
671 minimal.partition_keys
672 } else if !minimal.primary_key.is_empty() {
673 minimal.primary_key
674 } else {
675 return Err(Error::schema(
676 "Table must have partition_keys or primary_key".to_string(),
677 ));
678 };
679
680 let columns: Vec<Column> = minimal
682 .columns
683 .iter()
684 .map(|col| Column {
685 name: col.name.clone(),
686 data_type: col.r#type.clone(),
687 nullable: col.nullable,
688 default: None,
689 is_static: false, })
691 .collect();
692
693 let partition_keys: Vec<KeyColumn> = partition_key_names
695 .iter()
696 .enumerate()
697 .map(|(pos, name)| {
698 let col = minimal
699 .columns
700 .iter()
701 .find(|c| &c.name == name)
702 .ok_or_else(|| {
703 Error::schema(format!("Partition key '{}' not found in columns", name))
704 })?;
705
706 Ok(KeyColumn {
707 name: col.name.clone(),
708 data_type: col.r#type.clone(),
709 position: pos,
710 })
711 })
712 .collect::<Result<Vec<_>>>()?;
713
714 let clustering_keys: Vec<ClusteringColumn> = minimal
716 .clustering_keys
717 .iter()
718 .enumerate()
719 .map(|(pos, ck)| ClusteringColumn {
720 name: ck.name.clone(),
721 data_type: ck.r#type.clone(),
722 position: pos,
723 order: ck.order.as_deref().map(|s| s.into()).unwrap_or_default(),
724 })
725 .collect();
726
727 let schema = TableSchema {
728 keyspace: minimal.keyspace,
729 table: minimal.table,
730 partition_keys,
731 clustering_keys,
732 columns,
733 comments: HashMap::new(),
734 };
735
736 schema.validate()?;
737 Ok(schema)
738 }
739
740 fn convert_json_table_to_table_schema(
742 &self,
743 keyspace: &str,
744 table_json: JsonTable,
745 ) -> Result<TableSchema> {
746 let partition_key_names = if !table_json.partition_keys.is_empty() {
747 table_json.partition_keys
748 } else if !table_json.primary_key.is_empty() {
749 table_json.primary_key
750 } else {
751 return Err(Error::schema(format!(
752 "Table '{}' must have partition_keys or primary_key",
753 table_json.name
754 )));
755 };
756
757 let columns: Vec<Column> = table_json
758 .columns
759 .iter()
760 .map(|col| Column {
761 name: col.name.clone(),
762 data_type: col.r#type.clone(),
763 nullable: col.nullable,
764 default: None,
765 is_static: false, })
767 .collect();
768
769 let partition_keys: Vec<KeyColumn> = partition_key_names
770 .iter()
771 .enumerate()
772 .map(|(pos, name)| {
773 let col = table_json
774 .columns
775 .iter()
776 .find(|c| &c.name == name)
777 .ok_or_else(|| {
778 Error::schema(format!(
779 "Partition key '{}' not found in columns of table '{}'",
780 name, table_json.name
781 ))
782 })?;
783
784 Ok(KeyColumn {
785 name: col.name.clone(),
786 data_type: col.r#type.clone(),
787 position: pos,
788 })
789 })
790 .collect::<Result<Vec<_>>>()?;
791
792 let clustering_keys: Vec<ClusteringColumn> = table_json
793 .clustering_keys
794 .iter()
795 .enumerate()
796 .map(|(pos, ck)| ClusteringColumn {
797 name: ck.name.clone(),
798 data_type: ck.r#type.clone(),
799 position: pos,
800 order: ck.order.as_deref().map(|s| s.into()).unwrap_or_default(),
801 })
802 .collect();
803
804 let schema = TableSchema {
805 keyspace: keyspace.to_string(),
806 table: table_json.name,
807 partition_keys,
808 clustering_keys,
809 columns,
810 comments: HashMap::new(),
811 };
812
813 schema.validate()?;
814 Ok(schema)
815 }
816
817 fn convert_json_udt_to_typedef(&self, keyspace: &str, udt_json: JsonUdt) -> Result<UdtTypeDef> {
819 let mut udt_def = UdtTypeDef::new(keyspace.to_string(), udt_json.name);
820
821 for field in udt_json.fields {
822 let field_type = crate::schema::CqlType::parse(&field.r#type)?;
823 udt_def = udt_def.with_field(field.name, field_type, field.nullable);
824 }
825
826 Ok(udt_def)
827 }
828
829 async fn apply_schemas(&mut self, parsed_schemas: Vec<ParsedSchema>) -> (usize, usize) {
831 let mut udt_map: HashMap<String, (String, UdtTypeDef)> = HashMap::new(); for parsed in &parsed_schemas {
835 for (qualified_name, udt_def) in &parsed.udts {
836 udt_map.insert(
838 qualified_name.clone(),
839 (udt_def.keyspace.clone(), udt_def.clone()),
840 );
841 }
842 }
843
844 let mut udts_loaded = 0;
846 {
847 let mut udt_registry = self.udt_registry.write().await;
848 for (_key, (_keyspace, udt_def)) in udt_map {
849 if self.config.validate_udt_dependencies {
850 if let Err(e) = udt_registry.register_udt_with_validation(udt_def.clone()) {
852 self.errors.push(SchemaLoadError {
853 file_path: None,
854 error_type: LoadErrorType::CircularUdtDependency,
855 message: format!("UDT validation failed: {}", e),
856 });
857 if !self.config.graceful_degradation {
859 return (udts_loaded, 0);
861 }
862 continue;
863 }
864 } else {
865 udt_registry.register_udt(udt_def);
866 }
867 udts_loaded += 1;
868 }
869 }
870
871 if !self.config.graceful_degradation && !self.errors.is_empty() {
873 return (udts_loaded, 0);
874 }
875
876 let mut table_map: HashMap<String, TableSchema> = HashMap::new();
878
879 for parsed in &parsed_schemas {
880 for (qualified_name, table_schema) in &parsed.tables {
881 table_map.insert(qualified_name.clone(), table_schema.clone());
883 }
884 }
885
886 let mut tables_loaded = 0;
888 {
889 let registry = self.registry.write().await;
890 for (_key, table_schema) in table_map {
891 match registry
892 .register_schema(
893 table_schema.clone(),
894 crate::schema::registry::SchemaSource::Manual,
895 )
896 .await
897 {
898 Ok(_) => tables_loaded += 1,
899 Err(e) => {
900 self.errors.push(SchemaLoadError {
901 file_path: None,
902 error_type: LoadErrorType::ValidationFailed,
903 message: format!(
904 "Failed to register table '{}.{}': {}",
905 table_schema.keyspace, table_schema.table, e
906 ),
907 });
908 if !self.config.graceful_degradation {
910 return (udts_loaded, tables_loaded);
912 }
913 }
914 }
915 }
916 }
917
918 (udts_loaded, tables_loaded)
919 }
920
921 fn build_result(&self, schemas_loaded: usize, udts_loaded: usize) -> LoadResult {
923 LoadResult {
924 schemas_loaded,
925 udts_loaded,
926 errors: self.errors.clone(),
927 warnings: self.warnings.clone(),
928 }
929 }
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935 use crate::platform::Platform;
936 use crate::schema::registry::{SchemaRegistry, SchemaRegistryConfig};
937 use crate::Config;
938 use std::io::Write;
939 use tempfile::TempDir;
940
941 async fn setup_test_aggregator() -> (SchemaAggregator, TempDir) {
942 let temp_dir = TempDir::new().unwrap();
943 let config = Config::default();
944 let platform = Arc::new(Platform::new(&config).await.unwrap());
945
946 let registry_config = SchemaRegistryConfig::default();
947 let registry = Arc::new(RwLock::new(
948 SchemaRegistry::new(registry_config, platform, config)
949 .await
950 .unwrap(),
951 ));
952 let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
953
954 let aggregator = SchemaAggregator::new(registry, udt_registry, AggregatorConfig::default());
955
956 (aggregator, temp_dir)
957 }
958
959 fn write_file(dir: &Path, name: &str, content: &str) -> PathBuf {
960 let path = dir.join(name);
961 let mut file = std::fs::File::create(&path).unwrap();
962 file.write_all(content.as_bytes()).unwrap();
963 path
964 }
965
966 #[tokio::test]
967 async fn test_load_single_json_file() {
968 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
969
970 let json_content = r#"
971 {
972 "keyspace": "test_ks",
973 "table": "users",
974 "columns": [
975 {"name": "id", "type": "uuid"},
976 {"name": "name", "type": "text"}
977 ],
978 "partition_keys": ["id"],
979 "clustering_keys": []
980 }
981 "#;
982
983 let json_path = write_file(temp_dir.path(), "users.json", json_content);
984 let result = aggregator.load_from_paths(&[json_path]).await.unwrap();
985
986 assert_eq!(result.schemas_loaded, 1);
987 assert_eq!(result.udts_loaded, 0);
988 assert!(result.errors.is_empty());
989 }
990
991 #[tokio::test]
992 async fn test_load_single_cql_file() {
993 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
994
995 let cql_content = r#"
996 CREATE TABLE test_ks.products (
997 id uuid PRIMARY KEY,
998 name text,
999 price decimal
1000 );
1001 "#;
1002
1003 let cql_path = write_file(temp_dir.path(), "products.cql", cql_content);
1004 let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1005
1006 assert_eq!(result.schemas_loaded, 1);
1007 assert_eq!(result.udts_loaded, 0);
1008 assert!(result.errors.is_empty());
1009 }
1010
1011 #[tokio::test]
1012 async fn test_directory_scanning_lexical_order() {
1013 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1014
1015 write_file(
1017 temp_dir.path(),
1018 "c_table.json",
1019 r#"{"keyspace":"ks","table":"c","columns":[{"name":"id","type":"uuid"}],"partition_keys":["id"]}"#,
1020 );
1021 write_file(
1022 temp_dir.path(),
1023 "a_table.json",
1024 r#"{"keyspace":"ks","table":"a","columns":[{"name":"id","type":"uuid"}],"partition_keys":["id"]}"#,
1025 );
1026 write_file(
1027 temp_dir.path(),
1028 "b_table.json",
1029 r#"{"keyspace":"ks","table":"b","columns":[{"name":"id","type":"uuid"}],"partition_keys":["id"]}"#,
1030 );
1031
1032 let result = aggregator
1033 .load_from_paths(&[temp_dir.path().to_path_buf()])
1034 .await
1035 .unwrap();
1036
1037 assert_eq!(result.schemas_loaded, 3);
1038 assert!(result.errors.is_empty());
1039 }
1040
1041 #[tokio::test]
1042 async fn test_last_wins_for_duplicate_tables() {
1043 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1044
1045 let first_json = r#"
1046 {
1047 "keyspace": "ks",
1048 "table": "users",
1049 "columns": [
1050 {"name": "id", "type": "uuid"},
1051 {"name": "name", "type": "text"}
1052 ],
1053 "partition_keys": ["id"]
1054 }
1055 "#;
1056
1057 let second_json = r#"
1058 {
1059 "keyspace": "ks",
1060 "table": "users",
1061 "columns": [
1062 {"name": "id", "type": "uuid"},
1063 {"name": "name", "type": "text"},
1064 {"name": "email", "type": "text"}
1065 ],
1066 "partition_keys": ["id"]
1067 }
1068 "#;
1069
1070 let path1 = write_file(temp_dir.path(), "users_v1.json", first_json);
1071 let path2 = write_file(temp_dir.path(), "users_v2.json", second_json);
1072
1073 let result = aggregator.load_from_paths(&[path1, path2]).await.unwrap();
1074
1075 assert_eq!(result.schemas_loaded, 1);
1077
1078 let registry = aggregator.registry.read().await;
1080 let schema = registry.get_schema("ks", "users").await.unwrap();
1081 assert_eq!(schema.columns.len(), 3);
1082 }
1083
1084 #[tokio::test]
1085 async fn test_two_pass_udt_then_tables() {
1086 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1087
1088 let full_schema_json = r#"
1089 {
1090 "keyspace": "ks",
1091 "udts": [
1092 {
1093 "name": "address",
1094 "fields": [
1095 {"name": "street", "type": "text"},
1096 {"name": "city", "type": "text"}
1097 ]
1098 }
1099 ],
1100 "tables": [
1101 {
1102 "name": "users",
1103 "columns": [
1104 {"name": "id", "type": "uuid"},
1105 {"name": "addr", "type": "frozen<address>"}
1106 ],
1107 "partition_keys": ["id"],
1108 "clustering_keys": []
1109 }
1110 ]
1111 }
1112 "#;
1113
1114 let path = write_file(temp_dir.path(), "schema.json", full_schema_json);
1115 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1116
1117 assert_eq!(result.schemas_loaded, 1);
1118 assert_eq!(result.udts_loaded, 1);
1119 assert!(result.errors.is_empty());
1120
1121 let udt_registry = aggregator.udt_registry.read().await;
1123 assert!(udt_registry.contains_udt("ks", "address"));
1124 }
1125
1126 #[tokio::test]
1129 async fn test_udt_only_json_schema_issue_230() {
1130 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1131
1132 let udt_only_json = r#"
1134 {
1135 "keyspace": "test_keyspace",
1136 "udts": [
1137 {
1138 "name": "address_type",
1139 "fields": [
1140 { "name": "street", "type": "text" },
1141 { "name": "city", "type": "text" },
1142 { "name": "zip", "type": "int" }
1143 ]
1144 }
1145 ]
1146 }
1147 "#;
1148
1149 let path = write_file(temp_dir.path(), "address.json", udt_only_json);
1150 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1151
1152 assert!(
1154 result.errors.is_empty(),
1155 "Expected no errors but got: {:?}",
1156 result.errors
1157 );
1158 assert_eq!(result.udts_loaded, 1, "Expected 1 UDT to be loaded");
1159 assert_eq!(
1160 result.schemas_loaded, 0,
1161 "Expected 0 tables (UDT-only file)"
1162 );
1163
1164 let udt_registry = aggregator.udt_registry.read().await;
1166 assert!(
1167 udt_registry.contains_udt("test_keyspace", "address_type"),
1168 "UDT address_type should be registered in test_keyspace"
1169 );
1170 }
1171
1172 #[tokio::test]
1175 async fn test_table_only_json_schema_symmetry() {
1176 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1177
1178 let table_only_json = r#"
1180 {
1181 "keyspace": "test_keyspace",
1182 "tables": [
1183 {
1184 "name": "simple_table",
1185 "columns": [
1186 { "name": "id", "type": "uuid" },
1187 { "name": "data", "type": "text" }
1188 ],
1189 "partition_keys": ["id"],
1190 "clustering_keys": []
1191 }
1192 ]
1193 }
1194 "#;
1195
1196 let path = write_file(temp_dir.path(), "table_only.json", table_only_json);
1197 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1198
1199 assert!(
1201 result.errors.is_empty(),
1202 "Expected no errors but got: {:?}",
1203 result.errors
1204 );
1205 assert_eq!(result.udts_loaded, 0, "Expected 0 UDTs (table-only file)");
1206 assert_eq!(result.schemas_loaded, 1, "Expected 1 table to be loaded");
1207
1208 let registry = aggregator.registry.read().await;
1210 assert!(
1211 registry
1212 .get_schema("test_keyspace", "simple_table")
1213 .await
1214 .is_ok(),
1215 "Table simple_table should be registered in test_keyspace"
1216 );
1217 }
1218
1219 #[tokio::test]
1220 async fn test_invalid_json_error_collection() {
1221 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1222
1223 let invalid_json = r#"{"keyspace": "ks", "table": "broken""#; let path = write_file(temp_dir.path(), "broken.json", invalid_json);
1226 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1227
1228 assert_eq!(result.schemas_loaded, 0);
1229 assert!(!result.errors.is_empty());
1230 assert!(matches!(
1231 result.errors[0].error_type,
1232 LoadErrorType::InvalidJson
1233 ));
1234 }
1235
1236 #[tokio::test]
1237 async fn test_minimal_format_with_primary_key_synonym() {
1238 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1239
1240 let json_content = r#"
1241 {
1242 "keyspace": "ks",
1243 "table": "items",
1244 "columns": [
1245 {"name": "id", "type": "uuid"},
1246 {"name": "data", "type": "text"}
1247 ],
1248 "primary_key": ["id"]
1249 }
1250 "#;
1251
1252 let path = write_file(temp_dir.path(), "items.json", json_content);
1253 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1254
1255 assert_eq!(result.schemas_loaded, 1);
1256 assert!(result.errors.is_empty());
1257 }
1258
1259 #[tokio::test]
1260 async fn test_data_type_alias_support() {
1261 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1262
1263 let json_content = r#"
1264 {
1265 "keyspace": "ks",
1266 "table": "legacy",
1267 "columns": [
1268 {"name": "id", "data_type": "uuid"},
1269 {"name": "value", "data_type": "text"}
1270 ],
1271 "partition_keys": ["id"]
1272 }
1273 "#;
1274
1275 let path = write_file(temp_dir.path(), "legacy.json", json_content);
1276 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1277
1278 assert_eq!(result.schemas_loaded, 1);
1279 assert!(result.errors.is_empty());
1280 }
1281
1282 #[tokio::test]
1283 async fn test_error_type_mapping_io_error() {
1284 let (mut aggregator, _temp_dir) = setup_test_aggregator().await;
1285
1286 let non_existent_path = PathBuf::from("/nonexistent/path/schema.json");
1288 let result = aggregator
1289 .load_from_paths(std::slice::from_ref(&non_existent_path))
1290 .await
1291 .unwrap();
1292
1293 assert_eq!(result.schemas_loaded, 0);
1294 assert_eq!(result.errors.len(), 1);
1295 assert!(matches!(
1296 result.errors[0].error_type,
1297 LoadErrorType::FileRead
1298 ));
1299 assert!(result.errors[0]
1300 .message
1301 .contains("Failed to discover files"));
1302 }
1303
1304 #[tokio::test]
1305 async fn test_error_type_mapping_invalid_json() {
1306 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1307
1308 let invalid_json = r#"{"keyspace": "ks", "table": "broken", invalid}"#;
1310 let path = write_file(temp_dir.path(), "invalid.json", invalid_json);
1311 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1312
1313 assert_eq!(result.schemas_loaded, 0);
1314 assert_eq!(result.errors.len(), 1);
1315 assert!(matches!(
1316 result.errors[0].error_type,
1317 LoadErrorType::InvalidJson
1318 ));
1319 assert!(result.errors[0].message.contains("Failed to parse file"));
1320 assert!(result.errors[0].message.contains("Invalid JSON"));
1321 }
1322
1323 #[tokio::test]
1324 async fn test_error_type_mapping_invalid_cql() {
1325 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1326
1327 let invalid_cql = r#"
1329 CREATE INVALID SYNTAX HERE
1330 id uuid PRIMARY KEY
1331 "#;
1332 let path = write_file(temp_dir.path(), "invalid.cql", invalid_cql);
1333 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1334
1335 assert_eq!(result.schemas_loaded, 0);
1336 assert_eq!(result.errors.len(), 1);
1337 assert!(matches!(
1338 result.errors[0].error_type,
1339 LoadErrorType::InvalidCql
1340 ));
1341 assert!(result.errors[0].message.contains("Failed to parse file"));
1342 }
1343
1344 #[tokio::test]
1345 async fn test_error_message_preservation() {
1346 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1347
1348 let invalid_json = r#"{"keyspace": "ks""#; let path = write_file(temp_dir.path(), "broken.json", invalid_json);
1351 let result = aggregator
1352 .load_from_paths(std::slice::from_ref(&path))
1353 .await
1354 .unwrap();
1355
1356 assert_eq!(result.errors.len(), 1);
1357 assert!(result.errors[0].message.contains("Failed to parse file"));
1359 assert!(result.errors[0].message.contains("Invalid JSON"));
1360 assert_eq!(result.errors[0].file_path, Some(path));
1362 }
1363
1364 #[tokio::test]
1365 async fn test_multiple_error_types_in_batch() {
1366 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1367
1368 let invalid_json = r#"{"invalid json"#;
1370 let invalid_cql = r#"INVALID CQL SYNTAX"#;
1371
1372 let json_path = write_file(temp_dir.path(), "bad.json", invalid_json);
1373 let cql_path = write_file(temp_dir.path(), "bad.cql", invalid_cql);
1374
1375 let result = aggregator
1376 .load_from_paths(&[json_path, cql_path])
1377 .await
1378 .unwrap();
1379
1380 assert_eq!(result.schemas_loaded, 0);
1381 assert_eq!(result.errors.len(), 2);
1382
1383 let json_error = result
1385 .errors
1386 .iter()
1387 .find(|e| {
1388 e.file_path
1389 .as_ref()
1390 .unwrap()
1391 .to_str()
1392 .unwrap()
1393 .ends_with(".json")
1394 })
1395 .unwrap();
1396 let cql_error = result
1397 .errors
1398 .iter()
1399 .find(|e| {
1400 e.file_path
1401 .as_ref()
1402 .unwrap()
1403 .to_str()
1404 .unwrap()
1405 .ends_with(".cql")
1406 })
1407 .unwrap();
1408
1409 assert!(matches!(json_error.error_type, LoadErrorType::InvalidJson));
1411 assert!(matches!(cql_error.error_type, LoadErrorType::InvalidCql));
1412 }
1413
1414 #[tokio::test]
1415 #[cfg(unix)]
1416 async fn test_file_read_error_from_parse_file() {
1417 use std::fs;
1418 use std::os::unix::fs::PermissionsExt;
1419
1420 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1421
1422 let json_content =
1424 r#"{"keyspace": "ks", "table": "test", "columns": [], "partition_keys": ["id"]}"#;
1425 let path = write_file(temp_dir.path(), "unreadable.json", json_content);
1426
1427 let mut perms = fs::metadata(&path).unwrap().permissions();
1429 perms.set_mode(0o000);
1430 fs::set_permissions(&path, perms).unwrap();
1431
1432 if fs::File::open(&path).is_ok() {
1435 return;
1436 }
1437
1438 let result = aggregator
1439 .load_from_paths(std::slice::from_ref(&path))
1440 .await
1441 .unwrap();
1442
1443 let mut perms = fs::metadata(&path).unwrap().permissions();
1445 perms.set_mode(0o644);
1446 let _ = fs::set_permissions(&path, perms);
1447
1448 assert_eq!(result.schemas_loaded, 0);
1450 assert_eq!(result.errors.len(), 1);
1451 assert!(matches!(
1452 result.errors[0].error_type,
1453 LoadErrorType::FileRead
1454 ));
1455 }
1456
1457 #[tokio::test]
1458 async fn test_multi_statement_cql_file_with_create_type_and_create_table() {
1459 let temp_dir = TempDir::new().unwrap();
1461 let config = Config::default();
1462 let platform = Arc::new(Platform::new(&config).await.unwrap());
1463
1464 let registry_config = SchemaRegistryConfig::default();
1465 let registry = Arc::new(RwLock::new(
1466 SchemaRegistry::new(registry_config, platform, config)
1467 .await
1468 .unwrap(),
1469 ));
1470 let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1471
1472 let mut aggregator = SchemaAggregator::new(
1473 registry,
1474 udt_registry,
1475 AggregatorConfig {
1476 graceful_degradation: true,
1477 validate_udt_dependencies: false, },
1479 );
1480
1481 let cql_content = r#"
1483 -- Test schema with UDTs
1484 CREATE TYPE test_ks.address (
1485 street text,
1486 city text,
1487 zip_code int
1488 );
1489
1490 CREATE TYPE test_ks.contact_info (
1491 email text,
1492 phone text,
1493 address address
1494 );
1495
1496 CREATE TABLE test_ks.users (
1497 id uuid PRIMARY KEY,
1498 name text,
1499 contact contact_info
1500 );
1501 "#;
1502
1503 let cql_path = write_file(temp_dir.path(), "schema.cql", cql_content);
1504 let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1505
1506 assert_eq!(result.udts_loaded, 2, "Expected 2 UDTs to be loaded");
1508 assert_eq!(result.schemas_loaded, 1, "Expected 1 table to be loaded");
1509 assert!(
1510 result.errors.is_empty(),
1511 "Expected no errors, got: {:?}",
1512 result.errors
1513 );
1514
1515 let udt_registry = aggregator.udt_registry.read().await;
1517 assert!(
1518 udt_registry.contains_udt("test_ks", "address"),
1519 "address UDT should be registered"
1520 );
1521 assert!(
1522 udt_registry.contains_udt("test_ks", "contact_info"),
1523 "contact_info UDT should be registered"
1524 );
1525
1526 let registry = aggregator.registry.read().await;
1528 let schema = registry.get_schema("test_ks", "users").await.unwrap();
1529 assert_eq!(schema.table, "users");
1530 assert_eq!(schema.columns.len(), 3);
1531 }
1532
1533 #[tokio::test]
1534 #[ignore = "Test fails due to UDT dependency validation not implemented - see Issue #117 review"]
1535 async fn test_cql_file_with_comments_and_semicolons() {
1536 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1537
1538 let cql_content = r#"
1540 -- This is a comment with ; semicolon
1541 CREATE TYPE test_ks.metadata (
1542 key text,
1543 value text
1544 );
1545
1546 /* Multi-line comment
1547 with ; semicolon */
1548 CREATE TABLE test_ks.data (
1549 id uuid PRIMARY KEY,
1550 info metadata
1551 );
1552 "#;
1553
1554 let cql_path = write_file(temp_dir.path(), "edge_cases.cql", cql_content);
1555 let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1556
1557 assert_eq!(result.udts_loaded, 1);
1558 assert_eq!(result.schemas_loaded, 1);
1559 assert!(result.errors.is_empty());
1560 }
1561
1562 #[tokio::test]
1563 async fn test_backward_compat_single_create_table() {
1564 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1565
1566 let cql_content = r#"
1568 CREATE TABLE test_ks.simple (
1569 id uuid PRIMARY KEY,
1570 data text
1571 );
1572 "#;
1573
1574 let cql_path = write_file(temp_dir.path(), "simple.cql", cql_content);
1575 let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1576
1577 assert_eq!(result.schemas_loaded, 1);
1578 assert_eq!(result.udts_loaded, 0);
1579 assert!(result.errors.is_empty());
1580 }
1581
1582 #[tokio::test]
1583 async fn test_graceful_degradation_false_fails_on_invalid_json() {
1584 let temp_dir = TempDir::new().unwrap();
1585 let config = Config::default();
1586 let platform = Arc::new(Platform::new(&config).await.unwrap());
1587
1588 let registry_config = SchemaRegistryConfig::default();
1589 let registry = Arc::new(RwLock::new(
1590 SchemaRegistry::new(registry_config, platform, config)
1591 .await
1592 .unwrap(),
1593 ));
1594 let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1595
1596 let mut aggregator = SchemaAggregator::new(
1598 registry,
1599 udt_registry,
1600 AggregatorConfig {
1601 graceful_degradation: false,
1602 validate_udt_dependencies: true,
1603 },
1604 );
1605
1606 let invalid_json = r#"{"keyspace": "ks", "table": "broken""#; let valid_json = r#"
1609 {
1610 "keyspace": "ks",
1611 "table": "valid_table",
1612 "columns": [
1613 {"name": "id", "type": "uuid"}
1614 ],
1615 "partition_keys": ["id"]
1616 }
1617 "#;
1618
1619 let invalid_path = write_file(temp_dir.path(), "01_invalid.json", invalid_json);
1620 let valid_path = write_file(temp_dir.path(), "02_valid.json", valid_json);
1621
1622 let result = aggregator
1623 .load_from_paths(&[invalid_path, valid_path])
1624 .await
1625 .unwrap();
1626
1627 assert_eq!(result.schemas_loaded, 0);
1629 assert_eq!(result.udts_loaded, 0);
1630 assert!(!result.errors.is_empty());
1631 assert!(matches!(
1632 result.errors[0].error_type,
1633 LoadErrorType::InvalidJson
1634 ));
1635 }
1636
1637 #[tokio::test]
1638 async fn test_graceful_degradation_true_continues_after_error() {
1639 let temp_dir = TempDir::new().unwrap();
1640 let config = Config::default();
1641 let platform = Arc::new(Platform::new(&config).await.unwrap());
1642
1643 let registry_config = SchemaRegistryConfig::default();
1644 let registry = Arc::new(RwLock::new(
1645 SchemaRegistry::new(registry_config, platform, config)
1646 .await
1647 .unwrap(),
1648 ));
1649 let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1650
1651 let mut aggregator = SchemaAggregator::new(
1653 registry,
1654 udt_registry,
1655 AggregatorConfig {
1656 graceful_degradation: true,
1657 validate_udt_dependencies: true,
1658 },
1659 );
1660
1661 let invalid_json = r#"{"keyspace": "ks", "table": "broken""#; let valid_json = r#"
1664 {
1665 "keyspace": "ks",
1666 "table": "valid_table",
1667 "columns": [
1668 {"name": "id", "type": "uuid"}
1669 ],
1670 "partition_keys": ["id"]
1671 }
1672 "#;
1673
1674 let invalid_path = write_file(temp_dir.path(), "01_invalid.json", invalid_json);
1675 let valid_path = write_file(temp_dir.path(), "02_valid.json", valid_json);
1676
1677 let result = aggregator
1678 .load_from_paths(&[invalid_path, valid_path])
1679 .await
1680 .unwrap();
1681
1682 assert_eq!(result.schemas_loaded, 1);
1684 assert_eq!(result.udts_loaded, 0);
1685 assert_eq!(result.errors.len(), 1); assert!(matches!(
1687 result.errors[0].error_type,
1688 LoadErrorType::InvalidJson
1689 ));
1690 }
1691
1692 #[tokio::test]
1693 #[ignore = "Test fails because register_udt_with_validation does not catch invalid UDT references - pre-existing limitation"]
1694 async fn test_graceful_degradation_false_fails_on_invalid_udt() {
1695 let temp_dir = TempDir::new().unwrap();
1696 let config = Config::default();
1697 let platform = Arc::new(Platform::new(&config).await.unwrap());
1698
1699 let registry_config = SchemaRegistryConfig::default();
1700 let registry = Arc::new(RwLock::new(
1701 SchemaRegistry::new(registry_config, platform, config)
1702 .await
1703 .unwrap(),
1704 ));
1705 let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1706
1707 let mut aggregator = SchemaAggregator::new(
1709 registry,
1710 udt_registry,
1711 AggregatorConfig {
1712 graceful_degradation: false,
1713 validate_udt_dependencies: true,
1714 },
1715 );
1716
1717 let schema_with_invalid_udt = r#"
1719 {
1720 "keyspace": "ks",
1721 "udts": [
1722 {
1723 "name": "user_type",
1724 "fields": [
1725 {"name": "addr", "type": "frozen<nonexistent_udt>"}
1726 ]
1727 }
1728 ],
1729 "tables": [
1730 {
1731 "name": "users",
1732 "columns": [
1733 {"name": "id", "type": "uuid"},
1734 {"name": "data", "type": "text"}
1735 ],
1736 "partition_keys": ["id"]
1737 }
1738 ]
1739 }
1740 "#;
1741
1742 let path = write_file(temp_dir.path(), "schema.json", schema_with_invalid_udt);
1743 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1744
1745 assert_eq!(result.schemas_loaded, 0); assert_eq!(result.udts_loaded, 0); assert!(!result.errors.is_empty());
1752 assert!(matches!(
1754 result.errors[0].error_type,
1755 LoadErrorType::CircularUdtDependency
1756 ));
1757 }
1758
1759 #[tokio::test]
1760 #[ignore = "Test fails because register_udt_with_validation does not catch invalid UDT references - pre-existing limitation"]
1761 async fn test_graceful_degradation_true_loads_tables_despite_invalid_udt() {
1762 let temp_dir = TempDir::new().unwrap();
1763 let config = Config::default();
1764 let platform = Arc::new(Platform::new(&config).await.unwrap());
1765
1766 let registry_config = SchemaRegistryConfig::default();
1767 let registry = Arc::new(RwLock::new(
1768 SchemaRegistry::new(registry_config, platform, config)
1769 .await
1770 .unwrap(),
1771 ));
1772 let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
1773
1774 let mut aggregator = SchemaAggregator::new(
1776 registry,
1777 udt_registry,
1778 AggregatorConfig {
1779 graceful_degradation: true,
1780 validate_udt_dependencies: true,
1781 },
1782 );
1783
1784 let schema_with_invalid_udt = r#"
1786 {
1787 "keyspace": "ks",
1788 "udts": [
1789 {
1790 "name": "user_type",
1791 "fields": [
1792 {"name": "addr", "type": "frozen<nonexistent_udt>"}
1793 ]
1794 }
1795 ],
1796 "tables": [
1797 {
1798 "name": "users",
1799 "columns": [
1800 {"name": "id", "type": "uuid"},
1801 {"name": "data", "type": "text"}
1802 ],
1803 "partition_keys": ["id"]
1804 }
1805 ]
1806 }
1807 "#;
1808
1809 let path = write_file(temp_dir.path(), "schema.json", schema_with_invalid_udt);
1810 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1811
1812 assert_eq!(result.schemas_loaded, 1); assert_eq!(result.udts_loaded, 0); assert_eq!(result.errors.len(), 1); assert!(matches!(
1817 result.errors[0].error_type,
1818 LoadErrorType::CircularUdtDependency
1819 ));
1820 }
1821
1822 #[tokio::test]
1823 async fn test_multi_keyspace_cql_file_no_collision() {
1824 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1825
1826 let cql_content = r#"
1828 CREATE TYPE ks_a.address (
1829 street text,
1830 city text
1831 );
1832
1833 CREATE TYPE ks_b.address (
1834 country text,
1835 postal_code text
1836 );
1837
1838 CREATE TABLE ks_a.users (
1839 id uuid PRIMARY KEY,
1840 addr frozen<address>
1841 );
1842
1843 CREATE TABLE ks_b.customers (
1844 id uuid PRIMARY KEY,
1845 location frozen<address>
1846 );
1847 "#;
1848
1849 let cql_path = write_file(temp_dir.path(), "multi_ks.cql", cql_content);
1850 let result = aggregator.load_from_paths(&[cql_path]).await.unwrap();
1851
1852 assert_eq!(
1854 result.udts_loaded, 2,
1855 "Expected 2 UDTs from different keyspaces"
1856 );
1857 assert_eq!(
1858 result.schemas_loaded, 2,
1859 "Expected 2 tables from different keyspaces"
1860 );
1861 assert!(
1862 result.errors.is_empty(),
1863 "Expected no errors, got: {:?}",
1864 result.errors
1865 );
1866
1867 let udt_registry = aggregator.udt_registry.read().await;
1869 assert!(
1870 udt_registry.contains_udt("ks_a", "address"),
1871 "ks_a.address should be registered"
1872 );
1873 assert!(
1874 udt_registry.contains_udt("ks_b", "address"),
1875 "ks_b.address should be registered"
1876 );
1877
1878 let registry = aggregator.registry.read().await;
1880 let schema_a = registry.get_schema("ks_a", "users").await.unwrap();
1881 assert_eq!(schema_a.keyspace, "ks_a");
1882 assert_eq!(schema_a.table, "users");
1883
1884 let schema_b = registry.get_schema("ks_b", "customers").await.unwrap();
1885 assert_eq!(schema_b.keyspace, "ks_b");
1886 assert_eq!(schema_b.table, "customers");
1887 }
1888
1889 #[tokio::test]
1890 async fn test_error_schema_validation_not_mislabeled_as_file_read() {
1891 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1892
1893 let invalid_schema = r#"
1895 {
1896 "keyspace": "ks",
1897 "table": "broken_table",
1898 "columns": [
1899 {"name": "id", "type": "uuid"},
1900 {"name": "data", "type": "text"}
1901 ]
1902 }
1903 "#;
1904
1905 let path = write_file(temp_dir.path(), "invalid_schema.json", invalid_schema);
1906 let result = aggregator.load_from_paths(&[path]).await.unwrap();
1907
1908 assert_eq!(result.schemas_loaded, 0);
1910 assert!(!result.errors.is_empty());
1911 assert!(
1912 matches!(result.errors[0].error_type, LoadErrorType::ValidationFailed),
1913 "Expected ValidationFailed for missing partition_keys, got: {:?}",
1914 result.errors[0].error_type
1915 );
1916 assert!(
1917 result.errors[0].message.contains("partition_keys")
1918 || result.errors[0].message.contains("primary_key"),
1919 "Error message should mention missing keys: {}",
1920 result.errors[0].message
1921 );
1922 }
1923
1924 #[tokio::test]
1925 async fn test_multi_keyspace_json_files_no_collision() {
1926 let (mut aggregator, temp_dir) = setup_test_aggregator().await;
1927
1928 let json_ks_a = r#"
1930 {
1931 "keyspace": "ks_a",
1932 "udts": [
1933 {
1934 "name": "address",
1935 "fields": [
1936 {"name": "street", "type": "text"},
1937 {"name": "city", "type": "text"}
1938 ]
1939 }
1940 ],
1941 "tables": [
1942 {
1943 "name": "users",
1944 "columns": [
1945 {"name": "id", "type": "uuid"},
1946 {"name": "name", "type": "text"}
1947 ],
1948 "partition_keys": ["id"]
1949 }
1950 ]
1951 }
1952 "#;
1953
1954 let json_ks_b = r#"
1956 {
1957 "keyspace": "ks_b",
1958 "udts": [
1959 {
1960 "name": "address",
1961 "fields": [
1962 {"name": "country", "type": "text"},
1963 {"name": "postal_code", "type": "text"}
1964 ]
1965 }
1966 ],
1967 "tables": [
1968 {
1969 "name": "users",
1970 "columns": [
1971 {"name": "id", "type": "uuid"},
1972 {"name": "email", "type": "text"}
1973 ],
1974 "partition_keys": ["id"]
1975 }
1976 ]
1977 }
1978 "#;
1979
1980 let path_a = write_file(temp_dir.path(), "ks_a.json", json_ks_a);
1981 let path_b = write_file(temp_dir.path(), "ks_b.json", json_ks_b);
1982
1983 let result = aggregator.load_from_paths(&[path_a, path_b]).await.unwrap();
1984
1985 assert_eq!(
1987 result.udts_loaded, 2,
1988 "Expected 2 UDTs from different keyspaces"
1989 );
1990 assert_eq!(
1991 result.schemas_loaded, 2,
1992 "Expected 2 tables from different keyspaces"
1993 );
1994 assert!(
1995 result.errors.is_empty(),
1996 "Expected no errors, got: {:?}",
1997 result.errors
1998 );
1999
2000 let udt_registry = aggregator.udt_registry.read().await;
2002 assert!(
2003 udt_registry.contains_udt("ks_a", "address"),
2004 "ks_a.address should be registered"
2005 );
2006 assert!(
2007 udt_registry.contains_udt("ks_b", "address"),
2008 "ks_b.address should be registered"
2009 );
2010
2011 let registry = aggregator.registry.read().await;
2013 let schema_a = registry.get_schema("ks_a", "users").await.unwrap();
2014 assert_eq!(schema_a.keyspace, "ks_a");
2015 assert_eq!(schema_a.table, "users");
2016 assert!(
2017 schema_a.columns.iter().any(|c| c.name == "name"),
2018 "ks_a.users should have 'name' column"
2019 );
2020
2021 let schema_b = registry.get_schema("ks_b", "users").await.unwrap();
2022 assert_eq!(schema_b.keyspace, "ks_b");
2023 assert_eq!(schema_b.table, "users");
2024 assert!(
2025 schema_b.columns.iter().any(|c| c.name == "email"),
2026 "ks_b.users should have 'email' column"
2027 );
2028 }
2029}