1use std::collections::HashSet;
6use std::sync::LazyLock;
7
8use anyhow::Result;
9use regex::Regex;
10use tracing::{debug, info};
11
12use super::intermediate::IntermediateSchema;
13
14static SAFE_IDENTIFIER: LazyLock<Regex> = LazyLock::new(|| {
17 Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*(\.[A-Za-z_][A-Za-z0-9_]*)?$")
18 .expect("static regex is valid")
19});
20
21pub fn validate_sql_identifier(
37 value: &str,
38 field: &str,
39 path: &str,
40) -> std::result::Result<(), ValidationError> {
41 if value.is_empty() {
42 return Err(ValidationError {
43 message: format!(
44 "`{field}` at `{path}` must not be empty. \
45 Provide a view or function name such as \"v_user\" or \"public.v_user\"."
46 ),
47 path: path.to_string(),
48 severity: ErrorSeverity::Error,
49 suggestion: None,
50 });
51 }
52 if !SAFE_IDENTIFIER.is_match(value) {
53 return Err(ValidationError {
54 message: format!(
55 "`{field}` value {value:?} at `{path}` is not a valid SQL identifier. \
56 Only ASCII letters, digits, underscores, and an optional schema dot are \
57 allowed. Valid examples: \"v_user\", \"public.v_user\", \"fn_create_post\"."
58 ),
59 path: path.to_string(),
60 severity: ErrorSeverity::Error,
61 suggestion: Some(
62 "Remove semicolons, quotes, dashes, spaces, or any SQL syntax \
63 from the identifier value."
64 .to_string(),
65 ),
66 });
67 }
68 Ok(())
69}
70
71#[derive(Debug, Clone)]
73pub struct ValidationError {
74 pub message: String,
76 pub path: String,
78 pub severity: ErrorSeverity,
80 pub suggestion: Option<String>,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum ErrorSeverity {
87 Error,
89 Warning,
91}
92
93pub struct SchemaValidator;
95
96impl SchemaValidator {
97 pub fn validate(schema: &IntermediateSchema) -> Result<ValidationReport> {
99 info!("Validating schema structure");
100
101 let mut report = ValidationReport::default();
102
103 let mut type_names = HashSet::new();
105 for type_def in &schema.types {
106 if type_names.contains(&type_def.name) {
107 report.errors.push(ValidationError {
108 message: format!("Duplicate type name: '{}'", type_def.name),
109 path: format!("types[{}].name", type_names.len()),
110 severity: ErrorSeverity::Error,
111 suggestion: Some("Type names must be unique".to_string()),
112 });
113 }
114 type_names.insert(type_def.name.clone());
115 }
116
117 type_names.insert("Int".to_string());
119 type_names.insert("Float".to_string());
120 type_names.insert("String".to_string());
121 type_names.insert("Boolean".to_string());
122 type_names.insert("ID".to_string());
123
124 let mut query_names = HashSet::new();
126 for (idx, query) in schema.queries.iter().enumerate() {
127 debug!("Validating query: {}", query.name);
128
129 if query_names.contains(&query.name) {
131 report.errors.push(ValidationError {
132 message: format!("Duplicate query name: '{}'", query.name),
133 path: format!("queries[{idx}].name"),
134 severity: ErrorSeverity::Error,
135 suggestion: Some("Query names must be unique".to_string()),
136 });
137 }
138 query_names.insert(query.name.clone());
139
140 if !type_names.contains(&query.return_type) {
142 report.errors.push(ValidationError {
143 message: format!(
144 "Query '{}' references unknown type '{}'",
145 query.name, query.return_type
146 ),
147 path: format!("queries[{idx}].return_type"),
148 severity: ErrorSeverity::Error,
149 suggestion: Some(format!(
150 "Available types: {}",
151 Self::suggest_similar_type(&query.return_type, &type_names)
152 )),
153 });
154 }
155
156 for (arg_idx, arg) in query.arguments.iter().enumerate() {
158 if !type_names.contains(&arg.arg_type) {
159 report.errors.push(ValidationError {
160 message: format!(
161 "Query '{}' argument '{}' references unknown type '{}'",
162 query.name, arg.name, arg.arg_type
163 ),
164 path: format!("queries[{idx}].arguments[{arg_idx}].type"),
165 severity: ErrorSeverity::Error,
166 suggestion: Some(format!(
167 "Available types: {}",
168 Self::suggest_similar_type(&arg.arg_type, &type_names)
169 )),
170 });
171 }
172 }
173
174 if let Some(sql_source) = &query.sql_source {
176 if let Err(e) = validate_sql_identifier(
177 sql_source,
178 "sql_source",
179 &format!("Query.{}", query.name),
180 ) {
181 report.errors.push(e);
182 }
183 }
184
185 if query.sql_source.is_none() && query.returns_list {
187 report.errors.push(ValidationError {
188 message: format!(
189 "Query '{}' returns a list but has no sql_source",
190 query.name
191 ),
192 path: format!("queries[{idx}]"),
193 severity: ErrorSeverity::Warning,
194 suggestion: Some("Add sql_source for SQL-backed queries".to_string()),
195 });
196 }
197 }
198
199 let mut mutation_names = HashSet::new();
201 for (idx, mutation) in schema.mutations.iter().enumerate() {
202 debug!("Validating mutation: {}", mutation.name);
203
204 if mutation_names.contains(&mutation.name) {
206 report.errors.push(ValidationError {
207 message: format!("Duplicate mutation name: '{}'", mutation.name),
208 path: format!("mutations[{idx}].name"),
209 severity: ErrorSeverity::Error,
210 suggestion: Some("Mutation names must be unique".to_string()),
211 });
212 }
213 mutation_names.insert(mutation.name.clone());
214
215 if !type_names.contains(&mutation.return_type) {
217 report.errors.push(ValidationError {
218 message: format!(
219 "Mutation '{}' references unknown type '{}'",
220 mutation.name, mutation.return_type
221 ),
222 path: format!("mutations[{idx}].return_type"),
223 severity: ErrorSeverity::Error,
224 suggestion: Some(format!(
225 "Available types: {}",
226 Self::suggest_similar_type(&mutation.return_type, &type_names)
227 )),
228 });
229 }
230
231 for (arg_idx, arg) in mutation.arguments.iter().enumerate() {
233 if !type_names.contains(&arg.arg_type) {
234 report.errors.push(ValidationError {
235 message: format!(
236 "Mutation '{}' argument '{}' references unknown type '{}'",
237 mutation.name, arg.name, arg.arg_type
238 ),
239 path: format!("mutations[{idx}].arguments[{arg_idx}].type"),
240 severity: ErrorSeverity::Error,
241 suggestion: Some(format!(
242 "Available types: {}",
243 Self::suggest_similar_type(&arg.arg_type, &type_names)
244 )),
245 });
246 }
247 }
248
249 if let Some(sql_source) = &mutation.sql_source {
251 if let Err(e) = validate_sql_identifier(
252 sql_source,
253 "sql_source",
254 &format!("Mutation.{}", mutation.name),
255 ) {
256 report.errors.push(e);
257 }
258 }
259
260 if !mutation.inject.is_empty() {
262 let inject_names: Vec<&str> =
263 mutation.inject.keys().map(String::as_str).collect();
264 let fn_name = mutation
265 .sql_source
266 .as_deref()
267 .unwrap_or("<unknown>");
268 report.errors.push(ValidationError {
269 message: format!(
270 "Mutation '{}' has inject params {:?}. \
271 These are appended as the LAST positional arguments to \
272 `{fn_name}`. Your SQL function MUST declare injected \
273 parameters last, after all client-provided arguments.",
274 mutation.name, inject_names,
275 ),
276 path: format!("Mutation.{}", mutation.name),
277 severity: ErrorSeverity::Warning,
278 suggestion: None,
279 });
280 }
281 }
282
283 if let Some(observers) = &schema.observers {
285 let mut observer_names = HashSet::new();
286 for (idx, observer) in observers.iter().enumerate() {
287 debug!("Validating observer: {}", observer.name);
288
289 if observer_names.contains(&observer.name) {
291 report.errors.push(ValidationError {
292 message: format!("Duplicate observer name: '{}'", observer.name),
293 path: format!("observers[{idx}].name"),
294 severity: ErrorSeverity::Error,
295 suggestion: Some("Observer names must be unique".to_string()),
296 });
297 }
298 observer_names.insert(observer.name.clone());
299
300 if !type_names.contains(&observer.entity) {
302 report.errors.push(ValidationError {
303 message: format!(
304 "Observer '{}' references unknown entity '{}'",
305 observer.name, observer.entity
306 ),
307 path: format!("observers[{idx}].entity"),
308 severity: ErrorSeverity::Error,
309 suggestion: Some(format!(
310 "Available types: {}",
311 Self::suggest_similar_type(&observer.entity, &type_names)
312 )),
313 });
314 }
315
316 let valid_events = ["INSERT", "UPDATE", "DELETE"];
318 if !valid_events.contains(&observer.event.as_str()) {
319 report.errors.push(ValidationError {
320 message: format!(
321 "Observer '{}' has invalid event '{}'. Must be INSERT, UPDATE, or DELETE",
322 observer.name, observer.event
323 ),
324 path: format!("observers[{idx}].event"),
325 severity: ErrorSeverity::Error,
326 suggestion: Some("Valid events: INSERT, UPDATE, DELETE".to_string()),
327 });
328 }
329
330 if observer.actions.is_empty() {
332 report.errors.push(ValidationError {
333 message: format!(
334 "Observer '{}' must have at least one action",
335 observer.name
336 ),
337 path: format!("observers[{idx}].actions"),
338 severity: ErrorSeverity::Error,
339 suggestion: Some("Add a webhook, slack, or email action".to_string()),
340 });
341 }
342
343 for (action_idx, action) in observer.actions.iter().enumerate() {
345 if let Some(obj) = action.as_object() {
346 if let Some(action_type) = obj.get("type").and_then(|v| v.as_str()) {
348 let valid_action_types = ["webhook", "slack", "email"];
349 if !valid_action_types.contains(&action_type) {
350 report.errors.push(ValidationError {
351 message: format!(
352 "Observer '{}' action {} has invalid type '{}'",
353 observer.name, action_idx, action_type
354 ),
355 path: format!(
356 "observers[{idx}].actions[{action_idx}].type"
357 ),
358 severity: ErrorSeverity::Error,
359 suggestion: Some(
360 "Valid action types: webhook, slack, email".to_string(),
361 ),
362 });
363 }
364
365 match action_type {
367 "webhook" => {
368 let has_url = obj.contains_key("url");
369 let has_url_env = obj.contains_key("url_env");
370 if !has_url && !has_url_env {
371 report.errors.push(ValidationError {
372 message: format!(
373 "Observer '{}' webhook action must have 'url' or 'url_env'",
374 observer.name
375 ),
376 path: format!("observers[{idx}].actions[{action_idx}]"),
377 severity: ErrorSeverity::Error,
378 suggestion: Some("Add 'url' or 'url_env' field".to_string()),
379 });
380 }
381 },
382 "slack" => {
383 if !obj.contains_key("channel") {
384 report.errors.push(ValidationError {
385 message: format!(
386 "Observer '{}' slack action must have 'channel' field",
387 observer.name
388 ),
389 path: format!("observers[{idx}].actions[{action_idx}]"),
390 severity: ErrorSeverity::Error,
391 suggestion: Some("Add 'channel' field (e.g., '#sales')".to_string()),
392 });
393 }
394 if !obj.contains_key("message") {
395 report.errors.push(ValidationError {
396 message: format!(
397 "Observer '{}' slack action must have 'message' field",
398 observer.name
399 ),
400 path: format!("observers[{idx}].actions[{action_idx}]"),
401 severity: ErrorSeverity::Error,
402 suggestion: Some("Add 'message' field".to_string()),
403 });
404 }
405 },
406 "email" => {
407 let required_fields = ["to", "subject", "body"];
408 for field in &required_fields {
409 if !obj.contains_key(*field) {
410 report.errors.push(ValidationError {
411 message: format!(
412 "Observer '{}' email action must have '{}' field",
413 observer.name, field
414 ),
415 path: format!("observers[{idx}].actions[{action_idx}]"),
416 severity: ErrorSeverity::Error,
417 suggestion: Some(format!("Add '{field}' field")),
418 });
419 }
420 }
421 },
422 _ => {},
423 }
424 } else {
425 report.errors.push(ValidationError {
426 message: format!(
427 "Observer '{}' action {} missing 'type' field",
428 observer.name, action_idx
429 ),
430 path: format!("observers[{idx}].actions[{action_idx}]"),
431 severity: ErrorSeverity::Error,
432 suggestion: Some(
433 "Add 'type' field (webhook, slack, or email)".to_string(),
434 ),
435 });
436 }
437 } else {
438 report.errors.push(ValidationError {
439 message: format!(
440 "Observer '{}' action {} must be an object",
441 observer.name, action_idx
442 ),
443 path: format!("observers[{idx}].actions[{action_idx}]"),
444 severity: ErrorSeverity::Error,
445 suggestion: None,
446 });
447 }
448 }
449
450 let valid_backoff_strategies = ["exponential", "linear", "fixed"];
452 if !valid_backoff_strategies.contains(&observer.retry.backoff_strategy.as_str()) {
453 report.errors.push(ValidationError {
454 message: format!(
455 "Observer '{}' has invalid backoff_strategy '{}'",
456 observer.name, observer.retry.backoff_strategy
457 ),
458 path: format!("observers[{idx}].retry.backoff_strategy"),
459 severity: ErrorSeverity::Error,
460 suggestion: Some(
461 "Valid strategies: exponential, linear, fixed".to_string(),
462 ),
463 });
464 }
465
466 if observer.retry.max_attempts == 0 {
467 report.errors.push(ValidationError {
468 message: format!(
469 "Observer '{}' has max_attempts=0, actions will never execute",
470 observer.name
471 ),
472 path: format!("observers[{idx}].retry.max_attempts"),
473 severity: ErrorSeverity::Warning,
474 suggestion: Some("Set max_attempts >= 1".to_string()),
475 });
476 }
477
478 if observer.retry.initial_delay_ms == 0 {
479 report.errors.push(ValidationError {
480 message: format!(
481 "Observer '{}' has initial_delay_ms=0, retries will be immediate",
482 observer.name
483 ),
484 path: format!("observers[{idx}].retry.initial_delay_ms"),
485 severity: ErrorSeverity::Warning,
486 suggestion: Some("Consider setting initial_delay_ms > 0".to_string()),
487 });
488 }
489
490 if observer.retry.max_delay_ms < observer.retry.initial_delay_ms {
491 report.errors.push(ValidationError {
492 message: format!(
493 "Observer '{}' has max_delay_ms < initial_delay_ms",
494 observer.name
495 ),
496 path: format!("observers[{idx}].retry.max_delay_ms"),
497 severity: ErrorSeverity::Error,
498 suggestion: Some("max_delay_ms must be >= initial_delay_ms".to_string()),
499 });
500 }
501 }
502 }
503
504 info!(
505 "Validation complete: {} errors, {} warnings",
506 report.error_count(),
507 report.warning_count()
508 );
509
510 Ok(report)
511 }
512
513 fn suggest_similar_type(typo: &str, available: &HashSet<String>) -> String {
515 let similar: Vec<&String> = available
517 .iter()
518 .filter(|name| {
519 name.to_lowercase().starts_with(&typo[0..1].to_lowercase())
520 || typo.to_lowercase().starts_with(&name[0..1].to_lowercase())
521 })
522 .take(3)
523 .collect();
524
525 if similar.is_empty() {
526 available.iter().take(5).cloned().collect::<Vec<_>>().join(", ")
527 } else {
528 similar.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(", ")
529 }
530 }
531}
532
533#[derive(Debug, Default)]
535pub struct ValidationReport {
536 pub errors: Vec<ValidationError>,
538}
539
540impl ValidationReport {
541 pub fn is_valid(&self) -> bool {
543 !self.has_errors()
544 }
545
546 pub fn has_errors(&self) -> bool {
548 self.errors.iter().any(|e| e.severity == ErrorSeverity::Error)
549 }
550
551 pub fn error_count(&self) -> usize {
553 self.errors.iter().filter(|e| e.severity == ErrorSeverity::Error).count()
554 }
555
556 pub fn warning_count(&self) -> usize {
558 self.errors.iter().filter(|e| e.severity == ErrorSeverity::Warning).count()
559 }
560
561 pub fn print(&self) {
563 if self.errors.is_empty() {
564 return;
565 }
566
567 println!("\n📋 Validation Report:");
568
569 let errors: Vec<_> =
570 self.errors.iter().filter(|e| e.severity == ErrorSeverity::Error).collect();
571
572 let warnings: Vec<_> =
573 self.errors.iter().filter(|e| e.severity == ErrorSeverity::Warning).collect();
574
575 if !errors.is_empty() {
576 println!("\n ❌ Errors ({}):", errors.len());
577 for error in errors {
578 println!(" {}", error.message);
579 println!(" at: {}", error.path);
580 if let Some(suggestion) = &error.suggestion {
581 println!(" 💡 {suggestion}");
582 }
583 println!();
584 }
585 }
586
587 if !warnings.is_empty() {
588 println!("\n ⚠️ Warnings ({}):", warnings.len());
589 for warning in warnings {
590 println!(" {}", warning.message);
591 println!(" at: {}", warning.path);
592 if let Some(suggestion) = &warning.suggestion {
593 println!(" 💡 {suggestion}");
594 }
595 println!();
596 }
597 }
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use indexmap::IndexMap;
604
605 use super::*;
606 use crate::schema::intermediate::{IntermediateQuery, IntermediateType};
607
608 #[test]
609 fn test_validate_empty_schema() {
610 let schema = IntermediateSchema {
611 security: None,
612 version: "2.0.0".to_string(),
613 types: vec![],
614 enums: vec![],
615 input_types: vec![],
616 interfaces: vec![],
617 unions: vec![],
618 queries: vec![],
619 mutations: vec![],
620 subscriptions: vec![],
621 fragments: None,
622 directives: None,
623 fact_tables: None,
624 aggregate_queries: None,
625 observers: None,
626 custom_scalars: None,
627 observers_config: None,
628 subscriptions_config: None,
629 validation_config: None,
630 federation_config: None,
631 debug_config: None,
632 mcp_config: None,
633 query_defaults: None,
634 };
635
636 let report = SchemaValidator::validate(&schema).unwrap();
637 assert!(report.is_valid());
638 }
639
640 #[test]
641 fn test_detect_unknown_return_type() {
642 let schema = IntermediateSchema {
643 security: None,
644 version: "2.0.0".to_string(),
645 types: vec![],
646 enums: vec![],
647 input_types: vec![],
648 interfaces: vec![],
649 unions: vec![],
650 queries: vec![IntermediateQuery {
651 name: "users".to_string(),
652 return_type: "UnknownType".to_string(),
653 returns_list: true,
654 nullable: false,
655 arguments: vec![],
656 description: None,
657 sql_source: Some("users".to_string()),
658 auto_params: None,
659 deprecated: None,
660 jsonb_column: None,
661 relay: false,
662 inject: IndexMap::default(),
663 cache_ttl_seconds: None,
664 additional_views: vec![],
665 requires_role: None,
666 relay_cursor_type: None,
667 }],
668 mutations: vec![],
669 subscriptions: vec![],
670 fragments: None,
671 directives: None,
672 fact_tables: None,
673 aggregate_queries: None,
674 observers: None,
675 custom_scalars: None,
676 observers_config: None,
677 subscriptions_config: None,
678 validation_config: None,
679 federation_config: None,
680 debug_config: None,
681 mcp_config: None,
682 query_defaults: None,
683 };
684
685 let report = SchemaValidator::validate(&schema).unwrap();
686 assert!(!report.is_valid());
687 assert_eq!(report.error_count(), 1);
688 assert!(report.errors[0].message.contains("unknown type 'UnknownType'"));
689 }
690
691 #[test]
692 fn test_detect_duplicate_query_names() {
693 let schema = IntermediateSchema {
694 security: None,
695 version: "2.0.0".to_string(),
696 types: vec![IntermediateType {
697 name: "User".to_string(),
698 fields: vec![],
699 description: None,
700 implements: vec![],
701 requires_role: None,
702 is_error: false,
703 relay: false,
704 }],
705 enums: vec![],
706 input_types: vec![],
707 interfaces: vec![],
708 unions: vec![],
709 queries: vec![
710 IntermediateQuery {
711 name: "users".to_string(),
712 return_type: "User".to_string(),
713 returns_list: true,
714 nullable: false,
715 arguments: vec![],
716 description: None,
717 sql_source: Some("users".to_string()),
718 auto_params: None,
719 deprecated: None,
720 jsonb_column: None,
721 relay: false,
722 inject: IndexMap::default(),
723 cache_ttl_seconds: None,
724 additional_views: vec![],
725 requires_role: None,
726 relay_cursor_type: None,
727 },
728 IntermediateQuery {
729 name: "users".to_string(), return_type: "User".to_string(),
731 returns_list: true,
732 nullable: false,
733 arguments: vec![],
734 description: None,
735 sql_source: Some("users".to_string()),
736 auto_params: None,
737 deprecated: None,
738 jsonb_column: None,
739 relay: false,
740 inject: IndexMap::default(),
741 cache_ttl_seconds: None,
742 additional_views: vec![],
743 requires_role: None,
744 relay_cursor_type: None,
745 },
746 ],
747 mutations: vec![],
748 subscriptions: vec![],
749 fragments: None,
750 directives: None,
751 fact_tables: None,
752 aggregate_queries: None,
753 observers: None,
754 custom_scalars: None,
755 observers_config: None,
756 subscriptions_config: None,
757 validation_config: None,
758 federation_config: None,
759 debug_config: None,
760 mcp_config: None,
761 query_defaults: None,
762 };
763
764 let report = SchemaValidator::validate(&schema).unwrap();
765 assert!(!report.is_valid());
766 assert!(report.errors.iter().any(|e| e.message.contains("Duplicate query name")));
767 }
768
769 #[test]
770 fn test_warning_for_query_without_sql_source() {
771 let schema = IntermediateSchema {
772 security: None,
773 version: "2.0.0".to_string(),
774 types: vec![IntermediateType {
775 name: "User".to_string(),
776 fields: vec![],
777 description: None,
778 implements: vec![],
779 requires_role: None,
780 is_error: false,
781 relay: false,
782 }],
783 enums: vec![],
784 input_types: vec![],
785 interfaces: vec![],
786 unions: vec![],
787 queries: vec![IntermediateQuery {
788 name: "users".to_string(),
789 return_type: "User".to_string(),
790 returns_list: true,
791 nullable: false,
792 arguments: vec![],
793 description: None,
794 sql_source: None, auto_params: None,
796 deprecated: None,
797 jsonb_column: None,
798 relay: false,
799 inject: IndexMap::default(),
800 cache_ttl_seconds: None,
801 additional_views: vec![],
802 requires_role: None,
803 relay_cursor_type: None,
804 }],
805 mutations: vec![],
806 subscriptions: vec![],
807 fragments: None,
808 directives: None,
809 fact_tables: None,
810 aggregate_queries: None,
811 observers: None,
812 custom_scalars: None,
813 observers_config: None,
814 subscriptions_config: None,
815 validation_config: None,
816 federation_config: None,
817 debug_config: None,
818 mcp_config: None,
819 query_defaults: None,
820 };
821
822 let report = SchemaValidator::validate(&schema).unwrap();
823 assert!(report.is_valid()); assert_eq!(report.warning_count(), 1);
825 assert!(report.errors[0].message.contains("no sql_source"));
826 }
827
828 #[test]
829 fn test_valid_observer() {
830 use serde_json::json;
831
832 use super::super::intermediate::{IntermediateObserver, IntermediateRetryConfig};
833
834 let schema = IntermediateSchema {
835 security: None,
836 version: "2.0.0".to_string(),
837 types: vec![IntermediateType {
838 name: "Order".to_string(),
839 fields: vec![],
840 description: None,
841 implements: vec![],
842 requires_role: None,
843 is_error: false,
844 relay: false,
845 }],
846 enums: vec![],
847 input_types: vec![],
848 interfaces: vec![],
849 unions: vec![],
850 queries: vec![],
851 mutations: vec![],
852 subscriptions: vec![],
853 fragments: None,
854 directives: None,
855 fact_tables: None,
856 aggregate_queries: None,
857 observers: Some(vec![IntermediateObserver {
858 name: "onOrderCreated".to_string(),
859 entity: "Order".to_string(),
860 event: "INSERT".to_string(),
861 actions: vec![json!({
862 "type": "webhook",
863 "url": "https://example.com/orders"
864 })],
865 condition: None,
866 retry: IntermediateRetryConfig {
867 max_attempts: 3,
868 backoff_strategy: "exponential".to_string(),
869 initial_delay_ms: 100,
870 max_delay_ms: 60000,
871 },
872 }]),
873 custom_scalars: None,
874 observers_config: None,
875 subscriptions_config: None,
876 validation_config: None,
877 federation_config: None,
878 debug_config: None,
879 mcp_config: None,
880 query_defaults: None,
881 };
882
883 let report = SchemaValidator::validate(&schema).unwrap();
884 assert!(report.is_valid(), "Valid observer should pass validation");
885 assert_eq!(report.error_count(), 0);
886 }
887
888 #[test]
889 fn test_observer_with_unknown_entity() {
890 use serde_json::json;
891
892 use super::super::intermediate::{IntermediateObserver, IntermediateRetryConfig};
893
894 let schema = IntermediateSchema {
895 security: None,
896 version: "2.0.0".to_string(),
897 types: vec![],
898 enums: vec![],
899 input_types: vec![],
900 interfaces: vec![],
901 unions: vec![],
902 queries: vec![],
903 mutations: vec![],
904 subscriptions: vec![],
905 fragments: None,
906 directives: None,
907 fact_tables: None,
908 aggregate_queries: None,
909 observers: Some(vec![IntermediateObserver {
910 name: "onOrderCreated".to_string(),
911 entity: "UnknownEntity".to_string(),
912 event: "INSERT".to_string(),
913 actions: vec![json!({"type": "webhook", "url": "https://example.com"})],
914 condition: None,
915 retry: IntermediateRetryConfig {
916 max_attempts: 3,
917 backoff_strategy: "exponential".to_string(),
918 initial_delay_ms: 100,
919 max_delay_ms: 60000,
920 },
921 }]),
922 custom_scalars: None,
923 observers_config: None,
924 subscriptions_config: None,
925 validation_config: None,
926 federation_config: None,
927 debug_config: None,
928 mcp_config: None,
929 query_defaults: None,
930 };
931
932 let report = SchemaValidator::validate(&schema).unwrap();
933 assert!(!report.is_valid());
934 assert!(report.errors.iter().any(|e| e.message.contains("unknown entity")));
935 }
936
937 #[test]
938 fn test_observer_with_invalid_event() {
939 use serde_json::json;
940
941 use super::super::intermediate::{IntermediateObserver, IntermediateRetryConfig};
942
943 let schema = IntermediateSchema {
944 security: None,
945 version: "2.0.0".to_string(),
946 types: vec![IntermediateType {
947 name: "Order".to_string(),
948 fields: vec![],
949 description: None,
950 implements: vec![],
951 requires_role: None,
952 is_error: false,
953 relay: false,
954 }],
955 enums: vec![],
956 input_types: vec![],
957 interfaces: vec![],
958 unions: vec![],
959 queries: vec![],
960 mutations: vec![],
961 subscriptions: vec![],
962 fragments: None,
963 directives: None,
964 fact_tables: None,
965 aggregate_queries: None,
966 observers: Some(vec![IntermediateObserver {
967 name: "onOrderCreated".to_string(),
968 entity: "Order".to_string(),
969 event: "INVALID_EVENT".to_string(),
970 actions: vec![json!({"type": "webhook", "url": "https://example.com"})],
971 condition: None,
972 retry: IntermediateRetryConfig {
973 max_attempts: 3,
974 backoff_strategy: "exponential".to_string(),
975 initial_delay_ms: 100,
976 max_delay_ms: 60000,
977 },
978 }]),
979 custom_scalars: None,
980 observers_config: None,
981 subscriptions_config: None,
982 validation_config: None,
983 federation_config: None,
984 debug_config: None,
985 mcp_config: None,
986 query_defaults: None,
987 };
988
989 let report = SchemaValidator::validate(&schema).unwrap();
990 assert!(!report.is_valid());
991 assert!(report.errors.iter().any(|e| e.message.contains("invalid event")));
992 }
993
994 #[test]
995 fn test_observer_with_invalid_action_type() {
996 use serde_json::json;
997
998 use super::super::intermediate::{IntermediateObserver, IntermediateRetryConfig};
999
1000 let schema = IntermediateSchema {
1001 security: None,
1002 version: "2.0.0".to_string(),
1003 types: vec![IntermediateType {
1004 name: "Order".to_string(),
1005 fields: vec![],
1006 description: None,
1007 implements: vec![],
1008 requires_role: None,
1009 is_error: false,
1010 relay: false,
1011 }],
1012 enums: vec![],
1013 input_types: vec![],
1014 interfaces: vec![],
1015 unions: vec![],
1016 queries: vec![],
1017 mutations: vec![],
1018 subscriptions: vec![],
1019 fragments: None,
1020 directives: None,
1021 fact_tables: None,
1022 aggregate_queries: None,
1023 observers: Some(vec![IntermediateObserver {
1024 name: "onOrderCreated".to_string(),
1025 entity: "Order".to_string(),
1026 event: "INSERT".to_string(),
1027 actions: vec![json!({"type": "invalid_action"})],
1028 condition: None,
1029 retry: IntermediateRetryConfig {
1030 max_attempts: 3,
1031 backoff_strategy: "exponential".to_string(),
1032 initial_delay_ms: 100,
1033 max_delay_ms: 60000,
1034 },
1035 }]),
1036 custom_scalars: None,
1037 observers_config: None,
1038 subscriptions_config: None,
1039 validation_config: None,
1040 federation_config: None,
1041 debug_config: None,
1042 mcp_config: None,
1043 query_defaults: None,
1044 };
1045
1046 let report = SchemaValidator::validate(&schema).unwrap();
1047 assert!(!report.is_valid());
1048 assert!(report.errors.iter().any(|e| e.message.contains("invalid type")));
1049 }
1050
1051 #[test]
1052 fn test_observer_with_invalid_retry_config() {
1053 use serde_json::json;
1054
1055 use super::super::intermediate::{IntermediateObserver, IntermediateRetryConfig};
1056
1057 let schema = IntermediateSchema {
1058 security: None,
1059 version: "2.0.0".to_string(),
1060 types: vec![IntermediateType {
1061 name: "Order".to_string(),
1062 fields: vec![],
1063 description: None,
1064 implements: vec![],
1065 requires_role: None,
1066 is_error: false,
1067 relay: false,
1068 }],
1069 enums: vec![],
1070 input_types: vec![],
1071 interfaces: vec![],
1072 unions: vec![],
1073 queries: vec![],
1074 mutations: vec![],
1075 subscriptions: vec![],
1076 fragments: None,
1077 directives: None,
1078 fact_tables: None,
1079 aggregate_queries: None,
1080 observers: Some(vec![IntermediateObserver {
1081 name: "onOrderCreated".to_string(),
1082 entity: "Order".to_string(),
1083 event: "INSERT".to_string(),
1084 actions: vec![json!({"type": "webhook", "url": "https://example.com"})],
1085 condition: None,
1086 retry: IntermediateRetryConfig {
1087 max_attempts: 3,
1088 backoff_strategy: "invalid_strategy".to_string(),
1089 initial_delay_ms: 100,
1090 max_delay_ms: 60000,
1091 },
1092 }]),
1093 custom_scalars: None,
1094 observers_config: None,
1095 subscriptions_config: None,
1096 validation_config: None,
1097 federation_config: None,
1098 debug_config: None,
1099 mcp_config: None,
1100 query_defaults: None,
1101 };
1102
1103 let report = SchemaValidator::validate(&schema).unwrap();
1104 assert!(!report.is_valid());
1105 assert!(report.errors.iter().any(|e| e.message.contains("invalid backoff_strategy")));
1106 }
1107
1108 #[test]
1109 fn test_query_injection_in_sql_source_rejected() {
1110 let schema = IntermediateSchema {
1111 security: None,
1112 version: "2.0.0".to_string(),
1113 types: vec![IntermediateType {
1114 name: "User".to_string(),
1115 fields: vec![],
1116 description: None,
1117 implements: vec![],
1118 requires_role: None,
1119 is_error: false,
1120 relay: false,
1121 }],
1122 enums: vec![],
1123 input_types: vec![],
1124 interfaces: vec![],
1125 unions: vec![],
1126 queries: vec![IntermediateQuery {
1127 name: "users".to_string(),
1128 return_type: "User".to_string(),
1129 returns_list: true,
1130 nullable: false,
1131 arguments: vec![],
1132 description: None,
1133 sql_source: Some("v_user\"; DROP TABLE users; --".to_string()),
1134 auto_params: None,
1135 deprecated: None,
1136 jsonb_column: None,
1137 relay: false,
1138 inject: IndexMap::default(),
1139 cache_ttl_seconds: None,
1140 additional_views: vec![],
1141 requires_role: None,
1142 relay_cursor_type: None,
1143 }],
1144 mutations: vec![],
1145 subscriptions: vec![],
1146 fragments: None,
1147 directives: None,
1148 fact_tables: None,
1149 aggregate_queries: None,
1150 observers: None,
1151 custom_scalars: None,
1152 observers_config: None,
1153 subscriptions_config: None,
1154 validation_config: None,
1155 federation_config: None,
1156 debug_config: None,
1157 mcp_config: None,
1158 query_defaults: None,
1159 };
1160
1161 let report = SchemaValidator::validate(&schema).unwrap();
1162 assert!(!report.is_valid());
1163 assert!(report.errors.iter().any(|e| e.message.contains("valid SQL identifier")));
1164 }
1165
1166 #[test]
1167 fn test_query_schema_qualified_sql_source_passes() {
1168 let schema = IntermediateSchema {
1169 security: None,
1170 version: "2.0.0".to_string(),
1171 types: vec![IntermediateType {
1172 name: "User".to_string(),
1173 fields: vec![],
1174 description: None,
1175 implements: vec![],
1176 requires_role: None,
1177 is_error: false,
1178 relay: false,
1179 }],
1180 enums: vec![],
1181 input_types: vec![],
1182 interfaces: vec![],
1183 unions: vec![],
1184 queries: vec![IntermediateQuery {
1185 name: "users".to_string(),
1186 return_type: "User".to_string(),
1187 returns_list: true,
1188 nullable: false,
1189 arguments: vec![],
1190 description: None,
1191 sql_source: Some("public.v_user".to_string()),
1192 auto_params: None,
1193 deprecated: None,
1194 jsonb_column: None,
1195 relay: false,
1196 inject: IndexMap::default(),
1197 cache_ttl_seconds: None,
1198 additional_views: vec![],
1199 requires_role: None,
1200 relay_cursor_type: None,
1201 }],
1202 mutations: vec![],
1203 subscriptions: vec![],
1204 fragments: None,
1205 directives: None,
1206 fact_tables: None,
1207 aggregate_queries: None,
1208 observers: None,
1209 custom_scalars: None,
1210 observers_config: None,
1211 subscriptions_config: None,
1212 validation_config: None,
1213 federation_config: None,
1214 debug_config: None,
1215 mcp_config: None,
1216 query_defaults: None,
1217 };
1218
1219 let report = SchemaValidator::validate(&schema).unwrap();
1220 assert!(report.is_valid(), "Schema-qualified sql_source should be valid");
1222 }
1223}