Skip to main content

graphmind_sdk/
embedded.rs

1//! EmbeddedClient — in-process graph database client
2//!
3//! Uses GraphStore and QueryEngine directly, no network needed.
4
5use async_trait::async_trait;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10use graphmind::graph::GraphStore;
11use graphmind::query::{QueryEngine, RecordBatch, Value};
12
13use crate::client::GraphmindClient;
14use crate::error::{GraphmindError, GraphmindResult};
15use crate::models::{QueryResult, SdkEdge, SdkNode, ServerStatus, StorageStats};
16
17/// In-process client that wraps a GraphStore directly.
18///
19/// No network overhead — queries execute in the same process.
20/// Ideal for examples, tests, and embedded applications.
21pub struct EmbeddedClient {
22    pub(crate) store: Arc<RwLock<GraphStore>>,
23    engine: QueryEngine,
24}
25
26impl EmbeddedClient {
27    /// Create a new EmbeddedClient with a fresh empty graph store
28    pub fn new() -> Self {
29        Self {
30            store: Arc::new(RwLock::new(GraphStore::new())),
31            engine: QueryEngine::new(),
32        }
33    }
34
35    /// Create an EmbeddedClient wrapping an existing store
36    pub fn with_store(store: Arc<RwLock<GraphStore>>) -> Self {
37        Self {
38            store,
39            engine: QueryEngine::new(),
40        }
41    }
42
43    /// Get a reference to the underlying store (for direct graph manipulation)
44    pub fn store(&self) -> &Arc<RwLock<GraphStore>> {
45        &self.store
46    }
47
48    /// Acquire a read lock on the store.
49    ///
50    /// Use for direct read-only access to graph data (node/edge lookups, iterations).
51    pub async fn store_read(&self) -> tokio::sync::RwLockReadGuard<'_, GraphStore> {
52        self.store.read().await
53    }
54
55    /// Acquire a write lock on the store.
56    ///
57    /// Use for direct mutation (create_node, set_property, etc.).
58    pub async fn store_write(&self) -> tokio::sync::RwLockWriteGuard<'_, GraphStore> {
59        self.store.write().await
60    }
61
62    /// Create an NLQ pipeline for natural language → Cypher translation.
63    pub fn nlq_pipeline(
64        &self,
65        config: graphmind::persistence::tenant::NLQConfig,
66    ) -> Result<graphmind::NLQPipeline, graphmind::NLQError> {
67        graphmind::NLQPipeline::new(config)
68    }
69
70    /// Create an agent runtime for agentic enrichment workflows.
71    pub fn agent_runtime(
72        &self,
73        config: graphmind::persistence::tenant::AgentConfig,
74    ) -> graphmind::agent::AgentRuntime {
75        graphmind::agent::AgentRuntime::new(config)
76    }
77
78    /// Create a persistence manager for durable storage.
79    pub fn persistence_manager(
80        &self,
81        base_path: impl AsRef<std::path::Path>,
82    ) -> Result<graphmind::PersistenceManager, graphmind::PersistenceError> {
83        graphmind::PersistenceManager::new(base_path)
84    }
85
86    /// Return AST cache statistics (hits, misses).
87    pub fn cache_stats(&self) -> &graphmind::query::CacheStats {
88        self.engine.cache_stats()
89    }
90
91    /// Export a snapshot of the current graph store to a file.
92    pub async fn export_snapshot(
93        &self,
94        _tenant: &str,
95        path: &std::path::Path,
96    ) -> Result<graphmind::snapshot::format::ExportStats, Box<dyn std::error::Error>> {
97        let store_guard = self.store.read().await;
98        let file = std::fs::File::create(path)?;
99        let writer = std::io::BufWriter::new(file);
100        let stats = graphmind::snapshot::export_tenant(&store_guard, writer)?;
101        Ok(stats)
102    }
103
104    /// Import a snapshot into the current graph store from a file.
105    pub async fn import_snapshot(
106        &self,
107        _tenant: &str,
108        path: &std::path::Path,
109    ) -> Result<graphmind::snapshot::format::ImportStats, Box<dyn std::error::Error>> {
110        let mut store_guard = self.store.write().await;
111        let file = std::fs::File::open(path)?;
112        let reader = std::io::BufReader::new(file);
113        let stats = graphmind::snapshot::import_tenant(&mut store_guard, reader)?;
114        Ok(stats)
115    }
116}
117
118impl Default for EmbeddedClient {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124/// Convert a RecordBatch from the query engine into an SDK QueryResult.
125fn record_batch_to_query_result(batch: &RecordBatch, store: &GraphStore) -> QueryResult {
126    let mut nodes_map: HashMap<String, SdkNode> = HashMap::new();
127    let mut edges_map: HashMap<String, SdkEdge> = HashMap::new();
128    let mut records = Vec::new();
129
130    for record in &batch.records {
131        let mut row = Vec::new();
132        for col in &batch.columns {
133            let val = match record.get(col) {
134                Some(v) => v,
135                None => {
136                    row.push(serde_json::Value::Null);
137                    continue;
138                }
139            };
140
141            match val {
142                Value::Node(id, node) => {
143                    let mut properties = serde_json::Map::new();
144                    for (k, v) in &node.properties {
145                        properties.insert(k.clone(), v.to_json());
146                    }
147                    let id_str = id.as_u64().to_string();
148                    let labels: Vec<String> =
149                        node.labels.iter().map(|l| l.as_str().to_string()).collect();
150
151                    let node_json = serde_json::json!({
152                        "id": id_str,
153                        "labels": labels,
154                        "properties": properties,
155                    });
156
157                    nodes_map.entry(id_str.clone()).or_insert_with(|| SdkNode {
158                        id: id_str,
159                        labels,
160                        properties: properties.into_iter().collect(),
161                    });
162
163                    row.push(node_json);
164                }
165                Value::NodeRef(id) => {
166                    let id_str = id.as_u64().to_string();
167                    // Try to resolve from store
168                    let (labels, properties, node_json) = if let Some(node) = store.get_node(*id) {
169                        let mut props = serde_json::Map::new();
170                        for (k, v) in &node.properties {
171                            props.insert(k.clone(), v.to_json());
172                        }
173                        let lbls: Vec<String> =
174                            node.labels.iter().map(|l| l.as_str().to_string()).collect();
175                        let json = serde_json::json!({
176                            "id": id_str,
177                            "labels": lbls,
178                            "properties": props,
179                        });
180                        (lbls, props.into_iter().collect(), json)
181                    } else {
182                        let json =
183                            serde_json::json!({ "id": id_str, "labels": [], "properties": {} });
184                        (vec![], HashMap::new(), json)
185                    };
186
187                    nodes_map.entry(id_str.clone()).or_insert_with(|| SdkNode {
188                        id: id_str,
189                        labels,
190                        properties,
191                    });
192
193                    row.push(node_json);
194                }
195                Value::Edge(id, edge) => {
196                    let mut properties = serde_json::Map::new();
197                    for (k, v) in &edge.properties {
198                        properties.insert(k.clone(), v.to_json());
199                    }
200                    let id_str = id.as_u64().to_string();
201                    let edge_json = serde_json::json!({
202                        "id": id_str,
203                        "source": edge.source.as_u64().to_string(),
204                        "target": edge.target.as_u64().to_string(),
205                        "type": edge.edge_type.as_str(),
206                        "properties": properties,
207                    });
208
209                    edges_map.entry(id_str.clone()).or_insert_with(|| SdkEdge {
210                        id: id_str,
211                        source: edge.source.as_u64().to_string(),
212                        target: edge.target.as_u64().to_string(),
213                        edge_type: edge.edge_type.as_str().to_string(),
214                        properties: properties.into_iter().collect(),
215                    });
216
217                    row.push(edge_json);
218                }
219                Value::EdgeRef(id, src, tgt, et) => {
220                    let id_str = id.as_u64().to_string();
221                    let edge_json = serde_json::json!({
222                        "id": id_str,
223                        "source": src.as_u64().to_string(),
224                        "target": tgt.as_u64().to_string(),
225                        "type": et.as_str(),
226                        "properties": {},
227                    });
228
229                    edges_map.entry(id_str.clone()).or_insert_with(|| SdkEdge {
230                        id: id_str,
231                        source: src.as_u64().to_string(),
232                        target: tgt.as_u64().to_string(),
233                        edge_type: et.as_str().to_string(),
234                        properties: HashMap::new(),
235                    });
236
237                    row.push(edge_json);
238                }
239                Value::Property(p) => {
240                    row.push(p.to_json());
241                }
242                Value::Path {
243                    nodes: path_nodes,
244                    edges: path_edges,
245                } => {
246                    row.push(serde_json::json!({
247                        "nodes": path_nodes.iter().map(|n| n.as_u64().to_string()).collect::<Vec<_>>(),
248                        "edges": path_edges.iter().map(|e| e.as_u64().to_string()).collect::<Vec<_>>(),
249                        "length": path_edges.len(),
250                    }));
251                }
252                Value::Null => {
253                    row.push(serde_json::Value::Null);
254                }
255            }
256        }
257        records.push(row);
258    }
259
260    QueryResult {
261        nodes: nodes_map.into_values().collect(),
262        edges: edges_map.into_values().collect(),
263        columns: batch.columns.clone(),
264        records,
265    }
266}
267
268fn is_write_query(cypher: &str) -> bool {
269    let upper = cypher.trim().to_uppercase();
270    upper.starts_with("CREATE")
271        || upper.starts_with("DELETE")
272        || upper.starts_with("DETACH")
273        || upper.starts_with("SET")
274        || upper.starts_with("MERGE")
275        || upper.starts_with("CALL")
276        || upper.contains(" CREATE ")
277        || upper.contains(" DELETE ")
278        || upper.contains(" SET ")
279        || upper.contains(" MERGE ")
280        || upper.contains(" CALL ")
281        || upper.contains(" REMOVE ")
282}
283
284#[async_trait]
285impl GraphmindClient for EmbeddedClient {
286    async fn query(&self, graph: &str, cypher: &str) -> GraphmindResult<QueryResult> {
287        if is_write_query(cypher) {
288            let mut store_guard = self.store.write().await;
289            let batch = self
290                .engine
291                .execute_mut(cypher, &mut *store_guard, graph)
292                .map_err(|e| GraphmindError::QueryError(e.to_string()))?;
293            Ok(record_batch_to_query_result(&batch, &*store_guard))
294        } else {
295            let store_guard = self.store.read().await;
296            let batch = self
297                .engine
298                .execute(cypher, &*store_guard)
299                .map_err(|e| GraphmindError::QueryError(e.to_string()))?;
300            Ok(record_batch_to_query_result(&batch, &*store_guard))
301        }
302    }
303
304    async fn query_with_params(
305        &self,
306        graph: &str,
307        cypher: &str,
308        params: HashMap<String, serde_json::Value>,
309    ) -> GraphmindResult<QueryResult> {
310        // Convert JSON params to PropertyValue
311        let pv_params: HashMap<String, graphmind::graph::PropertyValue> = params
312            .into_iter()
313            .map(|(k, v)| {
314                let pv = match v {
315                    serde_json::Value::Null => graphmind::graph::PropertyValue::Null,
316                    serde_json::Value::Bool(b) => graphmind::graph::PropertyValue::Boolean(b),
317                    serde_json::Value::Number(n) => {
318                        if let Some(i) = n.as_i64() {
319                            graphmind::graph::PropertyValue::Integer(i)
320                        } else {
321                            graphmind::graph::PropertyValue::Float(n.as_f64().unwrap_or(0.0))
322                        }
323                    }
324                    serde_json::Value::String(s) => graphmind::graph::PropertyValue::String(s),
325                    _ => graphmind::graph::PropertyValue::String(v.to_string()),
326                };
327                (k, pv)
328            })
329            .collect();
330
331        if is_write_query(cypher) {
332            let mut store_guard = self.store.write().await;
333            let batch = self
334                .engine
335                .execute_mut_with_params(cypher, &mut *store_guard, graph, &pv_params)
336                .map_err(|e| GraphmindError::QueryError(e.to_string()))?;
337            Ok(record_batch_to_query_result(&batch, &*store_guard))
338        } else {
339            let store_guard = self.store.read().await;
340            let batch = self
341                .engine
342                .execute_with_params(cypher, &*store_guard, &pv_params)
343                .map_err(|e| GraphmindError::QueryError(e.to_string()))?;
344            Ok(record_batch_to_query_result(&batch, &*store_guard))
345        }
346    }
347
348    async fn query_readonly(&self, _graph: &str, cypher: &str) -> GraphmindResult<QueryResult> {
349        let store_guard = self.store.read().await;
350        let batch = self
351            .engine
352            .execute(cypher, &*store_guard)
353            .map_err(|e| GraphmindError::QueryError(e.to_string()))?;
354        Ok(record_batch_to_query_result(&batch, &*store_guard))
355    }
356
357    async fn delete_graph(&self, _graph: &str) -> GraphmindResult<()> {
358        let mut store_guard = self.store.write().await;
359        store_guard.clear();
360        Ok(())
361    }
362
363    async fn list_graphs(&self) -> GraphmindResult<Vec<String>> {
364        Ok(vec!["default".to_string()])
365    }
366
367    async fn status(&self, _graph: &str) -> GraphmindResult<ServerStatus> {
368        let store_guard = self.store.read().await;
369        Ok(ServerStatus {
370            status: "healthy".to_string(),
371            version: graphmind::VERSION.to_string(),
372            storage: StorageStats {
373                nodes: store_guard.node_count() as u64,
374                edges: store_guard.edge_count() as u64,
375            },
376        })
377    }
378
379    async fn ping(&self) -> GraphmindResult<String> {
380        Ok("PONG".to_string())
381    }
382
383    async fn schema(&self, _graph: &str) -> GraphmindResult<String> {
384        let store_guard = self.store.read().await;
385        let mut lines = Vec::new();
386        lines.push(format!("Nodes: {}", store_guard.node_count()));
387        lines.push(format!("Edges: {}", store_guard.edge_count()));
388
389        // Collect label counts
390        let mut label_counts: HashMap<String, usize> = HashMap::new();
391        for node in store_guard.all_nodes() {
392            for label in &node.labels {
393                *label_counts.entry(label.as_str().to_string()).or_insert(0) += 1;
394            }
395        }
396        if !label_counts.is_empty() {
397            lines.push("Node labels:".to_string());
398            for (label, count) in &label_counts {
399                lines.push(format!("  :{} ({})", label, count));
400            }
401        }
402
403        // Collect edge type counts
404        let mut edge_type_counts: HashMap<String, usize> = HashMap::new();
405        for edge in store_guard.all_edges() {
406            *edge_type_counts
407                .entry(edge.edge_type.as_str().to_string())
408                .or_insert(0) += 1;
409        }
410        if !edge_type_counts.is_empty() {
411            lines.push("Edge types:".to_string());
412            for (et, count) in &edge_type_counts {
413                lines.push(format!("  :{} ({})", et, count));
414            }
415        }
416
417        Ok(lines.join("\n"))
418    }
419
420    async fn explain(&self, _graph: &str, cypher: &str) -> GraphmindResult<QueryResult> {
421        let prefixed = if cypher.trim().to_uppercase().starts_with("EXPLAIN") {
422            cypher.to_string()
423        } else {
424            format!("EXPLAIN {}", cypher)
425        };
426        let store_guard = self.store.read().await;
427        let batch = self
428            .engine
429            .execute(&prefixed, &*store_guard)
430            .map_err(|e| GraphmindError::QueryError(e.to_string()))?;
431        Ok(record_batch_to_query_result(&batch, &*store_guard))
432    }
433
434    async fn profile(&self, _graph: &str, cypher: &str) -> GraphmindResult<QueryResult> {
435        let prefixed = if cypher.trim().to_uppercase().starts_with("PROFILE") {
436            cypher.to_string()
437        } else {
438            format!("PROFILE {}", cypher)
439        };
440        let store_guard = self.store.read().await;
441        let batch = self
442            .engine
443            .execute(&prefixed, &*store_guard)
444            .map_err(|e| GraphmindError::QueryError(e.to_string()))?;
445        Ok(record_batch_to_query_result(&batch, &*store_guard))
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452
453    #[tokio::test]
454    async fn test_embedded_ping() {
455        let client = EmbeddedClient::new();
456        let result = client.ping().await.unwrap();
457        assert_eq!(result, "PONG");
458    }
459
460    #[tokio::test]
461    async fn test_embedded_status() {
462        let client = EmbeddedClient::new();
463        let status = client.status("default").await.unwrap();
464        assert_eq!(status.status, "healthy");
465        assert_eq!(status.storage.nodes, 0);
466    }
467
468    #[tokio::test]
469    async fn test_embedded_create_and_query() {
470        let client = EmbeddedClient::new();
471
472        // Create nodes
473        client
474            .query("default", r#"CREATE (n:Person {name: "Alice", age: 30})"#)
475            .await
476            .unwrap();
477        client
478            .query("default", r#"CREATE (n:Person {name: "Bob", age: 25})"#)
479            .await
480            .unwrap();
481
482        // Query
483        let result = client
484            .query_readonly("default", "MATCH (n:Person) RETURN n.name, n.age")
485            .await
486            .unwrap();
487        assert_eq!(result.columns.len(), 2);
488        assert_eq!(result.records.len(), 2);
489
490        // Status should reflect 2 nodes
491        let status = client.status("default").await.unwrap();
492        assert_eq!(status.storage.nodes, 2);
493    }
494
495    #[tokio::test]
496    async fn test_embedded_delete_graph() {
497        let client = EmbeddedClient::new();
498
499        client
500            .query("default", r#"CREATE (n:Person {name: "Alice"})"#)
501            .await
502            .unwrap();
503
504        let status = client.status("default").await.unwrap();
505        assert_eq!(status.storage.nodes, 1);
506
507        client.delete_graph("default").await.unwrap();
508
509        let status = client.status("default").await.unwrap();
510        assert_eq!(status.storage.nodes, 0);
511    }
512
513    #[tokio::test]
514    async fn test_embedded_list_graphs() {
515        let client = EmbeddedClient::new();
516        let graphs = client.list_graphs().await.unwrap();
517        assert_eq!(graphs, vec!["default"]);
518    }
519
520    #[tokio::test]
521    async fn test_embedded_query_with_edges() {
522        let client = EmbeddedClient::new();
523
524        client
525            .query(
526                "default",
527                r#"CREATE (a:Person {name: "Alice"})-[:KNOWS]->(b:Person {name: "Bob"})"#,
528            )
529            .await
530            .unwrap();
531
532        let result = client
533            .query_readonly(
534                "default",
535                "MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name, b.name",
536            )
537            .await
538            .unwrap();
539
540        assert_eq!(result.records.len(), 1);
541    }
542
543    #[tokio::test]
544    async fn test_embedded_with_existing_store() {
545        let mut store = GraphStore::new();
546        let alice = store.create_node("Person");
547        if let Some(node) = store.get_node_mut(alice) {
548            node.set_property("name", "Alice");
549        }
550
551        let store = Arc::new(RwLock::new(store));
552        let client = EmbeddedClient::with_store(store);
553
554        let result = client
555            .query_readonly("default", "MATCH (n:Person) RETURN n.name")
556            .await
557            .unwrap();
558        assert_eq!(result.records.len(), 1);
559    }
560
561    // ========== Additional Embedded Client Coverage Tests ==========
562
563    #[test]
564    fn test_embedded_default() {
565        let client = EmbeddedClient::default();
566        // Default should produce a valid, empty client
567        let store = client.store();
568        assert!(Arc::strong_count(store) >= 1);
569    }
570
571    #[tokio::test]
572    async fn test_embedded_store_read() {
573        let client = EmbeddedClient::new();
574        client
575            .query("default", r#"CREATE (n:Person {name: "Alice"})"#)
576            .await
577            .unwrap();
578
579        let guard = client.store_read().await;
580        assert_eq!(guard.node_count(), 1);
581    }
582
583    #[tokio::test]
584    async fn test_embedded_store_write() {
585        let client = EmbeddedClient::new();
586        {
587            let mut guard = client.store_write().await;
588            let id = guard.create_node("Person");
589            if let Some(node) = guard.get_node_mut(id) {
590                node.set_property("name", "DirectWrite");
591            }
592        }
593
594        let result = client
595            .query_readonly("default", "MATCH (n:Person) RETURN n.name")
596            .await
597            .unwrap();
598        assert_eq!(result.records.len(), 1);
599    }
600
601    #[tokio::test]
602    async fn test_embedded_cache_stats() {
603        let client = EmbeddedClient::new();
604        let stats = client.cache_stats();
605        // Initially no queries, so hits should be 0
606        assert_eq!(stats.hits(), 0);
607    }
608
609    #[tokio::test]
610    async fn test_embedded_cache_stats_after_queries() {
611        let client = EmbeddedClient::new();
612        client
613            .query("default", r#"CREATE (n:Person {name: "Alice"})"#)
614            .await
615            .unwrap();
616        // Same query twice should potentially hit cache
617        client
618            .query_readonly("default", "MATCH (n:Person) RETURN n.name")
619            .await
620            .unwrap();
621        client
622            .query_readonly("default", "MATCH (n:Person) RETURN n.name")
623            .await
624            .unwrap();
625
626        let stats = client.cache_stats();
627        // At least one miss for the first time, then a hit
628        assert!(stats.hits() + stats.misses() >= 2);
629    }
630
631    #[tokio::test]
632    async fn test_embedded_query_readonly_error() {
633        let client = EmbeddedClient::new();
634        // Invalid Cypher syntax should produce an error
635        let result = client.query_readonly("default", "INVALID SYNTAX !!!").await;
636        assert!(result.is_err());
637    }
638
639    #[tokio::test]
640    async fn test_embedded_query_write_error() {
641        let client = EmbeddedClient::new();
642        // Invalid write query should produce an error
643        let result = client.query("default", "CREATE INVALID").await;
644        assert!(result.is_err());
645    }
646
647    #[tokio::test]
648    async fn test_embedded_version_in_status() {
649        let client = EmbeddedClient::new();
650        let status = client.status("default").await.unwrap();
651        // Version should be non-empty
652        assert!(!status.version.is_empty());
653    }
654
655    #[tokio::test]
656    async fn test_embedded_query_returns_nodes() {
657        let client = EmbeddedClient::new();
658        client
659            .query("default", r#"CREATE (n:Person {name: "Alice", age: 30})"#)
660            .await
661            .unwrap();
662
663        let result = client
664            .query_readonly("default", "MATCH (n:Person) RETURN n")
665            .await
666            .unwrap();
667        assert_eq!(result.records.len(), 1);
668        assert!(!result.nodes.is_empty());
669        // Check that the node has properties
670        let node = &result.nodes[0];
671        assert!(node.labels.contains(&"Person".to_string()));
672    }
673
674    #[tokio::test]
675    async fn test_embedded_query_returns_edges() {
676        let client = EmbeddedClient::new();
677        client.query("default",
678            r#"CREATE (a:Person {name: "Alice"})-[:KNOWS {since: 2020}]->(b:Person {name: "Bob"})"#
679        ).await.unwrap();
680
681        let result = client
682            .query_readonly("default", "MATCH (a)-[r:KNOWS]->(b) RETURN r")
683            .await
684            .unwrap();
685        assert_eq!(result.records.len(), 1);
686        assert!(!result.edges.is_empty());
687        let edge = &result.edges[0];
688        assert_eq!(edge.edge_type, "KNOWS");
689    }
690
691    #[tokio::test]
692    async fn test_embedded_query_returns_null() {
693        let client = EmbeddedClient::new();
694        client
695            .query("default", r#"CREATE (n:Person {name: "Alice"})"#)
696            .await
697            .unwrap();
698
699        // Query for a property that does not exist
700        let result = client
701            .query_readonly("default", "MATCH (n:Person) RETURN n.missing")
702            .await
703            .unwrap();
704        assert_eq!(result.records.len(), 1);
705        // The value should be JSON null
706        assert_eq!(result.records[0][0], serde_json::Value::Null);
707    }
708
709    #[tokio::test]
710    async fn test_embedded_multiple_writes_and_reads() {
711        let client = EmbeddedClient::new();
712
713        for i in 0..5 {
714            client
715                .query("default", &format!(r#"CREATE (n:Item {{id: {}}})"#, i))
716                .await
717                .unwrap();
718        }
719
720        let result = client
721            .query_readonly("default", "MATCH (n:Item) RETURN n.id")
722            .await
723            .unwrap();
724        assert_eq!(result.records.len(), 5);
725    }
726
727    #[tokio::test]
728    async fn test_embedded_delete_graph_and_recreate() {
729        let client = EmbeddedClient::new();
730
731        client
732            .query("default", r#"CREATE (n:Person {name: "Alice"})"#)
733            .await
734            .unwrap();
735        assert_eq!(client.status("default").await.unwrap().storage.nodes, 1);
736
737        client.delete_graph("default").await.unwrap();
738        assert_eq!(client.status("default").await.unwrap().storage.nodes, 0);
739
740        // Recreate
741        client
742            .query("default", r#"CREATE (n:Person {name: "Bob"})"#)
743            .await
744            .unwrap();
745        assert_eq!(client.status("default").await.unwrap().storage.nodes, 1);
746    }
747
748    #[tokio::test]
749    async fn test_embedded_with_store_shares_state() {
750        let store = Arc::new(RwLock::new(GraphStore::new()));
751        let client = EmbeddedClient::with_store(Arc::clone(&store));
752
753        client
754            .query("default", r#"CREATE (n:Person {name: "Alice"})"#)
755            .await
756            .unwrap();
757
758        // Store should reflect the changes made via client
759        let guard = store.read().await;
760        assert_eq!(guard.node_count(), 1);
761    }
762
763    #[test]
764    fn test_is_write_query_variants() {
765        assert!(is_write_query("CREATE (n:Person)"));
766        assert!(is_write_query("DELETE n"));
767        assert!(is_write_query("SET n.name = 'x'"));
768        assert!(is_write_query("MERGE (n:Person)"));
769        assert!(is_write_query("CALL db.something()"));
770        assert!(is_write_query("MATCH (n) CREATE (m)"));
771        assert!(is_write_query("MATCH (n) DELETE n"));
772        assert!(is_write_query("MATCH (n) SET n.x = 1"));
773        assert!(is_write_query("MATCH (n) MERGE (m)"));
774        assert!(is_write_query("MATCH (n) CALL db.x()"));
775
776        assert!(!is_write_query("MATCH (n) RETURN n"));
777        assert!(!is_write_query("MATCH (n:Person) RETURN n.name"));
778        assert!(!is_write_query("RETURN 1 + 2"));
779    }
780
781    #[tokio::test]
782    async fn test_embedded_query_property_values() {
783        let client = EmbeddedClient::new();
784        client
785            .query(
786                "default",
787                r#"CREATE (n:Person {name: "Alice", age: 30, score: 95.5, active: true})"#,
788            )
789            .await
790            .unwrap();
791
792        let result = client
793            .query_readonly(
794                "default",
795                "MATCH (n:Person) RETURN n.name, n.age, n.score, n.active",
796            )
797            .await
798            .unwrap();
799        assert_eq!(result.records.len(), 1);
800        assert_eq!(result.columns.len(), 4);
801    }
802
803    #[tokio::test]
804    async fn test_embedded_store_accessor() {
805        let client = EmbeddedClient::new();
806        let store_ref = client.store();
807        // Should be able to clone the Arc
808        let _cloned = Arc::clone(store_ref);
809        assert!(Arc::strong_count(store_ref) >= 2);
810    }
811}