Skip to main content

type_bridge_server/typedb/
client.rs

1use super::backend::{DriverBackend, QueryResultKind, TransactionType};
2use super::real_driver::RealTypeDBBackend;
3use crate::config::TypeDBSection;
4use crate::error::PipelineError;
5use crate::executor::QueryExecutor;
6
7/// Wrapper around the TypeDB Rust driver providing a clean async API
8/// for query execution and schema retrieval.
9pub struct TypeDBClient {
10    backend: Box<dyn DriverBackend>,
11}
12
13impl TypeDBClient {
14    /// Connect to a TypeDB server using the provided configuration.
15    #[cfg_attr(coverage_nightly, coverage(off))]
16    pub async fn connect(config: &TypeDBSection) -> Result<Self, PipelineError> {
17        let backend = RealTypeDBBackend::connect(config).await?;
18        Ok(Self {
19            backend: Box::new(backend),
20        })
21    }
22
23    /// Create a TypeDBClient with a custom backend (for testing).
24    #[cfg(test)]
25    pub(crate) fn with_backend(backend: Box<dyn DriverBackend>) -> Self {
26        Self { backend }
27    }
28
29    /// Execute a TypeQL query and return results as JSON.
30    ///
31    /// For read transactions, the transaction is used directly.
32    /// For write and schema transactions, the transaction is committed after execution.
33    pub async fn execute(
34        &self,
35        database: &str,
36        typeql: &str,
37        tx_type: &str,
38    ) -> Result<serde_json::Value, PipelineError> {
39        let transaction_type = parse_transaction_type(tx_type)?;
40
41        let mut tx = self
42            .backend
43            .open_transaction(database, transaction_type)
44            .await?;
45
46        let answer = tx.query(typeql).await?;
47
48        let needs_commit = matches!(
49            transaction_type,
50            TransactionType::Write | TransactionType::Schema
51        );
52
53        let results = match answer {
54            QueryResultKind::Ok => {
55                if needs_commit {
56                    tx.commit().await?;
57                }
58                serde_json::json!({ "ok": true })
59            }
60            QueryResultKind::Rows(rows) => {
61                if needs_commit {
62                    tx.commit().await?;
63                }
64                serde_json::Value::Array(rows)
65            }
66            QueryResultKind::Documents(docs) => {
67                if needs_commit {
68                    let _ = tx.commit().await;
69                }
70                serde_json::Value::Array(docs)
71            }
72        };
73
74        Ok(results)
75    }
76
77    /// Return whether a TypeDB database exists on the connected server.
78    pub async fn database_exists(&self, database: &str) -> Result<bool, PipelineError> {
79        self.backend.database_exists(database).await
80    }
81
82    /// Create a TypeDB database.
83    pub async fn create_database(&self, database: &str) -> Result<(), PipelineError> {
84        self.backend.create_database(database).await
85    }
86
87    /// Delete a TypeDB database.
88    pub async fn delete_database(&self, database: &str) -> Result<(), PipelineError> {
89        self.backend.delete_database(database).await
90    }
91
92    /// Delete a TypeDB database if it exists, then create it.
93    pub async fn reset_database(&self, database: &str) -> Result<(), PipelineError> {
94        if self.database_exists(database).await? {
95            self.delete_database(database).await?;
96        }
97        self.create_database(database).await
98    }
99
100    /// Check if the driver connection is open.
101    pub fn is_connected(&self) -> bool {
102        self.backend.is_open()
103    }
104}
105
106impl QueryExecutor for TypeDBClient {
107    fn execute<'a>(
108        &'a self,
109        database: &'a str,
110        typeql: &'a str,
111        transaction_type: &'a str,
112    ) -> std::pin::Pin<
113        Box<dyn std::future::Future<Output = Result<serde_json::Value, PipelineError>> + Send + 'a>,
114    > {
115        Box::pin(async move { self.execute(database, typeql, transaction_type).await })
116    }
117
118    fn is_connected(&self) -> bool {
119        self.is_connected()
120    }
121}
122
123/// Parse a transaction type string into a TypeDB TransactionType.
124pub(crate) fn parse_transaction_type(tx_type: &str) -> Result<TransactionType, PipelineError> {
125    match tx_type {
126        "read" => Ok(TransactionType::Read),
127        "write" => Ok(TransactionType::Write),
128        "schema" => Ok(TransactionType::Schema),
129        other => Err(PipelineError::QueryExecution(format!(
130            "Unknown transaction type: {other}"
131        ))),
132    }
133}
134
135#[cfg(test)]
136#[cfg(feature = "band8")]
137#[cfg_attr(coverage_nightly, coverage(off))]
138mod tests {
139    use std::collections::VecDeque;
140    use std::future::Future;
141    use std::pin::Pin;
142    use std::sync::Arc;
143    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
144
145    use super::*;
146    use crate::error::PipelineError;
147    use crate::typedb::backend::TransactionOps;
148
149    // =============================================
150    // Mock infrastructure
151    // =============================================
152
153    struct MockTransaction {
154        query_result: Option<QueryResultKind>,
155        query_error: Option<String>,
156        commit_error: Option<String>,
157        committed: Arc<AtomicBool>,
158        query_called: Arc<AtomicBool>,
159    }
160
161    impl MockTransaction {
162        fn new(result: QueryResultKind) -> Self {
163            Self {
164                query_result: Some(result),
165                query_error: None,
166                commit_error: None,
167                committed: Arc::new(AtomicBool::new(false)),
168                query_called: Arc::new(AtomicBool::new(false)),
169            }
170        }
171
172        fn failing_query(msg: &str) -> Self {
173            Self {
174                query_result: None,
175                query_error: Some(msg.to_string()),
176                commit_error: None,
177                committed: Arc::new(AtomicBool::new(false)),
178                query_called: Arc::new(AtomicBool::new(false)),
179            }
180        }
181
182        fn with_commit_error(mut self, msg: &str) -> Self {
183            self.commit_error = Some(msg.to_string());
184            self
185        }
186    }
187
188    impl TransactionOps for MockTransaction {
189        fn query(
190            &mut self,
191            _typeql: &str,
192        ) -> Pin<Box<dyn Future<Output = Result<QueryResultKind, PipelineError>> + Send + '_>>
193        {
194            self.query_called.store(true, Ordering::SeqCst);
195            let result = self.query_result.take();
196            let error = self.query_error.take();
197            Box::pin(async move {
198                if let Some(msg) = error {
199                    return Err(PipelineError::QueryExecution(msg));
200                }
201                Ok(result.expect("MockTransaction::query called more than once"))
202            })
203        }
204
205        fn commit(
206            &mut self,
207        ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
208            self.committed.store(true, Ordering::SeqCst);
209            let error = self.commit_error.take();
210            Box::pin(async move {
211                if let Some(msg) = error {
212                    return Err(PipelineError::QueryExecution(msg));
213                }
214                Ok(())
215            })
216        }
217    }
218
219    #[derive(Default)]
220    struct MockDatabaseAdmin {
221        exists_results: std::sync::Mutex<VecDeque<Result<bool, String>>>,
222        create_errors: std::sync::Mutex<VecDeque<String>>,
223        delete_errors: std::sync::Mutex<VecDeque<String>>,
224        exists_called: AtomicUsize,
225        create_called: AtomicUsize,
226        delete_called: AtomicUsize,
227        operations: std::sync::Mutex<Vec<String>>,
228    }
229
230    struct MockBackend {
231        transaction: std::sync::Mutex<Option<MockTransaction>>,
232        open_error: Option<String>,
233        is_open: bool,
234        open_called: Arc<AtomicUsize>,
235        database_admin: Arc<MockDatabaseAdmin>,
236    }
237
238    impl MockBackend {
239        fn new(tx: MockTransaction) -> Self {
240            Self {
241                transaction: std::sync::Mutex::new(Some(tx)),
242                open_error: None,
243                is_open: true,
244                open_called: Arc::new(AtomicUsize::new(0)),
245                database_admin: Arc::new(MockDatabaseAdmin::default()),
246            }
247        }
248
249        fn failing(msg: &str) -> Self {
250            Self {
251                transaction: std::sync::Mutex::new(None),
252                open_error: Some(msg.to_string()),
253                is_open: true,
254                open_called: Arc::new(AtomicUsize::new(0)),
255                database_admin: Arc::new(MockDatabaseAdmin::default()),
256            }
257        }
258
259        fn with_database_exists_results(self, results: Vec<Result<bool, String>>) -> Self {
260            *self.database_admin.exists_results.lock().unwrap() = results.into();
261            self
262        }
263
264        fn with_create_errors(self, errors: Vec<&str>) -> Self {
265            *self.database_admin.create_errors.lock().unwrap() =
266                errors.into_iter().map(str::to_string).collect();
267            self
268        }
269
270        fn with_delete_errors(self, errors: Vec<&str>) -> Self {
271            *self.database_admin.delete_errors.lock().unwrap() =
272                errors.into_iter().map(str::to_string).collect();
273            self
274        }
275
276        fn database_admin_state(&self) -> Arc<MockDatabaseAdmin> {
277            Arc::clone(&self.database_admin)
278        }
279    }
280
281    impl DriverBackend for MockBackend {
282        fn open_transaction(
283            &self,
284            _database: &str,
285            _tx_type: TransactionType,
286        ) -> Pin<Box<dyn Future<Output = Result<Box<dyn TransactionOps>, PipelineError>> + Send + '_>>
287        {
288            self.open_called.fetch_add(1, Ordering::SeqCst);
289            let tx = self.transaction.lock().unwrap().take();
290            let error = self.open_error.clone();
291            Box::pin(async move {
292                if let Some(msg) = error {
293                    return Err(PipelineError::QueryExecution(msg));
294                }
295                Ok(
296                    Box::new(tx.expect("MockBackend: no transaction configured"))
297                        as Box<dyn TransactionOps>,
298                )
299            })
300        }
301
302        fn is_open(&self) -> bool {
303            self.is_open
304        }
305
306        fn database_exists(
307            &self,
308            database: &str,
309        ) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + '_>> {
310            self.database_admin
311                .exists_called
312                .fetch_add(1, Ordering::SeqCst);
313            self.database_admin
314                .operations
315                .lock()
316                .unwrap()
317                .push(format!("exists:{database}"));
318            let result = self
319                .database_admin
320                .exists_results
321                .lock()
322                .unwrap()
323                .pop_front()
324                .unwrap_or(Ok(false));
325            Box::pin(async move { result.map_err(PipelineError::Connection) })
326        }
327
328        fn create_database(
329            &self,
330            database: &str,
331        ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
332            self.database_admin
333                .create_called
334                .fetch_add(1, Ordering::SeqCst);
335            self.database_admin
336                .operations
337                .lock()
338                .unwrap()
339                .push(format!("create:{database}"));
340            let error = self
341                .database_admin
342                .create_errors
343                .lock()
344                .unwrap()
345                .pop_front();
346            Box::pin(async move {
347                if let Some(msg) = error {
348                    return Err(PipelineError::Connection(msg));
349                }
350                Ok(())
351            })
352        }
353
354        fn delete_database(
355            &self,
356            database: &str,
357        ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
358            self.database_admin
359                .delete_called
360                .fetch_add(1, Ordering::SeqCst);
361            self.database_admin
362                .operations
363                .lock()
364                .unwrap()
365                .push(format!("delete:{database}"));
366            let error = self
367                .database_admin
368                .delete_errors
369                .lock()
370                .unwrap()
371                .pop_front();
372            Box::pin(async move {
373                if let Some(msg) = error {
374                    return Err(PipelineError::Connection(msg));
375                }
376                Ok(())
377            })
378        }
379    }
380
381    fn make_client(backend: MockBackend) -> TypeDBClient {
382        TypeDBClient::with_backend(Box::new(backend))
383    }
384
385    // =============================================
386    // parse_transaction_type tests
387    // =============================================
388
389    #[test]
390    fn parse_transaction_type_read() {
391        let result = parse_transaction_type("read").unwrap();
392        assert_eq!(result, TransactionType::Read);
393    }
394
395    #[test]
396    fn parse_transaction_type_write() {
397        let result = parse_transaction_type("write").unwrap();
398        assert_eq!(result, TransactionType::Write);
399    }
400
401    #[test]
402    fn parse_transaction_type_schema() {
403        let result = parse_transaction_type("schema").unwrap();
404        assert_eq!(result, TransactionType::Schema);
405    }
406
407    #[test]
408    fn parse_transaction_type_unknown() {
409        let result = parse_transaction_type("unknown");
410        let err = result.unwrap_err();
411        assert!(
412            matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("Unknown transaction type: unknown"))
413        );
414    }
415
416    #[test]
417    fn parse_transaction_type_empty() {
418        let result = parse_transaction_type("");
419        assert!(result.is_err());
420    }
421
422    #[test]
423    fn parse_transaction_type_case_sensitive() {
424        let result = parse_transaction_type("Read");
425        assert!(result.is_err());
426    }
427
428    // =============================================
429    // execute tests (via MockBackend)
430    // =============================================
431
432    #[tokio::test]
433    async fn execute_ok_read_no_commit() {
434        let tx = MockTransaction::new(QueryResultKind::Ok);
435        let committed = tx.committed.clone();
436        let client = make_client(MockBackend::new(tx));
437
438        let result = client
439            .execute("db", "match $x isa thing;", "read")
440            .await
441            .unwrap();
442        assert_eq!(result, serde_json::json!({"ok": true}));
443        assert!(!committed.load(Ordering::SeqCst));
444    }
445
446    #[tokio::test]
447    async fn execute_ok_write_commits() {
448        let tx = MockTransaction::new(QueryResultKind::Ok);
449        let committed = tx.committed.clone();
450        let client = make_client(MockBackend::new(tx));
451
452        let result = client
453            .execute("db", "insert $x isa thing;", "write")
454            .await
455            .unwrap();
456        assert_eq!(result, serde_json::json!({"ok": true}));
457        assert!(committed.load(Ordering::SeqCst));
458    }
459
460    #[tokio::test]
461    async fn execute_ok_schema_commits() {
462        let tx = MockTransaction::new(QueryResultKind::Ok);
463        let committed = tx.committed.clone();
464        let client = make_client(MockBackend::new(tx));
465
466        let result = client
467            .execute("db", "define entity thing;", "schema")
468            .await
469            .unwrap();
470        assert_eq!(result, serde_json::json!({"ok": true}));
471        assert!(committed.load(Ordering::SeqCst));
472    }
473
474    #[tokio::test]
475    async fn execute_rows_read_no_commit() {
476        let rows = vec![
477            serde_json::json!({"name": "Alice"}),
478            serde_json::json!({"name": "Bob"}),
479        ];
480        let tx = MockTransaction::new(QueryResultKind::Rows(rows.clone()));
481        let committed = tx.committed.clone();
482        let client = make_client(MockBackend::new(tx));
483
484        let result = client
485            .execute("db", "match $p isa person;", "read")
486            .await
487            .unwrap();
488        assert_eq!(result, serde_json::Value::Array(rows));
489        assert!(!committed.load(Ordering::SeqCst));
490    }
491
492    #[tokio::test]
493    async fn execute_rows_write_commits() {
494        let rows = vec![serde_json::json!({"id": 1})];
495        let tx = MockTransaction::new(QueryResultKind::Rows(rows));
496        let committed = tx.committed.clone();
497        let client = make_client(MockBackend::new(tx));
498
499        let result = client
500            .execute("db", "insert $x isa thing;", "write")
501            .await
502            .unwrap();
503        assert!(result.is_array());
504        assert!(committed.load(Ordering::SeqCst));
505    }
506
507    #[tokio::test]
508    async fn execute_rows_data_preserved() {
509        let rows = vec![
510            serde_json::json!({"name": "Alice", "age": 30}),
511            serde_json::json!({"name": "Bob", "age": 25}),
512        ];
513        let tx = MockTransaction::new(QueryResultKind::Rows(rows.clone()));
514        let client = make_client(MockBackend::new(tx));
515
516        let result = client
517            .execute("db", "match $p isa person;", "read")
518            .await
519            .unwrap();
520        let arr = result.as_array().unwrap();
521        assert_eq!(arr.len(), 2);
522        assert_eq!(arr[0]["name"], "Alice");
523        assert_eq!(arr[1]["age"], 25);
524    }
525
526    #[tokio::test]
527    async fn execute_docs_read_no_commit() {
528        let docs = vec![serde_json::json!({"doc": "data"})];
529        let tx = MockTransaction::new(QueryResultKind::Documents(docs.clone()));
530        let committed = tx.committed.clone();
531        let client = make_client(MockBackend::new(tx));
532
533        let result = client
534            .execute("db", "match $p isa person; fetch {};", "read")
535            .await
536            .unwrap();
537        assert_eq!(result, serde_json::Value::Array(docs));
538        assert!(!committed.load(Ordering::SeqCst));
539    }
540
541    #[tokio::test]
542    async fn execute_docs_write_commits() {
543        let docs = vec![serde_json::json!({"doc": "data"})];
544        let tx = MockTransaction::new(QueryResultKind::Documents(docs));
545        let committed = tx.committed.clone();
546        let client = make_client(MockBackend::new(tx));
547
548        let result = client
549            .execute("db", "insert $x isa thing;", "write")
550            .await
551            .unwrap();
552        assert!(result.is_array());
553        assert!(committed.load(Ordering::SeqCst));
554    }
555
556    #[tokio::test]
557    async fn execute_docs_commit_error_ignored() {
558        let docs = vec![serde_json::json!({"doc": "data"})];
559        let tx = MockTransaction::new(QueryResultKind::Documents(docs.clone()))
560            .with_commit_error("commit failed");
561        let client = make_client(MockBackend::new(tx));
562
563        // Documents + write: commit error is intentionally ignored (let _ = ...)
564        let result = client
565            .execute("db", "insert $x isa thing;", "write")
566            .await
567            .unwrap();
568        assert_eq!(result, serde_json::Value::Array(docs));
569    }
570
571    #[tokio::test]
572    async fn execute_transaction_open_failure() {
573        let client = make_client(MockBackend::failing("connection refused"));
574
575        let result = client.execute("db", "match $x isa thing;", "read").await;
576        let err = result.unwrap_err();
577        assert!(
578            matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("connection refused"))
579        );
580    }
581
582    #[tokio::test]
583    async fn execute_query_failure() {
584        let tx = MockTransaction::failing_query("syntax error");
585        let client = make_client(MockBackend::new(tx));
586
587        let result = client.execute("db", "bad query", "read").await;
588        let err = result.unwrap_err();
589        assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("syntax error")));
590    }
591
592    #[tokio::test]
593    async fn execute_commit_failure_ok_propagated() {
594        let tx = MockTransaction::new(QueryResultKind::Ok).with_commit_error("commit failed");
595        let client = make_client(MockBackend::new(tx));
596
597        // Ok + write: commit error IS propagated
598        let result = client.execute("db", "insert $x isa thing;", "write").await;
599        let err = result.unwrap_err();
600        assert!(
601            matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("commit failed"))
602        );
603    }
604
605    #[tokio::test]
606    async fn execute_commit_failure_rows_propagated() {
607        let tx =
608            MockTransaction::new(QueryResultKind::Rows(vec![])).with_commit_error("commit failed");
609        let client = make_client(MockBackend::new(tx));
610
611        // Rows + write: commit error IS propagated
612        let result = client.execute("db", "insert $x isa thing;", "write").await;
613        let err = result.unwrap_err();
614        assert!(
615            matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("commit failed"))
616        );
617    }
618
619    #[tokio::test]
620    async fn execute_invalid_transaction_type() {
621        let tx = MockTransaction::new(QueryResultKind::Ok);
622        let backend = MockBackend::new(tx);
623        let open_called = backend.open_called.clone();
624        let client = make_client(backend);
625
626        let result = client.execute("db", "match $x;", "invalid").await;
627        assert!(result.is_err());
628        // Backend should never be called if transaction type is invalid
629        assert_eq!(open_called.load(Ordering::SeqCst), 0);
630    }
631
632    #[test]
633    fn is_connected_delegates_to_backend() {
634        let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
635        backend.is_open = true;
636        let client = make_client(backend);
637        assert!(client.is_connected());
638    }
639
640    #[test]
641    fn is_connected_false_when_backend_closed() {
642        let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
643        backend.is_open = false;
644        let client = make_client(backend);
645        assert!(!client.is_connected());
646    }
647
648    // =============================================
649    // database admin tests (via MockBackend)
650    // =============================================
651
652    #[tokio::test]
653    async fn database_exists_true_delegates_to_backend() {
654        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
655            .with_database_exists_results(vec![Ok(true)]);
656        let admin = backend.database_admin_state();
657        let client = make_client(backend);
658
659        assert!(client.database_exists("admin_db").await.unwrap());
660        assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
661        assert_eq!(
662            admin.operations.lock().unwrap().clone(),
663            vec!["exists:admin_db".to_string()]
664        );
665    }
666
667    #[tokio::test]
668    async fn database_exists_false_delegates_to_backend() {
669        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
670            .with_database_exists_results(vec![Ok(false)]);
671        let admin = backend.database_admin_state();
672        let client = make_client(backend);
673
674        assert!(!client.database_exists("missing_db").await.unwrap());
675        assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
676        assert_eq!(
677            admin.operations.lock().unwrap().clone(),
678            vec!["exists:missing_db".to_string()]
679        );
680    }
681
682    #[tokio::test]
683    async fn create_database_delegates_to_backend() {
684        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
685        let admin = backend.database_admin_state();
686        let client = make_client(backend);
687
688        client.create_database("new_db").await.unwrap();
689        assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
690        assert_eq!(
691            admin.operations.lock().unwrap().clone(),
692            vec!["create:new_db".to_string()]
693        );
694    }
695
696    #[tokio::test]
697    async fn delete_database_delegates_to_backend() {
698        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
699        let admin = backend.database_admin_state();
700        let client = make_client(backend);
701
702        client.delete_database("old_db").await.unwrap();
703        assert_eq!(admin.delete_called.load(Ordering::SeqCst), 1);
704        assert_eq!(
705            admin.operations.lock().unwrap().clone(),
706            vec!["delete:old_db".to_string()]
707        );
708    }
709
710    #[tokio::test]
711    async fn reset_database_deletes_existing_database_before_create() {
712        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
713            .with_database_exists_results(vec![Ok(true)]);
714        let admin = backend.database_admin_state();
715        let client = make_client(backend);
716
717        client.reset_database("reset_db").await.unwrap();
718        assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
719        assert_eq!(admin.delete_called.load(Ordering::SeqCst), 1);
720        assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
721        assert_eq!(
722            admin.operations.lock().unwrap().clone(),
723            vec![
724                "exists:reset_db".to_string(),
725                "delete:reset_db".to_string(),
726                "create:reset_db".to_string(),
727            ]
728        );
729    }
730
731    #[tokio::test]
732    async fn reset_database_creates_when_database_is_absent() {
733        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
734            .with_database_exists_results(vec![Ok(false)]);
735        let admin = backend.database_admin_state();
736        let client = make_client(backend);
737
738        client.reset_database("reset_db").await.unwrap();
739        assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
740        assert_eq!(admin.delete_called.load(Ordering::SeqCst), 0);
741        assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
742        assert_eq!(
743            admin.operations.lock().unwrap().clone(),
744            vec!["exists:reset_db".to_string(), "create:reset_db".to_string(),]
745        );
746    }
747
748    #[tokio::test]
749    async fn database_exists_propagates_backend_error() {
750        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
751            .with_database_exists_results(vec![Err("lookup failed".to_string())]);
752        let client = make_client(backend);
753
754        let err = client.database_exists("db").await.unwrap_err();
755        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("lookup failed")));
756    }
757
758    #[tokio::test]
759    async fn create_database_propagates_backend_error() {
760        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
761            .with_create_errors(vec!["create failed"]);
762        let client = make_client(backend);
763
764        let err = client.create_database("db").await.unwrap_err();
765        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("create failed")));
766    }
767
768    #[tokio::test]
769    async fn delete_database_propagates_backend_error() {
770        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
771            .with_delete_errors(vec!["delete failed"]);
772        let client = make_client(backend);
773
774        let err = client.delete_database("db").await.unwrap_err();
775        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("delete failed")));
776    }
777
778    #[tokio::test]
779    async fn reset_database_propagates_lookup_error_without_mutating() {
780        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
781            .with_database_exists_results(vec![Err("lookup failed".to_string())]);
782        let admin = backend.database_admin_state();
783        let client = make_client(backend);
784
785        let err = client.reset_database("db").await.unwrap_err();
786        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("lookup failed")));
787        assert_eq!(admin.delete_called.load(Ordering::SeqCst), 0);
788        assert_eq!(admin.create_called.load(Ordering::SeqCst), 0);
789        assert_eq!(
790            admin.operations.lock().unwrap().clone(),
791            vec!["exists:db".to_string()]
792        );
793    }
794
795    #[tokio::test]
796    async fn reset_database_propagates_delete_error_without_create() {
797        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
798            .with_database_exists_results(vec![Ok(true)])
799            .with_delete_errors(vec!["delete failed"]);
800        let admin = backend.database_admin_state();
801        let client = make_client(backend);
802
803        let err = client.reset_database("db").await.unwrap_err();
804        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("delete failed")));
805        assert_eq!(admin.create_called.load(Ordering::SeqCst), 0);
806        assert_eq!(
807            admin.operations.lock().unwrap().clone(),
808            vec!["exists:db".to_string(), "delete:db".to_string()]
809        );
810    }
811
812    // =============================================
813    // QueryExecutor trait impl tests
814    // =============================================
815
816    #[test]
817    fn type_db_client_implements_query_executor() {
818        fn assert_executor<T: QueryExecutor>() {}
819        assert_executor::<TypeDBClient>();
820    }
821
822    #[tokio::test]
823    async fn query_executor_execute_delegates_to_client() {
824        let tx = MockTransaction::new(QueryResultKind::Rows(vec![serde_json::json!({"x": 1})]));
825        let client = make_client(MockBackend::new(tx));
826        let executor: Box<dyn QueryExecutor> = Box::new(client);
827
828        let result = executor
829            .execute("db", "match $x isa thing;", "read")
830            .await
831            .unwrap();
832        assert!(result.is_array());
833        assert_eq!(result.as_array().unwrap().len(), 1);
834    }
835
836    #[test]
837    fn query_executor_is_connected_delegates_to_client() {
838        let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
839        backend.is_open = true;
840        let client = make_client(backend);
841        let executor: Box<dyn QueryExecutor> = Box::new(client);
842        assert!(executor.is_connected());
843    }
844
845    // =============================================
846    // Integration tests (require running TypeDB)
847    // =============================================
848
849    #[tokio::test]
850    #[ignore = "requires running TypeDB server"]
851    #[cfg_attr(coverage_nightly, coverage(off))]
852    async fn integration_connect_invalid_address() {
853        let config = TypeDBSection {
854            address: "localhost:99999".to_string(),
855            database: "test".to_string(),
856            username: "admin".to_string(),
857            password: "password".to_string(),
858            http_port: 8000,
859            server_version: None,
860        };
861        let result = TypeDBClient::connect(&config).await;
862        assert!(result.is_err());
863    }
864
865    /// Live-test target resolved from the environment so the suite can point
866    /// at any disposable TypeDB container instead of a fixed local install.
867    fn live_config() -> TypeDBSection {
868        TypeDBSection {
869            address: std::env::var("TYPEDB_ADDRESS")
870                .unwrap_or_else(|_| "localhost:1729".to_string()),
871            database: std::env::var("TYPEDB_DATABASE").unwrap_or_else(|_| "test".to_string()),
872            username: "admin".to_string(),
873            password: "password".to_string(),
874            http_port: std::env::var("TYPEDB_HTTP_PORT")
875                .ok()
876                .and_then(|port| port.parse().ok())
877                .unwrap_or(8000),
878            server_version: std::env::var("TYPEDB_SERVER_VERSION").ok(),
879        }
880    }
881
882    #[tokio::test]
883    #[ignore = "requires running TypeDB server"]
884    #[cfg_attr(coverage_nightly, coverage(off))]
885    async fn integration_connect_success() {
886        let result = TypeDBClient::connect(&live_config()).await;
887        assert!(result.is_ok());
888        assert!(result.unwrap().is_connected());
889    }
890
891    #[tokio::test]
892    #[ignore = "requires running TypeDB server"]
893    #[cfg_attr(coverage_nightly, coverage(off))]
894    async fn integration_database_admin_roundtrip() {
895        let config = live_config();
896        let client = TypeDBClient::connect(&config)
897            .await
898            .expect("connect failed");
899        let database = format!("type_bridge_server_admin_{}", uuid::Uuid::new_v4().simple());
900
901        if client.database_exists(&database).await.unwrap_or(false) {
902            let _ = client.delete_database(&database).await;
903        }
904
905        assert!(!client.database_exists(&database).await.unwrap());
906        client
907            .create_database(&database)
908            .await
909            .expect("create failed");
910        assert!(client.database_exists(&database).await.unwrap());
911
912        client
913            .reset_database(&database)
914            .await
915            .expect("reset existing database failed");
916        assert!(client.database_exists(&database).await.unwrap());
917
918        client
919            .delete_database(&database)
920            .await
921            .expect("delete failed");
922        assert!(!client.database_exists(&database).await.unwrap());
923
924        client
925            .reset_database(&database)
926            .await
927            .expect("reset absent database failed");
928        assert!(client.database_exists(&database).await.unwrap());
929
930        client
931            .delete_database(&database)
932            .await
933            .expect("cleanup delete failed");
934    }
935
936    #[tokio::test]
937    #[ignore = "requires running TypeDB server"]
938    #[cfg_attr(coverage_nightly, coverage(off))]
939    async fn integration_execute_roundtrip() {
940        let config = live_config();
941        let client = TypeDBClient::connect(&config)
942            .await
943            .expect("connect failed");
944
945        client
946            .execute(&config.database, "define entity smoke_marker;", "schema")
947            .await
948            .expect("schema define failed");
949
950        let rows = client
951            .execute(&config.database, "match entity $t;", "read")
952            .await
953            .expect("read query failed");
954        let rows = rows.as_array().expect("read result must be a JSON array");
955        assert!(
956            !rows.is_empty(),
957            "expected at least the smoke_marker entity type"
958        );
959    }
960}