Skip to main content

type_bridge_server/
pipeline.rs

1use std::collections::HashMap;
2use std::time::Instant;
3
4use type_bridge_core_lib::ast::Clause;
5use type_bridge_core_lib::compiler::QueryCompiler;
6use type_bridge_core_lib::query_parser;
7use type_bridge_core_lib::schema::TypeSchema;
8use type_bridge_core_lib::validation::ValidationEngine;
9
10use crate::error::PipelineError;
11use crate::executor::QueryExecutor;
12use crate::interceptor::{Interceptor, InterceptorChain, RequestContext};
13use crate::schema_source::SchemaSource;
14
15/// Input for a structured (AST-based) query.
16pub struct QueryInput {
17    pub database: Option<String>,
18    pub transaction_type: String,
19    pub clauses: Vec<Clause>,
20    pub metadata: HashMap<String, serde_json::Value>,
21}
22
23/// Input for a raw TypeQL query.
24pub struct RawQueryInput {
25    pub database: Option<String>,
26    pub transaction_type: String,
27    pub query: String,
28    pub metadata: HashMap<String, serde_json::Value>,
29}
30
31/// Input for a validation-only request.
32pub struct ValidateInput {
33    pub clauses: Vec<Clause>,
34}
35
36/// Output from a successful pipeline execution.
37#[derive(Debug)]
38pub struct QueryOutput {
39    pub results: serde_json::Value,
40    pub request_id: String,
41    pub execution_time_ms: u64,
42    pub interceptors_applied: Vec<String>,
43}
44
45/// Output from a validation-only request.
46#[derive(Debug)]
47pub struct ValidateOutput {
48    pub is_valid: bool,
49    pub errors: Vec<ValidationErrorDetail>,
50}
51
52/// A single validation error.
53#[derive(Debug)]
54pub struct ValidationErrorDetail {
55    pub code: String,
56    pub message: String,
57    pub path: String,
58}
59
60#[cfg_attr(coverage_nightly, coverage(off))]
61fn log_query_execution(database: &str, transaction_type: &str, typeql: &str) {
62    tracing::info!(database, transaction_type, "Executing query");
63    tracing::debug!(typeql, "Compiled TypeQL");
64}
65
66/// Transport-agnostic query pipeline.
67///
68/// Encapsulates the full query lifecycle: validate → intercept → compile → execute → intercept.
69/// Use [`PipelineBuilder`] to construct an instance.
70///
71/// # Example
72///
73/// ```rust,ignore
74/// use type_bridge_server::{PipelineBuilder, QueryInput};
75///
76/// let pipeline = PipelineBuilder::new(my_executor)
77///     .with_schema_source(my_schema_source)
78///     .with_default_database("my_db")
79///     .build()?;
80///
81/// let output = pipeline.execute_query(QueryInput { ... }).await?;
82/// ```
83pub struct QueryPipeline {
84    schema: Option<TypeSchema>,
85    validation_engine: ValidationEngine,
86    interceptor_chain: InterceptorChain,
87    default_database: String,
88    executor: Box<dyn QueryExecutor>,
89}
90
91impl QueryPipeline {
92    /// Execute a structured (AST-based) query through the full pipeline.
93    pub async fn execute_query(&self, input: QueryInput) -> Result<QueryOutput, PipelineError> {
94        let start = Instant::now();
95        let request_id = uuid::Uuid::new_v4().to_string();
96        let database = input
97            .database
98            .unwrap_or_else(|| self.default_database.clone());
99
100        let mut ctx = RequestContext {
101            request_id: request_id.clone(),
102            client_id: "unknown".to_string(),
103            database: database.clone(),
104            transaction_type: input.transaction_type.clone(),
105            metadata: input.metadata,
106            timestamp: chrono::Utc::now(),
107        };
108
109        // Validate against schema
110        if let Some(schema) = &self.schema {
111            let result = self.validation_engine.validate_query(&input.clauses, schema);
112            if !result.is_valid {
113                return Err(PipelineError::Validation(format!(
114                    "{} validation error(s)",
115                    result.errors.len()
116                )));
117            }
118        }
119
120        // Run request interceptors
121        let clauses = self
122            .interceptor_chain
123            .execute_request(input.clauses, &mut ctx)
124            .await
125            .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
126
127        // Compile to TypeQL
128        let compiler = QueryCompiler::new();
129        let typeql = compiler.compile(&clauses);
130        ctx.metadata.insert(
131            "compiled_typeql".to_string(),
132            serde_json::Value::String(typeql.clone()),
133        );
134
135        // Execute
136        log_query_execution(&database, &input.transaction_type, &typeql);
137
138        let results = self
139            .executor
140            .execute(&database, &typeql, &input.transaction_type)
141            .await?;
142
143        // Run response interceptors
144        self.interceptor_chain
145            .execute_response(&results, &ctx)
146            .await
147            .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
148
149        let elapsed = start.elapsed().as_millis() as u64;
150
151        Ok(QueryOutput {
152            results,
153            request_id,
154            execution_time_ms: elapsed,
155            interceptors_applied: self
156                .interceptor_chain
157                .interceptor_names()
158                .into_iter()
159                .map(String::from)
160                .collect(),
161        })
162    }
163
164    /// Execute a raw TypeQL query through the full pipeline (parse → validate → intercept → compile → execute).
165    pub async fn execute_raw(&self, input: RawQueryInput) -> Result<QueryOutput, PipelineError> {
166        let clauses = query_parser::parse_typeql_query(&input.query)
167            .map_err(|e| PipelineError::Parse(e.to_string()))?;
168
169        self.execute_query(QueryInput {
170            database: input.database,
171            transaction_type: input.transaction_type,
172            clauses,
173            metadata: input.metadata,
174        })
175        .await
176    }
177
178    /// Validate clauses against the loaded schema without executing.
179    pub fn validate(&self, input: &ValidateInput) -> Result<ValidateOutput, PipelineError> {
180        let schema = self
181            .schema
182            .as_ref()
183            .ok_or_else(|| PipelineError::Schema("No schema loaded".to_string()))?;
184
185        let result = self.validation_engine.validate_query(&input.clauses, schema);
186
187        let errors = result
188            .errors
189            .iter()
190            .map(|e| ValidationErrorDetail {
191                code: e.code.clone(),
192                message: e.message.clone(),
193                path: e.path.clone(),
194            })
195            .collect();
196
197        Ok(ValidateOutput {
198            is_valid: result.is_valid,
199            errors,
200        })
201    }
202
203    /// Get the loaded schema, if any.
204    pub fn schema(&self) -> Option<&TypeSchema> {
205        self.schema.as_ref()
206    }
207
208    /// Check if the backend executor is connected.
209    pub fn is_connected(&self) -> bool {
210        self.executor.is_connected()
211    }
212
213    /// Get the default database name.
214    pub fn default_database(&self) -> &str {
215        &self.default_database
216    }
217}
218
219/// Builder for constructing a [`QueryPipeline`].
220///
221/// # Example
222///
223/// ```rust,ignore
224/// use type_bridge_server::PipelineBuilder;
225///
226/// let pipeline = PipelineBuilder::new(my_executor)
227///     .with_schema_source(FileSchemaSource::new("schema.tql"))
228///     .with_interceptor(AuditLogInterceptor::new(&config)?)
229///     .with_default_database("my_db")
230///     .build()?;
231/// ```
232pub struct PipelineBuilder {
233    executor: Box<dyn QueryExecutor>,
234    schema_source: Option<Box<dyn SchemaSource>>,
235    interceptors: Vec<Box<dyn Interceptor>>,
236    default_database: String,
237}
238
239impl PipelineBuilder {
240    /// Create a new builder with the given query executor.
241    pub fn new(executor: impl QueryExecutor + 'static) -> Self {
242        Self {
243            executor: Box::new(executor),
244            schema_source: None,
245            interceptors: Vec::new(),
246            default_database: String::new(),
247        }
248    }
249
250    /// Set the schema source. The schema will be loaded during [`build()`](Self::build).
251    pub fn with_schema_source(mut self, source: impl SchemaSource + 'static) -> Self {
252        self.schema_source = Some(Box::new(source));
253        self
254    }
255
256    /// Add an interceptor to the pipeline chain.
257    pub fn with_interceptor(mut self, interceptor: impl Interceptor + 'static) -> Self {
258        self.interceptors.push(Box::new(interceptor));
259        self
260    }
261
262    /// Set the default database name used when requests don't specify one.
263    pub fn with_default_database(mut self, database: impl Into<String>) -> Self {
264        self.default_database = database.into();
265        self
266    }
267
268    /// Build the pipeline, loading the schema if a source was provided.
269    pub fn build(self) -> Result<QueryPipeline, PipelineError> {
270        let schema = match self.schema_source {
271            Some(source) => Some(source.load()?),
272            None => None,
273        };
274
275        Ok(QueryPipeline {
276            schema,
277            validation_engine: ValidationEngine::new(),
278            interceptor_chain: InterceptorChain::new(self.interceptors),
279            default_database: self.default_database,
280            executor: self.executor,
281        })
282    }
283}
284
285#[cfg(test)]
286#[cfg_attr(coverage_nightly, coverage(off))]
287mod tests {
288    use std::future::Future;
289    use std::pin::Pin;
290    use std::sync::atomic::{AtomicUsize, Ordering};
291    use std::sync::Arc;
292
293    use type_bridge_core_lib::ast::{Constraint, Pattern, Value};
294
295    use super::*;
296    use crate::interceptor::traits::InterceptError;
297    use crate::test_helpers::{make_pipeline, make_simple_clauses, MockExecutor};
298
299    fn init_tracing() -> tracing::subscriber::DefaultGuard {
300        let subscriber = tracing_subscriber::fmt()
301            .with_max_level(tracing::Level::DEBUG)
302            .with_test_writer()
303            .finish();
304        tracing::subscriber::set_default(subscriber)
305    }
306
307    // --- Helper interceptors ---
308
309    struct PassthroughInterceptor {
310        name: String,
311    }
312
313    impl Interceptor for PassthroughInterceptor {
314        fn name(&self) -> &str {
315            &self.name
316        }
317        fn on_request<'a>(
318            &'a self,
319            clauses: Vec<Clause>,
320            _ctx: &'a mut RequestContext,
321        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
322            Box::pin(async move { Ok(clauses) })
323        }
324    }
325
326    struct RejectingRequestInterceptor;
327
328    impl Interceptor for RejectingRequestInterceptor {
329        fn name(&self) -> &str {
330            "rejector"
331        }
332        fn on_request<'a>(
333            &'a self,
334            _clauses: Vec<Clause>,
335            _ctx: &'a mut RequestContext,
336        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
337            Box::pin(async {
338                Err(InterceptError::AccessDenied {
339                    reason: "test rejection".into(),
340                })
341            })
342        }
343    }
344
345    struct RejectingResponseInterceptor;
346
347    impl Interceptor for RejectingResponseInterceptor {
348        fn name(&self) -> &str {
349            "resp-rejector"
350        }
351        fn on_request<'a>(
352            &'a self,
353            clauses: Vec<Clause>,
354            _ctx: &'a mut RequestContext,
355        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
356            Box::pin(async move { Ok(clauses) })
357        }
358        fn on_response<'a>(
359            &'a self,
360            _result: &'a serde_json::Value,
361            _ctx: &'a RequestContext,
362        ) -> Pin<Box<dyn Future<Output = Result<(), InterceptError>> + Send + 'a>> {
363            Box::pin(async { Err(InterceptError::Internal("response rejected".into())) })
364        }
365    }
366
367    struct CountingInterceptor {
368        name: String,
369        count: Arc<AtomicUsize>,
370    }
371
372    impl Interceptor for CountingInterceptor {
373        fn name(&self) -> &str {
374            &self.name
375        }
376        fn on_request<'a>(
377            &'a self,
378            clauses: Vec<Clause>,
379            _ctx: &'a mut RequestContext,
380        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
381            Box::pin(async move {
382                self.count.fetch_add(1, Ordering::SeqCst);
383                Ok(clauses)
384            })
385        }
386    }
387
388    /// SchemaSource that always fails.
389    struct FailingSchemaSource;
390
391    impl crate::schema_source::SchemaSource for FailingSchemaSource {
392        fn load(&self) -> Result<TypeSchema, PipelineError> {
393            Err(PipelineError::Schema("source failed".into()))
394        }
395    }
396
397    fn make_query_input(clauses: Vec<Clause>) -> QueryInput {
398        QueryInput {
399            database: None,
400            transaction_type: "read".to_string(),
401            clauses,
402            metadata: HashMap::new(),
403        }
404    }
405
406    fn make_query_input_with_db(clauses: Vec<Clause>, db: &str) -> QueryInput {
407        QueryInput {
408            database: Some(db.to_string()),
409            transaction_type: "read".to_string(),
410            clauses,
411            metadata: HashMap::new(),
412        }
413    }
414
415    // =============================================
416    // PipelineBuilder tests
417    // =============================================
418
419    #[test]
420    fn builder_without_schema_source() {
421        let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
422        assert!(pipeline.schema().is_none());
423    }
424
425    #[test]
426    fn builder_with_valid_schema_source() {
427        let pipeline = make_pipeline(MockExecutor::new(), true);
428        assert!(pipeline.schema().is_some());
429        let schema = pipeline.schema().unwrap();
430        assert!(schema.entities.contains_key("person"));
431    }
432
433    #[test]
434    fn builder_with_failing_schema_source() {
435        let result = PipelineBuilder::new(MockExecutor::new())
436            .with_schema_source(FailingSchemaSource)
437            .build();
438        let err = result.err().expect("Expected build error");
439        assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("source failed")));
440    }
441
442    #[test]
443    fn builder_with_default_database() {
444        let pipeline = PipelineBuilder::new(MockExecutor::new())
445            .with_default_database("mydb")
446            .build()
447            .unwrap();
448        assert_eq!(pipeline.default_database(), "mydb");
449    }
450
451    #[test]
452    fn builder_default_empty_database() {
453        let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
454        assert_eq!(pipeline.default_database(), "");
455    }
456
457    #[tokio::test]
458    async fn builder_with_interceptors() {
459        let pipeline = PipelineBuilder::new(MockExecutor::new())
460            .with_interceptor(PassthroughInterceptor {
461                name: "first".into(),
462            })
463            .with_interceptor(PassthroughInterceptor {
464                name: "second".into(),
465            })
466            .build()
467            .unwrap();
468        assert!(pipeline.schema().is_none());
469
470        // Execute a query to exercise PassthroughInterceptor's name() and on_request()
471        let input = make_query_input(vec![]);
472        let output = pipeline.execute_query(input).await.unwrap();
473        assert_eq!(output.interceptors_applied, vec!["first", "second"]);
474    }
475
476    // =============================================
477    // execute_query tests
478    // =============================================
479
480    #[tokio::test]
481    async fn execute_query_uses_input_database() {
482        let executor = MockExecutor::new();
483        let calls = executor.calls.clone();
484        let pipeline = make_pipeline(executor, false);
485
486        let input = make_query_input_with_db(vec![], "custom_db");
487        pipeline.execute_query(input).await.unwrap();
488
489        let recorded = calls.lock().unwrap();
490        assert_eq!(recorded[0].0, "custom_db");
491    }
492
493    #[tokio::test]
494    async fn execute_query_uses_default_database_when_none() {
495        let executor = MockExecutor::new();
496        let calls = executor.calls.clone();
497        let pipeline = make_pipeline(executor, false);
498
499        let input = make_query_input(vec![]);
500        pipeline.execute_query(input).await.unwrap();
501
502        let recorded = calls.lock().unwrap();
503        assert_eq!(recorded[0].0, "test_db"); // from make_pipeline
504    }
505
506    #[tokio::test]
507    async fn execute_query_skips_validation_when_no_schema() {
508        let pipeline = make_pipeline(MockExecutor::new(), false);
509        // Even invalid type names pass when there's no schema
510        let clauses = vec![Clause::Match(vec![Pattern::Entity {
511            variable: "x".to_string(),
512            type_name: "nonexistent_type".to_string(),
513            constraints: vec![],
514            is_strict: false,
515        }])];
516        let input = make_query_input(clauses);
517        let result = pipeline.execute_query(input).await;
518        assert!(result.is_ok());
519    }
520
521    #[tokio::test]
522    async fn execute_query_validates_when_schema_present_valid() {
523        let pipeline = make_pipeline(MockExecutor::new(), true);
524        let input = make_query_input(make_simple_clauses());
525        let result = pipeline.execute_query(input).await;
526        assert!(result.is_ok());
527    }
528
529    #[tokio::test]
530    async fn execute_query_validates_when_schema_present_invalid() {
531        let pipeline = make_pipeline(MockExecutor::new(), true);
532        // Reference an attribute type not in schema
533        let clauses = vec![Clause::Match(vec![Pattern::Entity {
534            variable: "p".to_string(),
535            type_name: "person".to_string(),
536            constraints: vec![Constraint::Has {
537                attr_name: "nonexistent_attr".to_string(),
538                value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
539                    value: serde_json::json!("val"),
540                    value_type: "string".to_string(),
541                }),
542            }],
543            is_strict: false,
544        }])];
545        let input = make_query_input(clauses);
546        let result = pipeline.execute_query(input).await;
547        let err = result.unwrap_err();
548        assert!(matches!(&err, PipelineError::Validation(msg) if msg.contains("validation error")));
549    }
550
551    #[tokio::test]
552    async fn execute_query_request_interceptor_failure() {
553        assert_eq!(RejectingRequestInterceptor.name(), "rejector");
554        let pipeline = PipelineBuilder::new(MockExecutor::new())
555            .with_interceptor(RejectingRequestInterceptor)
556            .build()
557            .unwrap();
558        let input = make_query_input(vec![]);
559        let result = pipeline.execute_query(input).await;
560        let err = result.unwrap_err();
561        assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("test rejection")));
562    }
563
564    #[tokio::test]
565    async fn execute_query_executor_failure() {
566        let pipeline = make_pipeline(MockExecutor::failing("db crash"), false);
567        let input = make_query_input(vec![]);
568        let result = pipeline.execute_query(input).await;
569        let err = result.unwrap_err();
570        assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("db crash")));
571    }
572
573    #[tokio::test]
574    async fn execute_query_response_interceptor_failure() {
575        assert_eq!(RejectingResponseInterceptor.name(), "resp-rejector");
576        let pipeline = PipelineBuilder::new(MockExecutor::new())
577            .with_interceptor(RejectingResponseInterceptor)
578            .build()
579            .unwrap();
580        let input = make_query_input(vec![]);
581        let result = pipeline.execute_query(input).await;
582        let err = result.unwrap_err();
583        assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("response rejected")));
584    }
585
586    #[tokio::test]
587    async fn execute_query_success_output_fields() {
588        let _guard = init_tracing();
589        let count = Arc::new(AtomicUsize::new(0));
590        let pipeline = PipelineBuilder::new(MockExecutor::with_result(serde_json::json!({"ok": true})))
591            .with_default_database("test_db")
592            .with_interceptor(CountingInterceptor {
593                name: "counter".into(),
594                count: count.clone(),
595            })
596            .build()
597            .unwrap();
598
599        let input = make_query_input(vec![]);
600        let output = pipeline.execute_query(input).await.unwrap();
601
602        assert!(!output.request_id.is_empty());
603        assert_eq!(output.results, serde_json::json!({"ok": true}));
604        assert_eq!(output.interceptors_applied, vec!["counter"]);
605        // execution_time_ms is non-negative (it's u64, always >= 0)
606        assert_eq!(count.load(Ordering::SeqCst), 1);
607    }
608
609    #[tokio::test]
610    async fn execute_query_empty_clauses_success() {
611        let pipeline = make_pipeline(MockExecutor::new(), false);
612        let input = make_query_input(vec![]);
613        let result = pipeline.execute_query(input).await;
614        assert!(result.is_ok());
615    }
616
617    #[tokio::test]
618    async fn execute_query_compiled_typeql_in_metadata() {
619        let executor = MockExecutor::new();
620        let calls = executor.calls.clone();
621        let pipeline = make_pipeline(executor, false);
622
623        let clauses = make_simple_clauses();
624        let input = make_query_input(clauses);
625        pipeline.execute_query(input).await.unwrap();
626
627        let recorded = calls.lock().unwrap();
628        // The compiled TypeQL should be a non-empty string
629        assert!(!recorded[0].1.is_empty());
630    }
631
632    #[tokio::test]
633    async fn execute_query_passes_transaction_type() {
634        let executor = MockExecutor::new();
635        let calls = executor.calls.clone();
636        let pipeline = make_pipeline(executor, false);
637
638        let input = QueryInput {
639            database: None,
640            transaction_type: "write".to_string(),
641            clauses: vec![],
642            metadata: HashMap::new(),
643        };
644        pipeline.execute_query(input).await.unwrap();
645
646        let recorded = calls.lock().unwrap();
647        assert_eq!(recorded[0].2, "write");
648    }
649
650    // =============================================
651    // execute_raw tests
652    // =============================================
653
654    #[tokio::test]
655    async fn execute_raw_valid_typeql() {
656        let executor = MockExecutor::new();
657        let calls = executor.calls.clone();
658        let pipeline = make_pipeline(executor, false);
659
660        let input = RawQueryInput {
661            database: None,
662            transaction_type: "read".to_string(),
663            query: "match $p isa person;".to_string(),
664            metadata: HashMap::new(),
665        };
666        let result = pipeline.execute_raw(input).await;
667        assert!(result.is_ok());
668
669        let recorded = calls.lock().unwrap();
670        assert_eq!(recorded.len(), 1);
671    }
672
673    #[tokio::test]
674    async fn execute_raw_invalid_typeql() {
675        let pipeline = make_pipeline(MockExecutor::new(), false);
676        let input = RawQueryInput {
677            database: None,
678            transaction_type: "read".to_string(),
679            query: "this is totally invalid <<>>".to_string(),
680            metadata: HashMap::new(),
681        };
682        let result = pipeline.execute_raw(input).await;
683        let err = result.unwrap_err();
684        assert!(matches!(&err, PipelineError::Parse(_)));
685    }
686
687    #[tokio::test]
688    async fn execute_raw_database_passthrough() {
689        let executor = MockExecutor::new();
690        let calls = executor.calls.clone();
691        let pipeline = make_pipeline(executor, false);
692
693        let input = RawQueryInput {
694            database: Some("raw_db".to_string()),
695            transaction_type: "read".to_string(),
696            query: "match $p isa person;".to_string(),
697            metadata: HashMap::new(),
698        };
699        pipeline.execute_raw(input).await.unwrap();
700
701        let recorded = calls.lock().unwrap();
702        assert_eq!(recorded[0].0, "raw_db");
703    }
704
705    #[tokio::test]
706    async fn execute_raw_executor_failure() {
707        let pipeline = make_pipeline(MockExecutor::failing("raw fail"), false);
708        let input = RawQueryInput {
709            database: None,
710            transaction_type: "read".to_string(),
711            query: "match $p isa person;".to_string(),
712            metadata: HashMap::new(),
713        };
714        let result = pipeline.execute_raw(input).await;
715        let err = result.unwrap_err();
716        assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("raw fail")));
717    }
718
719    // =============================================
720    // validate tests
721    // =============================================
722
723    #[test]
724    fn validate_no_schema_returns_error() {
725        let pipeline = make_pipeline(MockExecutor::new(), false);
726        let input = ValidateInput { clauses: vec![] };
727        let result = pipeline.validate(&input);
728        let err = result.unwrap_err();
729        assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("No schema loaded")));
730    }
731
732    #[test]
733    fn validate_valid_clauses() {
734        let pipeline = make_pipeline(MockExecutor::new(), true);
735        let input = ValidateInput {
736            clauses: make_simple_clauses(),
737        };
738        let result = pipeline.validate(&input).unwrap();
739        assert!(result.is_valid);
740        assert!(result.errors.is_empty());
741    }
742
743    #[test]
744    fn validate_invalid_clauses() {
745        let pipeline = make_pipeline(MockExecutor::new(), true);
746        let input = ValidateInput {
747            clauses: vec![Clause::Match(vec![Pattern::Entity {
748                variable: "p".to_string(),
749                type_name: "person".to_string(),
750                constraints: vec![Constraint::Has {
751                    attr_name: "nonexistent_attr".to_string(),
752                    value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
753                        value: serde_json::json!("val"),
754                        value_type: "string".to_string(),
755                    }),
756                }],
757                is_strict: false,
758            }])],
759        };
760        let result = pipeline.validate(&input).unwrap();
761        assert!(!result.is_valid);
762        assert!(!result.errors.is_empty());
763    }
764
765    #[test]
766    fn validate_error_detail_fields() {
767        let pipeline = make_pipeline(MockExecutor::new(), true);
768        let input = ValidateInput {
769            clauses: vec![Clause::Match(vec![Pattern::Entity {
770                variable: "x".to_string(),
771                type_name: "person".to_string(),
772                constraints: vec![Constraint::Has {
773                    attr_name: "nonexistent_attr".to_string(),
774                    value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
775                        value: serde_json::json!("val"),
776                        value_type: "string".to_string(),
777                    }),
778                }],
779                is_strict: false,
780            }])],
781        };
782        let result = pipeline.validate(&input).unwrap();
783        assert!(!result.is_valid);
784        let error = &result.errors[0];
785        assert!(!error.code.is_empty());
786        assert!(!error.message.is_empty());
787    }
788
789    #[test]
790    fn validate_empty_clauses_with_schema() {
791        let pipeline = make_pipeline(MockExecutor::new(), true);
792        let input = ValidateInput { clauses: vec![] };
793        let result = pipeline.validate(&input).unwrap();
794        assert!(result.is_valid);
795    }
796
797    // =============================================
798    // Accessor tests
799    // =============================================
800
801    #[test]
802    fn schema_returns_some_when_loaded() {
803        let pipeline = make_pipeline(MockExecutor::new(), true);
804        assert!(pipeline.schema().is_some());
805    }
806
807    #[test]
808    fn schema_returns_none_when_not_loaded() {
809        let pipeline = make_pipeline(MockExecutor::new(), false);
810        assert!(pipeline.schema().is_none());
811    }
812
813    #[test]
814    fn is_connected_delegates_to_executor() {
815        let executor = MockExecutor::new();
816        *executor.connected.lock().unwrap() = true;
817        let pipeline = make_pipeline(executor, false);
818        assert!(pipeline.is_connected());
819    }
820
821    #[test]
822    fn is_connected_false_when_executor_disconnected() {
823        let executor = MockExecutor::new();
824        *executor.connected.lock().unwrap() = false;
825        let pipeline = make_pipeline(executor, false);
826        assert!(!pipeline.is_connected());
827    }
828
829    #[test]
830    fn default_database_returns_configured_value() {
831        let pipeline = PipelineBuilder::new(MockExecutor::new())
832            .with_default_database("my_database")
833            .build()
834            .unwrap();
835        assert_eq!(pipeline.default_database(), "my_database");
836    }
837
838    #[test]
839    fn default_database_empty_when_not_set() {
840        let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
841        assert_eq!(pipeline.default_database(), "");
842    }
843}