1use std::collections::HashMap;
2use std::time::Instant;
3
4use type_bridge_core_lib::ast::Clause;
5use type_bridge_core_lib::compiler::QueryCompiler;
6use type_bridge_core_lib::schema::TypeSchema;
7use type_bridge_core_lib::validation::ValidationEngine;
8
9use crate::error::PipelineError;
10use crate::executor::QueryExecutor;
11use crate::interceptor::crud_interceptor::{CrudInterceptor, CrudInterceptorAdapter};
12use crate::interceptor::{Interceptor, InterceptorChain, RequestContext};
13use crate::schema_source::SchemaSource;
14
15pub struct QueryInput {
17 pub database: Option<String>,
18 pub transaction_type: String,
19 pub clauses: Vec<Clause>,
20 pub metadata: HashMap<String, serde_json::Value>,
21}
22
23pub struct ValidateInput {
25 pub clauses: Vec<Clause>,
26}
27
28#[derive(Debug)]
30pub struct QueryOutput {
31 pub results: serde_json::Value,
32 pub request_id: String,
33 pub execution_time_ms: u64,
34 pub interceptors_applied: Vec<String>,
35}
36
37#[derive(Debug)]
39pub struct ValidateOutput {
40 pub is_valid: bool,
41 pub errors: Vec<ValidationErrorDetail>,
42}
43
44#[derive(Debug)]
46pub struct ValidationErrorDetail {
47 pub code: String,
48 pub message: String,
49 pub path: String,
50}
51
52#[cfg_attr(coverage_nightly, coverage(off))]
53fn log_query_execution(database: &str, transaction_type: &str, typeql: &str) {
54 tracing::info!(database, transaction_type, "Executing query");
55 tracing::debug!(typeql, "Compiled TypeQL");
56}
57
58pub struct QueryPipeline {
76 schema: Option<TypeSchema>,
77 validation_engine: ValidationEngine,
78 interceptor_chain: InterceptorChain,
79 default_database: String,
80 executor: Box<dyn QueryExecutor>,
81 skip_validation: bool,
82}
83
84impl QueryPipeline {
85 pub async fn execute_query(&self, input: QueryInput) -> Result<QueryOutput, PipelineError> {
87 let start = Instant::now();
88 let request_id = uuid::Uuid::new_v4().to_string();
89 let database = input
90 .database
91 .unwrap_or_else(|| self.default_database.clone());
92
93 let mut ctx = RequestContext {
94 request_id: request_id.clone(),
95 client_id: "unknown".to_string(),
96 database: database.clone(),
97 transaction_type: input.transaction_type.clone(),
98 metadata: input.metadata,
99 timestamp: chrono::Utc::now(),
100 crud_info: None,
101 };
102
103 if !self.skip_validation
105 && let Some(schema) = &self.schema
106 {
107 let result = self
108 .validation_engine
109 .validate_query(&input.clauses, schema);
110 if !result.is_valid {
111 return Err(PipelineError::Validation(format!(
112 "{} validation error(s)",
113 result.errors.len()
114 )));
115 }
116 }
117
118 let clauses = self
120 .interceptor_chain
121 .execute_request(input.clauses, &mut ctx)
122 .await
123 .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
124
125 let compiler = QueryCompiler::new();
127 let typeql = compiler.compile(&clauses);
128 ctx.metadata.insert(
129 "compiled_typeql".to_string(),
130 serde_json::Value::String(typeql.clone()),
131 );
132
133 log_query_execution(&database, &input.transaction_type, &typeql);
135
136 let results = self
137 .executor
138 .execute(&database, &typeql, &input.transaction_type)
139 .await?;
140
141 self.interceptor_chain
143 .execute_response(&results, &ctx)
144 .await
145 .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
146
147 let elapsed = start.elapsed().as_millis() as u64;
148
149 Ok(QueryOutput {
150 results,
151 request_id,
152 execution_time_ms: elapsed,
153 interceptors_applied: self
154 .interceptor_chain
155 .interceptor_names()
156 .into_iter()
157 .map(String::from)
158 .collect(),
159 })
160 }
161
162 pub fn validate(&self, input: &ValidateInput) -> Result<ValidateOutput, PipelineError> {
164 let schema = self
165 .schema
166 .as_ref()
167 .ok_or_else(|| PipelineError::Schema("No schema loaded".to_string()))?;
168
169 let result = self
170 .validation_engine
171 .validate_query(&input.clauses, schema);
172
173 let errors = result
174 .errors
175 .iter()
176 .map(|e| ValidationErrorDetail {
177 code: e.code.clone(),
178 message: e.message.clone(),
179 path: e.path.clone(),
180 })
181 .collect();
182
183 Ok(ValidateOutput {
184 is_valid: result.is_valid,
185 errors,
186 })
187 }
188
189 pub fn schema(&self) -> Option<&TypeSchema> {
191 self.schema.as_ref()
192 }
193
194 pub fn is_connected(&self) -> bool {
196 self.executor.is_connected()
197 }
198
199 pub fn default_database(&self) -> &str {
201 &self.default_database
202 }
203}
204
205pub struct PipelineBuilder {
219 executor: Box<dyn QueryExecutor>,
220 schema_source: Option<Box<dyn SchemaSource>>,
221 interceptors: Vec<Box<dyn Interceptor>>,
222 default_database: String,
223 skip_validation: bool,
224}
225
226impl PipelineBuilder {
227 pub fn new(executor: impl QueryExecutor + 'static) -> Self {
229 Self {
230 executor: Box::new(executor),
231 schema_source: None,
232 interceptors: Vec::new(),
233 default_database: String::new(),
234 skip_validation: false,
235 }
236 }
237
238 pub fn with_schema_source(mut self, source: impl SchemaSource + 'static) -> Self {
240 self.schema_source = Some(Box::new(source));
241 self
242 }
243
244 pub fn with_interceptor(mut self, interceptor: impl Interceptor + 'static) -> Self {
246 self.interceptors.push(Box::new(interceptor));
247 self
248 }
249
250 pub fn with_default_database(mut self, database: impl Into<String>) -> Self {
252 self.default_database = database.into();
253 self
254 }
255
256 pub fn with_crud_interceptor(self, interceptor: impl CrudInterceptor + 'static) -> Self {
262 self.with_interceptor(CrudInterceptorAdapter::new(interceptor))
263 }
264
265 pub fn with_skip_validation(mut self) -> Self {
270 self.skip_validation = true;
271 self
272 }
273
274 pub fn build(self) -> Result<QueryPipeline, PipelineError> {
276 let schema = match self.schema_source {
277 Some(source) => Some(source.load()?),
278 None => None,
279 };
280
281 Ok(QueryPipeline {
282 schema,
283 validation_engine: ValidationEngine::new(),
284 interceptor_chain: InterceptorChain::new(self.interceptors),
285 default_database: self.default_database,
286 executor: self.executor,
287 skip_validation: self.skip_validation,
288 })
289 }
290}
291
292#[cfg(test)]
293#[cfg_attr(coverage_nightly, coverage(off))]
294mod tests {
295 use std::future::Future;
296 use std::pin::Pin;
297 use std::sync::Arc;
298 use std::sync::atomic::{AtomicUsize, Ordering};
299
300 use type_bridge_core_lib::ast::{Constraint, Pattern, Value};
301
302 use super::*;
303 use crate::interceptor::traits::InterceptError;
304 use crate::test_helpers::{MockExecutor, make_pipeline, make_simple_clauses};
305
306 fn init_tracing() -> tracing::subscriber::DefaultGuard {
307 let subscriber = tracing_subscriber::fmt()
308 .with_max_level(tracing::Level::DEBUG)
309 .with_test_writer()
310 .finish();
311 tracing::subscriber::set_default(subscriber)
312 }
313
314 struct PassthroughInterceptor {
317 name: String,
318 }
319
320 impl Interceptor for PassthroughInterceptor {
321 fn name(&self) -> &str {
322 &self.name
323 }
324 fn on_request<'a>(
325 &'a self,
326 clauses: Vec<Clause>,
327 _ctx: &'a mut RequestContext,
328 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>>
329 {
330 Box::pin(async move { Ok(clauses) })
331 }
332 }
333
334 struct RejectingRequestInterceptor;
335
336 impl Interceptor for RejectingRequestInterceptor {
337 fn name(&self) -> &str {
338 "rejector"
339 }
340 fn on_request<'a>(
341 &'a self,
342 _clauses: Vec<Clause>,
343 _ctx: &'a mut RequestContext,
344 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>>
345 {
346 Box::pin(async {
347 Err(InterceptError::AccessDenied {
348 reason: "test rejection".into(),
349 })
350 })
351 }
352 }
353
354 struct RejectingResponseInterceptor;
355
356 impl Interceptor for RejectingResponseInterceptor {
357 fn name(&self) -> &str {
358 "resp-rejector"
359 }
360 fn on_request<'a>(
361 &'a self,
362 clauses: Vec<Clause>,
363 _ctx: &'a mut RequestContext,
364 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>>
365 {
366 Box::pin(async move { Ok(clauses) })
367 }
368 fn on_response<'a>(
369 &'a self,
370 _result: &'a serde_json::Value,
371 _ctx: &'a RequestContext,
372 ) -> Pin<Box<dyn Future<Output = Result<(), InterceptError>> + Send + 'a>> {
373 Box::pin(async { Err(InterceptError::Internal("response rejected".into())) })
374 }
375 }
376
377 struct CountingInterceptor {
378 name: String,
379 count: Arc<AtomicUsize>,
380 }
381
382 impl Interceptor for CountingInterceptor {
383 fn name(&self) -> &str {
384 &self.name
385 }
386 fn on_request<'a>(
387 &'a self,
388 clauses: Vec<Clause>,
389 _ctx: &'a mut RequestContext,
390 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>>
391 {
392 Box::pin(async move {
393 self.count.fetch_add(1, Ordering::SeqCst);
394 Ok(clauses)
395 })
396 }
397 }
398
399 struct FailingSchemaSource;
401
402 impl crate::schema_source::SchemaSource for FailingSchemaSource {
403 fn load(&self) -> Result<TypeSchema, PipelineError> {
404 Err(PipelineError::Schema("source failed".into()))
405 }
406 }
407
408 fn make_query_input(clauses: Vec<Clause>) -> QueryInput {
409 QueryInput {
410 database: None,
411 transaction_type: "read".to_string(),
412 clauses,
413 metadata: HashMap::new(),
414 }
415 }
416
417 fn make_query_input_with_db(clauses: Vec<Clause>, db: &str) -> QueryInput {
418 QueryInput {
419 database: Some(db.to_string()),
420 transaction_type: "read".to_string(),
421 clauses,
422 metadata: HashMap::new(),
423 }
424 }
425
426 #[test]
431 fn builder_without_schema_source() {
432 let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
433 assert!(pipeline.schema().is_none());
434 }
435
436 #[test]
437 fn builder_with_valid_schema_source() {
438 let pipeline = make_pipeline(MockExecutor::new(), true);
439 assert!(pipeline.schema().is_some());
440 let schema = pipeline.schema().unwrap();
441 assert!(schema.entities.contains_key("person"));
442 }
443
444 #[test]
445 fn builder_with_failing_schema_source() {
446 let result = PipelineBuilder::new(MockExecutor::new())
447 .with_schema_source(FailingSchemaSource)
448 .build();
449 let err = result.err().expect("Expected build error");
450 assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("source failed")));
451 }
452
453 #[test]
454 fn builder_with_default_database() {
455 let pipeline = PipelineBuilder::new(MockExecutor::new())
456 .with_default_database("mydb")
457 .build()
458 .unwrap();
459 assert_eq!(pipeline.default_database(), "mydb");
460 }
461
462 #[test]
463 fn builder_default_empty_database() {
464 let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
465 assert_eq!(pipeline.default_database(), "");
466 }
467
468 #[tokio::test]
469 async fn builder_with_interceptors() {
470 let pipeline = PipelineBuilder::new(MockExecutor::new())
471 .with_interceptor(PassthroughInterceptor {
472 name: "first".into(),
473 })
474 .with_interceptor(PassthroughInterceptor {
475 name: "second".into(),
476 })
477 .build()
478 .unwrap();
479 assert!(pipeline.schema().is_none());
480
481 let input = make_query_input(vec![]);
482 let output = pipeline.execute_query(input).await.unwrap();
483 assert_eq!(output.interceptors_applied, vec!["first", "second"]);
484 }
485
486 #[tokio::test]
491 async fn execute_query_uses_input_database() {
492 let executor = MockExecutor::new();
493 let calls = executor.calls.clone();
494 let pipeline = make_pipeline(executor, false);
495
496 let input = make_query_input_with_db(vec![], "custom_db");
497 pipeline.execute_query(input).await.unwrap();
498
499 let recorded = calls.lock().unwrap();
500 assert_eq!(recorded[0].0, "custom_db");
501 }
502
503 #[tokio::test]
504 async fn execute_query_uses_default_database_when_none() {
505 let executor = MockExecutor::new();
506 let calls = executor.calls.clone();
507 let pipeline = make_pipeline(executor, false);
508
509 let input = make_query_input(vec![]);
510 pipeline.execute_query(input).await.unwrap();
511
512 let recorded = calls.lock().unwrap();
513 assert_eq!(recorded[0].0, "test_db"); }
515
516 #[tokio::test]
517 async fn execute_query_skips_validation_when_no_schema() {
518 let pipeline = make_pipeline(MockExecutor::new(), false);
519 let clauses = vec![Clause::Match(vec![Pattern::Entity {
520 variable: "x".to_string(),
521 type_name: "nonexistent_type".to_string(),
522 constraints: vec![],
523 is_strict: false,
524 }])];
525 let input = make_query_input(clauses);
526 let result = pipeline.execute_query(input).await;
527 assert!(result.is_ok());
528 }
529
530 #[tokio::test]
531 async fn execute_query_validates_when_schema_present_valid() {
532 let pipeline = make_pipeline(MockExecutor::new(), true);
533 let input = make_query_input(make_simple_clauses());
534 let result = pipeline.execute_query(input).await;
535 assert!(result.is_ok());
536 }
537
538 #[tokio::test]
539 async fn execute_query_validates_when_schema_present_invalid() {
540 let pipeline = make_pipeline(MockExecutor::new(), true);
541 let clauses = vec![Clause::Match(vec![Pattern::Entity {
542 variable: "p".to_string(),
543 type_name: "person".to_string(),
544 constraints: vec![Constraint::Has {
545 attr_name: "nonexistent_attr".to_string(),
546 value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
547 value: serde_json::json!("val"),
548 value_type: "string".to_string(),
549 }),
550 }],
551 is_strict: false,
552 }])];
553 let input = make_query_input(clauses);
554 let result = pipeline.execute_query(input).await;
555 let err = result.unwrap_err();
556 assert!(matches!(&err, PipelineError::Validation(msg) if msg.contains("validation error")));
557 }
558
559 #[tokio::test]
560 async fn execute_query_request_interceptor_failure() {
561 assert_eq!(RejectingRequestInterceptor.name(), "rejector");
562 let pipeline = PipelineBuilder::new(MockExecutor::new())
563 .with_interceptor(RejectingRequestInterceptor)
564 .build()
565 .unwrap();
566 let input = make_query_input(vec![]);
567 let result = pipeline.execute_query(input).await;
568 let err = result.unwrap_err();
569 assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("test rejection")));
570 }
571
572 #[tokio::test]
573 async fn execute_query_executor_failure() {
574 let pipeline = make_pipeline(MockExecutor::failing("db crash"), false);
575 let input = make_query_input(vec![]);
576 let result = pipeline.execute_query(input).await;
577 let err = result.unwrap_err();
578 assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("db crash")));
579 }
580
581 #[tokio::test]
582 async fn execute_query_response_interceptor_failure() {
583 assert_eq!(RejectingResponseInterceptor.name(), "resp-rejector");
584 let pipeline = PipelineBuilder::new(MockExecutor::new())
585 .with_interceptor(RejectingResponseInterceptor)
586 .build()
587 .unwrap();
588 let input = make_query_input(vec![]);
589 let result = pipeline.execute_query(input).await;
590 let err = result.unwrap_err();
591 assert!(
592 matches!(&err, PipelineError::Interceptor(msg) if msg.contains("response rejected"))
593 );
594 }
595
596 #[tokio::test]
597 async fn execute_query_success_output_fields() {
598 let _guard = init_tracing();
599 let count = Arc::new(AtomicUsize::new(0));
600 let pipeline =
601 PipelineBuilder::new(MockExecutor::with_result(serde_json::json!({"ok": true})))
602 .with_default_database("test_db")
603 .with_interceptor(CountingInterceptor {
604 name: "counter".into(),
605 count: count.clone(),
606 })
607 .build()
608 .unwrap();
609
610 let input = make_query_input(vec![]);
611 let output = pipeline.execute_query(input).await.unwrap();
612
613 assert!(!output.request_id.is_empty());
614 assert_eq!(output.results, serde_json::json!({"ok": true}));
615 assert_eq!(output.interceptors_applied, vec!["counter"]);
616 assert_eq!(count.load(Ordering::SeqCst), 1);
617 }
618
619 #[tokio::test]
620 async fn execute_query_empty_clauses_success() {
621 let pipeline = make_pipeline(MockExecutor::new(), false);
622 let input = make_query_input(vec![]);
623 let result = pipeline.execute_query(input).await;
624 assert!(result.is_ok());
625 }
626
627 #[tokio::test]
628 async fn execute_query_compiled_typeql_in_metadata() {
629 let executor = MockExecutor::new();
630 let calls = executor.calls.clone();
631 let pipeline = make_pipeline(executor, false);
632
633 let clauses = make_simple_clauses();
634 let input = make_query_input(clauses);
635 pipeline.execute_query(input).await.unwrap();
636
637 let recorded = calls.lock().unwrap();
638 assert!(!recorded[0].1.is_empty());
639 }
640
641 #[tokio::test]
642 async fn execute_query_passes_transaction_type() {
643 let executor = MockExecutor::new();
644 let calls = executor.calls.clone();
645 let pipeline = make_pipeline(executor, false);
646
647 let input = QueryInput {
648 database: None,
649 transaction_type: "write".to_string(),
650 clauses: vec![],
651 metadata: HashMap::new(),
652 };
653 pipeline.execute_query(input).await.unwrap();
654
655 let recorded = calls.lock().unwrap();
656 assert_eq!(recorded[0].2, "write");
657 }
658
659 #[test]
664 fn validate_no_schema_returns_error() {
665 let pipeline = make_pipeline(MockExecutor::new(), false);
666 let input = ValidateInput { clauses: vec![] };
667 let result = pipeline.validate(&input);
668 let err = result.unwrap_err();
669 assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("No schema loaded")));
670 }
671
672 #[test]
673 fn validate_valid_clauses() {
674 let pipeline = make_pipeline(MockExecutor::new(), true);
675 let input = ValidateInput {
676 clauses: make_simple_clauses(),
677 };
678 let result = pipeline.validate(&input).unwrap();
679 assert!(result.is_valid);
680 assert!(result.errors.is_empty());
681 }
682
683 #[test]
684 fn validate_invalid_clauses() {
685 let pipeline = make_pipeline(MockExecutor::new(), true);
686 let input = ValidateInput {
687 clauses: vec![Clause::Match(vec![Pattern::Entity {
688 variable: "p".to_string(),
689 type_name: "person".to_string(),
690 constraints: vec![Constraint::Has {
691 attr_name: "nonexistent_attr".to_string(),
692 value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
693 value: serde_json::json!("val"),
694 value_type: "string".to_string(),
695 }),
696 }],
697 is_strict: false,
698 }])],
699 };
700 let result = pipeline.validate(&input).unwrap();
701 assert!(!result.is_valid);
702 assert!(!result.errors.is_empty());
703 }
704
705 #[test]
706 fn validate_error_detail_fields() {
707 let pipeline = make_pipeline(MockExecutor::new(), true);
708 let input = ValidateInput {
709 clauses: vec![Clause::Match(vec![Pattern::Entity {
710 variable: "x".to_string(),
711 type_name: "person".to_string(),
712 constraints: vec![Constraint::Has {
713 attr_name: "nonexistent_attr".to_string(),
714 value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
715 value: serde_json::json!("val"),
716 value_type: "string".to_string(),
717 }),
718 }],
719 is_strict: false,
720 }])],
721 };
722 let result = pipeline.validate(&input).unwrap();
723 assert!(!result.is_valid);
724 let error = &result.errors[0];
725 assert!(!error.code.is_empty());
726 assert!(!error.message.is_empty());
727 }
728
729 #[test]
730 fn validate_empty_clauses_with_schema() {
731 let pipeline = make_pipeline(MockExecutor::new(), true);
732 let input = ValidateInput { clauses: vec![] };
733 let result = pipeline.validate(&input).unwrap();
734 assert!(result.is_valid);
735 }
736
737 #[test]
742 fn schema_returns_some_when_loaded() {
743 let pipeline = make_pipeline(MockExecutor::new(), true);
744 assert!(pipeline.schema().is_some());
745 }
746
747 #[test]
748 fn schema_returns_none_when_not_loaded() {
749 let pipeline = make_pipeline(MockExecutor::new(), false);
750 assert!(pipeline.schema().is_none());
751 }
752
753 #[test]
754 fn is_connected_delegates_to_executor() {
755 let executor = MockExecutor::new();
756 *executor.connected.lock().unwrap() = true;
757 let pipeline = make_pipeline(executor, false);
758 assert!(pipeline.is_connected());
759 }
760
761 #[test]
762 fn is_connected_false_when_executor_disconnected() {
763 let executor = MockExecutor::new();
764 *executor.connected.lock().unwrap() = false;
765 let pipeline = make_pipeline(executor, false);
766 assert!(!pipeline.is_connected());
767 }
768
769 #[test]
770 fn default_database_returns_configured_value() {
771 let pipeline = PipelineBuilder::new(MockExecutor::new())
772 .with_default_database("my_database")
773 .build()
774 .unwrap();
775 assert_eq!(pipeline.default_database(), "my_database");
776 }
777
778 #[test]
779 fn default_database_empty_when_not_set() {
780 let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
781 assert_eq!(pipeline.default_database(), "");
782 }
783}