Skip to main content

type_bridge_server/
pipeline.rs

1use std::collections::HashMap;
2use std::time::Instant;
3
4use type_bridge_core_lib::ast::Clause;
5use type_bridge_core_lib::compiler::QueryCompiler;
6use type_bridge_core_lib::schema::TypeSchema;
7use type_bridge_core_lib::validation::ValidationEngine;
8
9use crate::error::PipelineError;
10use crate::executor::QueryExecutor;
11use crate::interceptor::{Interceptor, InterceptorChain, RequestContext};
12use crate::schema_source::SchemaSource;
13
14/// Input for a structured (AST-based) query.
15pub struct QueryInput {
16    pub database: Option<String>,
17    pub transaction_type: String,
18    pub clauses: Vec<Clause>,
19    pub metadata: HashMap<String, serde_json::Value>,
20}
21
22/// Input for a validation-only request.
23pub struct ValidateInput {
24    pub clauses: Vec<Clause>,
25}
26
27/// Output from a successful pipeline execution.
28#[derive(Debug)]
29pub struct QueryOutput {
30    pub results: serde_json::Value,
31    pub request_id: String,
32    pub execution_time_ms: u64,
33    pub interceptors_applied: Vec<String>,
34}
35
36/// Output from a validation-only request.
37#[derive(Debug)]
38pub struct ValidateOutput {
39    pub is_valid: bool,
40    pub errors: Vec<ValidationErrorDetail>,
41}
42
43/// A single validation error.
44#[derive(Debug)]
45pub struct ValidationErrorDetail {
46    pub code: String,
47    pub message: String,
48    pub path: String,
49}
50
51#[cfg_attr(coverage_nightly, coverage(off))]
52fn log_query_execution(database: &str, transaction_type: &str, typeql: &str) {
53    tracing::info!(database, transaction_type, "Executing query");
54    tracing::debug!(typeql, "Compiled TypeQL");
55}
56
57/// Transport-agnostic query pipeline.
58///
59/// Encapsulates the full query lifecycle: validate → intercept → compile → execute → intercept.
60/// Use [`PipelineBuilder`] to construct an instance.
61///
62/// # Example
63///
64/// ```rust,ignore
65/// use type_bridge_server::{PipelineBuilder, QueryInput};
66///
67/// let pipeline = PipelineBuilder::new(my_executor)
68///     .with_schema_source(my_schema_source)
69///     .with_default_database("my_db")
70///     .build()?;
71///
72/// let output = pipeline.execute_query(QueryInput { ... }).await?;
73/// ```
74pub struct QueryPipeline {
75    schema: Option<TypeSchema>,
76    validation_engine: ValidationEngine,
77    interceptor_chain: InterceptorChain,
78    default_database: String,
79    executor: Box<dyn QueryExecutor>,
80    skip_validation: bool,
81}
82
83impl QueryPipeline {
84    /// Execute a structured (AST-based) query through the full pipeline.
85    pub async fn execute_query(&self, input: QueryInput) -> Result<QueryOutput, PipelineError> {
86        let start = Instant::now();
87        let request_id = uuid::Uuid::new_v4().to_string();
88        let database = input
89            .database
90            .unwrap_or_else(|| self.default_database.clone());
91
92        let mut ctx = RequestContext {
93            request_id: request_id.clone(),
94            client_id: "unknown".to_string(),
95            database: database.clone(),
96            transaction_type: input.transaction_type.clone(),
97            metadata: input.metadata,
98            timestamp: chrono::Utc::now(),
99        };
100
101        // Validate against schema
102        if !self.skip_validation
103            && let Some(schema) = &self.schema
104        {
105            let result = self.validation_engine.validate_query(&input.clauses, schema);
106            if !result.is_valid {
107                return Err(PipelineError::Validation(format!(
108                    "{} validation error(s)",
109                    result.errors.len()
110                )));
111            }
112        }
113
114        // Run request interceptors
115        let clauses = self
116            .interceptor_chain
117            .execute_request(input.clauses, &mut ctx)
118            .await
119            .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
120
121        // Compile to TypeQL
122        let compiler = QueryCompiler::new();
123        let typeql = compiler.compile(&clauses);
124        ctx.metadata.insert(
125            "compiled_typeql".to_string(),
126            serde_json::Value::String(typeql.clone()),
127        );
128
129        // Execute
130        log_query_execution(&database, &input.transaction_type, &typeql);
131
132        let results = self
133            .executor
134            .execute(&database, &typeql, &input.transaction_type)
135            .await?;
136
137        // Run response interceptors
138        self.interceptor_chain
139            .execute_response(&results, &ctx)
140            .await
141            .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
142
143        let elapsed = start.elapsed().as_millis() as u64;
144
145        Ok(QueryOutput {
146            results,
147            request_id,
148            execution_time_ms: elapsed,
149            interceptors_applied: self
150                .interceptor_chain
151                .interceptor_names()
152                .into_iter()
153                .map(String::from)
154                .collect(),
155        })
156    }
157
158    /// Validate clauses against the loaded schema without executing.
159    pub fn validate(&self, input: &ValidateInput) -> Result<ValidateOutput, PipelineError> {
160        let schema = self
161            .schema
162            .as_ref()
163            .ok_or_else(|| PipelineError::Schema("No schema loaded".to_string()))?;
164
165        let result = self.validation_engine.validate_query(&input.clauses, schema);
166
167        let errors = result
168            .errors
169            .iter()
170            .map(|e| ValidationErrorDetail {
171                code: e.code.clone(),
172                message: e.message.clone(),
173                path: e.path.clone(),
174            })
175            .collect();
176
177        Ok(ValidateOutput {
178            is_valid: result.is_valid,
179            errors,
180        })
181    }
182
183    /// Get the loaded schema, if any.
184    pub fn schema(&self) -> Option<&TypeSchema> {
185        self.schema.as_ref()
186    }
187
188    /// Check if the backend executor is connected.
189    pub fn is_connected(&self) -> bool {
190        self.executor.is_connected()
191    }
192
193    /// Get the default database name.
194    pub fn default_database(&self) -> &str {
195        &self.default_database
196    }
197}
198
199/// Builder for constructing a [`QueryPipeline`].
200///
201/// # Example
202///
203/// ```rust,ignore
204/// use type_bridge_server::PipelineBuilder;
205///
206/// let pipeline = PipelineBuilder::new(my_executor)
207///     .with_schema_source(FileSchemaSource::new("schema.tql"))
208///     .with_interceptor(AuditLogInterceptor::new(&config)?)
209///     .with_default_database("my_db")
210///     .build()?;
211/// ```
212pub struct PipelineBuilder {
213    executor: Box<dyn QueryExecutor>,
214    schema_source: Option<Box<dyn SchemaSource>>,
215    interceptors: Vec<Box<dyn Interceptor>>,
216    default_database: String,
217    skip_validation: bool,
218}
219
220impl PipelineBuilder {
221    /// Create a new builder with the given query executor.
222    pub fn new(executor: impl QueryExecutor + 'static) -> Self {
223        Self {
224            executor: Box::new(executor),
225            schema_source: None,
226            interceptors: Vec::new(),
227            default_database: String::new(),
228            skip_validation: false,
229        }
230    }
231
232    /// Set the schema source. The schema will be loaded during [`build()`](Self::build).
233    pub fn with_schema_source(mut self, source: impl SchemaSource + 'static) -> Self {
234        self.schema_source = Some(Box::new(source));
235        self
236    }
237
238    /// Add an interceptor to the pipeline chain.
239    pub fn with_interceptor(mut self, interceptor: impl Interceptor + 'static) -> Self {
240        self.interceptors.push(Box::new(interceptor));
241        self
242    }
243
244    /// Set the default database name used when requests don't specify one.
245    pub fn with_default_database(mut self, database: impl Into<String>) -> Self {
246        self.default_database = database.into();
247        self
248    }
249
250    /// Skip schema validation during query execution.
251    ///
252    /// The schema is still loaded (and accessible via [`QueryPipeline::schema`]),
253    /// but queries are not validated against it before execution.
254    pub fn with_skip_validation(mut self) -> Self {
255        self.skip_validation = true;
256        self
257    }
258
259    /// Build the pipeline, loading the schema if a source was provided.
260    pub fn build(self) -> Result<QueryPipeline, PipelineError> {
261        let schema = match self.schema_source {
262            Some(source) => Some(source.load()?),
263            None => None,
264        };
265
266        Ok(QueryPipeline {
267            schema,
268            validation_engine: ValidationEngine::new(),
269            interceptor_chain: InterceptorChain::new(self.interceptors),
270            default_database: self.default_database,
271            executor: self.executor,
272            skip_validation: self.skip_validation,
273        })
274    }
275}
276
277#[cfg(test)]
278#[cfg_attr(coverage_nightly, coverage(off))]
279mod tests {
280    use std::future::Future;
281    use std::pin::Pin;
282    use std::sync::atomic::{AtomicUsize, Ordering};
283    use std::sync::Arc;
284
285    use type_bridge_core_lib::ast::{Constraint, Pattern, Value};
286
287    use super::*;
288    use crate::interceptor::traits::InterceptError;
289    use crate::test_helpers::{make_pipeline, make_simple_clauses, MockExecutor};
290
291    fn init_tracing() -> tracing::subscriber::DefaultGuard {
292        let subscriber = tracing_subscriber::fmt()
293            .with_max_level(tracing::Level::DEBUG)
294            .with_test_writer()
295            .finish();
296        tracing::subscriber::set_default(subscriber)
297    }
298
299    // --- Helper interceptors ---
300
301    struct PassthroughInterceptor {
302        name: String,
303    }
304
305    impl Interceptor for PassthroughInterceptor {
306        fn name(&self) -> &str {
307            &self.name
308        }
309        fn on_request<'a>(
310            &'a self,
311            clauses: Vec<Clause>,
312            _ctx: &'a mut RequestContext,
313        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
314            Box::pin(async move { Ok(clauses) })
315        }
316    }
317
318    struct RejectingRequestInterceptor;
319
320    impl Interceptor for RejectingRequestInterceptor {
321        fn name(&self) -> &str {
322            "rejector"
323        }
324        fn on_request<'a>(
325            &'a self,
326            _clauses: Vec<Clause>,
327            _ctx: &'a mut RequestContext,
328        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
329            Box::pin(async {
330                Err(InterceptError::AccessDenied {
331                    reason: "test rejection".into(),
332                })
333            })
334        }
335    }
336
337    struct RejectingResponseInterceptor;
338
339    impl Interceptor for RejectingResponseInterceptor {
340        fn name(&self) -> &str {
341            "resp-rejector"
342        }
343        fn on_request<'a>(
344            &'a self,
345            clauses: Vec<Clause>,
346            _ctx: &'a mut RequestContext,
347        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
348            Box::pin(async move { Ok(clauses) })
349        }
350        fn on_response<'a>(
351            &'a self,
352            _result: &'a serde_json::Value,
353            _ctx: &'a RequestContext,
354        ) -> Pin<Box<dyn Future<Output = Result<(), InterceptError>> + Send + 'a>> {
355            Box::pin(async { Err(InterceptError::Internal("response rejected".into())) })
356        }
357    }
358
359    struct CountingInterceptor {
360        name: String,
361        count: Arc<AtomicUsize>,
362    }
363
364    impl Interceptor for CountingInterceptor {
365        fn name(&self) -> &str {
366            &self.name
367        }
368        fn on_request<'a>(
369            &'a self,
370            clauses: Vec<Clause>,
371            _ctx: &'a mut RequestContext,
372        ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
373            Box::pin(async move {
374                self.count.fetch_add(1, Ordering::SeqCst);
375                Ok(clauses)
376            })
377        }
378    }
379
380    /// SchemaSource that always fails.
381    struct FailingSchemaSource;
382
383    impl crate::schema_source::SchemaSource for FailingSchemaSource {
384        fn load(&self) -> Result<TypeSchema, PipelineError> {
385            Err(PipelineError::Schema("source failed".into()))
386        }
387    }
388
389    fn make_query_input(clauses: Vec<Clause>) -> QueryInput {
390        QueryInput {
391            database: None,
392            transaction_type: "read".to_string(),
393            clauses,
394            metadata: HashMap::new(),
395        }
396    }
397
398    fn make_query_input_with_db(clauses: Vec<Clause>, db: &str) -> QueryInput {
399        QueryInput {
400            database: Some(db.to_string()),
401            transaction_type: "read".to_string(),
402            clauses,
403            metadata: HashMap::new(),
404        }
405    }
406
407    // =============================================
408    // PipelineBuilder tests
409    // =============================================
410
411    #[test]
412    fn builder_without_schema_source() {
413        let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
414        assert!(pipeline.schema().is_none());
415    }
416
417    #[test]
418    fn builder_with_valid_schema_source() {
419        let pipeline = make_pipeline(MockExecutor::new(), true);
420        assert!(pipeline.schema().is_some());
421        let schema = pipeline.schema().unwrap();
422        assert!(schema.entities.contains_key("person"));
423    }
424
425    #[test]
426    fn builder_with_failing_schema_source() {
427        let result = PipelineBuilder::new(MockExecutor::new())
428            .with_schema_source(FailingSchemaSource)
429            .build();
430        let err = result.err().expect("Expected build error");
431        assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("source failed")));
432    }
433
434    #[test]
435    fn builder_with_default_database() {
436        let pipeline = PipelineBuilder::new(MockExecutor::new())
437            .with_default_database("mydb")
438            .build()
439            .unwrap();
440        assert_eq!(pipeline.default_database(), "mydb");
441    }
442
443    #[test]
444    fn builder_default_empty_database() {
445        let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
446        assert_eq!(pipeline.default_database(), "");
447    }
448
449    #[tokio::test]
450    async fn builder_with_interceptors() {
451        let pipeline = PipelineBuilder::new(MockExecutor::new())
452            .with_interceptor(PassthroughInterceptor {
453                name: "first".into(),
454            })
455            .with_interceptor(PassthroughInterceptor {
456                name: "second".into(),
457            })
458            .build()
459            .unwrap();
460        assert!(pipeline.schema().is_none());
461
462        let input = make_query_input(vec![]);
463        let output = pipeline.execute_query(input).await.unwrap();
464        assert_eq!(output.interceptors_applied, vec!["first", "second"]);
465    }
466
467    // =============================================
468    // execute_query tests
469    // =============================================
470
471    #[tokio::test]
472    async fn execute_query_uses_input_database() {
473        let executor = MockExecutor::new();
474        let calls = executor.calls.clone();
475        let pipeline = make_pipeline(executor, false);
476
477        let input = make_query_input_with_db(vec![], "custom_db");
478        pipeline.execute_query(input).await.unwrap();
479
480        let recorded = calls.lock().unwrap();
481        assert_eq!(recorded[0].0, "custom_db");
482    }
483
484    #[tokio::test]
485    async fn execute_query_uses_default_database_when_none() {
486        let executor = MockExecutor::new();
487        let calls = executor.calls.clone();
488        let pipeline = make_pipeline(executor, false);
489
490        let input = make_query_input(vec![]);
491        pipeline.execute_query(input).await.unwrap();
492
493        let recorded = calls.lock().unwrap();
494        assert_eq!(recorded[0].0, "test_db"); // from make_pipeline
495    }
496
497    #[tokio::test]
498    async fn execute_query_skips_validation_when_no_schema() {
499        let pipeline = make_pipeline(MockExecutor::new(), false);
500        let clauses = vec![Clause::Match(vec![Pattern::Entity {
501            variable: "x".to_string(),
502            type_name: "nonexistent_type".to_string(),
503            constraints: vec![],
504            is_strict: false,
505        }])];
506        let input = make_query_input(clauses);
507        let result = pipeline.execute_query(input).await;
508        assert!(result.is_ok());
509    }
510
511    #[tokio::test]
512    async fn execute_query_validates_when_schema_present_valid() {
513        let pipeline = make_pipeline(MockExecutor::new(), true);
514        let input = make_query_input(make_simple_clauses());
515        let result = pipeline.execute_query(input).await;
516        assert!(result.is_ok());
517    }
518
519    #[tokio::test]
520    async fn execute_query_validates_when_schema_present_invalid() {
521        let pipeline = make_pipeline(MockExecutor::new(), true);
522        let clauses = vec![Clause::Match(vec![Pattern::Entity {
523            variable: "p".to_string(),
524            type_name: "person".to_string(),
525            constraints: vec![Constraint::Has {
526                attr_name: "nonexistent_attr".to_string(),
527                value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
528                    value: serde_json::json!("val"),
529                    value_type: "string".to_string(),
530                }),
531            }],
532            is_strict: false,
533        }])];
534        let input = make_query_input(clauses);
535        let result = pipeline.execute_query(input).await;
536        let err = result.unwrap_err();
537        assert!(matches!(&err, PipelineError::Validation(msg) if msg.contains("validation error")));
538    }
539
540    #[tokio::test]
541    async fn execute_query_request_interceptor_failure() {
542        assert_eq!(RejectingRequestInterceptor.name(), "rejector");
543        let pipeline = PipelineBuilder::new(MockExecutor::new())
544            .with_interceptor(RejectingRequestInterceptor)
545            .build()
546            .unwrap();
547        let input = make_query_input(vec![]);
548        let result = pipeline.execute_query(input).await;
549        let err = result.unwrap_err();
550        assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("test rejection")));
551    }
552
553    #[tokio::test]
554    async fn execute_query_executor_failure() {
555        let pipeline = make_pipeline(MockExecutor::failing("db crash"), false);
556        let input = make_query_input(vec![]);
557        let result = pipeline.execute_query(input).await;
558        let err = result.unwrap_err();
559        assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("db crash")));
560    }
561
562    #[tokio::test]
563    async fn execute_query_response_interceptor_failure() {
564        assert_eq!(RejectingResponseInterceptor.name(), "resp-rejector");
565        let pipeline = PipelineBuilder::new(MockExecutor::new())
566            .with_interceptor(RejectingResponseInterceptor)
567            .build()
568            .unwrap();
569        let input = make_query_input(vec![]);
570        let result = pipeline.execute_query(input).await;
571        let err = result.unwrap_err();
572        assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("response rejected")));
573    }
574
575    #[tokio::test]
576    async fn execute_query_success_output_fields() {
577        let _guard = init_tracing();
578        let count = Arc::new(AtomicUsize::new(0));
579        let pipeline = PipelineBuilder::new(MockExecutor::with_result(serde_json::json!({"ok": true})))
580            .with_default_database("test_db")
581            .with_interceptor(CountingInterceptor {
582                name: "counter".into(),
583                count: count.clone(),
584            })
585            .build()
586            .unwrap();
587
588        let input = make_query_input(vec![]);
589        let output = pipeline.execute_query(input).await.unwrap();
590
591        assert!(!output.request_id.is_empty());
592        assert_eq!(output.results, serde_json::json!({"ok": true}));
593        assert_eq!(output.interceptors_applied, vec!["counter"]);
594        assert_eq!(count.load(Ordering::SeqCst), 1);
595    }
596
597    #[tokio::test]
598    async fn execute_query_empty_clauses_success() {
599        let pipeline = make_pipeline(MockExecutor::new(), false);
600        let input = make_query_input(vec![]);
601        let result = pipeline.execute_query(input).await;
602        assert!(result.is_ok());
603    }
604
605    #[tokio::test]
606    async fn execute_query_compiled_typeql_in_metadata() {
607        let executor = MockExecutor::new();
608        let calls = executor.calls.clone();
609        let pipeline = make_pipeline(executor, false);
610
611        let clauses = make_simple_clauses();
612        let input = make_query_input(clauses);
613        pipeline.execute_query(input).await.unwrap();
614
615        let recorded = calls.lock().unwrap();
616        assert!(!recorded[0].1.is_empty());
617    }
618
619    #[tokio::test]
620    async fn execute_query_passes_transaction_type() {
621        let executor = MockExecutor::new();
622        let calls = executor.calls.clone();
623        let pipeline = make_pipeline(executor, false);
624
625        let input = QueryInput {
626            database: None,
627            transaction_type: "write".to_string(),
628            clauses: vec![],
629            metadata: HashMap::new(),
630        };
631        pipeline.execute_query(input).await.unwrap();
632
633        let recorded = calls.lock().unwrap();
634        assert_eq!(recorded[0].2, "write");
635    }
636
637    // =============================================
638    // validate tests
639    // =============================================
640
641    #[test]
642    fn validate_no_schema_returns_error() {
643        let pipeline = make_pipeline(MockExecutor::new(), false);
644        let input = ValidateInput { clauses: vec![] };
645        let result = pipeline.validate(&input);
646        let err = result.unwrap_err();
647        assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("No schema loaded")));
648    }
649
650    #[test]
651    fn validate_valid_clauses() {
652        let pipeline = make_pipeline(MockExecutor::new(), true);
653        let input = ValidateInput {
654            clauses: make_simple_clauses(),
655        };
656        let result = pipeline.validate(&input).unwrap();
657        assert!(result.is_valid);
658        assert!(result.errors.is_empty());
659    }
660
661    #[test]
662    fn validate_invalid_clauses() {
663        let pipeline = make_pipeline(MockExecutor::new(), true);
664        let input = ValidateInput {
665            clauses: vec![Clause::Match(vec![Pattern::Entity {
666                variable: "p".to_string(),
667                type_name: "person".to_string(),
668                constraints: vec![Constraint::Has {
669                    attr_name: "nonexistent_attr".to_string(),
670                    value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
671                        value: serde_json::json!("val"),
672                        value_type: "string".to_string(),
673                    }),
674                }],
675                is_strict: false,
676            }])],
677        };
678        let result = pipeline.validate(&input).unwrap();
679        assert!(!result.is_valid);
680        assert!(!result.errors.is_empty());
681    }
682
683    #[test]
684    fn validate_error_detail_fields() {
685        let pipeline = make_pipeline(MockExecutor::new(), true);
686        let input = ValidateInput {
687            clauses: vec![Clause::Match(vec![Pattern::Entity {
688                variable: "x".to_string(),
689                type_name: "person".to_string(),
690                constraints: vec![Constraint::Has {
691                    attr_name: "nonexistent_attr".to_string(),
692                    value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
693                        value: serde_json::json!("val"),
694                        value_type: "string".to_string(),
695                    }),
696                }],
697                is_strict: false,
698            }])],
699        };
700        let result = pipeline.validate(&input).unwrap();
701        assert!(!result.is_valid);
702        let error = &result.errors[0];
703        assert!(!error.code.is_empty());
704        assert!(!error.message.is_empty());
705    }
706
707    #[test]
708    fn validate_empty_clauses_with_schema() {
709        let pipeline = make_pipeline(MockExecutor::new(), true);
710        let input = ValidateInput { clauses: vec![] };
711        let result = pipeline.validate(&input).unwrap();
712        assert!(result.is_valid);
713    }
714
715    // =============================================
716    // Accessor tests
717    // =============================================
718
719    #[test]
720    fn schema_returns_some_when_loaded() {
721        let pipeline = make_pipeline(MockExecutor::new(), true);
722        assert!(pipeline.schema().is_some());
723    }
724
725    #[test]
726    fn schema_returns_none_when_not_loaded() {
727        let pipeline = make_pipeline(MockExecutor::new(), false);
728        assert!(pipeline.schema().is_none());
729    }
730
731    #[test]
732    fn is_connected_delegates_to_executor() {
733        let executor = MockExecutor::new();
734        *executor.connected.lock().unwrap() = true;
735        let pipeline = make_pipeline(executor, false);
736        assert!(pipeline.is_connected());
737    }
738
739    #[test]
740    fn is_connected_false_when_executor_disconnected() {
741        let executor = MockExecutor::new();
742        *executor.connected.lock().unwrap() = false;
743        let pipeline = make_pipeline(executor, false);
744        assert!(!pipeline.is_connected());
745    }
746
747    #[test]
748    fn default_database_returns_configured_value() {
749        let pipeline = PipelineBuilder::new(MockExecutor::new())
750            .with_default_database("my_database")
751            .build()
752            .unwrap();
753        assert_eq!(pipeline.default_database(), "my_database");
754    }
755
756    #[test]
757    fn default_database_empty_when_not_set() {
758        let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
759        assert_eq!(pipeline.default_database(), "");
760    }
761}