1use std::collections::HashMap;
2use std::time::Instant;
3
4use type_bridge_core_lib::ast::Clause;
5use type_bridge_core_lib::compiler::QueryCompiler;
6use type_bridge_core_lib::query_parser;
7use type_bridge_core_lib::schema::TypeSchema;
8use type_bridge_core_lib::validation::ValidationEngine;
9
10use crate::error::PipelineError;
11use crate::executor::QueryExecutor;
12use crate::interceptor::{Interceptor, InterceptorChain, RequestContext};
13use crate::schema_source::SchemaSource;
14
15pub struct QueryInput {
17 pub database: Option<String>,
18 pub transaction_type: String,
19 pub clauses: Vec<Clause>,
20 pub metadata: HashMap<String, serde_json::Value>,
21}
22
23pub struct RawQueryInput {
25 pub database: Option<String>,
26 pub transaction_type: String,
27 pub query: String,
28 pub metadata: HashMap<String, serde_json::Value>,
29}
30
31pub struct ValidateInput {
33 pub clauses: Vec<Clause>,
34}
35
36#[derive(Debug)]
38pub struct QueryOutput {
39 pub results: serde_json::Value,
40 pub request_id: String,
41 pub execution_time_ms: u64,
42 pub interceptors_applied: Vec<String>,
43}
44
45#[derive(Debug)]
47pub struct ValidateOutput {
48 pub is_valid: bool,
49 pub errors: Vec<ValidationErrorDetail>,
50}
51
52#[derive(Debug)]
54pub struct ValidationErrorDetail {
55 pub code: String,
56 pub message: String,
57 pub path: String,
58}
59
60#[cfg_attr(coverage_nightly, coverage(off))]
61fn log_query_execution(database: &str, transaction_type: &str, typeql: &str) {
62 tracing::info!(database, transaction_type, "Executing query");
63 tracing::debug!(typeql, "Compiled TypeQL");
64}
65
66pub struct QueryPipeline {
84 schema: Option<TypeSchema>,
85 validation_engine: ValidationEngine,
86 interceptor_chain: InterceptorChain,
87 default_database: String,
88 executor: Box<dyn QueryExecutor>,
89}
90
91impl QueryPipeline {
92 pub async fn execute_query(&self, input: QueryInput) -> Result<QueryOutput, PipelineError> {
94 let start = Instant::now();
95 let request_id = uuid::Uuid::new_v4().to_string();
96 let database = input
97 .database
98 .unwrap_or_else(|| self.default_database.clone());
99
100 let mut ctx = RequestContext {
101 request_id: request_id.clone(),
102 client_id: "unknown".to_string(),
103 database: database.clone(),
104 transaction_type: input.transaction_type.clone(),
105 metadata: input.metadata,
106 timestamp: chrono::Utc::now(),
107 };
108
109 if let Some(schema) = &self.schema {
111 let result = self.validation_engine.validate_query(&input.clauses, schema);
112 if !result.is_valid {
113 return Err(PipelineError::Validation(format!(
114 "{} validation error(s)",
115 result.errors.len()
116 )));
117 }
118 }
119
120 let clauses = self
122 .interceptor_chain
123 .execute_request(input.clauses, &mut ctx)
124 .await
125 .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
126
127 let compiler = QueryCompiler::new();
129 let typeql = compiler.compile(&clauses);
130 ctx.metadata.insert(
131 "compiled_typeql".to_string(),
132 serde_json::Value::String(typeql.clone()),
133 );
134
135 log_query_execution(&database, &input.transaction_type, &typeql);
137
138 let results = self
139 .executor
140 .execute(&database, &typeql, &input.transaction_type)
141 .await?;
142
143 self.interceptor_chain
145 .execute_response(&results, &ctx)
146 .await
147 .map_err(|e| PipelineError::Interceptor(e.to_string()))?;
148
149 let elapsed = start.elapsed().as_millis() as u64;
150
151 Ok(QueryOutput {
152 results,
153 request_id,
154 execution_time_ms: elapsed,
155 interceptors_applied: self
156 .interceptor_chain
157 .interceptor_names()
158 .into_iter()
159 .map(String::from)
160 .collect(),
161 })
162 }
163
164 pub async fn execute_raw(&self, input: RawQueryInput) -> Result<QueryOutput, PipelineError> {
166 let clauses = query_parser::parse_typeql_query(&input.query)
167 .map_err(|e| PipelineError::Parse(e.to_string()))?;
168
169 self.execute_query(QueryInput {
170 database: input.database,
171 transaction_type: input.transaction_type,
172 clauses,
173 metadata: input.metadata,
174 })
175 .await
176 }
177
178 pub fn validate(&self, input: &ValidateInput) -> Result<ValidateOutput, PipelineError> {
180 let schema = self
181 .schema
182 .as_ref()
183 .ok_or_else(|| PipelineError::Schema("No schema loaded".to_string()))?;
184
185 let result = self.validation_engine.validate_query(&input.clauses, schema);
186
187 let errors = result
188 .errors
189 .iter()
190 .map(|e| ValidationErrorDetail {
191 code: e.code.clone(),
192 message: e.message.clone(),
193 path: e.path.clone(),
194 })
195 .collect();
196
197 Ok(ValidateOutput {
198 is_valid: result.is_valid,
199 errors,
200 })
201 }
202
203 pub fn schema(&self) -> Option<&TypeSchema> {
205 self.schema.as_ref()
206 }
207
208 pub fn is_connected(&self) -> bool {
210 self.executor.is_connected()
211 }
212
213 pub fn default_database(&self) -> &str {
215 &self.default_database
216 }
217}
218
219pub struct PipelineBuilder {
233 executor: Box<dyn QueryExecutor>,
234 schema_source: Option<Box<dyn SchemaSource>>,
235 interceptors: Vec<Box<dyn Interceptor>>,
236 default_database: String,
237}
238
239impl PipelineBuilder {
240 pub fn new(executor: impl QueryExecutor + 'static) -> Self {
242 Self {
243 executor: Box::new(executor),
244 schema_source: None,
245 interceptors: Vec::new(),
246 default_database: String::new(),
247 }
248 }
249
250 pub fn with_schema_source(mut self, source: impl SchemaSource + 'static) -> Self {
252 self.schema_source = Some(Box::new(source));
253 self
254 }
255
256 pub fn with_interceptor(mut self, interceptor: impl Interceptor + 'static) -> Self {
258 self.interceptors.push(Box::new(interceptor));
259 self
260 }
261
262 pub fn with_default_database(mut self, database: impl Into<String>) -> Self {
264 self.default_database = database.into();
265 self
266 }
267
268 pub fn build(self) -> Result<QueryPipeline, PipelineError> {
270 let schema = match self.schema_source {
271 Some(source) => Some(source.load()?),
272 None => None,
273 };
274
275 Ok(QueryPipeline {
276 schema,
277 validation_engine: ValidationEngine::new(),
278 interceptor_chain: InterceptorChain::new(self.interceptors),
279 default_database: self.default_database,
280 executor: self.executor,
281 })
282 }
283}
284
285#[cfg(test)]
286#[cfg_attr(coverage_nightly, coverage(off))]
287mod tests {
288 use std::future::Future;
289 use std::pin::Pin;
290 use std::sync::atomic::{AtomicUsize, Ordering};
291 use std::sync::Arc;
292
293 use type_bridge_core_lib::ast::{Constraint, Pattern, Value};
294
295 use super::*;
296 use crate::interceptor::traits::InterceptError;
297 use crate::test_helpers::{make_pipeline, make_simple_clauses, MockExecutor};
298
299 fn init_tracing() -> tracing::subscriber::DefaultGuard {
300 let subscriber = tracing_subscriber::fmt()
301 .with_max_level(tracing::Level::DEBUG)
302 .with_test_writer()
303 .finish();
304 tracing::subscriber::set_default(subscriber)
305 }
306
307 struct PassthroughInterceptor {
310 name: String,
311 }
312
313 impl Interceptor for PassthroughInterceptor {
314 fn name(&self) -> &str {
315 &self.name
316 }
317 fn on_request<'a>(
318 &'a self,
319 clauses: Vec<Clause>,
320 _ctx: &'a mut RequestContext,
321 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
322 Box::pin(async move { Ok(clauses) })
323 }
324 }
325
326 struct RejectingRequestInterceptor;
327
328 impl Interceptor for RejectingRequestInterceptor {
329 fn name(&self) -> &str {
330 "rejector"
331 }
332 fn on_request<'a>(
333 &'a self,
334 _clauses: Vec<Clause>,
335 _ctx: &'a mut RequestContext,
336 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
337 Box::pin(async {
338 Err(InterceptError::AccessDenied {
339 reason: "test rejection".into(),
340 })
341 })
342 }
343 }
344
345 struct RejectingResponseInterceptor;
346
347 impl Interceptor for RejectingResponseInterceptor {
348 fn name(&self) -> &str {
349 "resp-rejector"
350 }
351 fn on_request<'a>(
352 &'a self,
353 clauses: Vec<Clause>,
354 _ctx: &'a mut RequestContext,
355 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
356 Box::pin(async move { Ok(clauses) })
357 }
358 fn on_response<'a>(
359 &'a self,
360 _result: &'a serde_json::Value,
361 _ctx: &'a RequestContext,
362 ) -> Pin<Box<dyn Future<Output = Result<(), InterceptError>> + Send + 'a>> {
363 Box::pin(async { Err(InterceptError::Internal("response rejected".into())) })
364 }
365 }
366
367 struct CountingInterceptor {
368 name: String,
369 count: Arc<AtomicUsize>,
370 }
371
372 impl Interceptor for CountingInterceptor {
373 fn name(&self) -> &str {
374 &self.name
375 }
376 fn on_request<'a>(
377 &'a self,
378 clauses: Vec<Clause>,
379 _ctx: &'a mut RequestContext,
380 ) -> Pin<Box<dyn Future<Output = Result<Vec<Clause>, InterceptError>> + Send + 'a>> {
381 Box::pin(async move {
382 self.count.fetch_add(1, Ordering::SeqCst);
383 Ok(clauses)
384 })
385 }
386 }
387
388 struct FailingSchemaSource;
390
391 impl crate::schema_source::SchemaSource for FailingSchemaSource {
392 fn load(&self) -> Result<TypeSchema, PipelineError> {
393 Err(PipelineError::Schema("source failed".into()))
394 }
395 }
396
397 fn make_query_input(clauses: Vec<Clause>) -> QueryInput {
398 QueryInput {
399 database: None,
400 transaction_type: "read".to_string(),
401 clauses,
402 metadata: HashMap::new(),
403 }
404 }
405
406 fn make_query_input_with_db(clauses: Vec<Clause>, db: &str) -> QueryInput {
407 QueryInput {
408 database: Some(db.to_string()),
409 transaction_type: "read".to_string(),
410 clauses,
411 metadata: HashMap::new(),
412 }
413 }
414
415 #[test]
420 fn builder_without_schema_source() {
421 let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
422 assert!(pipeline.schema().is_none());
423 }
424
425 #[test]
426 fn builder_with_valid_schema_source() {
427 let pipeline = make_pipeline(MockExecutor::new(), true);
428 assert!(pipeline.schema().is_some());
429 let schema = pipeline.schema().unwrap();
430 assert!(schema.entities.contains_key("person"));
431 }
432
433 #[test]
434 fn builder_with_failing_schema_source() {
435 let result = PipelineBuilder::new(MockExecutor::new())
436 .with_schema_source(FailingSchemaSource)
437 .build();
438 let err = result.err().expect("Expected build error");
439 assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("source failed")));
440 }
441
442 #[test]
443 fn builder_with_default_database() {
444 let pipeline = PipelineBuilder::new(MockExecutor::new())
445 .with_default_database("mydb")
446 .build()
447 .unwrap();
448 assert_eq!(pipeline.default_database(), "mydb");
449 }
450
451 #[test]
452 fn builder_default_empty_database() {
453 let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
454 assert_eq!(pipeline.default_database(), "");
455 }
456
457 #[tokio::test]
458 async fn builder_with_interceptors() {
459 let pipeline = PipelineBuilder::new(MockExecutor::new())
460 .with_interceptor(PassthroughInterceptor {
461 name: "first".into(),
462 })
463 .with_interceptor(PassthroughInterceptor {
464 name: "second".into(),
465 })
466 .build()
467 .unwrap();
468 assert!(pipeline.schema().is_none());
469
470 let input = make_query_input(vec![]);
472 let output = pipeline.execute_query(input).await.unwrap();
473 assert_eq!(output.interceptors_applied, vec!["first", "second"]);
474 }
475
476 #[tokio::test]
481 async fn execute_query_uses_input_database() {
482 let executor = MockExecutor::new();
483 let calls = executor.calls.clone();
484 let pipeline = make_pipeline(executor, false);
485
486 let input = make_query_input_with_db(vec![], "custom_db");
487 pipeline.execute_query(input).await.unwrap();
488
489 let recorded = calls.lock().unwrap();
490 assert_eq!(recorded[0].0, "custom_db");
491 }
492
493 #[tokio::test]
494 async fn execute_query_uses_default_database_when_none() {
495 let executor = MockExecutor::new();
496 let calls = executor.calls.clone();
497 let pipeline = make_pipeline(executor, false);
498
499 let input = make_query_input(vec![]);
500 pipeline.execute_query(input).await.unwrap();
501
502 let recorded = calls.lock().unwrap();
503 assert_eq!(recorded[0].0, "test_db"); }
505
506 #[tokio::test]
507 async fn execute_query_skips_validation_when_no_schema() {
508 let pipeline = make_pipeline(MockExecutor::new(), false);
509 let clauses = vec![Clause::Match(vec![Pattern::Entity {
511 variable: "x".to_string(),
512 type_name: "nonexistent_type".to_string(),
513 constraints: vec![],
514 is_strict: false,
515 }])];
516 let input = make_query_input(clauses);
517 let result = pipeline.execute_query(input).await;
518 assert!(result.is_ok());
519 }
520
521 #[tokio::test]
522 async fn execute_query_validates_when_schema_present_valid() {
523 let pipeline = make_pipeline(MockExecutor::new(), true);
524 let input = make_query_input(make_simple_clauses());
525 let result = pipeline.execute_query(input).await;
526 assert!(result.is_ok());
527 }
528
529 #[tokio::test]
530 async fn execute_query_validates_when_schema_present_invalid() {
531 let pipeline = make_pipeline(MockExecutor::new(), true);
532 let clauses = vec![Clause::Match(vec![Pattern::Entity {
534 variable: "p".to_string(),
535 type_name: "person".to_string(),
536 constraints: vec![Constraint::Has {
537 attr_name: "nonexistent_attr".to_string(),
538 value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
539 value: serde_json::json!("val"),
540 value_type: "string".to_string(),
541 }),
542 }],
543 is_strict: false,
544 }])];
545 let input = make_query_input(clauses);
546 let result = pipeline.execute_query(input).await;
547 let err = result.unwrap_err();
548 assert!(matches!(&err, PipelineError::Validation(msg) if msg.contains("validation error")));
549 }
550
551 #[tokio::test]
552 async fn execute_query_request_interceptor_failure() {
553 assert_eq!(RejectingRequestInterceptor.name(), "rejector");
554 let pipeline = PipelineBuilder::new(MockExecutor::new())
555 .with_interceptor(RejectingRequestInterceptor)
556 .build()
557 .unwrap();
558 let input = make_query_input(vec![]);
559 let result = pipeline.execute_query(input).await;
560 let err = result.unwrap_err();
561 assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("test rejection")));
562 }
563
564 #[tokio::test]
565 async fn execute_query_executor_failure() {
566 let pipeline = make_pipeline(MockExecutor::failing("db crash"), false);
567 let input = make_query_input(vec![]);
568 let result = pipeline.execute_query(input).await;
569 let err = result.unwrap_err();
570 assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("db crash")));
571 }
572
573 #[tokio::test]
574 async fn execute_query_response_interceptor_failure() {
575 assert_eq!(RejectingResponseInterceptor.name(), "resp-rejector");
576 let pipeline = PipelineBuilder::new(MockExecutor::new())
577 .with_interceptor(RejectingResponseInterceptor)
578 .build()
579 .unwrap();
580 let input = make_query_input(vec![]);
581 let result = pipeline.execute_query(input).await;
582 let err = result.unwrap_err();
583 assert!(matches!(&err, PipelineError::Interceptor(msg) if msg.contains("response rejected")));
584 }
585
586 #[tokio::test]
587 async fn execute_query_success_output_fields() {
588 let _guard = init_tracing();
589 let count = Arc::new(AtomicUsize::new(0));
590 let pipeline = PipelineBuilder::new(MockExecutor::with_result(serde_json::json!({"ok": true})))
591 .with_default_database("test_db")
592 .with_interceptor(CountingInterceptor {
593 name: "counter".into(),
594 count: count.clone(),
595 })
596 .build()
597 .unwrap();
598
599 let input = make_query_input(vec![]);
600 let output = pipeline.execute_query(input).await.unwrap();
601
602 assert!(!output.request_id.is_empty());
603 assert_eq!(output.results, serde_json::json!({"ok": true}));
604 assert_eq!(output.interceptors_applied, vec!["counter"]);
605 assert_eq!(count.load(Ordering::SeqCst), 1);
607 }
608
609 #[tokio::test]
610 async fn execute_query_empty_clauses_success() {
611 let pipeline = make_pipeline(MockExecutor::new(), false);
612 let input = make_query_input(vec![]);
613 let result = pipeline.execute_query(input).await;
614 assert!(result.is_ok());
615 }
616
617 #[tokio::test]
618 async fn execute_query_compiled_typeql_in_metadata() {
619 let executor = MockExecutor::new();
620 let calls = executor.calls.clone();
621 let pipeline = make_pipeline(executor, false);
622
623 let clauses = make_simple_clauses();
624 let input = make_query_input(clauses);
625 pipeline.execute_query(input).await.unwrap();
626
627 let recorded = calls.lock().unwrap();
628 assert!(!recorded[0].1.is_empty());
630 }
631
632 #[tokio::test]
633 async fn execute_query_passes_transaction_type() {
634 let executor = MockExecutor::new();
635 let calls = executor.calls.clone();
636 let pipeline = make_pipeline(executor, false);
637
638 let input = QueryInput {
639 database: None,
640 transaction_type: "write".to_string(),
641 clauses: vec![],
642 metadata: HashMap::new(),
643 };
644 pipeline.execute_query(input).await.unwrap();
645
646 let recorded = calls.lock().unwrap();
647 assert_eq!(recorded[0].2, "write");
648 }
649
650 #[tokio::test]
655 async fn execute_raw_valid_typeql() {
656 let executor = MockExecutor::new();
657 let calls = executor.calls.clone();
658 let pipeline = make_pipeline(executor, false);
659
660 let input = RawQueryInput {
661 database: None,
662 transaction_type: "read".to_string(),
663 query: "match $p isa person;".to_string(),
664 metadata: HashMap::new(),
665 };
666 let result = pipeline.execute_raw(input).await;
667 assert!(result.is_ok());
668
669 let recorded = calls.lock().unwrap();
670 assert_eq!(recorded.len(), 1);
671 }
672
673 #[tokio::test]
674 async fn execute_raw_invalid_typeql() {
675 let pipeline = make_pipeline(MockExecutor::new(), false);
676 let input = RawQueryInput {
677 database: None,
678 transaction_type: "read".to_string(),
679 query: "this is totally invalid <<>>".to_string(),
680 metadata: HashMap::new(),
681 };
682 let result = pipeline.execute_raw(input).await;
683 let err = result.unwrap_err();
684 assert!(matches!(&err, PipelineError::Parse(_)));
685 }
686
687 #[tokio::test]
688 async fn execute_raw_database_passthrough() {
689 let executor = MockExecutor::new();
690 let calls = executor.calls.clone();
691 let pipeline = make_pipeline(executor, false);
692
693 let input = RawQueryInput {
694 database: Some("raw_db".to_string()),
695 transaction_type: "read".to_string(),
696 query: "match $p isa person;".to_string(),
697 metadata: HashMap::new(),
698 };
699 pipeline.execute_raw(input).await.unwrap();
700
701 let recorded = calls.lock().unwrap();
702 assert_eq!(recorded[0].0, "raw_db");
703 }
704
705 #[tokio::test]
706 async fn execute_raw_executor_failure() {
707 let pipeline = make_pipeline(MockExecutor::failing("raw fail"), false);
708 let input = RawQueryInput {
709 database: None,
710 transaction_type: "read".to_string(),
711 query: "match $p isa person;".to_string(),
712 metadata: HashMap::new(),
713 };
714 let result = pipeline.execute_raw(input).await;
715 let err = result.unwrap_err();
716 assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("raw fail")));
717 }
718
719 #[test]
724 fn validate_no_schema_returns_error() {
725 let pipeline = make_pipeline(MockExecutor::new(), false);
726 let input = ValidateInput { clauses: vec![] };
727 let result = pipeline.validate(&input);
728 let err = result.unwrap_err();
729 assert!(matches!(&err, PipelineError::Schema(msg) if msg.contains("No schema loaded")));
730 }
731
732 #[test]
733 fn validate_valid_clauses() {
734 let pipeline = make_pipeline(MockExecutor::new(), true);
735 let input = ValidateInput {
736 clauses: make_simple_clauses(),
737 };
738 let result = pipeline.validate(&input).unwrap();
739 assert!(result.is_valid);
740 assert!(result.errors.is_empty());
741 }
742
743 #[test]
744 fn validate_invalid_clauses() {
745 let pipeline = make_pipeline(MockExecutor::new(), true);
746 let input = ValidateInput {
747 clauses: vec![Clause::Match(vec![Pattern::Entity {
748 variable: "p".to_string(),
749 type_name: "person".to_string(),
750 constraints: vec![Constraint::Has {
751 attr_name: "nonexistent_attr".to_string(),
752 value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
753 value: serde_json::json!("val"),
754 value_type: "string".to_string(),
755 }),
756 }],
757 is_strict: false,
758 }])],
759 };
760 let result = pipeline.validate(&input).unwrap();
761 assert!(!result.is_valid);
762 assert!(!result.errors.is_empty());
763 }
764
765 #[test]
766 fn validate_error_detail_fields() {
767 let pipeline = make_pipeline(MockExecutor::new(), true);
768 let input = ValidateInput {
769 clauses: vec![Clause::Match(vec![Pattern::Entity {
770 variable: "x".to_string(),
771 type_name: "person".to_string(),
772 constraints: vec![Constraint::Has {
773 attr_name: "nonexistent_attr".to_string(),
774 value: Value::Literal(type_bridge_core_lib::ast::LiteralValue {
775 value: serde_json::json!("val"),
776 value_type: "string".to_string(),
777 }),
778 }],
779 is_strict: false,
780 }])],
781 };
782 let result = pipeline.validate(&input).unwrap();
783 assert!(!result.is_valid);
784 let error = &result.errors[0];
785 assert!(!error.code.is_empty());
786 assert!(!error.message.is_empty());
787 }
788
789 #[test]
790 fn validate_empty_clauses_with_schema() {
791 let pipeline = make_pipeline(MockExecutor::new(), true);
792 let input = ValidateInput { clauses: vec![] };
793 let result = pipeline.validate(&input).unwrap();
794 assert!(result.is_valid);
795 }
796
797 #[test]
802 fn schema_returns_some_when_loaded() {
803 let pipeline = make_pipeline(MockExecutor::new(), true);
804 assert!(pipeline.schema().is_some());
805 }
806
807 #[test]
808 fn schema_returns_none_when_not_loaded() {
809 let pipeline = make_pipeline(MockExecutor::new(), false);
810 assert!(pipeline.schema().is_none());
811 }
812
813 #[test]
814 fn is_connected_delegates_to_executor() {
815 let executor = MockExecutor::new();
816 *executor.connected.lock().unwrap() = true;
817 let pipeline = make_pipeline(executor, false);
818 assert!(pipeline.is_connected());
819 }
820
821 #[test]
822 fn is_connected_false_when_executor_disconnected() {
823 let executor = MockExecutor::new();
824 *executor.connected.lock().unwrap() = false;
825 let pipeline = make_pipeline(executor, false);
826 assert!(!pipeline.is_connected());
827 }
828
829 #[test]
830 fn default_database_returns_configured_value() {
831 let pipeline = PipelineBuilder::new(MockExecutor::new())
832 .with_default_database("my_database")
833 .build()
834 .unwrap();
835 assert_eq!(pipeline.default_database(), "my_database");
836 }
837
838 #[test]
839 fn default_database_empty_when_not_set() {
840 let pipeline = PipelineBuilder::new(MockExecutor::new()).build().unwrap();
841 assert_eq!(pipeline.default_database(), "");
842 }
843}