Skip to main content

type_bridge_server/
pipeline.rs

1use std::collections::HashMap;
2use std::time::Instant;
3
4use type_bridge_core_lib::ast::Clause;
5use type_bridge_core_lib::compiler::QueryCompiler;
6use type_bridge_core_lib::schema::TypeSchema;
7use type_bridge_core_lib::validation::ValidationEngine;
8
9use crate::error::PipelineError;
10use crate::executor::QueryExecutor;
11use crate::interceptor::crud_interceptor::{CrudInterceptor, CrudInterceptorAdapter};
12use crate::interceptor::{Interceptor, InterceptorChain, RequestContext};
13use crate::schema_source::SchemaSource;
14
15/// Input for a structured (AST-based) query.
16pub struct QueryInput {
17    pub database: Option<String>,
18    pub transaction_type: String,
19    pub clauses: Vec<Clause>,
20    pub metadata: HashMap<String, serde_json::Value>,
21}
22
23/// Input for a validation-only request.
24pub struct ValidateInput {
25    pub clauses: Vec<Clause>,
26}
27
28/// Output from a successful pipeline execution.
29#[derive(Debug)]
30pub struct QueryOutput {
31    pub results: serde_json::Value,
32    pub request_id: String,
33    pub execution_time_ms: u64,
34    pub interceptors_applied: Vec<String>,
35}
36
37/// Output from a validation-only request.
38#[derive(Debug)]
39pub struct ValidateOutput {
40    pub is_valid: bool,
41    pub errors: Vec<ValidationErrorDetail>,
42}
43
44/// A single validation error.
45#[derive(Debug)]
46pub struct ValidationErrorDetail {
47    pub code: String,
48    pub message: String,
49    pub path: String,
50}
51
52#[cfg_attr(coverage_nightly, coverage(off))]
53fn log_query_execution(database: &str, transaction_type: &str, typeql: &str) {
54    tracing::info!(database, transaction_type, "Executing query");
55    tracing::debug!(typeql, "Compiled TypeQL");
56}
57
58/// Transport-agnostic query pipeline.
59///
60/// Encapsulates the full query lifecycle: validate → intercept → compile → execute → intercept.
61/// Use [`PipelineBuilder`] to construct an instance.
62///
63/// # Example
64///
65/// ```rust,ignore
66/// use type_bridge_server::{PipelineBuilder, QueryInput};
67///
68/// let pipeline = PipelineBuilder::new(my_executor)
69///     .with_schema_source(my_schema_source)
70///     .with_default_database("my_db")
71///     .build()?;
72///
73/// let output = pipeline.execute_query(QueryInput { ... }).await?;
74/// ```
75pub struct QueryPipeline {
76    schema: Option<TypeSchema>,
77    validation_engine: ValidationEngine,
78    interceptor_chain: InterceptorChain,
79    default_database: String,
80    executor: Box<dyn QueryExecutor>,
81    skip_validation: bool,
82}
83
84impl QueryPipeline {
85    /// Execute a structured (AST-based) query through the full pipeline.
86    pub async fn execute_query(&self, input: QueryInput) -> Result<QueryOutput, PipelineError> {
87        let start = Instant::now();
88        let request_id = uuid::Uuid::new_v4().to_string();
89        let database = input
90            .database
91            .unwrap_or_else(|| self.default_database.clone());
92
93        let mut ctx = RequestContext {
94            request_id: request_id.clone(),
95            client_id: "unknown".to_string(),
96            database: database.clone(),
97            transaction_type: input.transaction_type.clone(),
98            metadata: input.metadata,
99            timestamp: chrono::Utc::now(),
100            crud_info: None,
101        };
102
103        // Validate against schema
104        if !self.skip_validation
105            && let Some(schema) = &self.schema
106        {
107            let result = self
108                .validation_engine
109                .validate_query(&input.clauses, schema);
110            if !result.is_valid {
111                return Err(PipelineError::Validation(format!(
112                    "{} validation error(s)",
113                    result.errors.len()
114                )));
115            }
116        }
117
118        // Run request interceptors
119        let clauses = self
120            .interceptor_chain
121            .execute_request(input.clauses, &mut ctx)
122            .await
123            .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
124
125        // Compile to TypeQL
126        let compiler = QueryCompiler::new();
127        let typeql = compiler.compile(&clauses);
128        ctx.metadata.insert(
129            "compiled_typeql".to_string(),
130            serde_json::Value::String(typeql.clone()),
131        );
132
133        // Execute
134        log_query_execution(&database, &input.transaction_type, &typeql);
135
136        let results = self
137            .executor
138            .execute(&database, &typeql, &input.transaction_type)
139            .await?;
140
141        // Run response interceptors
142        self.interceptor_chain
143            .execute_response(&results, &ctx)
144            .await
145            .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
146
147        let elapsed = start.elapsed().as_millis() as u64;
148
149        Ok(QueryOutput {
150            results,
151            request_id,
152            execution_time_ms: elapsed,
153            interceptors_applied: self
154                .interceptor_chain
155                .interceptor_names()
156                .into_iter()
157                .map(String::from)
158                .collect(),
159        })
160    }
161
162    /// Validate clauses against the loaded schema without executing.
163    pub fn validate(&self, input: &ValidateInput) -> Result<ValidateOutput, PipelineError> {
164        let schema = self
165            .schema
166            .as_ref()
167            .ok_or_else(|| PipelineError::Schema("No schema loaded".to_string()))?;
168
169        let result = self
170            .validation_engine
171            .validate_query(&input.clauses, schema);
172
173        let errors = result
174            .errors
175            .iter()
176            .map(|e| ValidationErrorDetail {
177                code: e.code.clone(),
178                message: e.message.clone(),
179                path: e.path.clone(),
180            })
181            .collect();
182
183        Ok(ValidateOutput {
184            is_valid: result.is_valid,
185            errors,
186        })
187    }
188
189    /// Get the loaded schema, if any.
190    pub fn schema(&self) -> Option<&TypeSchema> {
191        self.schema.as_ref()
192    }
193
194    /// Check if the backend executor is connected.
195    pub fn is_connected(&self) -> bool {
196        self.executor.is_connected()
197    }
198
199    /// Get the default database name.
200    pub fn default_database(&self) -> &str {
201        &self.default_database
202    }
203}
204
205/// Builder for constructing a [`QueryPipeline`].
206///
207/// # Example
208///
209/// ```rust,ignore
210/// use type_bridge_server::PipelineBuilder;
211///
212/// let pipeline = PipelineBuilder::new(my_executor)
213///     .with_schema_source(FileSchemaSource::new("schema.tql"))
214///     .with_interceptor(AuditLogInterceptor::new(&config)?)
215///     .with_default_database("my_db")
216///     .build()?;
217/// ```
218pub struct PipelineBuilder {
219    executor: Box<dyn QueryExecutor>,
220    schema_source: Option<Box<dyn SchemaSource>>,
221    interceptors: Vec<Box<dyn Interceptor>>,
222    default_database: String,
223    skip_validation: bool,
224}
225
226impl PipelineBuilder {
227    /// Create a new builder with the given query executor.
228    pub fn new(executor: impl QueryExecutor + 'static) -> Self {
229        Self {
230            executor: Box::new(executor),
231            schema_source: None,
232            interceptors: Vec::new(),
233            default_database: String::new(),
234            skip_validation: false,
235        }
236    }
237
238    /// Set the schema source. The schema will be loaded during [`build()`](Self::build).
239    pub fn with_schema_source(mut self, source: impl SchemaSource + 'static) -> Self {
240        self.schema_source = Some(Box::new(source));
241        self
242    }
243
244    /// Add an interceptor to the pipeline chain.
245    pub fn with_interceptor(mut self, interceptor: impl Interceptor + 'static) -> Self {
246        self.interceptors.push(Box::new(interceptor));
247        self
248    }
249
250    /// Set the default database name used when requests don't specify one.
251    pub fn with_default_database(mut self, database: impl Into<String>) -> Self {
252        self.default_database = database.into();
253        self
254    }
255
256    /// Add a CRUD-aware interceptor to the pipeline chain.
257    ///
258    /// The interceptor is automatically wrapped in a [`CrudInterceptorAdapter`]
259    /// that extracts [`CrudInfo`](crate::interceptor::CrudInfo) and delegates
260    /// to the CRUD-specific hooks.
261    pub fn with_crud_interceptor(self, interceptor: impl CrudInterceptor + 'static) -> Self {
262        self.with_interceptor(CrudInterceptorAdapter::new(interceptor))
263    }
264
265    /// Skip schema validation during query execution.
266    ///
267    /// The schema is still loaded (and accessible via [`QueryPipeline::schema`]),
268    /// but queries are not validated against it before execution.
269    pub fn with_skip_validation(mut self) -> Self {
270        self.skip_validation = true;
271        self
272    }
273
274    /// Build the pipeline, loading the schema if a source was provided.
275    pub fn build(self) -> Result<QueryPipeline, PipelineError> {
276        let schema = match self.schema_source {
277            Some(source) => Some(source.load()?),
278            None => None,
279        };
280
281        Ok(QueryPipeline {
282            schema,
283            validation_engine: ValidationEngine::new(),
284            interceptor_chain: InterceptorChain::new(self.interceptors),
285            default_database: self.default_database,
286            executor: self.executor,
287            skip_validation: self.skip_validation,
288        })
289    }
290}
291
292#[cfg(test)]
293#[cfg_attr(coverage_nightly, coverage(off))]
294mod tests {
295    use std::future::Future;
296    use std::pin::Pin;
297    use std::sync::Arc;
298    use std::sync::atomic::{AtomicUsize, Ordering};
299
300    use type_bridge_core_lib::ast::{Constraint, Pattern, Value};
301
302    use super::*;
303    use crate::interceptor::traits::InterceptError;
304    use crate::test_helpers::{MockExecutor, make_pipeline, make_simple_clauses};
305
306    fn init_tracing() -> tracing::subscriber::DefaultGuard {
307        let subscriber = tracing_subscriber::fmt()
308            .with_max_level(tracing::Level::DEBUG)
309            .with_test_writer()
310            .finish();
311        tracing::subscriber::set_default(subscriber)
312    }
313
314    // --- Helper interceptors ---
315
316    struct PassthroughInterceptor {
317        name: String,
318    }
319
320    impl Interceptor for PassthroughInterceptor {
321        fn name(&self) -> &str {
322            &self.name
323        }
324        fn on_request<'a>(
325            &'a self,
326            clauses: Vec<Clause>,
327            _ctx: &'a mut RequestContext,
328        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>>
329        {
330            Box::pin(async move { Ok(clauses) })
331        }
332    }
333
334    struct RejectingRequestInterceptor;
335
336    impl Interceptor for RejectingRequestInterceptor {
337        fn name(&self) -> &str {
338            "rejector"
339        }
340        fn on_request<'a>(
341            &'a self,
342            _clauses: Vec<Clause>,
343            _ctx: &'a mut RequestContext,
344        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>>
345        {
346            Box::pin(async {
347                Err(InterceptError::AccessDenied {
348                    reason: "test rejection".into(),
349                })
350            })
351        }
352    }
353
354    struct RejectingResponseInterceptor;
355
356    impl Interceptor for RejectingResponseInterceptor {
357        fn name(&self) -> &str {
358            "resp-rejector"
359        }
360        fn on_request<'a>(
361            &'a self,
362            clauses: Vec<Clause>,
363            _ctx: &'a mut RequestContext,
364        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>>
365        {
366            Box::pin(async move { Ok(clauses) })
367        }
368        fn on_response<'a>(
369            &'a self,
370            _result: &'a serde_json::Value,
371            _ctx: &'a RequestContext,
372        ) -> Pin<Box<dyn Future<Output = Result<(), InterceptError>> + Send + 'a>> {
373            Box::pin(async { Err(InterceptError::Internal("response rejected".into())) })
374        }
375    }
376
377    struct CountingInterceptor {
378        name: String,
379        count: Arc<AtomicUsize>,
380    }
381
382    impl Interceptor for CountingInterceptor {
383        fn name(&self) -> &str {
384            &self.name
385        }
386        fn on_request<'a>(
387            &'a self,
388            clauses: Vec<Clause>,
389            _ctx: &'a mut RequestContext,
390        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>>
391        {
392            Box::pin(async move {
393                self.count.fetch_add(1, Ordering::SeqCst);
394                Ok(clauses)
395            })
396        }
397    }
398
399    /// SchemaSource that always fails.
400    struct FailingSchemaSource;
401
402    impl crate::schema_source::SchemaSource for FailingSchemaSource {
403        fn load(&self) -> Result<TypeSchema, PipelineError> {
404            Err(PipelineError::Schema("source failed".into()))
405        }
406    }
407
408    fn make_query_input(clauses: Vec<Clause>) -> QueryInput {
409        QueryInput {
410            database: None,
411            transaction_type: "read".to_string(),
412            clauses,
413            metadata: HashMap::new(),
414        }
415    }
416
417    fn make_query_input_with_db(clauses: Vec<Clause>, db: &str) -> QueryInput {
418        QueryInput {
419            database: Some(db.to_string()),
420            transaction_type: "read".to_string(),
421            clauses,
422            metadata: HashMap::new(),
423        }
424    }
425
426    // =============================================
427    // PipelineBuilder tests
428    // =============================================
429
430    #[test]
431    fn builder_without_schema_source() {
432        let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
433        assert!(pipeline.schema().is_none());
434    }
435
436    #[test]
437    fn builder_with_valid_schema_source() {
438        let pipeline = make_pipeline(MockExecutor::new(), true);
439        assert!(pipeline.schema().is_some());
440        let schema = pipeline.schema().unwrap();
441        assert!(schema.entities.contains_key("person"));
442    }
443
444    #[test]
445    fn builder_with_failing_schema_source() {
446        let result = PipelineBuilder::new(MockExecutor::new())
447            .with_schema_source(FailingSchemaSource)
448            .build();
449        let err = result.err().expect("Expected build error");
450        assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("source failed")));
451    }
452
453    #[test]
454    fn builder_with_default_database() {
455        let pipeline = PipelineBuilder::new(MockExecutor::new())
456            .with_default_database("mydb")
457            .build()
458            .unwrap();
459        assert_eq!(pipeline.default_database(), "mydb");
460    }
461
462    #[test]
463    fn builder_default_empty_database() {
464        let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
465        assert_eq!(pipeline.default_database(), "");
466    }
467
468    #[tokio::test]
469    async fn builder_with_interceptors() {
470        let pipeline = PipelineBuilder::new(MockExecutor::new())
471            .with_interceptor(PassthroughInterceptor {
472                name: "first".into(),
473            })
474            .with_interceptor(PassthroughInterceptor {
475                name: "second".into(),
476            })
477            .build()
478            .unwrap();
479        assert!(pipeline.schema().is_none());
480
481        let input = make_query_input(vec![]);
482        let output = pipeline.execute_query(input).await.unwrap();
483        assert_eq!(output.interceptors_applied, vec!["first", "second"]);
484    }
485
486    // =============================================
487    // execute_query tests
488    // =============================================
489
490    #[tokio::test]
491    async fn execute_query_uses_input_database() {
492        let executor = MockExecutor::new();
493        let calls = executor.calls.clone();
494        let pipeline = make_pipeline(executor, false);
495
496        let input = make_query_input_with_db(vec![], "custom_db");
497        pipeline.execute_query(input).await.unwrap();
498
499        let recorded = calls.lock().unwrap();
500        assert_eq!(recorded[0].0, "custom_db");
501    }
502
503    #[tokio::test]
504    async fn execute_query_uses_default_database_when_none() {
505        let executor = MockExecutor::new();
506        let calls = executor.calls.clone();
507        let pipeline = make_pipeline(executor, false);
508
509        let input = make_query_input(vec![]);
510        pipeline.execute_query(input).await.unwrap();
511
512        let recorded = calls.lock().unwrap();
513        assert_eq!(recorded[0].0, "test_db"); // from make_pipeline
514    }
515
516    #[tokio::test]
517    async fn execute_query_skips_validation_when_no_schema() {
518        let pipeline = make_pipeline(MockExecutor::new(), false);
519        let clauses = vec![Clause::Match(vec![Pattern::Entity {
520            variable: "x".to_string(),
521            type_name: "nonexistent_type".to_string(),
522            constraints: vec![],
523            is_strict: false,
524        }])];
525        let input = make_query_input(clauses);
526        let result = pipeline.execute_query(input).await;
527        assert!(result.is_ok());
528    }
529
530    #[tokio::test]
531    async fn execute_query_validates_when_schema_present_valid() {
532        let pipeline = make_pipeline(MockExecutor::new(), true);
533        let input = make_query_input(make_simple_clauses());
534        let result = pipeline.execute_query(input).await;
535        assert!(result.is_ok());
536    }
537
538    #[tokio::test]
539    async fn execute_query_validates_when_schema_present_invalid() {
540        let pipeline = make_pipeline(MockExecutor::new(), true);
541        let clauses = vec![Clause::Match(vec![Pattern::Entity {
542            variable: "p".to_string(),
543            type_name: "person".to_string(),
544            constraints: vec![Constraint::Has {
545                attr_name: "nonexistent_attr".to_string(),
546                value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
547                    value: serde_json::json!("val"),
548                    value_type: "string".to_string(),
549                }),
550            }],
551            is_strict: false,
552        }])];
553        let input = make_query_input(clauses);
554        let result = pipeline.execute_query(input).await;
555        let err = result.unwrap_err();
556        assert!(matches!(&err, PipelineError::Validation(msg) if msg.contains("validation error")));
557    }
558
559    #[tokio::test]
560    async fn execute_query_request_interceptor_failure() {
561        assert_eq!(RejectingRequestInterceptor.name(), "rejector");
562        let pipeline = PipelineBuilder::new(MockExecutor::new())
563            .with_interceptor(RejectingRequestInterceptor)
564            .build()
565            .unwrap();
566        let input = make_query_input(vec![]);
567        let result = pipeline.execute_query(input).await;
568        let err = result.unwrap_err();
569        assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("test rejection")));
570    }
571
572    #[tokio::test]
573    async fn execute_query_executor_failure() {
574        let pipeline = make_pipeline(MockExecutor::failing("db crash"), false);
575        let input = make_query_input(vec![]);
576        let result = pipeline.execute_query(input).await;
577        let err = result.unwrap_err();
578        assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("db crash")));
579    }
580
581    #[tokio::test]
582    async fn execute_query_response_interceptor_failure() {
583        assert_eq!(RejectingResponseInterceptor.name(), "resp-rejector");
584        let pipeline = PipelineBuilder::new(MockExecutor::new())
585            .with_interceptor(RejectingResponseInterceptor)
586            .build()
587            .unwrap();
588        let input = make_query_input(vec![]);
589        let result = pipeline.execute_query(input).await;
590        let err = result.unwrap_err();
591        assert!(
592            matches!(&err, PipelineError::Interceptor(msg) if msg.contains("response rejected"))
593        );
594    }
595
596    #[tokio::test]
597    async fn execute_query_success_output_fields() {
598        let _guard = init_tracing();
599        let count = Arc::new(AtomicUsize::new(0));
600        let pipeline =
601            PipelineBuilder::new(MockExecutor::with_result(serde_json::json!({"ok": true})))
602                .with_default_database("test_db")
603                .with_interceptor(CountingInterceptor {
604                    name: "counter".into(),
605                    count: count.clone(),
606                })
607                .build()
608                .unwrap();
609
610        let input = make_query_input(vec![]);
611        let output = pipeline.execute_query(input).await.unwrap();
612
613        assert!(!output.request_id.is_empty());
614        assert_eq!(output.results, serde_json::json!({"ok": true}));
615        assert_eq!(output.interceptors_applied, vec!["counter"]);
616        assert_eq!(count.load(Ordering::SeqCst), 1);
617    }
618
619    #[tokio::test]
620    async fn execute_query_empty_clauses_success() {
621        let pipeline = make_pipeline(MockExecutor::new(), false);
622        let input = make_query_input(vec![]);
623        let result = pipeline.execute_query(input).await;
624        assert!(result.is_ok());
625    }
626
627    #[tokio::test]
628    async fn execute_query_compiled_typeql_in_metadata() {
629        let executor = MockExecutor::new();
630        let calls = executor.calls.clone();
631        let pipeline = make_pipeline(executor, false);
632
633        let clauses = make_simple_clauses();
634        let input = make_query_input(clauses);
635        pipeline.execute_query(input).await.unwrap();
636
637        let recorded = calls.lock().unwrap();
638        assert!(!recorded[0].1.is_empty());
639    }
640
641    #[tokio::test]
642    async fn execute_query_passes_transaction_type() {
643        let executor = MockExecutor::new();
644        let calls = executor.calls.clone();
645        let pipeline = make_pipeline(executor, false);
646
647        let input = QueryInput {
648            database: None,
649            transaction_type: "write".to_string(),
650            clauses: vec![],
651            metadata: HashMap::new(),
652        };
653        pipeline.execute_query(input).await.unwrap();
654
655        let recorded = calls.lock().unwrap();
656        assert_eq!(recorded[0].2, "write");
657    }
658
659    // =============================================
660    // validate tests
661    // =============================================
662
663    #[test]
664    fn validate_no_schema_returns_error() {
665        let pipeline = make_pipeline(MockExecutor::new(), false);
666        let input = ValidateInput { clauses: vec![] };
667        let result = pipeline.validate(&input);
668        let err = result.unwrap_err();
669        assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("No schema loaded")));
670    }
671
672    #[test]
673    fn validate_valid_clauses() {
674        let pipeline = make_pipeline(MockExecutor::new(), true);
675        let input = ValidateInput {
676            clauses: make_simple_clauses(),
677        };
678        let result = pipeline.validate(&input).unwrap();
679        assert!(result.is_valid);
680        assert!(result.errors.is_empty());
681    }
682
683    #[test]
684    fn validate_invalid_clauses() {
685        let pipeline = make_pipeline(MockExecutor::new(), true);
686        let input = ValidateInput {
687            clauses: vec![Clause::Match(vec![Pattern::Entity {
688                variable: "p".to_string(),
689                type_name: "person".to_string(),
690                constraints: vec![Constraint::Has {
691                    attr_name: "nonexistent_attr".to_string(),
692                    value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
693                        value: serde_json::json!("val"),
694                        value_type: "string".to_string(),
695                    }),
696                }],
697                is_strict: false,
698            }])],
699        };
700        let result = pipeline.validate(&input).unwrap();
701        assert!(!result.is_valid);
702        assert!(!result.errors.is_empty());
703    }
704
705    #[test]
706    fn validate_error_detail_fields() {
707        let pipeline = make_pipeline(MockExecutor::new(), true);
708        let input = ValidateInput {
709            clauses: vec![Clause::Match(vec![Pattern::Entity {
710                variable: "x".to_string(),
711                type_name: "person".to_string(),
712                constraints: vec![Constraint::Has {
713                    attr_name: "nonexistent_attr".to_string(),
714                    value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
715                        value: serde_json::json!("val"),
716                        value_type: "string".to_string(),
717                    }),
718                }],
719                is_strict: false,
720            }])],
721        };
722        let result = pipeline.validate(&input).unwrap();
723        assert!(!result.is_valid);
724        let error = &result.errors[0];
725        assert!(!error.code.is_empty());
726        assert!(!error.message.is_empty());
727    }
728
729    #[test]
730    fn validate_empty_clauses_with_schema() {
731        let pipeline = make_pipeline(MockExecutor::new(), true);
732        let input = ValidateInput { clauses: vec![] };
733        let result = pipeline.validate(&input).unwrap();
734        assert!(result.is_valid);
735    }
736
737    // =============================================
738    // Accessor tests
739    // =============================================
740
741    #[test]
742    fn schema_returns_some_when_loaded() {
743        let pipeline = make_pipeline(MockExecutor::new(), true);
744        assert!(pipeline.schema().is_some());
745    }
746
747    #[test]
748    fn schema_returns_none_when_not_loaded() {
749        let pipeline = make_pipeline(MockExecutor::new(), false);
750        assert!(pipeline.schema().is_none());
751    }
752
753    #[test]
754    fn is_connected_delegates_to_executor() {
755        let executor = MockExecutor::new();
756        *executor.connected.lock().unwrap() = true;
757        let pipeline = make_pipeline(executor, false);
758        assert!(pipeline.is_connected());
759    }
760
761    #[test]
762    fn is_connected_false_when_executor_disconnected() {
763        let executor = MockExecutor::new();
764        *executor.connected.lock().unwrap() = false;
765        let pipeline = make_pipeline(executor, false);
766        assert!(!pipeline.is_connected());
767    }
768
769    #[test]
770    fn default_database_returns_configured_value() {
771        let pipeline = PipelineBuilder::new(MockExecutor::new())
772            .with_default_database("my_database")
773            .build()
774            .unwrap();
775        assert_eq!(pipeline.default_database(), "my_database");
776    }
777
778    #[test]
779    fn default_database_empty_when_not_set() {
780        let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
781        assert_eq!(pipeline.default_database(), "");
782    }
783}