1use std::collections::HashMap;
2use std::time::Instant;
3
4use type_bridge_core_lib::ast::Clause;
5use type_bridge_core_lib::compiler::QueryCompiler;
6use type_bridge_core_lib::schema::TypeSchema;
7use type_bridge_core_lib::validation::ValidationEngine;
8
9use crate::error::PipelineError;
10use crate::executor::QueryExecutor;
11use crate::interceptor::{Interceptor, InterceptorChain, RequestContext};
12use crate::schema_source::SchemaSource;
13
14pub struct QueryInput {
16 pub database: Option<String>,
17 pub transaction_type: String,
18 pub clauses: Vec<Clause>,
19 pub metadata: HashMap<String, serde_json::Value>,
20}
21
22pub struct ValidateInput {
24 pub clauses: Vec<Clause>,
25}
26
27#[derive(Debug)]
29pub struct QueryOutput {
30 pub results: serde_json::Value,
31 pub request_id: String,
32 pub execution_time_ms: u64,
33 pub interceptors_applied: Vec<String>,
34}
35
36#[derive(Debug)]
38pub struct ValidateOutput {
39 pub is_valid: bool,
40 pub errors: Vec<ValidationErrorDetail>,
41}
42
43#[derive(Debug)]
45pub struct ValidationErrorDetail {
46 pub code: String,
47 pub message: String,
48 pub path: String,
49}
50
51#[cfg_attr(coverage_nightly, coverage(off))]
52fn log_query_execution(database: &str, transaction_type: &str, typeql: &str) {
53 tracing::info!(database, transaction_type, "Executing query");
54 tracing::debug!(typeql, "Compiled TypeQL");
55}
56
57pub struct QueryPipeline {
75 schema: Option<TypeSchema>,
76 validation_engine: ValidationEngine,
77 interceptor_chain: InterceptorChain,
78 default_database: String,
79 executor: Box<dyn QueryExecutor>,
80 skip_validation: bool,
81}
82
83impl QueryPipeline {
84 pub async fn execute_query(&self, input: QueryInput) -> Result<QueryOutput, PipelineError> {
86 let start = Instant::now();
87 let request_id = uuid::Uuid::new_v4().to_string();
88 let database = input
89 .database
90 .unwrap_or_else(|| self.default_database.clone());
91
92 let mut ctx = RequestContext {
93 request_id: request_id.clone(),
94 client_id: "unknown".to_string(),
95 database: database.clone(),
96 transaction_type: input.transaction_type.clone(),
97 metadata: input.metadata,
98 timestamp: chrono::Utc::now(),
99 };
100
101 if !self.skip_validation
103 && let Some(schema) = &self.schema
104 {
105 let result = self.validation_engine.validate_query(&input.clauses, schema);
106 if !result.is_valid {
107 return Err(PipelineError::Validation(format!(
108 "{} validation error(s)",
109 result.errors.len()
110 )));
111 }
112 }
113
114 let clauses = self
116 .interceptor_chain
117 .execute_request(input.clauses, &mut ctx)
118 .await
119 .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
120
121 let compiler = QueryCompiler::new();
123 let typeql = compiler.compile(&clauses);
124 ctx.metadata.insert(
125 "compiled_typeql".to_string(),
126 serde_json::Value::String(typeql.clone()),
127 );
128
129 log_query_execution(&database, &input.transaction_type, &typeql);
131
132 let results = self
133 .executor
134 .execute(&database, &typeql, &input.transaction_type)
135 .await?;
136
137 self.interceptor_chain
139 .execute_response(&results, &ctx)
140 .await
141 .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
142
143 let elapsed = start.elapsed().as_millis() as u64;
144
145 Ok(QueryOutput {
146 results,
147 request_id,
148 execution_time_ms: elapsed,
149 interceptors_applied: self
150 .interceptor_chain
151 .interceptor_names()
152 .into_iter()
153 .map(String::from)
154 .collect(),
155 })
156 }
157
158 pub fn validate(&self, input: &ValidateInput) -> Result<ValidateOutput, PipelineError> {
160 let schema = self
161 .schema
162 .as_ref()
163 .ok_or_else(|| PipelineError::Schema("No schema loaded".to_string()))?;
164
165 let result = self.validation_engine.validate_query(&input.clauses, schema);
166
167 let errors = result
168 .errors
169 .iter()
170 .map(|e| ValidationErrorDetail {
171 code: e.code.clone(),
172 message: e.message.clone(),
173 path: e.path.clone(),
174 })
175 .collect();
176
177 Ok(ValidateOutput {
178 is_valid: result.is_valid,
179 errors,
180 })
181 }
182
183 pub fn schema(&self) -> Option<&TypeSchema> {
185 self.schema.as_ref()
186 }
187
188 pub fn is_connected(&self) -> bool {
190 self.executor.is_connected()
191 }
192
193 pub fn default_database(&self) -> &str {
195 &self.default_database
196 }
197}
198
199pub struct PipelineBuilder {
213 executor: Box<dyn QueryExecutor>,
214 schema_source: Option<Box<dyn SchemaSource>>,
215 interceptors: Vec<Box<dyn Interceptor>>,
216 default_database: String,
217 skip_validation: bool,
218}
219
220impl PipelineBuilder {
221 pub fn new(executor: impl QueryExecutor + 'static) -> Self {
223 Self {
224 executor: Box::new(executor),
225 schema_source: None,
226 interceptors: Vec::new(),
227 default_database: String::new(),
228 skip_validation: false,
229 }
230 }
231
232 pub fn with_schema_source(mut self, source: impl SchemaSource + 'static) -> Self {
234 self.schema_source = Some(Box::new(source));
235 self
236 }
237
238 pub fn with_interceptor(mut self, interceptor: impl Interceptor + 'static) -> Self {
240 self.interceptors.push(Box::new(interceptor));
241 self
242 }
243
244 pub fn with_default_database(mut self, database: impl Into<String>) -> Self {
246 self.default_database = database.into();
247 self
248 }
249
250 pub fn with_skip_validation(mut self) -> Self {
255 self.skip_validation = true;
256 self
257 }
258
259 pub fn build(self) -> Result<QueryPipeline, PipelineError> {
261 let schema = match self.schema_source {
262 Some(source) => Some(source.load()?),
263 None => None,
264 };
265
266 Ok(QueryPipeline {
267 schema,
268 validation_engine: ValidationEngine::new(),
269 interceptor_chain: InterceptorChain::new(self.interceptors),
270 default_database: self.default_database,
271 executor: self.executor,
272 skip_validation: self.skip_validation,
273 })
274 }
275}
276
277#[cfg(test)]
278#[cfg_attr(coverage_nightly, coverage(off))]
279mod tests {
280 use std::future::Future;
281 use std::pin::Pin;
282 use std::sync::atomic::{AtomicUsize, Ordering};
283 use std::sync::Arc;
284
285 use type_bridge_core_lib::ast::{Constraint, Pattern, Value};
286
287 use super::*;
288 use crate::interceptor::traits::InterceptError;
289 use crate::test_helpers::{make_pipeline, make_simple_clauses, MockExecutor};
290
291 fn init_tracing() -> tracing::subscriber::DefaultGuard {
292 let subscriber = tracing_subscriber::fmt()
293 .with_max_level(tracing::Level::DEBUG)
294 .with_test_writer()
295 .finish();
296 tracing::subscriber::set_default(subscriber)
297 }
298
299 struct PassthroughInterceptor {
302 name: String,
303 }
304
305 impl Interceptor for PassthroughInterceptor {
306 fn name(&self) -> &str {
307 &self.name
308 }
309 fn on_request<'a>(
310 &'a self,
311 clauses: Vec<Clause>,
312 _ctx: &'a mut RequestContext,
313 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
314 Box::pin(async move { Ok(clauses) })
315 }
316 }
317
318 struct RejectingRequestInterceptor;
319
320 impl Interceptor for RejectingRequestInterceptor {
321 fn name(&self) -> &str {
322 "rejector"
323 }
324 fn on_request<'a>(
325 &'a self,
326 _clauses: Vec<Clause>,
327 _ctx: &'a mut RequestContext,
328 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
329 Box::pin(async {
330 Err(InterceptError::AccessDenied {
331 reason: "test rejection".into(),
332 })
333 })
334 }
335 }
336
337 struct RejectingResponseInterceptor;
338
339 impl Interceptor for RejectingResponseInterceptor {
340 fn name(&self) -> &str {
341 "resp-rejector"
342 }
343 fn on_request<'a>(
344 &'a self,
345 clauses: Vec<Clause>,
346 _ctx: &'a mut RequestContext,
347 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
348 Box::pin(async move { Ok(clauses) })
349 }
350 fn on_response<'a>(
351 &'a self,
352 _result: &'a serde_json::Value,
353 _ctx: &'a RequestContext,
354 ) -> Pin<Box<dyn Future<Output = Result<(), InterceptError>> + Send + 'a>> {
355 Box::pin(async { Err(InterceptError::Internal("response rejected".into())) })
356 }
357 }
358
359 struct CountingInterceptor {
360 name: String,
361 count: Arc<AtomicUsize>,
362 }
363
364 impl Interceptor for CountingInterceptor {
365 fn name(&self) -> &str {
366 &self.name
367 }
368 fn on_request<'a>(
369 &'a self,
370 clauses: Vec<Clause>,
371 _ctx: &'a mut RequestContext,
372 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
373 Box::pin(async move {
374 self.count.fetch_add(1, Ordering::SeqCst);
375 Ok(clauses)
376 })
377 }
378 }
379
380 struct FailingSchemaSource;
382
383 impl crate::schema_source::SchemaSource for FailingSchemaSource {
384 fn load(&self) -> Result<TypeSchema, PipelineError> {
385 Err(PipelineError::Schema("source failed".into()))
386 }
387 }
388
389 fn make_query_input(clauses: Vec<Clause>) -> QueryInput {
390 QueryInput {
391 database: None,
392 transaction_type: "read".to_string(),
393 clauses,
394 metadata: HashMap::new(),
395 }
396 }
397
398 fn make_query_input_with_db(clauses: Vec<Clause>, db: &str) -> QueryInput {
399 QueryInput {
400 database: Some(db.to_string()),
401 transaction_type: "read".to_string(),
402 clauses,
403 metadata: HashMap::new(),
404 }
405 }
406
407 #[test]
412 fn builder_without_schema_source() {
413 let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
414 assert!(pipeline.schema().is_none());
415 }
416
417 #[test]
418 fn builder_with_valid_schema_source() {
419 let pipeline = make_pipeline(MockExecutor::new(), true);
420 assert!(pipeline.schema().is_some());
421 let schema = pipeline.schema().unwrap();
422 assert!(schema.entities.contains_key("person"));
423 }
424
425 #[test]
426 fn builder_with_failing_schema_source() {
427 let result = PipelineBuilder::new(MockExecutor::new())
428 .with_schema_source(FailingSchemaSource)
429 .build();
430 let err = result.err().expect("Expected build error");
431 assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("source failed")));
432 }
433
434 #[test]
435 fn builder_with_default_database() {
436 let pipeline = PipelineBuilder::new(MockExecutor::new())
437 .with_default_database("mydb")
438 .build()
439 .unwrap();
440 assert_eq!(pipeline.default_database(), "mydb");
441 }
442
443 #[test]
444 fn builder_default_empty_database() {
445 let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
446 assert_eq!(pipeline.default_database(), "");
447 }
448
449 #[tokio::test]
450 async fn builder_with_interceptors() {
451 let pipeline = PipelineBuilder::new(MockExecutor::new())
452 .with_interceptor(PassthroughInterceptor {
453 name: "first".into(),
454 })
455 .with_interceptor(PassthroughInterceptor {
456 name: "second".into(),
457 })
458 .build()
459 .unwrap();
460 assert!(pipeline.schema().is_none());
461
462 let input = make_query_input(vec![]);
463 let output = pipeline.execute_query(input).await.unwrap();
464 assert_eq!(output.interceptors_applied, vec!["first", "second"]);
465 }
466
467 #[tokio::test]
472 async fn execute_query_uses_input_database() {
473 let executor = MockExecutor::new();
474 let calls = executor.calls.clone();
475 let pipeline = make_pipeline(executor, false);
476
477 let input = make_query_input_with_db(vec![], "custom_db");
478 pipeline.execute_query(input).await.unwrap();
479
480 let recorded = calls.lock().unwrap();
481 assert_eq!(recorded[0].0, "custom_db");
482 }
483
484 #[tokio::test]
485 async fn execute_query_uses_default_database_when_none() {
486 let executor = MockExecutor::new();
487 let calls = executor.calls.clone();
488 let pipeline = make_pipeline(executor, false);
489
490 let input = make_query_input(vec![]);
491 pipeline.execute_query(input).await.unwrap();
492
493 let recorded = calls.lock().unwrap();
494 assert_eq!(recorded[0].0, "test_db"); }
496
497 #[tokio::test]
498 async fn execute_query_skips_validation_when_no_schema() {
499 let pipeline = make_pipeline(MockExecutor::new(), false);
500 let clauses = vec![Clause::Match(vec![Pattern::Entity {
501 variable: "x".to_string(),
502 type_name: "nonexistent_type".to_string(),
503 constraints: vec![],
504 is_strict: false,
505 }])];
506 let input = make_query_input(clauses);
507 let result = pipeline.execute_query(input).await;
508 assert!(result.is_ok());
509 }
510
511 #[tokio::test]
512 async fn execute_query_validates_when_schema_present_valid() {
513 let pipeline = make_pipeline(MockExecutor::new(), true);
514 let input = make_query_input(make_simple_clauses());
515 let result = pipeline.execute_query(input).await;
516 assert!(result.is_ok());
517 }
518
519 #[tokio::test]
520 async fn execute_query_validates_when_schema_present_invalid() {
521 let pipeline = make_pipeline(MockExecutor::new(), true);
522 let clauses = vec![Clause::Match(vec![Pattern::Entity {
523 variable: "p".to_string(),
524 type_name: "person".to_string(),
525 constraints: vec![Constraint::Has {
526 attr_name: "nonexistent_attr".to_string(),
527 value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
528 value: serde_json::json!("val"),
529 value_type: "string".to_string(),
530 }),
531 }],
532 is_strict: false,
533 }])];
534 let input = make_query_input(clauses);
535 let result = pipeline.execute_query(input).await;
536 let err = result.unwrap_err();
537 assert!(matches!(&err, PipelineError::Validation(msg) if msg.contains("validation error")));
538 }
539
540 #[tokio::test]
541 async fn execute_query_request_interceptor_failure() {
542 assert_eq!(RejectingRequestInterceptor.name(), "rejector");
543 let pipeline = PipelineBuilder::new(MockExecutor::new())
544 .with_interceptor(RejectingRequestInterceptor)
545 .build()
546 .unwrap();
547 let input = make_query_input(vec![]);
548 let result = pipeline.execute_query(input).await;
549 let err = result.unwrap_err();
550 assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("test rejection")));
551 }
552
553 #[tokio::test]
554 async fn execute_query_executor_failure() {
555 let pipeline = make_pipeline(MockExecutor::failing("db crash"), false);
556 let input = make_query_input(vec![]);
557 let result = pipeline.execute_query(input).await;
558 let err = result.unwrap_err();
559 assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("db crash")));
560 }
561
562 #[tokio::test]
563 async fn execute_query_response_interceptor_failure() {
564 assert_eq!(RejectingResponseInterceptor.name(), "resp-rejector");
565 let pipeline = PipelineBuilder::new(MockExecutor::new())
566 .with_interceptor(RejectingResponseInterceptor)
567 .build()
568 .unwrap();
569 let input = make_query_input(vec![]);
570 let result = pipeline.execute_query(input).await;
571 let err = result.unwrap_err();
572 assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("response rejected")));
573 }
574
575 #[tokio::test]
576 async fn execute_query_success_output_fields() {
577 let _guard = init_tracing();
578 let count = Arc::new(AtomicUsize::new(0));
579 let pipeline = PipelineBuilder::new(MockExecutor::with_result(serde_json::json!({"ok": true})))
580 .with_default_database("test_db")
581 .with_interceptor(CountingInterceptor {
582 name: "counter".into(),
583 count: count.clone(),
584 })
585 .build()
586 .unwrap();
587
588 let input = make_query_input(vec![]);
589 let output = pipeline.execute_query(input).await.unwrap();
590
591 assert!(!output.request_id.is_empty());
592 assert_eq!(output.results, serde_json::json!({"ok": true}));
593 assert_eq!(output.interceptors_applied, vec!["counter"]);
594 assert_eq!(count.load(Ordering::SeqCst), 1);
595 }
596
597 #[tokio::test]
598 async fn execute_query_empty_clauses_success() {
599 let pipeline = make_pipeline(MockExecutor::new(), false);
600 let input = make_query_input(vec![]);
601 let result = pipeline.execute_query(input).await;
602 assert!(result.is_ok());
603 }
604
605 #[tokio::test]
606 async fn execute_query_compiled_typeql_in_metadata() {
607 let executor = MockExecutor::new();
608 let calls = executor.calls.clone();
609 let pipeline = make_pipeline(executor, false);
610
611 let clauses = make_simple_clauses();
612 let input = make_query_input(clauses);
613 pipeline.execute_query(input).await.unwrap();
614
615 let recorded = calls.lock().unwrap();
616 assert!(!recorded[0].1.is_empty());
617 }
618
619 #[tokio::test]
620 async fn execute_query_passes_transaction_type() {
621 let executor = MockExecutor::new();
622 let calls = executor.calls.clone();
623 let pipeline = make_pipeline(executor, false);
624
625 let input = QueryInput {
626 database: None,
627 transaction_type: "write".to_string(),
628 clauses: vec![],
629 metadata: HashMap::new(),
630 };
631 pipeline.execute_query(input).await.unwrap();
632
633 let recorded = calls.lock().unwrap();
634 assert_eq!(recorded[0].2, "write");
635 }
636
637 #[test]
642 fn validate_no_schema_returns_error() {
643 let pipeline = make_pipeline(MockExecutor::new(), false);
644 let input = ValidateInput { clauses: vec![] };
645 let result = pipeline.validate(&input);
646 let err = result.unwrap_err();
647 assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("No schema loaded")));
648 }
649
650 #[test]
651 fn validate_valid_clauses() {
652 let pipeline = make_pipeline(MockExecutor::new(), true);
653 let input = ValidateInput {
654 clauses: make_simple_clauses(),
655 };
656 let result = pipeline.validate(&input).unwrap();
657 assert!(result.is_valid);
658 assert!(result.errors.is_empty());
659 }
660
661 #[test]
662 fn validate_invalid_clauses() {
663 let pipeline = make_pipeline(MockExecutor::new(), true);
664 let input = ValidateInput {
665 clauses: vec![Clause::Match(vec![Pattern::Entity {
666 variable: "p".to_string(),
667 type_name: "person".to_string(),
668 constraints: vec![Constraint::Has {
669 attr_name: "nonexistent_attr".to_string(),
670 value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
671 value: serde_json::json!("val"),
672 value_type: "string".to_string(),
673 }),
674 }],
675 is_strict: false,
676 }])],
677 };
678 let result = pipeline.validate(&input).unwrap();
679 assert!(!result.is_valid);
680 assert!(!result.errors.is_empty());
681 }
682
683 #[test]
684 fn validate_error_detail_fields() {
685 let pipeline = make_pipeline(MockExecutor::new(), true);
686 let input = ValidateInput {
687 clauses: vec![Clause::Match(vec![Pattern::Entity {
688 variable: "x".to_string(),
689 type_name: "person".to_string(),
690 constraints: vec![Constraint::Has {
691 attr_name: "nonexistent_attr".to_string(),
692 value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
693 value: serde_json::json!("val"),
694 value_type: "string".to_string(),
695 }),
696 }],
697 is_strict: false,
698 }])],
699 };
700 let result = pipeline.validate(&input).unwrap();
701 assert!(!result.is_valid);
702 let error = &result.errors[0];
703 assert!(!error.code.is_empty());
704 assert!(!error.message.is_empty());
705 }
706
707 #[test]
708 fn validate_empty_clauses_with_schema() {
709 let pipeline = make_pipeline(MockExecutor::new(), true);
710 let input = ValidateInput { clauses: vec![] };
711 let result = pipeline.validate(&input).unwrap();
712 assert!(result.is_valid);
713 }
714
715 #[test]
720 fn schema_returns_some_when_loaded() {
721 let pipeline = make_pipeline(MockExecutor::new(), true);
722 assert!(pipeline.schema().is_some());
723 }
724
725 #[test]
726 fn schema_returns_none_when_not_loaded() {
727 let pipeline = make_pipeline(MockExecutor::new(), false);
728 assert!(pipeline.schema().is_none());
729 }
730
731 #[test]
732 fn is_connected_delegates_to_executor() {
733 let executor = MockExecutor::new();
734 *executor.connected.lock().unwrap() = true;
735 let pipeline = make_pipeline(executor, false);
736 assert!(pipeline.is_connected());
737 }
738
739 #[test]
740 fn is_connected_false_when_executor_disconnected() {
741 let executor = MockExecutor::new();
742 *executor.connected.lock().unwrap() = false;
743 let pipeline = make_pipeline(executor, false);
744 assert!(!pipeline.is_connected());
745 }
746
747 #[test]
748 fn default_database_returns_configured_value() {
749 let pipeline = PipelineBuilder::new(MockExecutor::new())
750 .with_default_database("my_database")
751 .build()
752 .unwrap();
753 assert_eq!(pipeline.default_database(), "my_database");
754 }
755
756 #[test]
757 fn default_database_empty_when_not_set() {
758 let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
759 assert_eq!(pipeline.default_database(), "");
760 }
761}