Skip to main content

type_bridge_server/typedb/
client.rs

1use super::backend::{DriverBackend, QueryResultKind, TransactionType};
2use super::real_driver::RealTypeDBBackend;
3use crate::config::TypeDBSection;
4use crate::error::PipelineError;
5use crate::executor::QueryExecutor;
6
7/// Wrapper around the TypeDB Rust driver providing a clean async API
8/// for query execution and schema retrieval.
9pub struct TypeDBClient {
10    backend: Box<dyn DriverBackend>,
11}
12
13impl TypeDBClient {
14    /// Connect to a TypeDB server using the provided configuration.
15    #[cfg_attr(coverage_nightly, coverage(off))]
16    pub async fn connect(config: &TypeDBSection) -> Result<Self, PipelineError> {
17        let backend = RealTypeDBBackend::connect(config).await?;
18        Ok(Self {
19            backend: Box::new(backend),
20        })
21    }
22
23    /// Create a TypeDBClient with a custom backend (for testing).
24    #[cfg(test)]
25    pub(crate) fn with_backend(backend: Box<dyn DriverBackend>) -> Self {
26        Self { backend }
27    }
28
29    /// Execute a TypeQL query and return results as JSON.
30    ///
31    /// For read transactions, the transaction is used directly.
32    /// For write and schema transactions, the transaction is committed after execution.
33    pub async fn execute(
34        &self,
35        database: &str,
36        typeql: &str,
37        tx_type: &str,
38    ) -> Result<serde_json::Value, PipelineError> {
39        let transaction_type = parse_transaction_type(tx_type)?;
40
41        let mut tx = self
42            .backend
43            .open_transaction(database, transaction_type)
44            .await?;
45
46        let answer = tx.query(typeql).await?;
47
48        let needs_commit = matches!(
49            transaction_type,
50            TransactionType::Write | TransactionType::Schema
51        );
52
53        let results = match answer {
54            QueryResultKind::Ok => {
55                if needs_commit {
56                    tx.commit().await?;
57                }
58                serde_json::json!({ "ok": true })
59            }
60            QueryResultKind::Rows(rows) => {
61                if needs_commit {
62                    tx.commit().await?;
63                }
64                serde_json::Value::Array(rows)
65            }
66            QueryResultKind::Documents(docs) => {
67                if needs_commit {
68                    let _ = tx.commit().await;
69                }
70                serde_json::Value::Array(docs)
71            }
72        };
73
74        Ok(results)
75    }
76
77    /// Return whether a TypeDB database exists on the connected server.
78    pub async fn database_exists(&self, database: &str) -> Result<bool, PipelineError> {
79        self.backend.database_exists(database).await
80    }
81
82    /// Create a TypeDB database.
83    pub async fn create_database(&self, database: &str) -> Result<(), PipelineError> {
84        self.backend.create_database(database).await
85    }
86
87    /// Delete a TypeDB database.
88    pub async fn delete_database(&self, database: &str) -> Result<(), PipelineError> {
89        self.backend.delete_database(database).await
90    }
91
92    /// Delete a TypeDB database if it exists, then create it.
93    pub async fn reset_database(&self, database: &str) -> Result<(), PipelineError> {
94        if self.database_exists(database).await? {
95            self.delete_database(database).await?;
96        }
97        self.create_database(database).await
98    }
99
100    /// Check if the driver connection is open.
101    pub fn is_connected(&self) -> bool {
102        self.backend.is_open()
103    }
104}
105
106impl QueryExecutor for TypeDBClient {
107    fn execute<'a>(
108        &'a self,
109        database: &'a str,
110        typeql: &'a str,
111        transaction_type: &'a str,
112    ) -> std::pin::Pin<
113        Box<dyn std::future::Future<Output = Result<serde_json::Value, PipelineError>> + Send + 'a>,
114    > {
115        Box::pin(async move { self.execute(database, typeql, transaction_type).await })
116    }
117
118    fn is_connected(&self) -> bool {
119        self.is_connected()
120    }
121}
122
123/// Parse a transaction type string into a TypeDB TransactionType.
124pub(crate) fn parse_transaction_type(tx_type: &str) -> Result<TransactionType, PipelineError> {
125    match tx_type {
126        "read" => Ok(TransactionType::Read),
127        "write" => Ok(TransactionType::Write),
128        "schema" => Ok(TransactionType::Schema),
129        other => Err(PipelineError::QueryExecution(format!(
130            "Unknown transaction type: {other}"
131        ))),
132    }
133}
134
135/// Convert a band-7 TypeDB concept to a JSON value.
136///
137/// Output shape is identical to [`concept_to_json_b8`] for all common concepts.
138#[cfg(feature = "band7")]
139pub(crate) fn concept_to_json_b7(
140    concept: &type_bridge_typedb_driver_b7::concept::Concept,
141) -> serde_json::Value {
142    let mut obj = serde_json::Map::new();
143    obj.insert(
144        "category".to_string(),
145        serde_json::Value::String(concept.get_category().name().to_string()),
146    );
147    obj.insert(
148        "label".to_string(),
149        serde_json::Value::String(concept.get_label().to_string()),
150    );
151    if let Some(iid) = concept.try_get_iid() {
152        obj.insert(
153            "iid".to_string(),
154            serde_json::Value::String(iid.to_string()),
155        );
156    }
157    if let Some(value) = concept.try_get_value() {
158        obj.insert("value".to_string(), value_to_json_b7(value));
159    }
160    if let Some(value_type) = concept.try_get_value_type() {
161        obj.insert(
162            "value_type".to_string(),
163            serde_json::Value::String(value_type.name().to_string()),
164        );
165    }
166    serde_json::Value::Object(obj)
167}
168
169/// Convert a band-7 TypeDB value to a JSON value.
170#[cfg(feature = "band7")]
171#[cfg_attr(coverage_nightly, coverage(off))]
172fn value_to_json_b7(value: &type_bridge_typedb_driver_b7::concept::Value) -> serde_json::Value {
173    if let Some(b) = value.get_boolean() {
174        return serde_json::Value::Bool(b);
175    }
176    if let Some(i) = value.get_integer() {
177        return serde_json::json!(i);
178    }
179    if let Some(d) = value.get_double() {
180        return serde_json::json!(d);
181    }
182    if let Some(s) = value.get_string() {
183        return serde_json::Value::String(s.to_string());
184    }
185    if let Some(date) = value.get_date() {
186        return serde_json::Value::String(date.to_string());
187    }
188    if let Some(dt) = value.get_datetime() {
189        return serde_json::Value::String(dt.to_string());
190    }
191    if let Some(dt_tz) = value.get_datetime_tz() {
192        return serde_json::Value::String(dt_tz.to_string());
193    }
194    if let Some(dec) = value.get_decimal() {
195        return serde_json::Value::String(dec.to_string());
196    }
197    if let Some(dur) = value.get_duration() {
198        return serde_json::Value::String(dur.to_string());
199    }
200    serde_json::Value::String(value.to_string())
201}
202
203/// Convert a band-8 TypeDB concept to a JSON value.
204///
205/// Output shape is identical to [`concept_to_json_b7`] for all common concepts.
206#[cfg(feature = "band8")]
207pub(crate) fn concept_to_json_b8(concept: &typedb_driver::concept::Concept) -> serde_json::Value {
208    let mut obj = serde_json::Map::new();
209    obj.insert(
210        "category".to_string(),
211        serde_json::Value::String(concept.get_category().name().to_string()),
212    );
213    obj.insert(
214        "label".to_string(),
215        serde_json::Value::String(concept.get_label().to_string()),
216    );
217    if let Some(iid) = concept.try_get_iid() {
218        obj.insert(
219            "iid".to_string(),
220            serde_json::Value::String(iid.to_string()),
221        );
222    }
223    if let Some(value) = concept.try_get_value() {
224        obj.insert("value".to_string(), value_to_json_b8(value));
225    }
226    if let Some(value_type) = concept.try_get_value_type() {
227        obj.insert(
228            "value_type".to_string(),
229            serde_json::Value::String(value_type.name().to_string()),
230        );
231    }
232    serde_json::Value::Object(obj)
233}
234
235/// Convert a band-8 TypeDB value to a JSON value.
236#[cfg(feature = "band8")]
237#[cfg_attr(coverage_nightly, coverage(off))]
238pub(crate) fn value_to_json_b8(value: &typedb_driver::concept::Value) -> serde_json::Value {
239    if let Some(b) = value.get_boolean() {
240        return serde_json::Value::Bool(b);
241    }
242    if let Some(i) = value.get_integer() {
243        return serde_json::json!(i);
244    }
245    if let Some(d) = value.get_double() {
246        return serde_json::json!(d);
247    }
248    if let Some(s) = value.get_string() {
249        return serde_json::Value::String(s.to_string());
250    }
251    if let Some(date) = value.get_date() {
252        return serde_json::Value::String(date.to_string());
253    }
254    if let Some(dt) = value.get_datetime() {
255        return serde_json::Value::String(dt.to_string());
256    }
257    if let Some(dt_tz) = value.get_datetime_tz() {
258        return serde_json::Value::String(dt_tz.to_string());
259    }
260    if let Some(dec) = value.get_decimal() {
261        return serde_json::Value::String(dec.to_string());
262    }
263    if let Some(dur) = value.get_duration() {
264        return serde_json::Value::String(dur.to_string());
265    }
266    // Fallback: use Display representation
267    serde_json::Value::String(value.to_string())
268}
269
270#[cfg(test)]
271#[cfg(feature = "band8")]
272#[cfg_attr(coverage_nightly, coverage(off))]
273mod tests {
274    use std::collections::VecDeque;
275    use std::future::Future;
276    use std::pin::Pin;
277    use std::sync::Arc;
278    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
279
280    use typedb_driver::IID;
281    use typedb_driver::concept::value::{Decimal, Duration, TimeZone};
282    use typedb_driver::concept::{
283        Attribute, AttributeType, Concept, Entity, EntityType, Value, ValueType,
284    };
285
286    use super::*;
287    use crate::error::PipelineError;
288    use crate::typedb::backend::TransactionOps;
289
290    // =============================================
291    // Mock infrastructure
292    // =============================================
293
294    struct MockTransaction {
295        query_result: Option<QueryResultKind>,
296        query_error: Option<String>,
297        commit_error: Option<String>,
298        committed: Arc<AtomicBool>,
299        query_called: Arc<AtomicBool>,
300    }
301
302    impl MockTransaction {
303        fn new(result: QueryResultKind) -> Self {
304            Self {
305                query_result: Some(result),
306                query_error: None,
307                commit_error: None,
308                committed: Arc::new(AtomicBool::new(false)),
309                query_called: Arc::new(AtomicBool::new(false)),
310            }
311        }
312
313        fn failing_query(msg: &str) -> Self {
314            Self {
315                query_result: None,
316                query_error: Some(msg.to_string()),
317                commit_error: None,
318                committed: Arc::new(AtomicBool::new(false)),
319                query_called: Arc::new(AtomicBool::new(false)),
320            }
321        }
322
323        fn with_commit_error(mut self, msg: &str) -> Self {
324            self.commit_error = Some(msg.to_string());
325            self
326        }
327    }
328
329    impl TransactionOps for MockTransaction {
330        fn query(
331            &mut self,
332            _typeql: &str,
333        ) -> Pin<Box<dyn Future<Output = Result<QueryResultKind, PipelineError>> + Send + '_>>
334        {
335            self.query_called.store(true, Ordering::SeqCst);
336            let result = self.query_result.take();
337            let error = self.query_error.take();
338            Box::pin(async move {
339                if let Some(msg) = error {
340                    return Err(PipelineError::QueryExecution(msg));
341                }
342                Ok(result.expect("MockTransaction::query called more than once"))
343            })
344        }
345
346        fn commit(
347            &mut self,
348        ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
349            self.committed.store(true, Ordering::SeqCst);
350            let error = self.commit_error.take();
351            Box::pin(async move {
352                if let Some(msg) = error {
353                    return Err(PipelineError::QueryExecution(msg));
354                }
355                Ok(())
356            })
357        }
358    }
359
360    #[derive(Default)]
361    struct MockDatabaseAdmin {
362        exists_results: std::sync::Mutex<VecDeque<Result<bool, String>>>,
363        create_errors: std::sync::Mutex<VecDeque<String>>,
364        delete_errors: std::sync::Mutex<VecDeque<String>>,
365        exists_called: AtomicUsize,
366        create_called: AtomicUsize,
367        delete_called: AtomicUsize,
368        operations: std::sync::Mutex<Vec<String>>,
369    }
370
371    struct MockBackend {
372        transaction: std::sync::Mutex<Option<MockTransaction>>,
373        open_error: Option<String>,
374        is_open: bool,
375        open_called: Arc<AtomicUsize>,
376        database_admin: Arc<MockDatabaseAdmin>,
377    }
378
379    impl MockBackend {
380        fn new(tx: MockTransaction) -> Self {
381            Self {
382                transaction: std::sync::Mutex::new(Some(tx)),
383                open_error: None,
384                is_open: true,
385                open_called: Arc::new(AtomicUsize::new(0)),
386                database_admin: Arc::new(MockDatabaseAdmin::default()),
387            }
388        }
389
390        fn failing(msg: &str) -> Self {
391            Self {
392                transaction: std::sync::Mutex::new(None),
393                open_error: Some(msg.to_string()),
394                is_open: true,
395                open_called: Arc::new(AtomicUsize::new(0)),
396                database_admin: Arc::new(MockDatabaseAdmin::default()),
397            }
398        }
399
400        fn with_database_exists_results(self, results: Vec<Result<bool, String>>) -> Self {
401            *self.database_admin.exists_results.lock().unwrap() = results.into();
402            self
403        }
404
405        fn with_create_errors(self, errors: Vec<&str>) -> Self {
406            *self.database_admin.create_errors.lock().unwrap() =
407                errors.into_iter().map(str::to_string).collect();
408            self
409        }
410
411        fn with_delete_errors(self, errors: Vec<&str>) -> Self {
412            *self.database_admin.delete_errors.lock().unwrap() =
413                errors.into_iter().map(str::to_string).collect();
414            self
415        }
416
417        fn database_admin_state(&self) -> Arc<MockDatabaseAdmin> {
418            Arc::clone(&self.database_admin)
419        }
420    }
421
422    impl DriverBackend for MockBackend {
423        fn open_transaction(
424            &self,
425            _database: &str,
426            _tx_type: TransactionType,
427        ) -> Pin<Box<dyn Future<Output = Result<Box<dyn TransactionOps>, PipelineError>> + Send + '_>>
428        {
429            self.open_called.fetch_add(1, Ordering::SeqCst);
430            let tx = self.transaction.lock().unwrap().take();
431            let error = self.open_error.clone();
432            Box::pin(async move {
433                if let Some(msg) = error {
434                    return Err(PipelineError::QueryExecution(msg));
435                }
436                Ok(
437                    Box::new(tx.expect("MockBackend: no transaction configured"))
438                        as Box<dyn TransactionOps>,
439                )
440            })
441        }
442
443        fn is_open(&self) -> bool {
444            self.is_open
445        }
446
447        fn database_exists(
448            &self,
449            database: &str,
450        ) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + '_>> {
451            self.database_admin
452                .exists_called
453                .fetch_add(1, Ordering::SeqCst);
454            self.database_admin
455                .operations
456                .lock()
457                .unwrap()
458                .push(format!("exists:{database}"));
459            let result = self
460                .database_admin
461                .exists_results
462                .lock()
463                .unwrap()
464                .pop_front()
465                .unwrap_or(Ok(false));
466            Box::pin(async move { result.map_err(PipelineError::Connection) })
467        }
468
469        fn create_database(
470            &self,
471            database: &str,
472        ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
473            self.database_admin
474                .create_called
475                .fetch_add(1, Ordering::SeqCst);
476            self.database_admin
477                .operations
478                .lock()
479                .unwrap()
480                .push(format!("create:{database}"));
481            let error = self
482                .database_admin
483                .create_errors
484                .lock()
485                .unwrap()
486                .pop_front();
487            Box::pin(async move {
488                if let Some(msg) = error {
489                    return Err(PipelineError::Connection(msg));
490                }
491                Ok(())
492            })
493        }
494
495        fn delete_database(
496            &self,
497            database: &str,
498        ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + '_>> {
499            self.database_admin
500                .delete_called
501                .fetch_add(1, Ordering::SeqCst);
502            self.database_admin
503                .operations
504                .lock()
505                .unwrap()
506                .push(format!("delete:{database}"));
507            let error = self
508                .database_admin
509                .delete_errors
510                .lock()
511                .unwrap()
512                .pop_front();
513            Box::pin(async move {
514                if let Some(msg) = error {
515                    return Err(PipelineError::Connection(msg));
516                }
517                Ok(())
518            })
519        }
520    }
521
522    fn make_client(backend: MockBackend) -> TypeDBClient {
523        TypeDBClient::with_backend(Box::new(backend))
524    }
525
526    // =============================================
527    // parse_transaction_type tests
528    // =============================================
529
530    #[test]
531    fn parse_transaction_type_read() {
532        let result = parse_transaction_type("read").unwrap();
533        assert_eq!(result, TransactionType::Read);
534    }
535
536    #[test]
537    fn parse_transaction_type_write() {
538        let result = parse_transaction_type("write").unwrap();
539        assert_eq!(result, TransactionType::Write);
540    }
541
542    #[test]
543    fn parse_transaction_type_schema() {
544        let result = parse_transaction_type("schema").unwrap();
545        assert_eq!(result, TransactionType::Schema);
546    }
547
548    #[test]
549    fn parse_transaction_type_unknown() {
550        let result = parse_transaction_type("unknown");
551        let err = result.unwrap_err();
552        assert!(
553            matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("Unknown transaction type: unknown"))
554        );
555    }
556
557    #[test]
558    fn parse_transaction_type_empty() {
559        let result = parse_transaction_type("");
560        assert!(result.is_err());
561    }
562
563    #[test]
564    fn parse_transaction_type_case_sensitive() {
565        let result = parse_transaction_type("Read");
566        assert!(result.is_err());
567    }
568
569    // =============================================
570    // execute tests (via MockBackend)
571    // =============================================
572
573    #[tokio::test]
574    async fn execute_ok_read_no_commit() {
575        let tx = MockTransaction::new(QueryResultKind::Ok);
576        let committed = tx.committed.clone();
577        let client = make_client(MockBackend::new(tx));
578
579        let result = client
580            .execute("db", "match $x isa thing;", "read")
581            .await
582            .unwrap();
583        assert_eq!(result, serde_json::json!({"ok": true}));
584        assert!(!committed.load(Ordering::SeqCst));
585    }
586
587    #[tokio::test]
588    async fn execute_ok_write_commits() {
589        let tx = MockTransaction::new(QueryResultKind::Ok);
590        let committed = tx.committed.clone();
591        let client = make_client(MockBackend::new(tx));
592
593        let result = client
594            .execute("db", "insert $x isa thing;", "write")
595            .await
596            .unwrap();
597        assert_eq!(result, serde_json::json!({"ok": true}));
598        assert!(committed.load(Ordering::SeqCst));
599    }
600
601    #[tokio::test]
602    async fn execute_ok_schema_commits() {
603        let tx = MockTransaction::new(QueryResultKind::Ok);
604        let committed = tx.committed.clone();
605        let client = make_client(MockBackend::new(tx));
606
607        let result = client
608            .execute("db", "define entity thing;", "schema")
609            .await
610            .unwrap();
611        assert_eq!(result, serde_json::json!({"ok": true}));
612        assert!(committed.load(Ordering::SeqCst));
613    }
614
615    #[tokio::test]
616    async fn execute_rows_read_no_commit() {
617        let rows = vec![
618            serde_json::json!({"name": "Alice"}),
619            serde_json::json!({"name": "Bob"}),
620        ];
621        let tx = MockTransaction::new(QueryResultKind::Rows(rows.clone()));
622        let committed = tx.committed.clone();
623        let client = make_client(MockBackend::new(tx));
624
625        let result = client
626            .execute("db", "match $p isa person;", "read")
627            .await
628            .unwrap();
629        assert_eq!(result, serde_json::Value::Array(rows));
630        assert!(!committed.load(Ordering::SeqCst));
631    }
632
633    #[tokio::test]
634    async fn execute_rows_write_commits() {
635        let rows = vec![serde_json::json!({"id": 1})];
636        let tx = MockTransaction::new(QueryResultKind::Rows(rows));
637        let committed = tx.committed.clone();
638        let client = make_client(MockBackend::new(tx));
639
640        let result = client
641            .execute("db", "insert $x isa thing;", "write")
642            .await
643            .unwrap();
644        assert!(result.is_array());
645        assert!(committed.load(Ordering::SeqCst));
646    }
647
648    #[tokio::test]
649    async fn execute_rows_data_preserved() {
650        let rows = vec![
651            serde_json::json!({"name": "Alice", "age": 30}),
652            serde_json::json!({"name": "Bob", "age": 25}),
653        ];
654        let tx = MockTransaction::new(QueryResultKind::Rows(rows.clone()));
655        let client = make_client(MockBackend::new(tx));
656
657        let result = client
658            .execute("db", "match $p isa person;", "read")
659            .await
660            .unwrap();
661        let arr = result.as_array().unwrap();
662        assert_eq!(arr.len(), 2);
663        assert_eq!(arr[0]["name"], "Alice");
664        assert_eq!(arr[1]["age"], 25);
665    }
666
667    #[tokio::test]
668    async fn execute_docs_read_no_commit() {
669        let docs = vec![serde_json::json!({"doc": "data"})];
670        let tx = MockTransaction::new(QueryResultKind::Documents(docs.clone()));
671        let committed = tx.committed.clone();
672        let client = make_client(MockBackend::new(tx));
673
674        let result = client
675            .execute("db", "match $p isa person; fetch {};", "read")
676            .await
677            .unwrap();
678        assert_eq!(result, serde_json::Value::Array(docs));
679        assert!(!committed.load(Ordering::SeqCst));
680    }
681
682    #[tokio::test]
683    async fn execute_docs_write_commits() {
684        let docs = vec![serde_json::json!({"doc": "data"})];
685        let tx = MockTransaction::new(QueryResultKind::Documents(docs));
686        let committed = tx.committed.clone();
687        let client = make_client(MockBackend::new(tx));
688
689        let result = client
690            .execute("db", "insert $x isa thing;", "write")
691            .await
692            .unwrap();
693        assert!(result.is_array());
694        assert!(committed.load(Ordering::SeqCst));
695    }
696
697    #[tokio::test]
698    async fn execute_docs_commit_error_ignored() {
699        let docs = vec![serde_json::json!({"doc": "data"})];
700        let tx = MockTransaction::new(QueryResultKind::Documents(docs.clone()))
701            .with_commit_error("commit failed");
702        let client = make_client(MockBackend::new(tx));
703
704        // Documents + write: commit error is intentionally ignored (let _ = ...)
705        let result = client
706            .execute("db", "insert $x isa thing;", "write")
707            .await
708            .unwrap();
709        assert_eq!(result, serde_json::Value::Array(docs));
710    }
711
712    #[tokio::test]
713    async fn execute_transaction_open_failure() {
714        let client = make_client(MockBackend::failing("connection refused"));
715
716        let result = client.execute("db", "match $x isa thing;", "read").await;
717        let err = result.unwrap_err();
718        assert!(
719            matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("connection refused"))
720        );
721    }
722
723    #[tokio::test]
724    async fn execute_query_failure() {
725        let tx = MockTransaction::failing_query("syntax error");
726        let client = make_client(MockBackend::new(tx));
727
728        let result = client.execute("db", "bad query", "read").await;
729        let err = result.unwrap_err();
730        assert!(matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("syntax error")));
731    }
732
733    #[tokio::test]
734    async fn execute_commit_failure_ok_propagated() {
735        let tx = MockTransaction::new(QueryResultKind::Ok).with_commit_error("commit failed");
736        let client = make_client(MockBackend::new(tx));
737
738        // Ok + write: commit error IS propagated
739        let result = client.execute("db", "insert $x isa thing;", "write").await;
740        let err = result.unwrap_err();
741        assert!(
742            matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("commit failed"))
743        );
744    }
745
746    #[tokio::test]
747    async fn execute_commit_failure_rows_propagated() {
748        let tx =
749            MockTransaction::new(QueryResultKind::Rows(vec![])).with_commit_error("commit failed");
750        let client = make_client(MockBackend::new(tx));
751
752        // Rows + write: commit error IS propagated
753        let result = client.execute("db", "insert $x isa thing;", "write").await;
754        let err = result.unwrap_err();
755        assert!(
756            matches!(&err, PipelineError::QueryExecution(msg) if msg.contains("commit failed"))
757        );
758    }
759
760    #[tokio::test]
761    async fn execute_invalid_transaction_type() {
762        let tx = MockTransaction::new(QueryResultKind::Ok);
763        let backend = MockBackend::new(tx);
764        let open_called = backend.open_called.clone();
765        let client = make_client(backend);
766
767        let result = client.execute("db", "match $x;", "invalid").await;
768        assert!(result.is_err());
769        // Backend should never be called if transaction type is invalid
770        assert_eq!(open_called.load(Ordering::SeqCst), 0);
771    }
772
773    #[test]
774    fn is_connected_delegates_to_backend() {
775        let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
776        backend.is_open = true;
777        let client = make_client(backend);
778        assert!(client.is_connected());
779    }
780
781    #[test]
782    fn is_connected_false_when_backend_closed() {
783        let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
784        backend.is_open = false;
785        let client = make_client(backend);
786        assert!(!client.is_connected());
787    }
788
789    // =============================================
790    // database admin tests (via MockBackend)
791    // =============================================
792
793    #[tokio::test]
794    async fn database_exists_true_delegates_to_backend() {
795        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
796            .with_database_exists_results(vec![Ok(true)]);
797        let admin = backend.database_admin_state();
798        let client = make_client(backend);
799
800        assert!(client.database_exists("admin_db").await.unwrap());
801        assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
802        assert_eq!(
803            admin.operations.lock().unwrap().clone(),
804            vec!["exists:admin_db".to_string()]
805        );
806    }
807
808    #[tokio::test]
809    async fn database_exists_false_delegates_to_backend() {
810        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
811            .with_database_exists_results(vec![Ok(false)]);
812        let admin = backend.database_admin_state();
813        let client = make_client(backend);
814
815        assert!(!client.database_exists("missing_db").await.unwrap());
816        assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
817        assert_eq!(
818            admin.operations.lock().unwrap().clone(),
819            vec!["exists:missing_db".to_string()]
820        );
821    }
822
823    #[tokio::test]
824    async fn create_database_delegates_to_backend() {
825        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
826        let admin = backend.database_admin_state();
827        let client = make_client(backend);
828
829        client.create_database("new_db").await.unwrap();
830        assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
831        assert_eq!(
832            admin.operations.lock().unwrap().clone(),
833            vec!["create:new_db".to_string()]
834        );
835    }
836
837    #[tokio::test]
838    async fn delete_database_delegates_to_backend() {
839        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
840        let admin = backend.database_admin_state();
841        let client = make_client(backend);
842
843        client.delete_database("old_db").await.unwrap();
844        assert_eq!(admin.delete_called.load(Ordering::SeqCst), 1);
845        assert_eq!(
846            admin.operations.lock().unwrap().clone(),
847            vec!["delete:old_db".to_string()]
848        );
849    }
850
851    #[tokio::test]
852    async fn reset_database_deletes_existing_database_before_create() {
853        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
854            .with_database_exists_results(vec![Ok(true)]);
855        let admin = backend.database_admin_state();
856        let client = make_client(backend);
857
858        client.reset_database("reset_db").await.unwrap();
859        assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
860        assert_eq!(admin.delete_called.load(Ordering::SeqCst), 1);
861        assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
862        assert_eq!(
863            admin.operations.lock().unwrap().clone(),
864            vec![
865                "exists:reset_db".to_string(),
866                "delete:reset_db".to_string(),
867                "create:reset_db".to_string(),
868            ]
869        );
870    }
871
872    #[tokio::test]
873    async fn reset_database_creates_when_database_is_absent() {
874        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
875            .with_database_exists_results(vec![Ok(false)]);
876        let admin = backend.database_admin_state();
877        let client = make_client(backend);
878
879        client.reset_database("reset_db").await.unwrap();
880        assert_eq!(admin.exists_called.load(Ordering::SeqCst), 1);
881        assert_eq!(admin.delete_called.load(Ordering::SeqCst), 0);
882        assert_eq!(admin.create_called.load(Ordering::SeqCst), 1);
883        assert_eq!(
884            admin.operations.lock().unwrap().clone(),
885            vec!["exists:reset_db".to_string(), "create:reset_db".to_string(),]
886        );
887    }
888
889    #[tokio::test]
890    async fn database_exists_propagates_backend_error() {
891        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
892            .with_database_exists_results(vec![Err("lookup failed".to_string())]);
893        let client = make_client(backend);
894
895        let err = client.database_exists("db").await.unwrap_err();
896        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("lookup failed")));
897    }
898
899    #[tokio::test]
900    async fn create_database_propagates_backend_error() {
901        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
902            .with_create_errors(vec!["create failed"]);
903        let client = make_client(backend);
904
905        let err = client.create_database("db").await.unwrap_err();
906        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("create failed")));
907    }
908
909    #[tokio::test]
910    async fn delete_database_propagates_backend_error() {
911        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
912            .with_delete_errors(vec!["delete failed"]);
913        let client = make_client(backend);
914
915        let err = client.delete_database("db").await.unwrap_err();
916        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("delete failed")));
917    }
918
919    #[tokio::test]
920    async fn reset_database_propagates_lookup_error_without_mutating() {
921        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
922            .with_database_exists_results(vec![Err("lookup failed".to_string())]);
923        let admin = backend.database_admin_state();
924        let client = make_client(backend);
925
926        let err = client.reset_database("db").await.unwrap_err();
927        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("lookup failed")));
928        assert_eq!(admin.delete_called.load(Ordering::SeqCst), 0);
929        assert_eq!(admin.create_called.load(Ordering::SeqCst), 0);
930        assert_eq!(
931            admin.operations.lock().unwrap().clone(),
932            vec!["exists:db".to_string()]
933        );
934    }
935
936    #[tokio::test]
937    async fn reset_database_propagates_delete_error_without_create() {
938        let backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok))
939            .with_database_exists_results(vec![Ok(true)])
940            .with_delete_errors(vec!["delete failed"]);
941        let admin = backend.database_admin_state();
942        let client = make_client(backend);
943
944        let err = client.reset_database("db").await.unwrap_err();
945        assert!(matches!(&err, PipelineError::Connection(msg) if msg.contains("delete failed")));
946        assert_eq!(admin.create_called.load(Ordering::SeqCst), 0);
947        assert_eq!(
948            admin.operations.lock().unwrap().clone(),
949            vec!["exists:db".to_string(), "delete:db".to_string()]
950        );
951    }
952
953    // =============================================
954    // value_to_json tests
955    // =============================================
956
957    #[test]
958    fn value_to_json_boolean_true() {
959        let value = Value::Boolean(true);
960        let json = value_to_json_b8(&value);
961        assert_eq!(json, serde_json::Value::Bool(true));
962    }
963
964    #[test]
965    fn value_to_json_boolean_false() {
966        let value = Value::Boolean(false);
967        let json = value_to_json_b8(&value);
968        assert_eq!(json, serde_json::Value::Bool(false));
969    }
970
971    #[test]
972    fn value_to_json_integer() {
973        let value = Value::Integer(42);
974        let json = value_to_json_b8(&value);
975        assert_eq!(json, serde_json::json!(42));
976    }
977
978    #[test]
979    fn value_to_json_integer_negative() {
980        let value = Value::Integer(-100);
981        let json = value_to_json_b8(&value);
982        assert_eq!(json, serde_json::json!(-100));
983    }
984
985    #[test]
986    fn value_to_json_double() {
987        let value = Value::Double(3.15);
988        let json = value_to_json_b8(&value);
989        assert_eq!(json, serde_json::json!(3.15));
990    }
991
992    #[test]
993    fn value_to_json_string() {
994        let value = Value::String("hello".to_string());
995        let json = value_to_json_b8(&value);
996        assert_eq!(json, serde_json::Value::String("hello".to_string()));
997    }
998
999    #[test]
1000    fn value_to_json_string_empty() {
1001        let value = Value::String(String::new());
1002        let json = value_to_json_b8(&value);
1003        assert_eq!(json, serde_json::Value::String(String::new()));
1004    }
1005
1006    #[test]
1007    fn value_to_json_date() {
1008        let date = chrono::NaiveDate::from_ymd_opt(2024, 1, 15).unwrap();
1009        let value = Value::Date(date);
1010        let json = value_to_json_b8(&value);
1011        assert_eq!(json, serde_json::Value::String("2024-01-15".to_string()));
1012    }
1013
1014    #[test]
1015    fn value_to_json_datetime() {
1016        let dt = chrono::NaiveDate::from_ymd_opt(2024, 1, 15)
1017            .unwrap()
1018            .and_hms_opt(10, 30, 0)
1019            .unwrap();
1020        let value = Value::Datetime(dt);
1021        let json = value_to_json_b8(&value);
1022        let s = json.as_str().unwrap();
1023        assert!(s.contains("2024-01-15"));
1024    }
1025
1026    #[test]
1027    fn value_to_json_decimal() {
1028        let dec = Decimal::new(42, 0);
1029        let value = Value::Decimal(dec);
1030        let json = value_to_json_b8(&value);
1031        assert!(json.is_string());
1032    }
1033
1034    #[test]
1035    fn value_to_json_duration() {
1036        let dur = Duration::new(1, 2, 3_000_000_000);
1037        let value = Value::Duration(dur);
1038        let json = value_to_json_b8(&value);
1039        assert!(json.is_string());
1040    }
1041
1042    #[test]
1043    fn value_to_json_datetime_tz() {
1044        use chrono::TimeZone as _;
1045        let tz = TimeZone::Fixed(chrono::FixedOffset::east_opt(3600).unwrap());
1046        let dt = tz.with_ymd_and_hms(2024, 6, 15, 12, 30, 0).unwrap();
1047        let value = Value::DatetimeTZ(dt);
1048        let json = value_to_json_b8(&value);
1049        let s = json.as_str().unwrap();
1050        assert!(s.contains("2024"));
1051    }
1052
1053    // =============================================
1054    // concept_to_json tests
1055    // =============================================
1056
1057    #[test]
1058    fn concept_to_json_entity_type() {
1059        let concept = Concept::EntityType(EntityType {
1060            label: "person".to_string(),
1061        });
1062        let json = concept_to_json_b8(&concept);
1063        assert_eq!(json["category"], "EntityType");
1064        assert_eq!(json["label"], "person");
1065        assert!(json.get("iid").is_none());
1066        assert!(json.get("value").is_none());
1067    }
1068
1069    #[test]
1070    fn concept_to_json_attribute_type() {
1071        let concept = Concept::AttributeType(AttributeType {
1072            label: "name".to_string(),
1073            value_type: Some(ValueType::String),
1074        });
1075        let json = concept_to_json_b8(&concept);
1076        assert_eq!(json["category"], "AttributeType");
1077        assert_eq!(json["label"], "name");
1078        assert_eq!(json["value_type"], "string");
1079    }
1080
1081    #[test]
1082    fn concept_to_json_value_boolean() {
1083        let concept = Concept::Value(Value::Boolean(true));
1084        let json = concept_to_json_b8(&concept);
1085        assert_eq!(json["category"], "Value");
1086        assert_eq!(json["value"], true);
1087    }
1088
1089    #[test]
1090    fn concept_to_json_value_integer() {
1091        let concept = Concept::Value(Value::Integer(42));
1092        let json = concept_to_json_b8(&concept);
1093        assert_eq!(json["value"], 42);
1094    }
1095
1096    #[test]
1097    fn concept_to_json_value_string() {
1098        let concept = Concept::Value(Value::String("hello".to_string()));
1099        let json = concept_to_json_b8(&concept);
1100        assert_eq!(json["value"], "hello");
1101    }
1102
1103    #[test]
1104    fn concept_to_json_entity_with_iid() {
1105        let iid: IID = vec![0x01, 0x02, 0x03].into();
1106        let concept = Concept::Entity(Entity {
1107            iid,
1108            type_: Some(EntityType {
1109                label: "person".to_string(),
1110            }),
1111        });
1112        let json = concept_to_json_b8(&concept);
1113        assert_eq!(json["category"], "Entity");
1114        assert_eq!(json["label"], "person");
1115        // IID should be present
1116        let iid_str = json["iid"].as_str().unwrap();
1117        assert!(iid_str.starts_with("0x"));
1118    }
1119
1120    #[test]
1121    fn concept_to_json_attribute_with_value() {
1122        let iid: IID = vec![0xAA, 0xBB].into();
1123        let concept = Concept::Attribute(Attribute {
1124            iid,
1125            value: Value::String("hello".to_string()),
1126            type_: Some(AttributeType {
1127                label: "name".to_string(),
1128                value_type: Some(ValueType::String),
1129            }),
1130        });
1131        let json = concept_to_json_b8(&concept);
1132        assert_eq!(json["category"], "Attribute");
1133        assert_eq!(json["label"], "name");
1134        // Attribute IID is not exposed via try_get_iid()
1135        assert!(json.get("iid").is_none());
1136        assert_eq!(json["value"], "hello");
1137        assert_eq!(json["value_type"], "string");
1138    }
1139
1140    #[test]
1141    fn concept_to_json_attribute_type_without_value_type() {
1142        let concept = Concept::AttributeType(AttributeType {
1143            label: "abstract_attr".to_string(),
1144            value_type: None,
1145        });
1146        let json = concept_to_json_b8(&concept);
1147        assert_eq!(json["label"], "abstract_attr");
1148        assert!(json.get("value_type").is_none());
1149    }
1150
1151    // =============================================
1152    // QueryExecutor trait impl tests
1153    // =============================================
1154
1155    #[test]
1156    fn type_db_client_implements_query_executor() {
1157        fn assert_executor<T: QueryExecutor>() {}
1158        assert_executor::<TypeDBClient>();
1159    }
1160
1161    #[tokio::test]
1162    async fn query_executor_execute_delegates_to_client() {
1163        let tx = MockTransaction::new(QueryResultKind::Rows(vec![serde_json::json!({"x": 1})]));
1164        let client = make_client(MockBackend::new(tx));
1165        let executor: Box<dyn QueryExecutor> = Box::new(client);
1166
1167        let result = executor
1168            .execute("db", "match $x isa thing;", "read")
1169            .await
1170            .unwrap();
1171        assert!(result.is_array());
1172        assert_eq!(result.as_array().unwrap().len(), 1);
1173    }
1174
1175    #[test]
1176    fn query_executor_is_connected_delegates_to_client() {
1177        let mut backend = MockBackend::new(MockTransaction::new(QueryResultKind::Ok));
1178        backend.is_open = true;
1179        let client = make_client(backend);
1180        let executor: Box<dyn QueryExecutor> = Box::new(client);
1181        assert!(executor.is_connected());
1182    }
1183
1184    // =============================================
1185    // Integration tests (require running TypeDB)
1186    // =============================================
1187
1188    #[tokio::test]
1189    #[ignore = "requires running TypeDB server"]
1190    #[cfg_attr(coverage_nightly, coverage(off))]
1191    async fn integration_connect_invalid_address() {
1192        let config = TypeDBSection {
1193            address: "localhost:99999".to_string(),
1194            database: "test".to_string(),
1195            username: "admin".to_string(),
1196            password: "password".to_string(),
1197            http_port: 8000,
1198        };
1199        let result = TypeDBClient::connect(&config).await;
1200        assert!(result.is_err());
1201    }
1202
1203    /// Live-test target resolved from the environment so the suite can point
1204    /// at any disposable TypeDB container instead of a fixed local install.
1205    fn live_config() -> TypeDBSection {
1206        TypeDBSection {
1207            address: std::env::var("TYPEDB_ADDRESS")
1208                .unwrap_or_else(|_| "localhost:1729".to_string()),
1209            database: std::env::var("TYPEDB_DATABASE").unwrap_or_else(|_| "test".to_string()),
1210            username: "admin".to_string(),
1211            password: "password".to_string(),
1212            http_port: std::env::var("TYPEDB_HTTP_PORT")
1213                .ok()
1214                .and_then(|port| port.parse().ok())
1215                .unwrap_or(8000),
1216        }
1217    }
1218
1219    #[tokio::test]
1220    #[ignore = "requires running TypeDB server"]
1221    #[cfg_attr(coverage_nightly, coverage(off))]
1222    async fn integration_connect_success() {
1223        let result = TypeDBClient::connect(&live_config()).await;
1224        assert!(result.is_ok());
1225        assert!(result.unwrap().is_connected());
1226    }
1227
1228    #[tokio::test]
1229    #[ignore = "requires running TypeDB server"]
1230    #[cfg_attr(coverage_nightly, coverage(off))]
1231    async fn integration_database_admin_roundtrip() {
1232        let config = live_config();
1233        let client = TypeDBClient::connect(&config)
1234            .await
1235            .expect("connect failed");
1236        let database = format!("type_bridge_server_admin_{}", uuid::Uuid::new_v4().simple());
1237
1238        if client.database_exists(&database).await.unwrap_or(false) {
1239            let _ = client.delete_database(&database).await;
1240        }
1241
1242        assert!(!client.database_exists(&database).await.unwrap());
1243        client
1244            .create_database(&database)
1245            .await
1246            .expect("create failed");
1247        assert!(client.database_exists(&database).await.unwrap());
1248
1249        client
1250            .reset_database(&database)
1251            .await
1252            .expect("reset existing database failed");
1253        assert!(client.database_exists(&database).await.unwrap());
1254
1255        client
1256            .delete_database(&database)
1257            .await
1258            .expect("delete failed");
1259        assert!(!client.database_exists(&database).await.unwrap());
1260
1261        client
1262            .reset_database(&database)
1263            .await
1264            .expect("reset absent database failed");
1265        assert!(client.database_exists(&database).await.unwrap());
1266
1267        client
1268            .delete_database(&database)
1269            .await
1270            .expect("cleanup delete failed");
1271    }
1272
1273    #[tokio::test]
1274    #[ignore = "requires running TypeDB server"]
1275    #[cfg_attr(coverage_nightly, coverage(off))]
1276    async fn integration_execute_roundtrip() {
1277        let config = live_config();
1278        let client = TypeDBClient::connect(&config)
1279            .await
1280            .expect("connect failed");
1281
1282        client
1283            .execute(&config.database, "define entity smoke_marker;", "schema")
1284            .await
1285            .expect("schema define failed");
1286
1287        let rows = client
1288            .execute(&config.database, "match entity $t;", "read")
1289            .await
1290            .expect("read query failed");
1291        let rows = rows.as_array().expect("read result must be a JSON array");
1292        assert!(
1293            !rows.is_empty(),
1294            "expected at least the smoke_marker entity type"
1295        );
1296    }
1297}