1use super::backend::{DriverBackend, QueryResultKind, TransactionType};
2use super::real_driver::RealTypeDBBackend;
3use crate::config::TypeDBSection;
4use crate::error::PipelineError;
5use crate::executor::QueryExecutor;
6
7pub struct TypeDBClient {
10 backend: Box<dyn DriverBackend>,
11}
12
13impl TypeDBClient {
14 #[cfg_attr(coverage_nightly, coverage(off))]
16 pub async fn connect(config: &TypeDBSection) -> Result<Self, PipelineError> {
17 let backend = RealTypeDBBackend::connect(config).await?;
18 Ok(Self {
19 backend: Box::new(backend),
20 })
21 }
22
23 #[cfg(test)]
25 pub(crate) fn with_backend(backend: Box<dyn DriverBackend>) -> Self {
26 Self { backend }
27 }
28
29 pub async fn execute(
34 &self,
35 database: &str,
36 typeql: &str,
37 tx_type: &str,
38 ) -> Result<serde_json::Value, PipelineError> {
39 let transaction_type = parse_transaction_type(tx_type)?;
40
41 let mut tx = self
42 .backend
43 .open_transaction(database, transaction_type)
44 .await?;
45
46 let answer = tx.query(typeql).await?;
47
48 let needs_commit = matches!(
49 transaction_type,
50 TransactionType::Write | TransactionType::Schema
51 );
52
53 let results = match answer {
54 QueryResultKind::Ok => {
55 if needs_commit {
56 tx.commit().await?;
57 }
58 serde_json::json!({ "ok": true })
59 }
60 QueryResultKind::Rows(rows) => {
61 if needs_commit {
62 tx.commit().await?;
63 }
64 serde_json::Value::Array(rows)
65 }
66 QueryResultKind::Documents(docs) => {
67 if needs_commit {
68 let _ = tx.commit().await;
69 }
70 serde_json::Value::Array(docs)
71 }
72 };
73
74 Ok(results)
75 }
76
77 pub async fn database_exists(&self, database: &str) -> Result<bool, PipelineError> {
79 self.backend.database_exists(database).await
80 }
81
82 pub async fn create_database(&self, database: &str) -> Result<(), PipelineError> {
84 self.backend.create_database(database).await
85 }
86
87 pub async fn delete_database(&self, database: &str) -> Result<(), PipelineError> {
89 self.backend.delete_database(database).await
90 }
91
92 pub async fn reset_database(&self, database: &str) -> Result<(), PipelineError> {
94 if self.database_exists(database).await? {
95 self.delete_database(database).await?;
96 }
97 self.create_database(database).await
98 }
99
100 pub fn is_connected(&self) -> bool {
102 self.backend.is_open()
103 }
104}
105
106impl QueryExecutor for TypeDBClient {
107 fn execute<'a>(
108 &'a self,
109 database: &'a str,
110 typeql: &'a str,
111 transaction_type: &'a str,
112 ) -> std::pin::Pin<
113 Box<dyn std::future::Future<Output = Result<serde_json::Value, PipelineError>> + Send + 'a>,
114 > {
115 Box::pin(async move { self.execute(database, typeql, transaction_type).await })
116 }
117
118 fn is_connected(&self) -> bool {
119 self.is_connected()
120 }
121}
122
123pub(crate) fn parse_transaction_type(tx_type: &str) -> Result<TransactionType, PipelineError> {
125 match tx_type {
126 "read" => Ok(TransactionType::Read),
127 "write" => Ok(TransactionType::Write),
128 "schema" => Ok(TransactionType::Schema),
129 other => Err(PipelineError::QueryExecution(format!(
130 "Unknown transaction type: {other}"
131 ))),
132 }
133}
134
135#[cfg(feature = "band7")]
139pub(crate) fn concept_to_json_b7(
140 concept: &type_bridge_typedb_driver_b7::concept::Concept,
141) -> serde_json::Value {
142 let mut obj = serde_json::Map::new();
143 obj.insert(
144 "category".to_string(),
145 serde_json::Value::String(concept.get_category().name().to_string()),
146 );
147 obj.insert(
148 "label".to_string(),
149 serde_json::Value::String(concept.get_label().to_string()),
150 );
151 if let Some(iid) = concept.try_get_iid() {
152 obj.insert(
153 "iid".to_string(),
154 serde_json::Value::String(iid.to_string()),
155 );
156 }
157 if let Some(value) = concept.try_get_value() {
158 obj.insert("value".to_string(), value_to_json_b7(value));
159 }
160 if let Some(value_type) = concept.try_get_value_type() {
161 obj.insert(
162 "value_type".to_string(),
163 serde_json::Value::String(value_type.name().to_string()),
164 );
165 }
166 serde_json::Value::Object(obj)
167}
168
169#[cfg(feature = "band7")]
171#[cfg_attr(coverage_nightly, coverage(off))]
172fn value_to_json_b7(value: &type_bridge_typedb_driver_b7::concept::Value) -> serde_json::Value {
173 if let Some(b) = value.get_boolean() {
174 return serde_json::Value::Bool(b);
175 }
176 if let Some(i) = value.get_integer() {
177 return serde_json::json!(i);
178 }
179 if let Some(d) = value.get_double() {
180 return serde_json::json!(d);
181 }
182 if let Some(s) = value.get_string() {
183 return serde_json::Value::String(s.to_string());
184 }
185 if let Some(date) = value.get_date() {
186 return serde_json::Value::String(date.to_string());
187 }
188 if let Some(dt) = value.get_datetime() {
189 return serde_json::Value::String(dt.to_string());
190 }
191 if let Some(dt_tz) = value.get_datetime_tz() {
192 return serde_json::Value::String(dt_tz.to_string());
193 }
194 if let Some(dec) = value.get_decimal() {
195 return serde_json::Value::String(dec.to_string());
196 }
197 if let Some(dur) = value.get_duration() {
198 return serde_json::Value::String(dur.to_string());
199 }
200 serde_json::Value::String(value.to_string())
201}
202
203#[cfg(feature = "band8")]
207pub(crate) fn concept_to_json_b8(concept: &typedb_driver::concept::Concept) -> serde_json::Value {
208 let mut obj = serde_json::Map::new();
209 obj.insert(
210 "category".to_string(),
211 serde_json::Value::String(concept.get_category().name().to_string()),
212 );
213 obj.insert(
214 "label".to_string(),
215 serde_json::Value::String(concept.get_label().to_string()),
216 );
217 if let Some(iid) = concept.try_get_iid() {
218 obj.insert(
219 "iid".to_string(),
220 serde_json::Value::String(iid.to_string()),
221 );
222 }
223 if let Some(value) = concept.try_get_value() {
224 obj.insert("value".to_string(), value_to_json_b8(value));
225 }
226 if let Some(value_type) = concept.try_get_value_type() {
227 obj.insert(
228 "value_type".to_string(),
229 serde_json::Value::String(value_type.name().to_string()),
230 );
231 }
232 serde_json::Value::Object(obj)
233}
234
235#[cfg(feature = "band8")]
237#[cfg_attr(coverage_nightly, coverage(off))]
238pub(crate) fn value_to_json_b8(value: &typedb_driver::concept::Value) -> serde_json::Value {
239 if let Some(b) = value.get_boolean() {
240 return serde_json::Value::Bool(b);
241 }
242 if let Some(i) = value.get_integer() {
243 return serde_json::json!(i);
244 }
245 if let Some(d) = value.get_double() {
246 return serde_json::json!(d);
247 }
248 if let Some(s) = value.get_string() {
249 return serde_json::Value::String(s.to_string());
250 }
251 if let Some(date) = value.get_date() {
252 return serde_json::Value::String(date.to_string());
253 }
254 if let Some(dt) = value.get_datetime() {
255 return serde_json::Value::String(dt.to_string());
256 }
257 if let Some(dt_tz) = value.get_datetime_tz() {
258 return serde_json::Value::String(dt_tz.to_string());
259 }
260 if let Some(dec) = value.get_decimal() {
261 return serde_json::Value::String(dec.to_string());
262 }
263 if let Some(dur) = value.get_duration() {
264 return serde_json::Value::String(dur.to_string());
265 }
266 serde_json::Value::String(value.to_string())
268}
269
270#[cfg(test)]
271#[cfg(feature = "band8")]
272#[cfg_attr(coverage_nightly, coverage(off))]
273mod tests {
274 use std::collections::VecDeque;
275 use std::future::Future;
276 use std::pin::Pin;
277 use std::sync::Arc;
278 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
279
280 use typedb_driver::IID;
281 use typedb_driver::concept::value::{Decimal, Duration, TimeZone};
282 use typedb_driver::concept::{
283 Attribute, AttributeType, Concept, Entity, EntityType, Value, ValueType,
284 };
285
286 use super::*;
287 use crate::error::PipelineError;
288 use crate::typedb::backend::TransactionOps;
289
290 struct MockTransaction {
295 query_result: Option<QueryResultKind>,
296 query_error: Option<String>,
297 commit_error: Option<String>,
298 committed: Arc<AtomicBool>,
299 query_called: Arc<AtomicBool>,
300 }
301
302 impl MockTransaction {
303 fn new(result: QueryResultKind) -> Self {
304 Self {
305 query_result: Some(result),
306 query_error: None,
307 commit_error: None,
308 committed: Arc::new(AtomicBool::new(false)),
309 query_called: Arc::new(AtomicBool::new(false)),
310 }
311 }
312
313 fn failing_query(msg: &str) -> Self {
314 Self {
315 query_result: None,
316 query_error: Some(msg.to_string()),
317 commit_error: None,
318 committed: Arc::new(AtomicBool::new(false)),
319 query_called: Arc::new(AtomicBool::new(false)),
320 }
321 }
322
323 fn with_commit_error(mut self, msg: &str) -> Self {
324 self.commit_error = Some(msg.to_string());
325 self
326 }
327 }
328
329 impl TransactionOps for MockTransaction {
330 fn query(
331 &mut self,
332 _typeql: &str,
333 ) -> Pin<Box<dyn Future<Output = Result<QueryResultKind, PipelineError>> + Send + '_>>
334 {
335 self.query_called.store(true, Ordering::SeqCst);
336 let result = self.query_result.take();
337 let error = self.query_error.take();
338 Box::pin(async move {
339 if let Some(msg) = error {
340 return Err(PipelineError::QueryExecution(msg));
341 }
342 Ok(result.expect("MockTransaction::query called more than once"))
343 })
344 }
345
346 fn commit(
347 &mut self,
348 ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
349 self.committed.store(true, Ordering::SeqCst);
350 let error = self.commit_error.take();
351 Box::pin(async move {
352 if let Some(msg) = error {
353 return Err(PipelineError::QueryExecution(msg));
354 }
355 Ok(())
356 })
357 }
358 }
359
360 #[derive(Default)]
361 struct MockDatabaseAdmin {
362 exists_results: std::sync::Mutex<VecDeque<Result<bool, String>>>,
363 create_errors: std::sync::Mutex<VecDeque<String>>,
364 delete_errors: std::sync::Mutex<VecDeque<String>>,
365 exists_called: AtomicUsize,
366 create_called: AtomicUsize,
367 delete_called: AtomicUsize,
368 operations: std::sync::Mutex<Vec<String>>,
369 }
370
371 struct MockBackend {
372 transaction: std::sync::Mutex<Option<MockTransaction>>,
373 open_error: Option<String>,
374 is_open: bool,
375 open_called: Arc<AtomicUsize>,
376 database_admin: Arc<MockDatabaseAdmin>,
377 }
378
379 impl MockBackend {
380 fn new(tx: MockTransaction) -> Self {
381 Self {
382 transaction: std::sync::Mutex::new(Some(tx)),
383 open_error: None,
384 is_open: true,
385 open_called: Arc::new(AtomicUsize::new(0)),
386 database_admin: Arc::new(MockDatabaseAdmin::default()),
387 }
388 }
389
390 fn failing(msg: &str) -> Self {
391 Self {
392 transaction: std::sync::Mutex::new(None),
393 open_error: Some(msg.to_string()),
394 is_open: true,
395 open_called: Arc::new(AtomicUsize::new(0)),
396 database_admin: Arc::new(MockDatabaseAdmin::default()),
397 }
398 }
399
400 fn with_database_exists_results(self, results: Vec<Result<bool, String>>) -> Self {
401 *self.database_admin.exists_results.lock().unwrap() = results.into();
402 self
403 }
404
405 fn with_create_errors(self, errors: Vec<&str>) -> Self {
406 *self.database_admin.create_errors.lock().unwrap() =
407 errors.into_iter().map(str::to_string).collect();
408 self
409 }
410
411 fn with_delete_errors(self, errors: Vec<&str>) -> Self {
412 *self.database_admin.delete_errors.lock().unwrap() =
413 errors.into_iter().map(str::to_string).collect();
414 self
415 }
416
417 fn database_admin_state(&self) -> Arc<MockDatabaseAdmin> {
418 Arc::clone(&self.database_admin)
419 }
420 }
421
422 impl DriverBackend for MockBackend {
423 fn open_transaction(
424 &self,
425 _database: &str,
426 _tx_type: TransactionType,
427 ) -> Pin<Box<dyn Future<Output = Result<Box<dyn TransactionOps>, PipelineError>> + Send + '_>>
428 {
429 self.open_called.fetch_add(1, Ordering::SeqCst);
430 let tx = self.transaction.lock().unwrap().take();
431 let error = self.open_error.clone();
432 Box::pin(async move {
433 if let Some(msg) = error {
434 return Err(PipelineError::QueryExecution(msg));
435 }
436 Ok(
437 Box::new(tx.expect("MockBackend: no transaction configured"))
438 as Box<dyn TransactionOps>,
439 )
440 })
441 }
442
443 fn is_open(&self) -> bool {
444 self.is_open
445 }
446
447 fn database_exists(
448 &self,
449 database: &str,
450 ) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + '_>> {
451 self.database_admin
452 .exists_called
453 .fetch_add(1, Ordering::SeqCst);
454 self.database_admin
455 .operations
456 .lock()
457 .unwrap()
458 .push(format!("exists:{database}"));
459 let result = self
460 .database_admin
461 .exists_results
462 .lock()
463 .unwrap()
464 .pop_front()
465 .unwrap_or(Ok(false));
466 Box::pin(async move { result.map_err(PipelineError::Connection) })
467 }
468
469 fn create_database(
470 &self,
471 database: &str,
472 ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
473 self.database_admin
474 .create_called
475 .fetch_add(1, Ordering::SeqCst);
476 self.database_admin
477 .operations
478 .lock()
479 .unwrap()
480 .push(format!("create:{database}"));
481 let error = self
482 .database_admin
483 .create_errors
484 .lock()
485 .unwrap()
486 .pop_front();
487 Box::pin(async move {
488 if let Some(msg) = error {
489 return Err(PipelineError::Connection(msg));
490 }
491 Ok(())
492 })
493 }
494
495 fn delete_database(
496 &self,
497 database: &str,
498 ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
499 self.database_admin
500 .delete_called
501 .fetch_add(1, Ordering::SeqCst);
502 self.database_admin
503 .operations
504 .lock()
505 .unwrap()
506 .push(format!("delete:{database}"));
507 let error = self
508 .database_admin
509 .delete_errors
510 .lock()
511 .unwrap()
512 .pop_front();
513 Box::pin(async move {
514 if let Some(msg) = error {
515 return Err(PipelineError::Connection(msg));
516 }
517 Ok(())
518 })
519 }
520 }
521
522 fn make_client(backend: MockBackend) -> TypeDBClient {
523 TypeDBClient::with_backend(Box::new(backend))
524 }
525
526 #[test]
531 fn parse_transaction_type_read() {
532 let result = parse_transaction_type("read").unwrap();
533 assert_eq!(result, TransactionType::Read);
534 }
535
536 #[test]
537 fn parse_transaction_type_write() {
538 let result = parse_transaction_type("write").unwrap();
539 assert_eq!(result, TransactionType::Write);
540 }
541
542 #[test]
543 fn parse_transaction_type_schema() {
544 let result = parse_transaction_type("schema").unwrap();
545 assert_eq!(result, TransactionType::Schema);
546 }
547
548 #[test]
549 fn parse_transaction_type_unknown() {
550 let result = parse_transaction_type("unknown");
551 let err = result.unwrap_err();
552 assert!(
553 matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("Unknown transaction type: unknown"))
554 );
555 }
556
557 #[test]
558 fn parse_transaction_type_empty() {
559 let result = parse_transaction_type("");
560 assert!(result.is_err());
561 }
562
563 #[test]
564 fn parse_transaction_type_case_sensitive() {
565 let result = parse_transaction_type("Read");
566 assert!(result.is_err());
567 }
568
569 #[tokio::test]
574 async fn execute_ok_read_no_commit() {
575 let tx = MockTransaction::new(QueryResultKind::Ok);
576 let committed = tx.committed.clone();
577 let client = make_client(MockBackend::new(tx));
578
579 let result = client
580 .execute("db", "match $x isa thing;", "read")
581 .await
582 .unwrap();
583 assert_eq!(result, serde_json::json!({"ok": true}));
584 assert!(!committed.load(Ordering::SeqCst));
585 }
586
587 #[tokio::test]
588 async fn execute_ok_write_commits() {
589 let tx = MockTransaction::new(QueryResultKind::Ok);
590 let committed = tx.committed.clone();
591 let client = make_client(MockBackend::new(tx));
592
593 let result = client
594 .execute("db", "insert $x isa thing;", "write")
595 .await
596 .unwrap();
597 assert_eq!(result, serde_json::json!({"ok": true}));
598 assert!(committed.load(Ordering::SeqCst));
599 }
600
601 #[tokio::test]
602 async fn execute_ok_schema_commits() {
603 let tx = MockTransaction::new(QueryResultKind::Ok);
604 let committed = tx.committed.clone();
605 let client = make_client(MockBackend::new(tx));
606
607 let result = client
608 .execute("db", "define entity thing;", "schema")
609 .await
610 .unwrap();
611 assert_eq!(result, serde_json::json!({"ok": true}));
612 assert!(committed.load(Ordering::SeqCst));
613 }
614
615 #[tokio::test]
616 async fn execute_rows_read_no_commit() {
617 let rows = vec![
618 serde_json::json!({"name": "Alice"}),
619 serde_json::json!({"name": "Bob"}),
620 ];
621 let tx = MockTransaction::new(QueryResultKind::Rows(rows.clone()));
622 let committed = tx.committed.clone();
623 let client = make_client(MockBackend::new(tx));
624
625 let result = client
626 .execute("db", "match $p isa person;", "read")
627 .await
628 .unwrap();
629 assert_eq!(result, serde_json::Value::Array(rows));
630 assert!(!committed.load(Ordering::SeqCst));
631 }
632
633 #[tokio::test]
634 async fn execute_rows_write_commits() {
635 let rows = vec![serde_json::json!({"id": 1})];
636 let tx = MockTransaction::new(QueryResultKind::Rows(rows));
637 let committed = tx.committed.clone();
638 let client = make_client(MockBackend::new(tx));
639
640 let result = client
641 .execute("db", "insert $x isa thing;", "write")
642 .await
643 .unwrap();
644 assert!(result.is_array());
645 assert!(committed.load(Ordering::SeqCst));
646 }
647
648 #[tokio::test]
649 async fn execute_rows_data_preserved() {
650 let rows = vec![
651 serde_json::json!({"name": "Alice", "age": 30}),
652 serde_json::json!({"name": "Bob", "age": 25}),
653 ];
654 let tx = MockTransaction::new(QueryResultKind::Rows(rows.clone()));
655 let client = make_client(MockBackend::new(tx));
656
657 let result = client
658 .execute("db", "match $p isa person;", "read")
659 .await
660 .unwrap();
661 let arr = result.as_array().unwrap();
662 assert_eq!(arr.len(), 2);
663 assert_eq!(arr[0]["name"], "Alice");
664 assert_eq!(arr[1]["age"], 25);
665 }
666
667 #[tokio::test]
668 async fn execute_docs_read_no_commit() {
669 let docs = vec![serde_json::json!({"doc": "data"})];
670 let tx = MockTransaction::new(QueryResultKind::Documents(docs.clone()));
671 let committed = tx.committed.clone();
672 let client = make_client(MockBackend::new(tx));
673
674 let result = client
675 .execute("db", "match $p isa person; fetch {};", "read")
676 .await
677 .unwrap();
678 assert_eq!(result, serde_json::Value::Array(docs));
679 assert!(!committed.load(Ordering::SeqCst));
680 }
681
682 #[tokio::test]
683 async fn execute_docs_write_commits() {
684 let docs = vec![serde_json::json!({"doc": "data"})];
685 let tx = MockTransaction::new(QueryResultKind::Documents(docs));
686 let committed = tx.committed.clone();
687 let client = make_client(MockBackend::new(tx));
688
689 let result = client
690 .execute("db", "insert $x isa thing;", "write")
691 .await
692 .unwrap();
693 assert!(result.is_array());
694 assert!(committed.load(Ordering::SeqCst));
695 }
696
697 #[tokio::test]
698 async fn execute_docs_commit_error_ignored() {
699 let docs = vec![serde_json::json!({"doc": "data"})];
700 let tx = MockTransaction::new(QueryResultKind::Documents(docs.clone()))
701 .with_commit_error("commit failed");
702 let client = make_client(MockBackend::new(tx));
703
704 let result = client
706 .execute("db", "insert $x isa thing;", "write")
707 .await
708 .unwrap();
709 assert_eq!(result, serde_json::Value::Array(docs));
710 }
711
712 #[tokio::test]
713 async fn execute_transaction_open_failure() {
714 let client = make_client(MockBackend::failing("connection refused"));
715
716 let result = client.execute("db", "match $x isa thing;", "read").await;
717 let err = result.unwrap_err();
718 assert!(
719 matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("connection refused"))
720 );
721 }
722
723 #[tokio::test]
724 async fn execute_query_failure() {
725 let tx = MockTransaction::failing_query("syntax error");
726 let client = make_client(MockBackend::new(tx));
727
728 let result = client.execute("db", "bad query", "read").await;
729 let err = result.unwrap_err();
730 assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("syntax error")));
731 }
732
733 #[tokio::test]
734 async fn execute_commit_failure_ok_propagated() {
735 let tx = MockTransaction::new(QueryResultKind::Ok).with_commit_error("commit failed");
736 let client = make_client(MockBackend::new(tx));
737
738 let result = client.execute("db", "insert $x isa thing;", "write").await;
740 let err = result.unwrap_err();
741 assert!(
742 matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("commit failed"))
743 );
744 }
745
746 #[tokio::test]
747 async fn execute_commit_failure_rows_propagated() {
748 let tx =
749 MockTransaction::new(QueryResultKind::Rows(vec![])).with_commit_error("commit failed");
750 let client = make_client(MockBackend::new(tx));
751
752 let result = client.execute("db", "insert $x isa thing;", "write").await;
754 let err = result.unwrap_err();
755 assert!(
756 matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("commit failed"))
757 );
758 }
759
760 #[tokio::test]
761 async fn execute_invalid_transaction_type() {
762 let tx = MockTransaction::new(QueryResultKind::Ok);
763 let backend = MockBackend::new(tx);
764 let open_called = backend.open_called.clone();
765 let client = make_client(backend);
766
767 let result = client.execute("db", "match $x;", "invalid").await;
768 assert!(result.is_err());
769 assert_eq!(open_called.load(Ordering::SeqCst), 0);
771 }
772
773 #[test]
774 fn is_connected_delegates_to_backend() {
775 let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
776 backend.is_open = true;
777 let client = make_client(backend);
778 assert!(client.is_connected());
779 }
780
781 #[test]
782 fn is_connected_false_when_backend_closed() {
783 let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
784 backend.is_open = false;
785 let client = make_client(backend);
786 assert!(!client.is_connected());
787 }
788
789 #[tokio::test]
794 async fn database_exists_true_delegates_to_backend() {
795 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
796 .with_database_exists_results(vec![Ok(true)]);
797 let admin = backend.database_admin_state();
798 let client = make_client(backend);
799
800 assert!(client.database_exists("admin_db").await.unwrap());
801 assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
802 assert_eq!(
803 admin.operations.lock().unwrap().clone(),
804 vec!["exists:admin_db".to_string()]
805 );
806 }
807
808 #[tokio::test]
809 async fn database_exists_false_delegates_to_backend() {
810 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
811 .with_database_exists_results(vec![Ok(false)]);
812 let admin = backend.database_admin_state();
813 let client = make_client(backend);
814
815 assert!(!client.database_exists("missing_db").await.unwrap());
816 assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
817 assert_eq!(
818 admin.operations.lock().unwrap().clone(),
819 vec!["exists:missing_db".to_string()]
820 );
821 }
822
823 #[tokio::test]
824 async fn create_database_delegates_to_backend() {
825 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
826 let admin = backend.database_admin_state();
827 let client = make_client(backend);
828
829 client.create_database("new_db").await.unwrap();
830 assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
831 assert_eq!(
832 admin.operations.lock().unwrap().clone(),
833 vec!["create:new_db".to_string()]
834 );
835 }
836
837 #[tokio::test]
838 async fn delete_database_delegates_to_backend() {
839 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
840 let admin = backend.database_admin_state();
841 let client = make_client(backend);
842
843 client.delete_database("old_db").await.unwrap();
844 assert_eq!(admin.delete_called.load(Ordering::SeqCst), 1);
845 assert_eq!(
846 admin.operations.lock().unwrap().clone(),
847 vec!["delete:old_db".to_string()]
848 );
849 }
850
851 #[tokio::test]
852 async fn reset_database_deletes_existing_database_before_create() {
853 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
854 .with_database_exists_results(vec![Ok(true)]);
855 let admin = backend.database_admin_state();
856 let client = make_client(backend);
857
858 client.reset_database("reset_db").await.unwrap();
859 assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
860 assert_eq!(admin.delete_called.load(Ordering::SeqCst), 1);
861 assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
862 assert_eq!(
863 admin.operations.lock().unwrap().clone(),
864 vec![
865 "exists:reset_db".to_string(),
866 "delete:reset_db".to_string(),
867 "create:reset_db".to_string(),
868 ]
869 );
870 }
871
872 #[tokio::test]
873 async fn reset_database_creates_when_database_is_absent() {
874 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
875 .with_database_exists_results(vec![Ok(false)]);
876 let admin = backend.database_admin_state();
877 let client = make_client(backend);
878
879 client.reset_database("reset_db").await.unwrap();
880 assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
881 assert_eq!(admin.delete_called.load(Ordering::SeqCst), 0);
882 assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
883 assert_eq!(
884 admin.operations.lock().unwrap().clone(),
885 vec!["exists:reset_db".to_string(), "create:reset_db".to_string(),]
886 );
887 }
888
889 #[tokio::test]
890 async fn database_exists_propagates_backend_error() {
891 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
892 .with_database_exists_results(vec![Err("lookup failed".to_string())]);
893 let client = make_client(backend);
894
895 let err = client.database_exists("db").await.unwrap_err();
896 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("lookup failed")));
897 }
898
899 #[tokio::test]
900 async fn create_database_propagates_backend_error() {
901 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
902 .with_create_errors(vec!["create failed"]);
903 let client = make_client(backend);
904
905 let err = client.create_database("db").await.unwrap_err();
906 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("create failed")));
907 }
908
909 #[tokio::test]
910 async fn delete_database_propagates_backend_error() {
911 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
912 .with_delete_errors(vec!["delete failed"]);
913 let client = make_client(backend);
914
915 let err = client.delete_database("db").await.unwrap_err();
916 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("delete failed")));
917 }
918
919 #[tokio::test]
920 async fn reset_database_propagates_lookup_error_without_mutating() {
921 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
922 .with_database_exists_results(vec![Err("lookup failed".to_string())]);
923 let admin = backend.database_admin_state();
924 let client = make_client(backend);
925
926 let err = client.reset_database("db").await.unwrap_err();
927 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("lookup failed")));
928 assert_eq!(admin.delete_called.load(Ordering::SeqCst), 0);
929 assert_eq!(admin.create_called.load(Ordering::SeqCst), 0);
930 assert_eq!(
931 admin.operations.lock().unwrap().clone(),
932 vec!["exists:db".to_string()]
933 );
934 }
935
936 #[tokio::test]
937 async fn reset_database_propagates_delete_error_without_create() {
938 let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
939 .with_database_exists_results(vec![Ok(true)])
940 .with_delete_errors(vec!["delete failed"]);
941 let admin = backend.database_admin_state();
942 let client = make_client(backend);
943
944 let err = client.reset_database("db").await.unwrap_err();
945 assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("delete failed")));
946 assert_eq!(admin.create_called.load(Ordering::SeqCst), 0);
947 assert_eq!(
948 admin.operations.lock().unwrap().clone(),
949 vec!["exists:db".to_string(), "delete:db".to_string()]
950 );
951 }
952
953 #[test]
958 fn value_to_json_boolean_true() {
959 let value = Value::Boolean(true);
960 let json = value_to_json_b8(&value);
961 assert_eq!(json, serde_json::Value::Bool(true));
962 }
963
964 #[test]
965 fn value_to_json_boolean_false() {
966 let value = Value::Boolean(false);
967 let json = value_to_json_b8(&value);
968 assert_eq!(json, serde_json::Value::Bool(false));
969 }
970
971 #[test]
972 fn value_to_json_integer() {
973 let value = Value::Integer(42);
974 let json = value_to_json_b8(&value);
975 assert_eq!(json, serde_json::json!(42));
976 }
977
978 #[test]
979 fn value_to_json_integer_negative() {
980 let value = Value::Integer(-100);
981 let json = value_to_json_b8(&value);
982 assert_eq!(json, serde_json::json!(-100));
983 }
984
985 #[test]
986 fn value_to_json_double() {
987 let value = Value::Double(3.15);
988 let json = value_to_json_b8(&value);
989 assert_eq!(json, serde_json::json!(3.15));
990 }
991
992 #[test]
993 fn value_to_json_string() {
994 let value = Value::String("hello".to_string());
995 let json = value_to_json_b8(&value);
996 assert_eq!(json, serde_json::Value::String("hello".to_string()));
997 }
998
999 #[test]
1000 fn value_to_json_string_empty() {
1001 let value = Value::String(String::new());
1002 let json = value_to_json_b8(&value);
1003 assert_eq!(json, serde_json::Value::String(String::new()));
1004 }
1005
1006 #[test]
1007 fn value_to_json_date() {
1008 let date = chrono::NaiveDate::from_ymd_opt(2024, 1, 15).unwrap();
1009 let value = Value::Date(date);
1010 let json = value_to_json_b8(&value);
1011 assert_eq!(json, serde_json::Value::String("2024-01-15".to_string()));
1012 }
1013
1014 #[test]
1015 fn value_to_json_datetime() {
1016 let dt = chrono::NaiveDate::from_ymd_opt(2024, 1, 15)
1017 .unwrap()
1018 .and_hms_opt(10, 30, 0)
1019 .unwrap();
1020 let value = Value::Datetime(dt);
1021 let json = value_to_json_b8(&value);
1022 let s = json.as_str().unwrap();
1023 assert!(s.contains("2024-01-15"));
1024 }
1025
1026 #[test]
1027 fn value_to_json_decimal() {
1028 let dec = Decimal::new(42, 0);
1029 let value = Value::Decimal(dec);
1030 let json = value_to_json_b8(&value);
1031 assert!(json.is_string());
1032 }
1033
1034 #[test]
1035 fn value_to_json_duration() {
1036 let dur = Duration::new(1, 2, 3_000_000_000);
1037 let value = Value::Duration(dur);
1038 let json = value_to_json_b8(&value);
1039 assert!(json.is_string());
1040 }
1041
1042 #[test]
1043 fn value_to_json_datetime_tz() {
1044 use chrono::TimeZone as _;
1045 let tz = TimeZone::Fixed(chrono::FixedOffset::east_opt(3600).unwrap());
1046 let dt = tz.with_ymd_and_hms(2024, 6, 15, 12, 30, 0).unwrap();
1047 let value = Value::DatetimeTZ(dt);
1048 let json = value_to_json_b8(&value);
1049 let s = json.as_str().unwrap();
1050 assert!(s.contains("2024"));
1051 }
1052
1053 #[test]
1058 fn concept_to_json_entity_type() {
1059 let concept = Concept::EntityType(EntityType {
1060 label: "person".to_string(),
1061 });
1062 let json = concept_to_json_b8(&concept);
1063 assert_eq!(json["category"], "EntityType");
1064 assert_eq!(json["label"], "person");
1065 assert!(json.get("iid").is_none());
1066 assert!(json.get("value").is_none());
1067 }
1068
1069 #[test]
1070 fn concept_to_json_attribute_type() {
1071 let concept = Concept::AttributeType(AttributeType {
1072 label: "name".to_string(),
1073 value_type: Some(ValueType::String),
1074 });
1075 let json = concept_to_json_b8(&concept);
1076 assert_eq!(json["category"], "AttributeType");
1077 assert_eq!(json["label"], "name");
1078 assert_eq!(json["value_type"], "string");
1079 }
1080
1081 #[test]
1082 fn concept_to_json_value_boolean() {
1083 let concept = Concept::Value(Value::Boolean(true));
1084 let json = concept_to_json_b8(&concept);
1085 assert_eq!(json["category"], "Value");
1086 assert_eq!(json["value"], true);
1087 }
1088
1089 #[test]
1090 fn concept_to_json_value_integer() {
1091 let concept = Concept::Value(Value::Integer(42));
1092 let json = concept_to_json_b8(&concept);
1093 assert_eq!(json["value"], 42);
1094 }
1095
1096 #[test]
1097 fn concept_to_json_value_string() {
1098 let concept = Concept::Value(Value::String("hello".to_string()));
1099 let json = concept_to_json_b8(&concept);
1100 assert_eq!(json["value"], "hello");
1101 }
1102
1103 #[test]
1104 fn concept_to_json_entity_with_iid() {
1105 let iid: IID = vec![0x01, 0x02, 0x03].into();
1106 let concept = Concept::Entity(Entity {
1107 iid,
1108 type_: Some(EntityType {
1109 label: "person".to_string(),
1110 }),
1111 });
1112 let json = concept_to_json_b8(&concept);
1113 assert_eq!(json["category"], "Entity");
1114 assert_eq!(json["label"], "person");
1115 let iid_str = json["iid"].as_str().unwrap();
1117 assert!(iid_str.starts_with("0x"));
1118 }
1119
1120 #[test]
1121 fn concept_to_json_attribute_with_value() {
1122 let iid: IID = vec![0xAA, 0xBB].into();
1123 let concept = Concept::Attribute(Attribute {
1124 iid,
1125 value: Value::String("hello".to_string()),
1126 type_: Some(AttributeType {
1127 label: "name".to_string(),
1128 value_type: Some(ValueType::String),
1129 }),
1130 });
1131 let json = concept_to_json_b8(&concept);
1132 assert_eq!(json["category"], "Attribute");
1133 assert_eq!(json["label"], "name");
1134 assert!(json.get("iid").is_none());
1136 assert_eq!(json["value"], "hello");
1137 assert_eq!(json["value_type"], "string");
1138 }
1139
1140 #[test]
1141 fn concept_to_json_attribute_type_without_value_type() {
1142 let concept = Concept::AttributeType(AttributeType {
1143 label: "abstract_attr".to_string(),
1144 value_type: None,
1145 });
1146 let json = concept_to_json_b8(&concept);
1147 assert_eq!(json["label"], "abstract_attr");
1148 assert!(json.get("value_type").is_none());
1149 }
1150
1151 #[test]
1156 fn type_db_client_implements_query_executor() {
1157 fn assert_executor<T: QueryExecutor>() {}
1158 assert_executor::<TypeDBClient>();
1159 }
1160
1161 #[tokio::test]
1162 async fn query_executor_execute_delegates_to_client() {
1163 let tx = MockTransaction::new(QueryResultKind::Rows(vec![serde_json::json!({"x": 1})]));
1164 let client = make_client(MockBackend::new(tx));
1165 let executor: Box<dyn QueryExecutor> = Box::new(client);
1166
1167 let result = executor
1168 .execute("db", "match $x isa thing;", "read")
1169 .await
1170 .unwrap();
1171 assert!(result.is_array());
1172 assert_eq!(result.as_array().unwrap().len(), 1);
1173 }
1174
1175 #[test]
1176 fn query_executor_is_connected_delegates_to_client() {
1177 let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
1178 backend.is_open = true;
1179 let client = make_client(backend);
1180 let executor: Box<dyn QueryExecutor> = Box::new(client);
1181 assert!(executor.is_connected());
1182 }
1183
1184 #[tokio::test]
1189 #[ignore = "requires running TypeDB server"]
1190 #[cfg_attr(coverage_nightly, coverage(off))]
1191 async fn integration_connect_invalid_address() {
1192 let config = TypeDBSection {
1193 address: "localhost:99999".to_string(),
1194 database: "test".to_string(),
1195 username: "admin".to_string(),
1196 password: "password".to_string(),
1197 http_port: 8000,
1198 };
1199 let result = TypeDBClient::connect(&config).await;
1200 assert!(result.is_err());
1201 }
1202
1203 fn live_config() -> TypeDBSection {
1206 TypeDBSection {
1207 address: std::env::var("TYPEDB_ADDRESS")
1208 .unwrap_or_else(|_| "localhost:1729".to_string()),
1209 database: std::env::var("TYPEDB_DATABASE").unwrap_or_else(|_| "test".to_string()),
1210 username: "admin".to_string(),
1211 password: "password".to_string(),
1212 http_port: std::env::var("TYPEDB_HTTP_PORT")
1213 .ok()
1214 .and_then(|port| port.parse().ok())
1215 .unwrap_or(8000),
1216 }
1217 }
1218
1219 #[tokio::test]
1220 #[ignore = "requires running TypeDB server"]
1221 #[cfg_attr(coverage_nightly, coverage(off))]
1222 async fn integration_connect_success() {
1223 let result = TypeDBClient::connect(&live_config()).await;
1224 assert!(result.is_ok());
1225 assert!(result.unwrap().is_connected());
1226 }
1227
1228 #[tokio::test]
1229 #[ignore = "requires running TypeDB server"]
1230 #[cfg_attr(coverage_nightly, coverage(off))]
1231 async fn integration_database_admin_roundtrip() {
1232 let config = live_config();
1233 let client = TypeDBClient::connect(&config)
1234 .await
1235 .expect("connect failed");
1236 let database = format!("type_bridge_server_admin_{}", uuid::Uuid::new_v4().simple());
1237
1238 if client.database_exists(&database).await.unwrap_or(false) {
1239 let _ = client.delete_database(&database).await;
1240 }
1241
1242 assert!(!client.database_exists(&database).await.unwrap());
1243 client
1244 .create_database(&database)
1245 .await
1246 .expect("create failed");
1247 assert!(client.database_exists(&database).await.unwrap());
1248
1249 client
1250 .reset_database(&database)
1251 .await
1252 .expect("reset existing database failed");
1253 assert!(client.database_exists(&database).await.unwrap());
1254
1255 client
1256 .delete_database(&database)
1257 .await
1258 .expect("delete failed");
1259 assert!(!client.database_exists(&database).await.unwrap());
1260
1261 client
1262 .reset_database(&database)
1263 .await
1264 .expect("reset absent database failed");
1265 assert!(client.database_exists(&database).await.unwrap());
1266
1267 client
1268 .delete_database(&database)
1269 .await
1270 .expect("cleanup delete failed");
1271 }
1272
1273 #[tokio::test]
1274 #[ignore = "requires running TypeDB server"]
1275 #[cfg_attr(coverage_nightly, coverage(off))]
1276 async fn integration_execute_roundtrip() {
1277 let config = live_config();
1278 let client = TypeDBClient::connect(&config)
1279 .await
1280 .expect("connect failed");
1281
1282 client
1283 .execute(&config.database, "define entity smoke_marker;", "schema")
1284 .await
1285 .expect("schema define failed");
1286
1287 let rows = client
1288 .execute(&config.database, "match entity $t;", "read")
1289 .await
1290 .expect("read query failed");
1291 let rows = rows.as_array().expect("read result must be a JSON array");
1292 assert!(
1293 !rows.is_empty(),
1294 "expected at least the smoke_marker entity type"
1295 );
1296 }
1297}