1use super::backend::{DriverBackend, QueryResultKind, TransactionType};
2use super::real_driver::RealTypeDBBackend;
3use crate::config::TypeDBSection;
4use crate::error::PipelineError;
5use crate::executor::QueryExecutor;
6
7pub struct TypeDBClient {
10 backend: Box<dyn DriverBackend>,
11}
12
13impl TypeDBClient {
14 #[cfg_attr(coverage_nightly, coverage(off))]
16 pub async fn connect(config: &TypeDBSection) -> Result<Self, PipelineError> {
17 let backend = RealTypeDBBackend::connect(config).await?;
18 Ok(Self {
19 backend: Box::new(backend),
20 })
21 }
22
23 #[cfg(test)]
25 pub(crate) fn with_backend(backend: Box<dyn DriverBackend>) -> Self {
26 Self { backend }
27 }
28
29 pub async fn execute(
34 &self,
35 database: &str,
36 typeql: &str,
37 tx_type: &str,
38 ) -> Result<serde_json::Value, PipelineError> {
39 let transaction_type = parse_transaction_type(tx_type)?;
40
41 let mut tx = self
42 .backend
43 .open_transaction(database, transaction_type)
44 .await?;
45
46 let answer = tx.query(typeql).await?;
47
48 let needs_commit = matches!(
49 transaction_type,
50 TransactionType::Write | TransactionType::Schema
51 );
52
53 let results = match answer {
54 QueryResultKind::Ok => {
55 if needs_commit {
56 tx.commit().await?;
57 }
58 serde_json::json!({ "ok": true })
59 }
60 QueryResultKind::Rows(rows) => {
61 if needs_commit {
62 tx.commit().await?;
63 }
64 serde_json::Value::Array(rows)
65 }
66 QueryResultKind::Documents(docs) => {
67 if needs_commit {
68 let _ = tx.commit().await;
69 }
70 serde_json::Value::Array(docs)
71 }
72 };
73
74 Ok(results)
75 }
76
77 pub async fn database_exists(&self, database: &str) -> Result<bool, PipelineError> {
79 self.backend.database_exists(database).await
80 }
81
82 pub async fn create_database(&self, database: &str) -> Result<(), PipelineError> {
84 self.backend.create_database(database).await
85 }
86
87 pub async fn delete_database(&self, database: &str) -> Result<(), PipelineError> {
89 self.backend.delete_database(database).await
90 }
91
92 pub async fn reset_database(&self, database: &str) -> Result<(), PipelineError> {
94 if self.database_exists(database).await? {
95 self.delete_database(database).await?;
96 }
97 self.create_database(database).await
98 }
99
100 pub fn is_connected(&self) -> bool {
102 self.backend.is_open()
103 }
104}
105
106impl QueryExecutor for TypeDBClient {
107 fn execute<'a>(
108 &'a self,
109 database: &'a str,
110 typeql: &'a str,
111 transaction_type: &'a str,
112 ) -> std::pin::Pin<
113 Box<dyn std::future::Future<Output = Result<serde_json::Value, PipelineError>> + Send + 'a>,
114 > {
115 Box::pin(async move { self.execute(database, typeql, transaction_type).await })
116 }
117
118 fn is_connected(&self) -> bool {
119 self.is_connected()
120 }
121}
122
123pub(crate) fn parse_transaction_type(tx_type: &str) -> Result<TransactionType, PipelineError> {
125 match tx_type {
126 "read" => Ok(TransactionType::Read),
127 "write" => Ok(TransactionType::Write),
128 "schema" => Ok(TransactionType::Schema),
129 other => Err(PipelineError::QueryExecution(format!(
130 "Unknown transaction type: {other}"
131 ))),
132 }
133}
134
135#[cfg(test)]
136#[cfg(feature = "band8")]
137#[cfg_attr(coverage_nightly, coverage(off))]
138mod tests {
139 use std::collections::VecDeque;
140 use std::future::Future;
141 use std::pin::Pin;
142 use std::sync::Arc;
143 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
144
145 use super::*;
146 use crate::error::PipelineError;
147 use crate::typedb::backend::TransactionOps;
148
149 struct MockTransaction {
154 query_result: Option<QueryResultKind>,
155 query_error: Option<String>,
156 commit_error: Option<String>,
157 committed: Arc<AtomicBool>,
158 query_called: Arc<AtomicBool>,
159 }
160
161 impl MockTransaction {
162 fn new(result: QueryResultKind) -> Self {
163 Self {
164 query_result: Some(result),
165 query_error: None,
166 commit_error: None,
167 committed: Arc::new(AtomicBool::new(false)),
168 query_called: Arc::new(AtomicBool::new(false)),
169 }
170 }
171
172 fn failing_query(msg: &str) -> Self {
173 Self {
174 query_result: None,
175 query_error: Some(msg.to_string()),
176 commit_error: None,
177 committed: Arc::new(AtomicBool::new(false)),
178 query_called: Arc::new(AtomicBool::new(false)),
179 }
180 }
181
182 fn with_commit_error(mut self, msg: &str) -> Self {
183 self.commit_error = Some(msg.to_string());
184 self
185 }
186 }
187
188 impl TransactionOps for MockTransaction {
189 fn query(
190 &mut self,
191 _typeql: &str,
192 ) -> Pin<Box<dyn Future<Output = Result<QueryResultKind, PipelineError>> + Send + '_>>
193 {
194 self.query_called.store(true, Ordering::SeqCst);
195 let result = self.query_result.take();
196 let error = self.query_error.take();
197 Box::pin(async move {
198 if let Some(msg) = error {
199 return Err(PipelineError::QueryExecution(msg));
200 }
201 Ok(result.expect("MockTransaction::query called more than once"))
202 })
203 }
204
205 fn commit(
206 &mut self,
207 ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
208 self.committed.store(true, Ordering::SeqCst);
209 let error = self.commit_error.take();
210 Box::pin(async move {
211 if let Some(msg) = error {
212 return Err(PipelineError::QueryExecution(msg));
213 }
214 Ok(())
215 })
216 }
217 }
218
219 #[derive(Default)]
220 struct MockDatabaseAdmin {
221 exists_results: std::sync::Mutex<VecDeque<Result<bool, String>>>,
222 create_errors: std::sync::Mutex<VecDeque<String>>,
223 delete_errors: std::sync::Mutex<VecDeque<String>>,
224 exists_called: AtomicUsize,
225 create_called: AtomicUsize,
226 delete_called: AtomicUsize,
227 operations: std::sync::Mutex<Vec<String>>,
228 }
229
230 struct MockBackend {
231 transaction: std::sync::Mutex<Option<MockTransaction>>,
232 open_error: Option<String>,
233 is_open: bool,
234 open_called: Arc<AtomicUsize>,
235 database_admin: Arc<MockDatabaseAdmin>,
236 }
237
238 impl MockBackend {
239 fn new(tx: MockTransaction) -> Self {
240 Self {
241 transaction: std::sync::Mutex::new(Some(tx)),
242 open_error: None,
243 is_open: true,
244 open_called: Arc::new(AtomicUsize::new(0)),
245 database_admin: Arc::new(MockDatabaseAdmin::default()),
246 }
247 }
248
249 fn failing(msg: &str) -> Self {
250 Self {
251 transaction: std::sync::Mutex::new(None),
252 open_error: Some(msg.to_string()),
253 is_open: true,
254 open_called: Arc::new(AtomicUsize::new(0)),
255 database_admin: Arc::new(MockDatabaseAdmin::default()),
256 }
257 }
258
259 fn with_database_exists_results(self, results: Vec<Result<bool, String>>) -> Self {
260 *self.database_admin.exists_results.lock().unwrap() = results.into();
261 self
262 }
263
264 fn with_create_errors(self, errors: Vec<&str>) -> Self {
265 *self.database_admin.create_errors.lock().unwrap() =
266 errors.into_iter().map(str::to_string).collect();
267 self
268 }
269
270 fn with_delete_errors(self, errors: Vec<&str>) -> Self {
271 *self.database_admin.delete_errors.lock().unwrap() =
272 errors.into_iter().map(str::to_string).collect();
273 self
274 }
275
276 fn database_admin_state(&self) -> Arc<MockDatabaseAdmin> {
277 Arc::clone(&self.database_admin)
278 }
279 }
280
281 impl DriverBackend for MockBackend {
282 fn open_transaction(
283 &self,
284 _database: &str,
285 _tx_type: TransactionType,
286 ) -> Pin<Box<dyn Future<Output = Result<Box<dyn TransactionOps>, PipelineError>> + Send + '_>>
287 {
288 self.open_called.fetch_add(1, Ordering::SeqCst);
289 let tx = self.transaction.lock().unwrap().take();
290 let error = self.open_error.clone();
291 Box::pin(async move {
292 if let Some(msg) = error {
293 return Err(PipelineError::QueryExecution(msg));
294 }
295 Ok(
296 Box::new(tx.expect("MockBackend: no transaction configured"))
297 as Box<dyn TransactionOps>,
298 )
299 })
300 }
301
302 fn is_open(&self) -> bool {
303 self.is_open
304 }
305
306 fn database_exists(
307 &self,
308 database: &str,
309 ) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + '_>> {
310 self.database_admin
311 .exists_called
312 .fetch_add(1, Ordering::SeqCst);
313 self.database_admin
314 .operations
315 .lock()
316 .unwrap()
317 .push(format!("exists:{database}"));
318 let result = self
319 .database_admin
320 .exists_results
321 .lock()
322 .unwrap()
323 .pop_front()
324 .unwrap_or(Ok(false));
325 Box::pin(async move { result.map_err(PipelineError::Connection) })
326 }
327
328 fn create_database(
329 &self,
330 database: &str,
331 ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
332 self.database_admin
333 .create_called
334 .fetch_add(1, Ordering::SeqCst);
335 self.database_admin
336 .operations
337 .lock()
338 .unwrap()
339 .push(format!("create:{database}"));
340 let error = self
341 .database_admin
342 .create_errors
343 .lock()
344 .unwrap()
345 .pop_front();
346 Box::pin(async move {
347 if let Some(msg) = error {
348 return Err(PipelineError::Connection(msg));
349 }
350 Ok(())
351 })
352 }
353
354 fn delete_database(
355 &self,
356 database: &str,
357 ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
358 self.database_admin
359 .delete_called
360 .fetch_add(1, Ordering::SeqCst);
361 self.database_admin
362 .operations
363 .lock()
364 .unwrap()
365 .push(format!("delete:{database}"));
366 let error = self
367 .database_admin
368 .delete_errors
369 .lock()
370 .unwrap()
371 .pop_front();
372 Box::pin(async move {
373 if let Some(msg) = error {
374 return Err(PipelineError::Connection(msg));
375 }
376 Ok(())
377 })
378 }
379 }
380
381 fn make_client(backend: MockBackend) -> TypeDBClient {
382 TypeDBClient::with_backend(Box::new(backend))
383 }
384
385 #[test]
390 fn parse_transaction_type_read() {
391 let result = parse_transaction_type("read").unwrap();
392 assert_eq!(result, TransactionType::Read);
393 }
394
395 #[test]
396 fn parse_transaction_type_write() {
397 let result = parse_transaction_type("write").unwrap();
398 assert_eq!(result, TransactionType::Write);
399 }
400
401 #[test]
402 fn parse_transaction_type_schema() {
403 let result = parse_transaction_type("schema").unwrap();
404 assert_eq!(result, TransactionType::Schema);
405 }
406
407 #[test]
408 fn parse_transaction_type_unknown() {
409 let result = parse_transaction_type("unknown");
410 let err = result.unwrap_err();
411 assert!(
412 matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("Unknown transaction type: unknown"))
413 );
414 }
415
416 #[test]
417 fn parse_transaction_type_empty() {
418 let result = parse_transaction_type("");
419 assert!(result.is_err());
420 }
421
422 #[test]
423 fn parse_transaction_type_case_sensitive() {
424 let result = parse_transaction_type("Read");
425 assert!(result.is_err());
426 }
427
428 #[tokio::test]
433 async fn execute_ok_read_no_commit() {
434 let tx = MockTransaction::new(QueryResultKind::Ok);
435 let committed = tx.committed.clone();
436 let client = make_client(MockBackend::new(tx));
437
438 let result = client
439 .execute("db", "match $x isa thing;", "read")
440 .await
441 .unwrap();
442 assert_eq!(result, serde_json::json!({"ok": true}));
443 assert!(!committed.load(Ordering::SeqCst));
444 }
445
446 #[tokio::test]
447 async fn execute_ok_write_commits() {
448 let tx = MockTransaction::new(QueryResultKind::Ok);
449 let committed = tx.committed.clone();
450 let client = make_client(MockBackend::new(tx));
451
452 let result = client
453 .execute("db", "insert $x isa thing;", "write")
454 .await
455 .unwrap();
456 assert_eq!(result, serde_json::json!({"ok": true}));
457 assert!(committed.load(Ordering::SeqCst));
458 }
459
460 #[tokio::test]
461 async fn execute_ok_schema_commits() {
462 let tx = MockTransaction::new(QueryResultKind::Ok);
463 let committed = tx.committed.clone();
464 let client = make_client(MockBackend::new(tx));
465
466 let result = client
467 .execute("db", "define entity thing;", "schema")
468 .await
469 .unwrap();
470 assert_eq!(result, serde_json::json!({"ok": true}));
471 assert!(committed.load(Ordering::SeqCst));
472 }
473
474 #[tokio::test]
475 async fn execute_rows_read_no_commit() {
476 let rows = vec![
477 serde_json::json!({"name": "Alice"}),
478 serde_json::json!({"name": "Bob"}),
479 ];
480 let tx = MockTransaction::new(QueryResultKind::Rows(rows.clone()));
481 let committed = tx.committed.clone();
482 let client = make_client(MockBackend::new(tx));
483
484 let result = client
485 .execute("db", "match $p isa person;", "read")
486 .await
487 .unwrap();
488 assert_eq!(result, serde_json::Value::Array(rows));
489 assert!(!committed.load(Ordering::SeqCst));
490 }
491
492 #[tokio::test]
493 async fn execute_rows_write_commits() {
494 let rows = vec![serde_json::json!({"id": 1})];
495 let tx = MockTransaction::new(QueryResultKind::Rows(rows));
496 let committed = tx.committed.clone();
497 let client = make_client(MockBackend::new(tx));
498
499 let result = client
500 .execute("db", "insert $x isa thing;", "write")
501 .await
502 .unwrap();
503 assert!(result.is_array());
504 assert!(committed.load(Ordering::SeqCst));
505 }
506
507 #[tokio::test]
508 async fn execute_rows_data_preserved() {
509 let rows = vec![
510 serde_json::json!({"name": "Alice", "age": 30}),
511 serde_json::json!({"name": "Bob", "age": 25}),
512 ];
513 let tx = MockTransaction::new(QueryResultKind::Rows(rows.clone()));
514 let client = make_client(MockBackend::new(tx));
515
516 let result = client
517 .execute("db", "match $p isa person;", "read")
518 .await
519 .unwrap();
520 let arr = result.as_array().unwrap();
521 assert_eq!(arr.len(), 2);
522 assert_eq!(arr[0]["name"], "Alice");
523 assert_eq!(arr[1]["age"], 25);
524 }
525
526 #[tokio::test]
527 async fn execute_docs_read_no_commit() {
528 let docs = vec![serde_json::json!({"doc": "data"})];
529 let tx = MockTransaction::new(QueryResultKind::Documents(docs.clone()));
530 let committed = tx.committed.clone();
531 let client = make_client(MockBackend::new(tx));
532
533 let result = client
534 .execute("db", "match $p isa person; fetch {};", "read")
535 .await
536 .unwrap();
537 assert_eq!(result, serde_json::Value::Array(docs));
538 assert!(!committed.load(Ordering::SeqCst));
539 }
540
541 #[tokio::test]
542 async fn execute_docs_write_commits() {
543 let docs = vec![serde_json::json!({"doc": "data"})];
544 let tx = MockTransaction::new(QueryResultKind::Documents(docs));
545 let committed = tx.committed.clone();
546 let client = make_client(MockBackend::new(tx));
547
548 let result = client
549 .execute("db", "insert $x isa thing;", "write")
550 .await
551 .unwrap();
552 assert!(result.is_array());
553 assert!(committed.load(Ordering::SeqCst));
554 }
555
556 #[tokio::test]
557 async fn execute_docs_commit_error_ignored() {
558 let docs = vec![serde_json::json!({"doc": "data"})];
559 let tx = MockTransaction::new(QueryResultKind::Documents(docs.clone()))
560 .with_commit_error("commit failed");
561 let client = make_client(MockBackend::new(tx));
562
563 let result = client
565 .execute("db", "insert $x isa thing;", "write")
566 .await
567 .unwrap();
568 assert_eq!(result, serde_json::Value::Array(docs));
569 }
570
571 #[tokio::test]
572 async fn execute_transaction_open_failure() {
573 let client = make_client(MockBackend::failing("connection refused"));
574
575 let result = client.execute("db", "match $x isa thing;", "read").await;
576 let err = result.unwrap_err();
577 assert!(
578 matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("connection refused"))
579 );
580 }
581
582 #[tokio::test]
583 async fn execute_query_failure() {
584 let tx = MockTransaction::failing_query("syntax error");
585 let client = make_client(MockBackend::new(tx));
586
587 let result = client.execute("db", "bad query", "read").await;
588 let err = result.unwrap_err();
589 assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("syntax error")));
590 }
591
592 #[tokio::test]
593 async fn execute_commit_failure_ok_propagated() {
594 let tx = MockTransaction::new(QueryResultKind::Ok).with_commit_error("commit failed");
595 let client = make_client(MockBackend::new(tx));
596
597 let result = client.execute("db", "insert $x isa thing;", "write").await;
599 let err = result.unwrap_err();
600 assert!(
601 matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("commit failed"))
602 );
603 }
604
605 #[tokio::test]
606 async fn execute_commit_failure_rows_propagated() {
607 let tx =
608 MockTransaction::new(QueryResultKind::Rows(vec![])).with_commit_error("commit failed");
609 let client = make_client(MockBackend::new(tx));
610
611 let result = client.execute("db", "insert $x isa thing;", "write").await;
613 let err = result.unwrap_err();
614 assert!(
615 matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("commit failed"))
616 );
617 }
618
619 #[tokio::test]
620 async fn execute_invalid_transaction_type() {
621 let tx = MockTransaction::new(QueryResultKind::Ok);
622 let backend = MockBackend::new(tx);
623 let open_called = backend.open_called.clone();
624 let client = make_client(backend);
625
626 let result = client.execute("db", "match $x;", "invalid").await;
627 assert!(result.is_err());
628 assert_eq!(open_called.load(Ordering::SeqCst), 0);
630 }
631
632 #[test]
633 fn is_connected_delegates_to_backend() {
634 let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
635 backend.is_open = true;
636 let client = make_client(backend);
637 assert!(client.is_connected());
638 }
639
640 #[test]
641 fn is_connected_false_when_backend_closed() {
642 let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
643 backend.is_open = false;
644 let client = make_client(backend);
645 assert!(!client.is_connected());
646 }
647
648 #[tokio::test]
653 async fn database_exists_true_delegates_to_backend() {
654 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
655 .with_database_exists_results(vec![Ok(true)]);
656 let admin = backend.database_admin_state();
657 let client = make_client(backend);
658
659 assert!(client.database_exists("admin_db").await.unwrap());
660 assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
661 assert_eq!(
662 admin.operations.lock().unwrap().clone(),
663 vec!["exists:admin_db".to_string()]
664 );
665 }
666
667 #[tokio::test]
668 async fn database_exists_false_delegates_to_backend() {
669 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
670 .with_database_exists_results(vec![Ok(false)]);
671 let admin = backend.database_admin_state();
672 let client = make_client(backend);
673
674 assert!(!client.database_exists("missing_db").await.unwrap());
675 assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
676 assert_eq!(
677 admin.operations.lock().unwrap().clone(),
678 vec!["exists:missing_db".to_string()]
679 );
680 }
681
682 #[tokio::test]
683 async fn create_database_delegates_to_backend() {
684 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
685 let admin = backend.database_admin_state();
686 let client = make_client(backend);
687
688 client.create_database("new_db").await.unwrap();
689 assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
690 assert_eq!(
691 admin.operations.lock().unwrap().clone(),
692 vec!["create:new_db".to_string()]
693 );
694 }
695
696 #[tokio::test]
697 async fn delete_database_delegates_to_backend() {
698 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
699 let admin = backend.database_admin_state();
700 let client = make_client(backend);
701
702 client.delete_database("old_db").await.unwrap();
703 assert_eq!(admin.delete_called.load(Ordering::SeqCst), 1);
704 assert_eq!(
705 admin.operations.lock().unwrap().clone(),
706 vec!["delete:old_db".to_string()]
707 );
708 }
709
710 #[tokio::test]
711 async fn reset_database_deletes_existing_database_before_create() {
712 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
713 .with_database_exists_results(vec![Ok(true)]);
714 let admin = backend.database_admin_state();
715 let client = make_client(backend);
716
717 client.reset_database("reset_db").await.unwrap();
718 assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
719 assert_eq!(admin.delete_called.load(Ordering::SeqCst), 1);
720 assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
721 assert_eq!(
722 admin.operations.lock().unwrap().clone(),
723 vec![
724 "exists:reset_db".to_string(),
725 "delete:reset_db".to_string(),
726 "create:reset_db".to_string(),
727 ]
728 );
729 }
730
731 #[tokio::test]
732 async fn reset_database_creates_when_database_is_absent() {
733 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
734 .with_database_exists_results(vec![Ok(false)]);
735 let admin = backend.database_admin_state();
736 let client = make_client(backend);
737
738 client.reset_database("reset_db").await.unwrap();
739 assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
740 assert_eq!(admin.delete_called.load(Ordering::SeqCst), 0);
741 assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
742 assert_eq!(
743 admin.operations.lock().unwrap().clone(),
744 vec!["exists:reset_db".to_string(), "create:reset_db".to_string(),]
745 );
746 }
747
748 #[tokio::test]
749 async fn database_exists_propagates_backend_error() {
750 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
751 .with_database_exists_results(vec![Err("lookup failed".to_string())]);
752 let client = make_client(backend);
753
754 let err = client.database_exists("db").await.unwrap_err();
755 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("lookup failed")));
756 }
757
758 #[tokio::test]
759 async fn create_database_propagates_backend_error() {
760 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
761 .with_create_errors(vec!["create failed"]);
762 let client = make_client(backend);
763
764 let err = client.create_database("db").await.unwrap_err();
765 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("create failed")));
766 }
767
768 #[tokio::test]
769 async fn delete_database_propagates_backend_error() {
770 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
771 .with_delete_errors(vec!["delete failed"]);
772 let client = make_client(backend);
773
774 let err = client.delete_database("db").await.unwrap_err();
775 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("delete failed")));
776 }
777
778 #[tokio::test]
779 async fn reset_database_propagates_lookup_error_without_mutating() {
780 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
781 .with_database_exists_results(vec![Err("lookup failed".to_string())]);
782 let admin = backend.database_admin_state();
783 let client = make_client(backend);
784
785 let err = client.reset_database("db").await.unwrap_err();
786 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("lookup failed")));
787 assert_eq!(admin.delete_called.load(Ordering::SeqCst), 0);
788 assert_eq!(admin.create_called.load(Ordering::SeqCst), 0);
789 assert_eq!(
790 admin.operations.lock().unwrap().clone(),
791 vec!["exists:db".to_string()]
792 );
793 }
794
795 #[tokio::test]
796 async fn reset_database_propagates_delete_error_without_create() {
797 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
798 .with_database_exists_results(vec![Ok(true)])
799 .with_delete_errors(vec!["delete failed"]);
800 let admin = backend.database_admin_state();
801 let client = make_client(backend);
802
803 let err = client.reset_database("db").await.unwrap_err();
804 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("delete failed")));
805 assert_eq!(admin.create_called.load(Ordering::SeqCst), 0);
806 assert_eq!(
807 admin.operations.lock().unwrap().clone(),
808 vec!["exists:db".to_string(), "delete:db".to_string()]
809 );
810 }
811
812 #[test]
817 fn type_db_client_implements_query_executor() {
818 fn assert_executor<T: QueryExecutor>() {}
819 assert_executor::<TypeDBClient>();
820 }
821
822 #[tokio::test]
823 async fn query_executor_execute_delegates_to_client() {
824 let tx = MockTransaction::new(QueryResultKind::Rows(vec![serde_json::json!({"x": 1})]));
825 let client = make_client(MockBackend::new(tx));
826 let executor: Box<dyn QueryExecutor> = Box::new(client);
827
828 let result = executor
829 .execute("db", "match $x isa thing;", "read")
830 .await
831 .unwrap();
832 assert!(result.is_array());
833 assert_eq!(result.as_array().unwrap().len(), 1);
834 }
835
836 #[test]
837 fn query_executor_is_connected_delegates_to_client() {
838 let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
839 backend.is_open = true;
840 let client = make_client(backend);
841 let executor: Box<dyn QueryExecutor> = Box::new(client);
842 assert!(executor.is_connected());
843 }
844
845 #[tokio::test]
850 #[ignore = "requires running TypeDB server"]
851 #[cfg_attr(coverage_nightly, coverage(off))]
852 async fn integration_connect_invalid_address() {
853 let config = TypeDBSection {
854 address: "localhost:99999".to_string(),
855 database: "test".to_string(),
856 username: "admin".to_string(),
857 password: "password".to_string(),
858 http_port: 8000,
859 server_version: None,
860 };
861 let result = TypeDBClient::connect(&config).await;
862 assert!(result.is_err());
863 }
864
865 fn live_config() -> TypeDBSection {
868 TypeDBSection {
869 address: std::env::var("TYPEDB_ADDRESS")
870 .unwrap_or_else(|_| "localhost:1729".to_string()),
871 database: std::env::var("TYPEDB_DATABASE").unwrap_or_else(|_| "test".to_string()),
872 username: "admin".to_string(),
873 password: "password".to_string(),
874 http_port: std::env::var("TYPEDB_HTTP_PORT")
875 .ok()
876 .and_then(|port| port.parse().ok())
877 .unwrap_or(8000),
878 server_version: std::env::var("TYPEDB_SERVER_VERSION").ok(),
879 }
880 }
881
882 #[tokio::test]
883 #[ignore = "requires running TypeDB server"]
884 #[cfg_attr(coverage_nightly, coverage(off))]
885 async fn integration_connect_success() {
886 let result = TypeDBClient::connect(&live_config()).await;
887 assert!(result.is_ok());
888 assert!(result.unwrap().is_connected());
889 }
890
891 #[tokio::test]
892 #[ignore = "requires running TypeDB server"]
893 #[cfg_attr(coverage_nightly, coverage(off))]
894 async fn integration_database_admin_roundtrip() {
895 let config = live_config();
896 let client = TypeDBClient::connect(&config)
897 .await
898 .expect("connect failed");
899 let database = format!("type_bridge_server_admin_{}", uuid::Uuid::new_v4().simple());
900
901 if client.database_exists(&database).await.unwrap_or(false) {
902 let _ = client.delete_database(&database).await;
903 }
904
905 assert!(!client.database_exists(&database).await.unwrap());
906 client
907 .create_database(&database)
908 .await
909 .expect("create failed");
910 assert!(client.database_exists(&database).await.unwrap());
911
912 client
913 .reset_database(&database)
914 .await
915 .expect("reset existing database failed");
916 assert!(client.database_exists(&database).await.unwrap());
917
918 client
919 .delete_database(&database)
920 .await
921 .expect("delete failed");
922 assert!(!client.database_exists(&database).await.unwrap());
923
924 client
925 .reset_database(&database)
926 .await
927 .expect("reset absent database failed");
928 assert!(client.database_exists(&database).await.unwrap());
929
930 client
931 .delete_database(&database)
932 .await
933 .expect("cleanup delete failed");
934 }
935
936 #[tokio::test]
937 #[ignore = "requires running TypeDB server"]
938 #[cfg_attr(coverage_nightly, coverage(off))]
939 async fn integration_execute_roundtrip() {
940 let config = live_config();
941 let client = TypeDBClient::connect(&config)
942 .await
943 .expect("connect failed");
944
945 client
946 .execute(&config.database, "define entity smoke_marker;", "schema")
947 .await
948 .expect("schema define failed");
949
950 let rows = client
951 .execute(&config.database, "match entity $t;", "read")
952 .await
953 .expect("read query failed");
954 let rows = rows.as_array().expect("read result must be a JSON array");
955 assert!(
956 !rows.is_empty(),
957 "expected at least the smoke_marker entity type"
958 );
959 }
960}