Skip to main content

nervusdb_core/
lib.rs

1//! NervusDB core Rust library providing the low level storage primitives.
2
3pub mod algorithms;
4mod error;
5#[cfg(not(target_arch = "wasm32"))]
6pub mod ffi;
7#[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
8mod fts_index;
9#[cfg(not(target_arch = "wasm32"))]
10pub mod migration;
11pub mod parser;
12pub mod query;
13pub mod storage;
14pub mod triple;
15#[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
16mod vector_index;
17
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20#[cfg(not(target_arch = "wasm32"))]
21use std::sync::Arc;
22
23#[cfg(not(target_arch = "wasm32"))]
24use storage::{Hexastore, open_store};
25
26pub type StringId = u64;
27pub use error::{Error, Result};
28#[cfg(not(target_arch = "wasm32"))]
29use redb::{Database as RedbDatabase, WriteTransaction};
30// Re-export Temporal Store types from nervusdb-temporal crate
31#[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
32pub use nervusdb_temporal::{
33    EnsureEntityOptions, EpisodeInput, EpisodeLinkOptions, EpisodeLinkRecord, FactWriteInput,
34    StoredAlias, StoredEntity, StoredEpisode, StoredFact, TemporalStoreV2 as TemporalStore,
35    TimelineQuery, TimelineRole,
36};
37pub use triple::{Fact, Triple};
38
39/// Database configuration used when opening an instance.
40#[derive(Debug, Clone)]
41pub struct Options {
42    data_path: PathBuf,
43}
44
45impl Options {
46    pub fn new<P: AsRef<Path>>(path: P) -> Self {
47        Self {
48            data_path: path.as_ref().to_owned(),
49        }
50    }
51}
52
53pub struct Database {
54    store: Box<dyn Hexastore + Send>,
55    #[cfg(not(target_arch = "wasm32"))]
56    redb: Arc<RedbDatabase>,
57    #[cfg(all(any(feature = "vector", feature = "fts"), not(target_arch = "wasm32")))]
58    redb_path: PathBuf,
59    #[cfg(not(target_arch = "wasm32"))]
60    active_write: Option<WriteTransaction>,
61    #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
62    fts_index: Option<fts_index::FtsIndex>,
63    #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
64    fts_write_log: HashMap<u64, Vec<u8>>,
65    #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
66    vector_index: Option<vector_index::VectorIndex>,
67    #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
68    vector_undo_log: Vec<VectorUndoEntry>,
69    #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
70    temporal: TemporalStore,
71    cursors: HashMap<u64, QueryCursor>,
72    next_cursor_id: u64,
73}
74
75#[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
76#[derive(Debug, Clone)]
77struct VectorUndoEntry {
78    node_id: u64,
79    old: Option<Vec<f32>>,
80}
81
82struct QueryCursor {
83    iter: crate::storage::HexastoreIter,
84    finished: bool,
85}
86
87impl QueryCursor {
88    fn new(iter: crate::storage::HexastoreIter) -> Self {
89        Self {
90            iter,
91            finished: false,
92        }
93    }
94
95    fn next_batch(&mut self, batch_size: usize) -> (Vec<Triple>, bool) {
96        let mut batch = Vec::with_capacity(batch_size);
97        for _ in 0..batch_size.max(1) {
98            match self.iter.next() {
99                Some(triple) => batch.push(triple),
100                None => {
101                    self.finished = true;
102                    break;
103                }
104            }
105        }
106        let done = self.finished || batch.is_empty();
107        (batch, done)
108    }
109}
110
111fn debug_env_enabled() -> bool {
112    match std::env::var("NERVUSDB_DEBUG_NATIVE") {
113        Ok(val) => val == "1" || val.eq_ignore_ascii_case("true"),
114        Err(_) => false,
115    }
116}
117
118fn emit_debug(message: &str) {
119    if debug_env_enabled() {
120        eprintln!("{}", message);
121    }
122}
123
124#[derive(Debug, Default, Clone, Copy)]
125pub struct QueryCriteria {
126    pub subject_id: Option<StringId>,
127    pub predicate_id: Option<StringId>,
128    pub object_id: Option<StringId>,
129}
130
131impl Database {
132    /// Opens a database at the specified path.
133    ///
134    /// The path should be a base path or directory.
135    /// We will create: path.redb (single-file storage for triples, dictionary, temporal data)
136    pub fn open(options: Options) -> Result<Self> {
137        let path = options.data_path;
138        // Ensure parent dir exists
139        #[cfg(not(target_arch = "wasm32"))]
140        if let Some(parent) = path.parent() {
141            std::fs::create_dir_all(parent).map_err(|e| Error::Other(e.to_string()))?;
142        }
143
144        #[cfg(not(target_arch = "wasm32"))]
145        let redb_path = path.with_extension("redb");
146        #[cfg(not(target_arch = "wasm32"))]
147        let redb = Arc::new(
148            RedbDatabase::create(redb_path.clone())
149                .map_err(|e| Error::Other(format!("failed to open redb: {e}")))?,
150        );
151
152        #[cfg(not(target_arch = "wasm32"))]
153        let store = open_store(redb.clone())?;
154        #[cfg(target_arch = "wasm32")]
155        let store = open_store(&path)?;
156
157        #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
158        let temporal = TemporalStore::open(redb.clone())?;
159
160        #[cfg_attr(
161            not(all(any(feature = "vector", feature = "fts"), not(target_arch = "wasm32"))),
162            allow(unused_mut)
163        )]
164        let mut db = Self {
165            store,
166            #[cfg(not(target_arch = "wasm32"))]
167            redb,
168            #[cfg(all(any(feature = "vector", feature = "fts"), not(target_arch = "wasm32")))]
169            redb_path: redb_path.clone(),
170            #[cfg(not(target_arch = "wasm32"))]
171            active_write: None,
172            #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
173            fts_index: None,
174            #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
175            fts_write_log: HashMap::new(),
176            #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
177            vector_index: None,
178            #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
179            vector_undo_log: Vec::new(),
180            #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
181            temporal,
182            cursors: HashMap::new(),
183            next_cursor_id: 1,
184        };
185
186        #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
187        {
188            db.vector_index = vector_index::VectorIndex::open_or_rebuild(&db, &db.redb_path)?;
189        }
190
191        #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
192        {
193            db.fts_index = fts_index::FtsIndex::open_or_rebuild(&db, &db.redb_path)?;
194        }
195
196        Ok(db)
197    }
198
199    pub fn hydrate(
200        &mut self,
201        dictionary_values: Vec<String>,
202        triples: Vec<(StringId, StringId, StringId)>,
203    ) -> Result<()> {
204        // Pre-intern dictionary values to ensure IDs match if they are sequential?
205        // Wait, if we just intern, we get new IDs.
206        // hydrate assumes specific IDs?
207        // The signature is `triples: Vec<(StringId, StringId, StringId)>`. StringId is u64.
208        // If the input triples use IDs that correspond to `dictionary_values` indices, we need to ensure that mapping.
209        // But `intern` assigns IDs.
210        // If `hydrate` is used for restoring a dump, the dump should probably contain string triples, not ID triples.
211        // Or we assume `dictionary_values` are in ID order 1..N.
212        // Let's assume `dictionary_values` are ordered by ID.
213
214        // Note: This implementation assumes that `dictionary_values` index + 1 == ID.
215        // This is fragile but matches the previous behavior where Dictionary assigned sequential IDs.
216        for value in dictionary_values {
217            self.store.intern(&value)?;
218        }
219
220        for (subject_id, predicate_id, object_id) in triples {
221            let triple = Triple::new(subject_id, predicate_id, object_id);
222            self.store.insert(&triple)?;
223        }
224
225        self.reset_cursors();
226
227        Ok(())
228    }
229
230    /// Set node properties from JSON string (converts to FlexBuffers internally)
231    pub fn set_node_property(&mut self, id: u64, json: &str) -> Result<()> {
232        // Parse JSON and convert to FlexBuffers for unified storage
233        let props: std::collections::HashMap<String, serde_json::Value> =
234            serde_json::from_str(json)
235                .map_err(|e| Error::Other(format!("Invalid JSON in set_node_property: {}", e)))?;
236        let binary = crate::storage::property::serialize_properties(&props)?;
237        self.set_node_property_binary(id, &binary)
238    }
239
240    /// Get node properties as JSON string (reads from FlexBuffers and converts)
241    pub fn get_node_property(&self, id: u64) -> Result<Option<String>> {
242        // Get binary data (FlexBuffers format)
243        if let Some(binary) = self.get_node_property_binary(id)? {
244            // Deserialize from FlexBuffers and convert to JSON
245            let props = crate::storage::property::deserialize_properties(&binary)?;
246            let json_string = serde_json::to_string(&props)
247                .map_err(|e| Error::Other(format!("Failed to serialize to JSON: {}", e)))?;
248            Ok(Some(json_string))
249        } else {
250            Ok(None)
251        }
252    }
253
254    /// Set edge properties from JSON string (converts to FlexBuffers internally)
255    pub fn set_edge_property(&mut self, s: u64, p: u64, o: u64, json: &str) -> Result<()> {
256        // Parse JSON and convert to FlexBuffers for unified storage
257        let props: std::collections::HashMap<String, serde_json::Value> =
258            serde_json::from_str(json)
259                .map_err(|e| Error::Other(format!("Invalid JSON in set_edge_property: {}", e)))?;
260        let binary = crate::storage::property::serialize_properties(&props)?;
261        self.set_edge_property_binary(s, p, o, &binary)
262    }
263
264    /// Get edge properties as JSON string (reads from FlexBuffers and converts)
265    pub fn get_edge_property(&self, s: u64, p: u64, o: u64) -> Result<Option<String>> {
266        // Get binary data (FlexBuffers format)
267        if let Some(binary) = self.get_edge_property_binary(s, p, o)? {
268            // Deserialize from FlexBuffers and convert to JSON
269            let props = crate::storage::property::deserialize_properties(&binary)?;
270            let json_string = serde_json::to_string(&props)
271                .map_err(|e| Error::Other(format!("Failed to serialize to JSON: {}", e)))?;
272            Ok(Some(json_string))
273        } else {
274            Ok(None)
275        }
276    }
277
278    // Binary property methods (v2.0, FlexBuffers for 10x performance)
279
280    pub fn set_node_property_binary(&mut self, id: u64, value: &[u8]) -> Result<()> {
281        #[cfg(not(target_arch = "wasm32"))]
282        if let Some(txn) = self.active_write.as_mut() {
283            {
284                let mut table = txn
285                    .open_table(crate::storage::schema::TABLE_NODE_PROPS_BINARY)
286                    .map_err(|e| Error::Other(e.to_string()))?;
287                table
288                    .insert(id, value)
289                    .map_err(|e| Error::Other(e.to_string()))?;
290            }
291
292            #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
293            fts_index::bump_committed_writes_in_txn(txn, 1)?;
294        } else {
295            let tx = self
296                .redb
297                .begin_write()
298                .map_err(|e| Error::Other(e.to_string()))?;
299            {
300                let mut table = tx
301                    .open_table(crate::storage::schema::TABLE_NODE_PROPS_BINARY)
302                    .map_err(|e| Error::Other(e.to_string()))?;
303                table
304                    .insert(id, value)
305                    .map_err(|e| Error::Other(e.to_string()))?;
306            }
307
308            #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
309            fts_index::bump_committed_writes_in_txn(&tx, 1)?;
310
311            tx.commit().map_err(|e| Error::Other(e.to_string()))?;
312            self.store.after_write_commit();
313        }
314
315        #[cfg(target_arch = "wasm32")]
316        self.store.set_node_property_binary(id, value)?;
317
318        #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
319        self.update_vector_index_from_node_props(id, value);
320
321        #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
322        self.update_fts_index_from_node_props(id, value);
323
324        Ok(())
325    }
326
327    pub fn get_node_property_binary(&self, id: u64) -> Result<Option<Vec<u8>>> {
328        self.store.get_node_property_binary(id)
329    }
330
331    pub fn set_edge_property_binary(&mut self, s: u64, p: u64, o: u64, value: &[u8]) -> Result<()> {
332        #[cfg(not(target_arch = "wasm32"))]
333        if let Some(txn) = self.active_write.as_mut() {
334            let mut table = txn
335                .open_table(crate::storage::schema::TABLE_EDGE_PROPS_BINARY)
336                .map_err(|e| Error::Other(e.to_string()))?;
337            table
338                .insert((s, p, o), value)
339                .map_err(|e| Error::Other(e.to_string()))?;
340        } else {
341            self.store.set_edge_property_binary(s, p, o, value)?;
342        }
343
344        Ok(())
345    }
346
347    pub fn get_edge_property_binary(&self, s: u64, p: u64, o: u64) -> Result<Option<Vec<u8>>> {
348        self.store.get_edge_property_binary(s, p, o)
349    }
350
351    #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
352    fn update_vector_index_from_node_props(&mut self, node_id: u64, value: &[u8]) {
353        let Some(index) = self.vector_index.as_mut() else {
354            return;
355        };
356
357        let Ok(props) = crate::storage::property::deserialize_properties(value) else {
358            return;
359        };
360
361        if self.active_write.is_some() {
362            let old = index.get_vector(node_id).ok().flatten();
363            self.vector_undo_log.push(VectorUndoEntry { node_id, old });
364        }
365
366        if index.upsert_from_props(node_id, &props).is_err() {
367            self.vector_index = None;
368            self.vector_undo_log.clear();
369        }
370    }
371
372    #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
373    fn rollback_vector_index(&mut self) {
374        let Some(index) = self.vector_index.as_mut() else {
375            self.vector_undo_log.clear();
376            return;
377        };
378
379        for entry in self.vector_undo_log.drain(..).rev() {
380            let _ = index.upsert(entry.node_id, entry.old.as_deref());
381        }
382    }
383
384    #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
385    fn update_fts_index_from_node_props(&mut self, node_id: u64, value: &[u8]) {
386        let Some(index) = self.fts_index.as_mut() else {
387            return;
388        };
389
390        if self.active_write.is_some() {
391            self.fts_write_log.insert(node_id, value.to_vec());
392            return;
393        }
394
395        let Ok(props) = crate::storage::property::deserialize_properties(value) else {
396            return;
397        };
398        if index.upsert_from_props(node_id, &props).is_err() {
399            self.fts_index = None;
400            self.fts_write_log.clear();
401        }
402    }
403
404    #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
405    pub(crate) fn fts_txt_score(&self, node_id: u64, property: &str, query: &str) -> f64 {
406        let Some(index) = self.fts_index.as_ref() else {
407            return 0.0;
408        };
409        index.txt_score(node_id, property, query).unwrap_or(0.0) as f64
410    }
411
412    #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
413    pub(crate) fn fts_scores_for_query(
414        &self,
415        property: &str,
416        query: &str,
417    ) -> Option<Arc<HashMap<u64, f32>>> {
418        let index = self.fts_index.as_ref()?;
419        index.scores_for_query(property, query).ok()
420    }
421
422    #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
423    pub fn configure_fts_index(&mut self, mode: &str) -> Result<()> {
424        if self.active_write.is_some() {
425            return Err(Error::Other(
426                "cannot configure fts index during active transaction".to_string(),
427            ));
428        }
429
430        let config = fts_index::FtsIndexConfig {
431            mode: if mode.is_empty() {
432                "all_string_props".to_string()
433            } else {
434                mode.to_string()
435            },
436        };
437        fts_index::write_config(self, &config)?;
438        self.fts_index = fts_index::FtsIndex::open_or_rebuild(self, &self.redb_path)?;
439        self.fts_write_log.clear();
440        Ok(())
441    }
442
443    #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
444    pub fn disable_fts_index(&mut self) -> Result<()> {
445        if self.active_write.is_some() {
446            return Err(Error::Other(
447                "cannot disable fts index during active transaction".to_string(),
448            ));
449        }
450        fts_index::clear_config(self)?;
451        self.fts_index = None;
452        self.fts_write_log.clear();
453        Ok(())
454    }
455
456    #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
457    pub fn configure_vector_index(
458        &mut self,
459        dim: usize,
460        property: &str,
461        metric: &str,
462    ) -> Result<()> {
463        if self.active_write.is_some() {
464            return Err(Error::Other(
465                "cannot configure vector index during active transaction".to_string(),
466            ));
467        }
468        if dim == 0 {
469            return Err(Error::Other("vector dim must be > 0".to_string()));
470        }
471
472        let config = vector_index::VectorIndexConfig {
473            dim,
474            property: property.to_string(),
475            metric: metric.to_string(),
476        };
477        vector_index::write_config(self, &config)?;
478        self.vector_index = vector_index::VectorIndex::open_or_rebuild(self, &self.redb_path)?;
479        Ok(())
480    }
481
482    #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
483    pub fn disable_vector_index(&mut self) -> Result<()> {
484        if self.active_write.is_some() {
485            return Err(Error::Other(
486                "cannot disable vector index during active transaction".to_string(),
487            ));
488        }
489        vector_index::clear_config(self)?;
490        self.vector_index = None;
491        self.vector_undo_log.clear();
492        Ok(())
493    }
494
495    #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
496    pub fn vector_search(&self, query: &[f32], limit: usize) -> Result<Vec<(u64, f32)>> {
497        let Some(index) = self.vector_index.as_ref() else {
498            return Err(Error::Other("vector index not configured".to_string()));
499        };
500        index.search(query, limit)
501    }
502
503    #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
504    pub(crate) fn vector_index_config(&self) -> Option<&vector_index::VectorIndexConfig> {
505        self.vector_index.as_ref().map(|index| index.config())
506    }
507
508    pub fn flush_indexes(&mut self) -> Result<()> {
509        #[cfg(not(target_arch = "wasm32"))]
510        if self.active_write.is_some() {
511            return Err(Error::Other(
512                "cannot flush indexes during active transaction".to_string(),
513            ));
514        }
515
516        #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
517        if let Some(index) = self.vector_index.as_mut() {
518            index.flush()?;
519        }
520
521        #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
522        {
523            let committed_writes = fts_index::read_committed_writes(self)?;
524            if let Some(index) = self.fts_index.as_mut() {
525                index.flush(committed_writes)?;
526            }
527        }
528
529        Ok(())
530    }
531
532    // Batch operations (v2.0, for migration and bulk operations)
533
534    /// Insert multiple triples in a single transaction
535    /// Returns the number of triples actually inserted (excludes duplicates)
536    pub fn batch_insert(&mut self, triples: &[Triple]) -> Result<usize> {
537        self.store.batch_insert(triples)
538    }
539
540    /// Delete multiple triples in a single transaction
541    /// Returns the number of triples actually deleted
542    pub fn batch_delete(&mut self, triples: &[Triple]) -> Result<usize> {
543        self.store.batch_delete(triples)
544    }
545
546    /// Insert multiple facts in a single optimized transaction
547    /// Uses cached table handles for maximum performance
548    /// Returns the triples that were inserted
549    pub fn batch_add_facts(&mut self, facts: &[Fact<'_>]) -> Result<Vec<Triple>> {
550        self.store.batch_insert_facts(facts)
551    }
552
553    pub fn add_fact(&mut self, fact: Fact<'_>) -> Result<Triple> {
554        #[cfg(not(target_arch = "wasm32"))]
555        if let Some(txn) = self.active_write.as_mut() {
556            let s = crate::storage::disk::intern_in_txn(txn, fact.subject)?;
557            let p = crate::storage::disk::intern_in_txn(txn, fact.predicate)?;
558            let o = crate::storage::disk::intern_in_txn(txn, fact.object)?;
559            let triple = Triple::new(s, p, o);
560            crate::storage::disk::insert_triple(txn, &triple)?;
561            return Ok(triple);
562        }
563        self.store.insert_fact(fact)
564    }
565
566    pub fn delete_fact(&mut self, fact: Fact<'_>) -> Result<bool> {
567        let s = self.resolve_id(fact.subject)?.ok_or(Error::NotFound)?;
568        let p = self.resolve_id(fact.predicate)?.ok_or(Error::NotFound)?;
569        let o = self.resolve_id(fact.object)?.ok_or(Error::NotFound)?;
570        let triple = Triple::new(s, p, o);
571        #[cfg(not(target_arch = "wasm32"))]
572        if let Some(txn) = self.active_write.as_mut() {
573            return crate::storage::disk::delete_triple(txn, &triple);
574        }
575        self.store.delete(&triple)
576    }
577
578    pub fn all_triples(&self) -> Vec<Triple> {
579        self.store.iter().collect()
580    }
581
582    pub fn resolve_str(&self, id: StringId) -> Result<Option<String>> {
583        self.store.resolve_str(id)
584    }
585
586    pub fn resolve_id(&self, value: &str) -> Result<Option<StringId>> {
587        self.store.resolve_id(value)
588    }
589
590    pub fn intern(&mut self, value: &str) -> Result<u64> {
591        #[cfg(not(target_arch = "wasm32"))]
592        if let Some(txn) = self.active_write.as_mut() {
593            return crate::storage::disk::intern_in_txn(txn, value);
594        }
595        self.store.intern(value)
596    }
597
598    /// Bulk intern strings in a single transaction (order preserving).
599    pub fn bulk_intern(&mut self, values: &[&str]) -> Result<Vec<u64>> {
600        self.store.bulk_intern(values)
601    }
602
603    pub fn dictionary_size(&self) -> Result<u64> {
604        self.store.dictionary_size()
605    }
606
607    pub fn execute_query(
608        &mut self,
609        query_string: &str,
610    ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
611        self.execute_query_with_params(query_string, None)
612    }
613
614    pub fn execute_query_with_params(
615        &mut self,
616        query_string: &str,
617        params: Option<HashMap<String, serde_json::Value>>,
618    ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
619        use query::parser::Parser;
620
621        let query = Parser::parse(query_string)?;
622
623        let param_values: HashMap<String, query::executor::Value> = params
624            .unwrap_or_default()
625            .into_iter()
626            .map(|(k, v)| (k, Self::serde_value_to_executor_value(v)))
627            .collect();
628
629        if debug_env_enabled() {
630            let keys: Vec<_> = param_values.keys().cloned().collect();
631            emit_debug(&format!(
632                "[nervusdb-core] execute_query_with_params received: {:?}",
633                keys
634            ));
635        }
636
637        self.execute_parsed_query_with_params(query, &param_values)
638    }
639
640    fn execute_parsed_query_with_params(
641        &mut self,
642        query: query::ast::Query,
643        param_values: &HashMap<String, query::executor::Value>,
644    ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
645        use query::ast::Clause;
646        use query::executor::{ExecutionContext, ExecutionPlan};
647        use query::planner::QueryPlanner;
648
649        // Handle CALL { ... } queries directly (simplified: only standalone CALL for MVP).
650        if query.clauses.len() == 1
651            && let Clause::Call(call_clause) = &query.clauses[0]
652        {
653            return self.execute_parsed_query_with_params(call_clause.query.clone(), param_values);
654        }
655        if query
656            .clauses
657            .iter()
658            .any(|clause| matches!(clause, Clause::Call(_)))
659        {
660            return Err(Error::NotImplemented("CALL with other clauses"));
661        }
662        if query
663            .clauses
664            .iter()
665            .any(|clause| matches!(clause, Clause::Union(_)))
666        {
667            return self.execute_union_query_with_params(query, param_values);
668        }
669
670        // Check if query contains SET clause (needs special handling due to mutation)
671        let has_set = query
672            .clauses
673            .iter()
674            .any(|clause| matches!(clause, Clause::Set(_)));
675
676        // Check if query contains DELETE clause (needs special handling due to mutation)
677        let has_delete = query
678            .clauses
679            .iter()
680            .any(|clause| matches!(clause, Clause::Delete(_)));
681
682        // Handle CREATE queries directly (simplified: only standalone CREATE for MVP)
683        if query.clauses.len() == 1
684            && let Clause::Create(create_clause) = &query.clauses[0]
685        {
686            // Execute CREATE immediately
687            return self.execute_create_pattern(&create_clause.pattern);
688        }
689
690        // Handle MERGE queries directly (simplified: only standalone MERGE for MVP)
691        if query.clauses.len() == 1
692            && let Clause::Merge(merge_clause) = &query.clauses[0]
693        {
694            return self.execute_merge_pattern(&merge_clause.pattern);
695        }
696
697        // Handle SET queries specially (needs mutation)
698        if has_set {
699            return self.execute_set_query_with_plan(&query, param_values);
700        }
701
702        // Handle DELETE queries specially (needs mutation)
703        if has_delete {
704            return self.execute_delete_query_with_plan(&query, param_values);
705        }
706
707        // Handle read queries through execution plan
708        let planner = QueryPlanner::new();
709        let plan = planner.plan(query)?;
710
711        let ctx = ExecutionContext {
712            db: self,
713            params: param_values,
714        };
715        let iterator = plan.execute(&ctx)?;
716
717        let mut results = Vec::new();
718        for record in iterator {
719            results.push(record?.values);
720        }
721
722        Ok(results)
723    }
724
725    fn execute_union_query_with_params(
726        &mut self,
727        query: query::ast::Query,
728        param_values: &HashMap<String, query::executor::Value>,
729    ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
730        use query::ast::{Clause, Expression, Query, ReturnClause, UnionClause};
731
732        fn is_write_clause(clause: &Clause) -> bool {
733            matches!(
734                clause,
735                Clause::Create(_) | Clause::Merge(_) | Clause::Set(_) | Clause::Delete(_)
736            )
737        }
738
739        fn infer_alias(expr: &Expression) -> String {
740            match expr {
741                Expression::Variable(name) => name.clone(),
742                Expression::PropertyAccess(pa) => format!("{}.{}", pa.variable, pa.property),
743                _ => "col".to_string(),
744            }
745        }
746
747        fn return_columns(query: &Query) -> Option<Vec<String>> {
748            query.clauses.iter().find_map(|clause| match clause {
749                Clause::Return(ReturnClause { items, .. }) => Some(
750                    items
751                        .iter()
752                        .map(|item| {
753                            item.alias
754                                .clone()
755                                .unwrap_or_else(|| infer_alias(&item.expression))
756                        })
757                        .collect(),
758                ),
759                _ => None,
760            })
761        }
762
763        fn validate_row_columns(
764            expected: &[String],
765            row: &std::collections::HashMap<String, query::executor::Value>,
766        ) -> Result<()> {
767            if row.len() != expected.len() {
768                return Err(Error::Other("UNION schema mismatch".to_string()));
769            }
770            for col in expected {
771                if !row.contains_key(col) {
772                    return Err(Error::Other("UNION schema mismatch".to_string()));
773                }
774            }
775            Ok(())
776        }
777
778        fn row_key(row: &std::collections::HashMap<String, query::executor::Value>) -> String {
779            let mut items: Vec<_> = row.iter().collect();
780            items.sort_by(|(a, _), (b, _)| a.cmp(b));
781            items
782                .into_iter()
783                .map(|(k, v)| format!("{k}={v:?}"))
784                .collect::<Vec<_>>()
785                .join("|")
786        }
787
788        let mut left_clauses = Vec::new();
789        let mut unions: Vec<UnionClause> = Vec::new();
790
791        for clause in query.clauses {
792            match clause {
793                Clause::Union(u) => unions.push(u),
794                other => left_clauses.push(other),
795            }
796        }
797
798        let left_query = Query {
799            clauses: left_clauses,
800        };
801
802        if left_query.clauses.iter().any(is_write_clause)
803            || unions
804                .iter()
805                .any(|u| u.query.clauses.iter().any(is_write_clause))
806        {
807            return Err(Error::NotImplemented("UNION with write clauses"));
808        }
809
810        let Some(expected_cols) = return_columns(&left_query) else {
811            return Err(Error::Other("UNION requires explicit RETURN".to_string()));
812        };
813
814        for u in &unions {
815            let Some(cols) = return_columns(&u.query) else {
816                return Err(Error::Other("UNION requires explicit RETURN".to_string()));
817            };
818            if cols != expected_cols {
819                return Err(Error::Other(
820                    "UNION queries must return the same columns".to_string(),
821                ));
822            }
823        }
824
825        let mut current = self.execute_parsed_query_with_params(left_query, param_values)?;
826        for row in &current {
827            validate_row_columns(&expected_cols, row)?;
828        }
829
830        for u in unions {
831            let mut right = self.execute_parsed_query_with_params(u.query, param_values)?;
832            for row in &right {
833                validate_row_columns(&expected_cols, row)?;
834            }
835
836            if u.all {
837                current.append(&mut right);
838            } else {
839                let mut deduped = Vec::new();
840                let mut seen = std::collections::HashSet::new();
841                for row in current.into_iter().chain(right) {
842                    if seen.insert(row_key(&row)) {
843                        deduped.push(row);
844                    }
845                }
846                current = deduped;
847            }
848        }
849
850        Ok(current)
851    }
852
853    /// Convert serde_json::Value to executor::Value (public for FFI)
854    pub fn serde_value_to_executor_value(value: serde_json::Value) -> query::executor::Value {
855        use query::executor::Value as ExecValue;
856
857        match value {
858            serde_json::Value::String(s) => ExecValue::String(s),
859            serde_json::Value::Number(n) => ExecValue::Float(n.as_f64().unwrap_or(0.0)),
860            serde_json::Value::Bool(b) => ExecValue::Boolean(b),
861            serde_json::Value::Null => ExecValue::Null,
862            serde_json::Value::Array(items) => {
863                let mut out = Vec::with_capacity(items.len());
864                for item in &items {
865                    let Some(n) = item.as_f64() else {
866                        return ExecValue::String(serde_json::Value::Array(items).to_string());
867                    };
868                    out.push(n as f32);
869                }
870                ExecValue::Vector(out)
871            }
872            _ => ExecValue::Null,
873        }
874    }
875
876    fn execute_create_pattern(
877        &mut self,
878        pattern: &query::ast::Pattern,
879    ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
880        use query::ast::{PathElement, RelationshipDirection};
881        use query::executor::Value;
882        use std::collections::HashMap;
883
884        let mut result_record: HashMap<String, Value> = HashMap::new();
885        let mut last_node_info: Option<(String, u64)> = None;
886
887        let mut i = 0;
888        while i < pattern.elements.len() {
889            match &pattern.elements[i] {
890                PathElement::Node(node_pattern) => {
891                    // Create a node by adding a type triple
892                    let anon_name = format!("_anon{}", i);
893                    let node_str = node_pattern.variable.as_deref().unwrap_or(&anon_name);
894                    let label = node_pattern
895                        .labels
896                        .first()
897                        .map(|s| s.as_str())
898                        .unwrap_or("Node");
899
900                    let fact = self.add_fact(Fact::new(node_str, "type", label))?;
901                    let node_id = fact.subject_id;
902
903                    // Set node properties if specified
904                    if let Some(props) = &node_pattern.properties {
905                        let props_map = self.convert_property_map_to_json(props)?;
906                        let binary = crate::storage::property::serialize_properties(&props_map)?;
907                        self.set_node_property_binary(node_id, &binary)?;
908                    }
909
910                    if let Some(var) = &node_pattern.variable {
911                        result_record.insert(var.clone(), Value::Node(node_id));
912                        last_node_info = Some((var.clone(), node_id));
913                    } else {
914                        last_node_info = Some((format!("_anon{}", i), node_id));
915                    }
916                }
917                PathElement::Relationship(rel_pattern) => {
918                    // Relationship must be between two nodes: (a)-[r]->(b)
919                    // We just processed a node, now expect another node after this relationship
920                    if i + 1 >= pattern.elements.len() {
921                        return Err(Error::Other(
922                            "Relationship must be followed by a node".to_string(),
923                        ));
924                    }
925
926                    if let Some((_, start_node_id)) = last_node_info {
927                        // Process the next node (end node)
928                        i += 1;
929                        if let PathElement::Node(end_node_pattern) = &pattern.elements[i] {
930                            let end_anon_name = format!("_anon{}", i);
931                            let end_node_str = end_node_pattern
932                                .variable
933                                .as_deref()
934                                .unwrap_or(&end_anon_name);
935                            let end_label = end_node_pattern
936                                .labels
937                                .first()
938                                .map(|s| s.as_str())
939                                .unwrap_or("Node");
940
941                            let end_fact =
942                                self.add_fact(Fact::new(end_node_str, "type", end_label))?;
943                            let end_node_id = end_fact.subject_id;
944
945                            // Set end node properties
946                            if let Some(props) = &end_node_pattern.properties {
947                                let props_map = self.convert_property_map_to_json(props)?;
948                                let binary =
949                                    crate::storage::property::serialize_properties(&props_map)?;
950                                self.set_node_property_binary(end_node_id, &binary)?;
951                            }
952
953                            if let Some(var) = &end_node_pattern.variable {
954                                result_record.insert(var.clone(), Value::Node(end_node_id));
955                            }
956
957                            // Create the relationship triple
958                            let rel_type = rel_pattern
959                                .types
960                                .first()
961                                .map(|s| s.as_str())
962                                .unwrap_or("RELATED_TO");
963
964                            // Determine direction
965                            let (subject_id, object_id) = match rel_pattern.direction {
966                                RelationshipDirection::LeftToRight => (start_node_id, end_node_id),
967                                RelationshipDirection::RightToLeft => (end_node_id, start_node_id),
968                                RelationshipDirection::Undirected => (start_node_id, end_node_id), // Default to left-to-right
969                            };
970
971                            let subject_str = self.resolve_str(subject_id)?.ok_or_else(|| {
972                                Error::Other("Subject node not found".to_string())
973                            })?;
974                            let object_str = self
975                                .resolve_str(object_id)?
976                                .ok_or_else(|| Error::Other("Object node not found".to_string()))?;
977
978                            let rel_fact =
979                                self.add_fact(Fact::new(&subject_str, rel_type, &object_str))?;
980
981                            // Set relationship properties if specified
982                            if let Some(props) = &rel_pattern.properties {
983                                let props_map = self.convert_property_map_to_json(props)?;
984                                let binary =
985                                    crate::storage::property::serialize_properties(&props_map)?;
986                                self.set_edge_property_binary(
987                                    rel_fact.subject_id,
988                                    rel_fact.predicate_id,
989                                    rel_fact.object_id,
990                                    &binary,
991                                )?;
992                            }
993
994                            // Store relationship in result if it has a variable
995                            if let Some(var) = &rel_pattern.variable {
996                                result_record.insert(var.clone(), Value::Relationship(rel_fact));
997                            }
998
999                            // Update last node info for potential next relationship
1000                            last_node_info = end_node_pattern
1001                                .variable
1002                                .as_ref()
1003                                .map(|v| (v.clone(), end_node_id))
1004                                .or(Some((format!("_anon{}", i), end_node_id)));
1005                        } else {
1006                            return Err(Error::Other(
1007                                "Expected node after relationship".to_string(),
1008                            ));
1009                        }
1010                    } else {
1011                        return Err(Error::Other("Relationship must follow a node".to_string()));
1012                    }
1013                }
1014            }
1015            i += 1;
1016        }
1017
1018        Ok(vec![result_record])
1019    }
1020
1021    fn execute_merge_pattern(
1022        &mut self,
1023        pattern: &query::ast::Pattern,
1024    ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
1025        use query::ast::{PathElement, RelationshipDirection};
1026        use query::executor::Value;
1027        use std::collections::HashMap;
1028
1029        let mut result_record: HashMap<String, Value> = HashMap::new();
1030        let mut last_node_info: Option<(String, u64)> = None;
1031
1032        let mut i = 0;
1033        while i < pattern.elements.len() {
1034            match &pattern.elements[i] {
1035                PathElement::Node(node_pattern) => {
1036                    let anon_name = format!("_anon{}", i);
1037                    let node_str = node_pattern.variable.as_deref().unwrap_or(&anon_name);
1038                    let label = node_pattern
1039                        .labels
1040                        .first()
1041                        .map(|s| s.as_str())
1042                        .unwrap_or("Node");
1043
1044                    let node_id = self.ensure_node(node_str, label)?;
1045
1046                    // Upsert node properties if specified.
1047                    if let Some(props) = &node_pattern.properties {
1048                        let props_map = self.convert_property_map_to_json(props)?;
1049                        let binary = crate::storage::property::serialize_properties(&props_map)?;
1050                        self.set_node_property_binary(node_id, &binary)?;
1051                    }
1052
1053                    if let Some(var) = &node_pattern.variable {
1054                        result_record.insert(var.clone(), Value::Node(node_id));
1055                        last_node_info = Some((var.clone(), node_id));
1056                    } else {
1057                        last_node_info = Some((anon_name, node_id));
1058                    }
1059                }
1060                PathElement::Relationship(rel_pattern) => {
1061                    if i + 1 >= pattern.elements.len() {
1062                        return Err(Error::Other(
1063                            "Relationship must be followed by a node".to_string(),
1064                        ));
1065                    }
1066
1067                    let Some((_, start_node_id)) = last_node_info else {
1068                        return Err(Error::Other("Relationship must follow a node".to_string()));
1069                    };
1070
1071                    i += 1;
1072                    let PathElement::Node(end_node_pattern) = &pattern.elements[i] else {
1073                        return Err(Error::Other("Expected node after relationship".to_string()));
1074                    };
1075
1076                    let end_anon_name = format!("_anon{}", i);
1077                    let end_node_str = end_node_pattern
1078                        .variable
1079                        .as_deref()
1080                        .unwrap_or(&end_anon_name);
1081                    let end_label = end_node_pattern
1082                        .labels
1083                        .first()
1084                        .map(|s| s.as_str())
1085                        .unwrap_or("Node");
1086
1087                    let end_node_id = self.ensure_node(end_node_str, end_label)?;
1088
1089                    // Upsert end node properties
1090                    if let Some(props) = &end_node_pattern.properties {
1091                        let props_map = self.convert_property_map_to_json(props)?;
1092                        let binary = crate::storage::property::serialize_properties(&props_map)?;
1093                        self.set_node_property_binary(end_node_id, &binary)?;
1094                    }
1095
1096                    if let Some(var) = &end_node_pattern.variable {
1097                        result_record.insert(var.clone(), Value::Node(end_node_id));
1098                    }
1099
1100                    let rel_type = rel_pattern
1101                        .types
1102                        .first()
1103                        .map(|s| s.as_str())
1104                        .unwrap_or("RELATED_TO");
1105
1106                    let (subject_id, object_id) = match rel_pattern.direction {
1107                        RelationshipDirection::LeftToRight => (start_node_id, end_node_id),
1108                        RelationshipDirection::RightToLeft => (end_node_id, start_node_id),
1109                        RelationshipDirection::Undirected => (start_node_id, end_node_id),
1110                    };
1111
1112                    let rel_triple = self.ensure_relationship(subject_id, rel_type, object_id)?;
1113
1114                    // Upsert relationship properties if specified.
1115                    if let Some(props) = &rel_pattern.properties {
1116                        let props_map = self.convert_property_map_to_json(props)?;
1117                        let binary = crate::storage::property::serialize_properties(&props_map)?;
1118                        self.set_edge_property_binary(
1119                            rel_triple.subject_id,
1120                            rel_triple.predicate_id,
1121                            rel_triple.object_id,
1122                            &binary,
1123                        )?;
1124                    }
1125
1126                    if let Some(var) = &rel_pattern.variable {
1127                        result_record.insert(var.clone(), Value::Relationship(rel_triple));
1128                    }
1129
1130                    last_node_info = end_node_pattern
1131                        .variable
1132                        .as_ref()
1133                        .map(|v| (v.clone(), end_node_id))
1134                        .or(Some((end_anon_name, end_node_id)));
1135                }
1136            }
1137            i += 1;
1138        }
1139
1140        Ok(vec![result_record])
1141    }
1142
1143    fn ensure_node(&mut self, node_str: &str, label: &str) -> Result<u64> {
1144        let node_id = self.resolve_id(node_str)?;
1145        let type_id = self.resolve_id("type")?;
1146        let label_id = self.resolve_id(label)?;
1147
1148        if let (Some(node_id), Some(type_id), Some(label_id)) = (node_id, type_id, label_id) {
1149            let criteria = QueryCriteria {
1150                subject_id: Some(node_id),
1151                predicate_id: Some(type_id),
1152                object_id: Some(label_id),
1153            };
1154            if self.query(criteria).next().is_some() {
1155                return Ok(node_id);
1156            }
1157        }
1158
1159        let fact = self.add_fact(Fact::new(node_str, "type", label))?;
1160        Ok(fact.subject_id)
1161    }
1162
1163    fn ensure_relationship(
1164        &mut self,
1165        subject_id: u64,
1166        rel_type: &str,
1167        object_id: u64,
1168    ) -> Result<Triple> {
1169        if let Some(predicate_id) = self.resolve_id(rel_type)? {
1170            let criteria = QueryCriteria {
1171                subject_id: Some(subject_id),
1172                predicate_id: Some(predicate_id),
1173                object_id: Some(object_id),
1174            };
1175            if self.query(criteria).next().is_some() {
1176                return Ok(Triple::new(subject_id, predicate_id, object_id));
1177            }
1178        }
1179
1180        let subject_str = self
1181            .resolve_str(subject_id)?
1182            .ok_or_else(|| Error::Other("Subject node not found".to_string()))?;
1183        let object_str = self
1184            .resolve_str(object_id)?
1185            .ok_or_else(|| Error::Other("Object node not found".to_string()))?;
1186
1187        self.add_fact(Fact::new(&subject_str, rel_type, &object_str))
1188    }
1189
1190    fn convert_property_map_to_json(
1191        &self,
1192        prop_map: &query::ast::PropertyMap,
1193    ) -> Result<HashMap<String, serde_json::Value>> {
1194        use query::ast::{Expression, Literal};
1195        let mut map = HashMap::new();
1196
1197        for pair in &prop_map.properties {
1198            let value = match &pair.value {
1199                Expression::Literal(lit) => match lit {
1200                    Literal::String(s) => serde_json::Value::String(s.clone()),
1201                    Literal::Float(f) => serde_json::json!(f),
1202                    Literal::Boolean(b) => serde_json::Value::Bool(*b),
1203                    Literal::Null => serde_json::Value::Null,
1204                    _ => serde_json::Value::Null,
1205                },
1206                _ => serde_json::Value::Null, // TODO: Support computed properties
1207            };
1208            map.insert(pair.key.clone(), value);
1209        }
1210
1211        Ok(map)
1212    }
1213
1214    fn execute_set_query_with_plan(
1215        &mut self,
1216        query: &query::ast::Query,
1217        params: &HashMap<String, query::executor::Value>,
1218    ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
1219        use query::executor::{ExecutionContext, ExecutionPlan, Value, evaluate_expression_value};
1220        use query::planner::QueryPlanner;
1221        use std::collections::HashMap;
1222
1223        // Build the execution plan
1224        let planner = QueryPlanner::new();
1225        let plan = planner.plan(query.clone())?;
1226
1227        // Extract SetNode and optional ReturnClause from the plan
1228        let (set_node, return_clause) = self.extract_set_node(&plan, query)?;
1229
1230        // Execute input plan to get all matching records
1231        let mut records: Vec<query::executor::Record> = {
1232            let ctx = ExecutionContext { db: &*self, params };
1233            let iterator = set_node.input.execute(&ctx)?;
1234            let mut rows = Vec::new();
1235            for record in iterator {
1236                rows.push(record?);
1237            }
1238            rows
1239        };
1240
1241        // Now we can modify the database (no more borrowing conflict)
1242        for record in &mut records {
1243            // Apply each SET item
1244            for set_item in &set_node.items {
1245                let var_name = &set_item.property.variable;
1246
1247                // Get the node ID from the record
1248                if let Some(Value::Node(node_id)) = record.get(var_name) {
1249                    let node_id = *node_id;
1250
1251                    // Evaluate the new value expression
1252                    let new_value = {
1253                        let ctx = ExecutionContext { db: &*self, params };
1254                        evaluate_expression_value(&set_item.value, record, &ctx)
1255                    };
1256
1257                    // Read existing properties
1258                    let mut props = if let Ok(Some(binary)) = self.get_node_property_binary(node_id)
1259                    {
1260                        crate::storage::property::deserialize_properties(&binary)?
1261                    } else {
1262                        HashMap::new()
1263                    };
1264
1265                    // Update the property
1266                    let json_value = match new_value {
1267                        Value::String(s) => serde_json::Value::String(s),
1268                        Value::Float(f) => serde_json::json!(f),
1269                        Value::Boolean(b) => serde_json::Value::Bool(b),
1270                        Value::Null => serde_json::Value::Null,
1271                        _ => serde_json::Value::Null,
1272                    };
1273                    props.insert(set_item.property.property.clone(), json_value);
1274
1275                    // Write back to database
1276                    let binary = crate::storage::property::serialize_properties(&props)?;
1277                    self.set_node_property_binary(node_id, &binary)?;
1278                }
1279            }
1280        }
1281
1282        // Apply RETURN projection if present
1283        if let Some(return_clause) = return_clause {
1284            let mut results = Vec::new();
1285            for record in records {
1286                let mut result = HashMap::new();
1287                for item in &return_clause.items {
1288                    let alias = item
1289                        .alias
1290                        .clone()
1291                        .unwrap_or_else(|| match &item.expression {
1292                            query::ast::Expression::Variable(name) => name.clone(),
1293                            query::ast::Expression::PropertyAccess(pa) => {
1294                                format!("{}.{}", pa.variable, pa.property)
1295                            }
1296                            _ => "col".to_string(),
1297                        });
1298                    let value = {
1299                        let ctx = ExecutionContext { db: &*self, params };
1300                        evaluate_expression_value(&item.expression, &record, &ctx)
1301                    };
1302                    result.insert(alias, value);
1303                }
1304                results.push(result);
1305            }
1306            Ok(results)
1307        } else {
1308            // No RETURN clause, just return the records as-is
1309            Ok(records.into_iter().map(|r| r.values).collect())
1310        }
1311    }
1312
1313    fn extract_set_node<'a>(
1314        &self,
1315        plan: &'a query::planner::PhysicalPlan,
1316        query: &query::ast::Query,
1317    ) -> Result<(
1318        &'a query::planner::SetNode,
1319        Option<query::ast::ReturnClause>,
1320    )> {
1321        // Find SET clause and RETURN clause
1322        let _set_clause = query
1323            .clauses
1324            .iter()
1325            .find_map(|c| {
1326                if let query::ast::Clause::Set(s) = c {
1327                    Some(s.clone())
1328                } else {
1329                    None
1330                }
1331            })
1332            .ok_or_else(|| Error::Other("No SET clause found".to_string()))?;
1333
1334        let return_clause = query.clauses.iter().find_map(|c| {
1335            if let query::ast::Clause::Return(r) = c {
1336                Some(r.clone())
1337            } else {
1338                None
1339            }
1340        });
1341
1342        // Extract SetNode from plan (may be wrapped in Project/Filter)
1343        fn find_set_node(plan: &query::planner::PhysicalPlan) -> Option<&query::planner::SetNode> {
1344            match plan {
1345                query::planner::PhysicalPlan::Set(node) => Some(node),
1346                query::planner::PhysicalPlan::Project(node) => find_set_node(&node.input),
1347                query::planner::PhysicalPlan::Filter(node) => find_set_node(&node.input),
1348                _ => None,
1349            }
1350        }
1351
1352        let set_node = find_set_node(plan)
1353            .ok_or_else(|| Error::Other("No SetNode found in plan".to_string()))?;
1354
1355        Ok((set_node, return_clause))
1356    }
1357
1358    fn execute_delete_query_with_plan(
1359        &mut self,
1360        query: &query::ast::Query,
1361        params: &HashMap<String, query::executor::Value>,
1362    ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
1363        use query::executor::{ExecutionContext, ExecutionPlan, Value, evaluate_expression_value};
1364        use query::planner::{PhysicalPlan, QueryPlanner};
1365
1366        // Build the execution plan
1367        let planner = QueryPlanner::new();
1368        let plan = planner.plan(query.clone())?;
1369
1370        // Extract DeleteNode and determine the correct input plan to execute
1371        // The plan might be Filter(Delete(Scan)), so we need to build Filter(Scan)
1372        let (delete_node, input_plan) = self.extract_delete_and_input_plan(&plan, query)?;
1373
1374        // Build the actual plan to execute (without the Delete in the middle)
1375        let exec_plan: &PhysicalPlan = match &plan {
1376            PhysicalPlan::Delete(_) => {
1377                // Simple case: just Delete(input), execute the input directly
1378                &delete_node.input
1379            }
1380            PhysicalPlan::Filter(_filter_node) => {
1381                // Filter(Delete(input)) - we need to execute Filter(input)
1382                // But we can't modify the plan tree here, so just use input_plan
1383                // which is the entire Filter(Delete(...)) - NO that's wrong!
1384                //
1385                // Actually, we need to reconstruct Filter(delete_node.input)
1386                // But we can't easily do that because FilterNode contains Box<PhysicalPlan>
1387                //
1388                // Simpler solution: execute delete_node.input and manually apply the filter
1389                &delete_node.input
1390            }
1391            _ => input_plan,
1392        };
1393
1394        // Execute input plan to get all matching records
1395        let base_records: Vec<query::executor::Record> = {
1396            let ctx = ExecutionContext { db: &*self, params };
1397            let iterator = exec_plan.execute(&ctx)?;
1398            let mut rows = Vec::new();
1399            for record in iterator {
1400                rows.push(record?);
1401            }
1402            rows
1403        };
1404
1405        // If we have a Filter wrapping the Delete, we need to apply it manually
1406        // Extract the filter predicate from the plan
1407        let filter_predicate = if let PhysicalPlan::Filter(filter_node) = &plan {
1408            Some(&filter_node.predicate)
1409        } else {
1410            None
1411        };
1412
1413        let mut records: Vec<query::executor::Record> = Vec::new();
1414        for rec in base_records {
1415            // Apply filter if present
1416            if let Some(predicate) = filter_predicate {
1417                use query::executor::evaluate_expression_value;
1418                let filter_result = {
1419                    let ctx = ExecutionContext { db: &*self, params };
1420                    evaluate_expression_value(predicate, &rec, &ctx)
1421                };
1422                if filter_result == Value::Boolean(true) {
1423                    records.push(rec);
1424                }
1425            } else {
1426                records.push(rec);
1427            }
1428        }
1429
1430        // Collect all node IDs to delete
1431        let mut node_ids_to_delete = Vec::new();
1432
1433        for record in &records {
1434            for expr in &delete_node.expressions {
1435                let value = {
1436                    let ctx = ExecutionContext { db: &*self, params };
1437                    evaluate_expression_value(expr, record, &ctx)
1438                };
1439                if let Value::Node(node_id) = value {
1440                    node_ids_to_delete.push(node_id);
1441                }
1442            }
1443        }
1444
1445        // Now perform the deletion (no more borrowing conflict)
1446        for node_id in node_ids_to_delete {
1447            self.delete_node(node_id, delete_node.detach)?;
1448        }
1449
1450        // DELETE doesn't return anything by default (Neo4j-style)
1451        Ok(Vec::new())
1452    }
1453
1454    fn extract_delete_and_input_plan<'a>(
1455        &self,
1456        plan: &'a query::planner::PhysicalPlan,
1457        query: &query::ast::Query,
1458    ) -> Result<(
1459        &'a query::planner::DeleteNode,
1460        &'a query::planner::PhysicalPlan,
1461    )> {
1462        // Find DELETE clause
1463        let _delete_clause = query
1464            .clauses
1465            .iter()
1466            .find_map(|c| {
1467                if let query::ast::Clause::Delete(d) = c {
1468                    Some(d.clone())
1469                } else {
1470                    None
1471                }
1472            })
1473            .ok_or_else(|| Error::Other("No DELETE clause found".to_string()))?;
1474
1475        // The plan structure can be:
1476        // - Delete(input) - simple case
1477        // - Filter(Delete(input)) - with WHERE clause
1478        // - Project(Delete(input)) or Project(Filter(Delete(input))) - with RETURN clause
1479
1480        // We want to return (DeleteNode, input_plan_with_filters)
1481        // The input_plan_with_filters should include any Filter that wraps the Delete
1482
1483        match plan {
1484            query::planner::PhysicalPlan::Delete(delete_node) => {
1485                // Simple case: no Filter wrapping
1486                Ok((delete_node, &delete_node.input))
1487            }
1488            query::planner::PhysicalPlan::Filter(filter_node) => {
1489                // Filter wraps Delete
1490                match &*filter_node.input {
1491                    query::planner::PhysicalPlan::Delete(delete_node) => {
1492                        // Return DeleteNode and the Filter as the input plan
1493                        Ok((delete_node, plan))
1494                    }
1495                    _ => Err(Error::Other("Expected Delete inside Filter".to_string())),
1496                }
1497            }
1498            query::planner::PhysicalPlan::Project(project_node) => {
1499                // Project wraps Delete or Filter(Delete)
1500                match &*project_node.input {
1501                    query::planner::PhysicalPlan::Delete(delete_node) => {
1502                        Ok((delete_node, &delete_node.input))
1503                    }
1504                    query::planner::PhysicalPlan::Filter(filter_node) => {
1505                        match &*filter_node.input {
1506                            query::planner::PhysicalPlan::Delete(delete_node) => {
1507                                Ok((delete_node, &project_node.input))
1508                            }
1509                            _ => Err(Error::Other("Expected Delete inside Filter".to_string())),
1510                        }
1511                    }
1512                    _ => Err(Error::Other(
1513                        "Expected Delete or Filter(Delete) inside Project".to_string(),
1514                    )),
1515                }
1516            }
1517            _ => Err(Error::Other("No DELETE plan found".to_string())),
1518        }
1519    }
1520
1521    fn delete_node(&mut self, node_id: u64, detach: bool) -> Result<()> {
1522        // First check if node has any relationships
1523        let has_relationships = self.node_has_relationships(node_id);
1524
1525        if has_relationships && !detach {
1526            return Err(Error::Other(format!(
1527                "Cannot delete node {} because it has relationships. Use DETACH DELETE to remove relationships first.",
1528                node_id
1529            )));
1530        }
1531
1532        // If DETACH, delete all relationships first
1533        if detach {
1534            self.delete_all_relationships(node_id)?;
1535        }
1536
1537        // Delete node type metadata (triples where subject is node_id and predicate is "type")
1538        if let Some(type_id) = self.resolve_id("type")? {
1539            let criteria = QueryCriteria {
1540                subject_id: Some(node_id),
1541                predicate_id: Some(type_id),
1542                object_id: None,
1543            };
1544
1545            let triples_to_delete: Vec<Triple> = self.query(criteria).collect();
1546            for triple in triples_to_delete {
1547                self.store.delete(&triple)?;
1548            }
1549        }
1550
1551        // Delete node properties
1552        #[cfg(not(target_arch = "wasm32"))]
1553        if let Some(txn) = self.active_write.as_mut() {
1554            {
1555                // Delete from binary table (v2.0)
1556                let mut binary_table = txn
1557                    .open_table(crate::storage::schema::TABLE_NODE_PROPS_BINARY)
1558                    .map_err(|e| Error::Other(e.to_string()))?;
1559                binary_table
1560                    .remove(node_id)
1561                    .map_err(|e| Error::Other(e.to_string()))?;
1562
1563                // Delete from legacy string table (v1.x)
1564                let mut string_table = txn
1565                    .open_table(crate::storage::schema::TABLE_NODE_PROPS)
1566                    .map_err(|e| Error::Other(e.to_string()))?;
1567                string_table
1568                    .remove(node_id)
1569                    .map_err(|e| Error::Other(e.to_string()))?;
1570            }
1571
1572            #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1573            fts_index::bump_committed_writes_in_txn(txn, 1)?;
1574        } else {
1575            let tx = self
1576                .redb
1577                .begin_write()
1578                .map_err(|e| Error::Other(e.to_string()))?;
1579            {
1580                // Delete from binary table (v2.0)
1581                let mut binary_table = tx
1582                    .open_table(crate::storage::schema::TABLE_NODE_PROPS_BINARY)
1583                    .map_err(|e| Error::Other(e.to_string()))?;
1584                binary_table
1585                    .remove(node_id)
1586                    .map_err(|e| Error::Other(e.to_string()))?;
1587
1588                // Delete from legacy string table (v1.x)
1589                let mut string_table = tx
1590                    .open_table(crate::storage::schema::TABLE_NODE_PROPS)
1591                    .map_err(|e| Error::Other(e.to_string()))?;
1592                string_table
1593                    .remove(node_id)
1594                    .map_err(|e| Error::Other(e.to_string()))?;
1595            }
1596
1597            #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1598            fts_index::bump_committed_writes_in_txn(&tx, 1)?;
1599
1600            tx.commit().map_err(|e| Error::Other(e.to_string()))?;
1601            self.store.after_write_commit();
1602        }
1603
1604        #[cfg(target_arch = "wasm32")]
1605        self.store.delete_node_properties(node_id)?;
1606
1607        #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
1608        if let Some(index) = self.vector_index.as_mut() {
1609            let _ = index.remove(node_id);
1610        }
1611
1612        #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1613        if let Some(index) = self.fts_index.as_mut() {
1614            let _ = index.delete_node(node_id);
1615        }
1616
1617        Ok(())
1618    }
1619
1620    fn node_has_relationships(&self, node_id: u64) -> bool {
1621        // Get type predicate ID
1622        let type_id = match self.resolve_id("type") {
1623            Ok(Some(id)) => id,
1624            _ => return false,
1625        };
1626
1627        // Check if node is subject of any non-type triple
1628        let criteria_as_subject = QueryCriteria {
1629            subject_id: Some(node_id),
1630            predicate_id: None,
1631            object_id: None,
1632        };
1633
1634        for triple in self.query(criteria_as_subject) {
1635            if triple.predicate_id != type_id {
1636                return true; // Found a relationship
1637            }
1638        }
1639
1640        // Check if node is object of any triple (all relationships)
1641        let criteria_as_object = QueryCriteria {
1642            subject_id: None,
1643            predicate_id: None,
1644            object_id: Some(node_id),
1645        };
1646
1647        self.query(criteria_as_object).next().is_some()
1648    }
1649
1650    fn delete_all_relationships(&mut self, node_id: u64) -> Result<()> {
1651        // Get type predicate ID to exclude metadata triples
1652        let type_id = self
1653            .resolve_id("type")?
1654            .ok_or_else(|| Error::Other("Type predicate not found".to_string()))?;
1655
1656        // Delete all triples where node is subject (except type metadata)
1657        let criteria_as_subject = QueryCriteria {
1658            subject_id: Some(node_id),
1659            predicate_id: None,
1660            object_id: None,
1661        };
1662
1663        let triples_to_delete: Vec<Triple> = self
1664            .query(criteria_as_subject)
1665            .filter(|t| t.predicate_id != type_id)
1666            .collect();
1667
1668        for triple in triples_to_delete {
1669            self.store.delete(&triple)?;
1670        }
1671
1672        // Delete all triples where node is object
1673        let criteria_as_object = QueryCriteria {
1674            subject_id: None,
1675            predicate_id: None,
1676            object_id: Some(node_id),
1677        };
1678
1679        let triples_to_delete: Vec<Triple> = self.query(criteria_as_object).collect();
1680
1681        for triple in triples_to_delete {
1682            self.store.delete(&triple)?;
1683        }
1684
1685        Ok(())
1686    }
1687
1688    pub fn query(&self, criteria: QueryCriteria) -> crate::storage::HexastoreIter {
1689        self.store.query(
1690            criteria.subject_id,
1691            criteria.predicate_id,
1692            criteria.object_id,
1693        )
1694    }
1695
1696    /// Get a reference to the underlying Hexastore for algorithm operations
1697    pub fn get_store(&self) -> &dyn crate::storage::Hexastore {
1698        self.store.as_ref()
1699    }
1700
1701    pub fn open_cursor(&mut self, criteria: QueryCriteria) -> Result<u64> {
1702        let iter = self.query(criteria);
1703        let cursor_id = self.next_cursor_id;
1704        self.next_cursor_id = self.next_cursor_id.wrapping_add(1).max(1);
1705        self.cursors.insert(cursor_id, QueryCursor::new(iter));
1706        Ok(cursor_id)
1707    }
1708
1709    pub fn cursor_next(
1710        &mut self,
1711        cursor_id: u64,
1712        batch_size: usize,
1713    ) -> Result<(Vec<Triple>, bool)> {
1714        let cursor = self
1715            .cursors
1716            .get_mut(&cursor_id)
1717            .ok_or(Error::InvalidCursor(cursor_id))?;
1718        let (batch, done) = cursor.next_batch(batch_size.max(1));
1719        if done {
1720            self.cursors.remove(&cursor_id);
1721        }
1722        Ok((batch, done))
1723    }
1724
1725    pub fn close_cursor(&mut self, cursor_id: u64) -> Result<()> {
1726        self.cursors
1727            .remove(&cursor_id)
1728            .ok_or(Error::InvalidCursor(cursor_id))?;
1729        Ok(())
1730    }
1731
1732    fn reset_cursors(&mut self) {
1733        self.cursors.clear();
1734        self.next_cursor_id = 1;
1735    }
1736
1737    #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1738    pub fn temporal_store(&self) -> &TemporalStore {
1739        &self.temporal
1740    }
1741
1742    #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1743    pub fn temporal_store_mut(&mut self) -> &mut TemporalStore {
1744        &mut self.temporal
1745    }
1746
1747    #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1748    pub fn timeline_query(&self, query: TimelineQuery) -> Vec<StoredFact> {
1749        self.temporal.query_timeline(&query).unwrap_or_default()
1750    }
1751
1752    #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1753    pub fn timeline_trace(&self, fact_id: u64) -> Vec<StoredEpisode> {
1754        self.temporal.trace_back(fact_id).unwrap_or_default()
1755    }
1756
1757    // ========================================================================
1758    // Enhanced Transaction API
1759    // ========================================================================
1760
1761    /// Begin a new write transaction
1762    ///
1763    /// Only one write transaction can be active at a time.
1764    /// All data operations will be buffered until commit().
1765    #[cfg(not(target_arch = "wasm32"))]
1766    pub fn begin_transaction(&mut self) -> Result<()> {
1767        if self.active_write.is_some() {
1768            return Err(Error::Other("transaction already open".to_string()));
1769        }
1770        let tx = self
1771            .redb
1772            .begin_write()
1773            .map_err(|e| Error::Other(e.to_string()))?;
1774        self.active_write = Some(tx);
1775        Ok(())
1776    }
1777
1778    /// Commit the active transaction, making all changes durable
1779    #[cfg(not(target_arch = "wasm32"))]
1780    pub fn commit_transaction(&mut self) -> Result<()> {
1781        let tx = self
1782            .active_write
1783            .take()
1784            .ok_or_else(|| Error::Other("no active transaction".to_string()))?;
1785        tx.commit().map_err(|e| Error::Other(e.to_string()))?;
1786        #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1787        {
1788            let staged = std::mem::take(&mut self.fts_write_log);
1789            if let Some(index) = self.fts_index.as_mut() {
1790                for (node_id, value) in staged {
1791                    if let Ok(props) = crate::storage::property::deserialize_properties(&value) {
1792                        let _ = index.upsert_from_props(node_id, &props);
1793                    }
1794                }
1795            }
1796        }
1797        #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
1798        self.vector_undo_log.clear();
1799        self.store.after_write_commit();
1800        Ok(())
1801    }
1802
1803    /// Abort the active transaction, discarding all changes
1804    #[cfg(not(target_arch = "wasm32"))]
1805    pub fn abort_transaction(&mut self) -> Result<()> {
1806        #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1807        self.fts_write_log.clear();
1808        #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
1809        self.rollback_vector_index();
1810        self.active_write = None;
1811        Ok(())
1812    }
1813
1814    /// Check if a transaction is currently active
1815    #[cfg(not(target_arch = "wasm32"))]
1816    pub fn is_transaction_active(&self) -> bool {
1817        self.active_write.is_some()
1818    }
1819
1820    /// Execute a closure within a transaction, automatically committing on success
1821    /// or aborting on error. This provides RAII-style transaction management.
1822    #[cfg(not(target_arch = "wasm32"))]
1823    pub fn with_transaction<F, R>(&mut self, f: F) -> Result<R>
1824    where
1825        F: FnOnce(&mut Self) -> Result<R>,
1826    {
1827        if self.is_transaction_active() {
1828            return Err(Error::Other("transaction already active".to_string()));
1829        }
1830
1831        self.begin_transaction()?;
1832
1833        match f(self) {
1834            Ok(result) => {
1835                self.commit_transaction()?;
1836                Ok(result)
1837            }
1838            Err(error) => {
1839                // Best effort abort on error
1840                let _ = self.abort_transaction();
1841                Err(error)
1842            }
1843        }
1844    }
1845}
1846
1847#[cfg(test)]
1848mod tests {
1849    use super::*;
1850    use tempfile::tempdir;
1851
1852    #[test]
1853    fn open_and_insert() {
1854        let tmp = tempdir().unwrap();
1855        let mut db = Database::open(Options::new(tmp.path())).unwrap();
1856        let triple = db.add_fact(Fact::new("alice", "knows", "bob")).unwrap();
1857        assert_eq!(db.all_triples(), vec![triple]);
1858        assert_eq!(db.resolve_str(triple.subject_id).unwrap().unwrap(), "alice");
1859
1860        let results: Vec<_> = db
1861            .query(QueryCriteria {
1862                subject_id: Some(triple.subject_id),
1863                predicate_id: None,
1864                object_id: None,
1865            })
1866            .collect();
1867        assert_eq!(results, vec![triple]);
1868
1869        let cursor_id = db
1870            .open_cursor(QueryCriteria {
1871                subject_id: Some(triple.subject_id),
1872                predicate_id: None,
1873                object_id: None,
1874            })
1875            .unwrap();
1876        let (batch, done) = db.cursor_next(cursor_id, 10).unwrap();
1877        assert!(done);
1878        assert_eq!(batch, vec![triple]);
1879    }
1880
1881    #[cfg(not(target_arch = "wasm32"))]
1882    #[test]
1883    fn node_and_edge_properties_roundtrip() {
1884        let tmp = tempdir().unwrap();
1885        let mut db = Database::open(Options::new(tmp.path())).unwrap();
1886        let triple = db.add_fact(Fact::new("alice", "knows", "bob")).unwrap();
1887
1888        db.set_node_property(triple.subject_id, r#"{"name":"alice"}"#)
1889            .unwrap();
1890        db.set_edge_property(
1891            triple.subject_id,
1892            triple.predicate_id,
1893            triple.object_id,
1894            r#"{"since":2020}"#,
1895        )
1896        .unwrap();
1897
1898        assert_eq!(
1899            db.get_node_property(triple.subject_id).unwrap().unwrap(),
1900            r#"{"name":"alice"}"#
1901        );
1902        assert_eq!(
1903            db.get_edge_property(triple.subject_id, triple.predicate_id, triple.object_id)
1904                .unwrap()
1905                .unwrap(),
1906            r#"{"since":2020}"#
1907        );
1908    }
1909
1910    #[test]
1911    #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1912    fn timeline_query_via_database() {
1913        let dir = tempdir().unwrap();
1914        let path = dir.path().join("timeline-db");
1915        let mut db = Database::open(Options::new(&path)).unwrap();
1916
1917        {
1918            let store = db.temporal_store_mut();
1919            let alice = store
1920                .ensure_entity(
1921                    "agent",
1922                    "alice",
1923                    EnsureEntityOptions {
1924                        alias: Some("Alice".into()),
1925                        occurred_at: Some("2025-01-01T00:00:00Z".into()),
1926                        ..Default::default()
1927                    },
1928                )
1929                .unwrap();
1930            let bob = store
1931                .ensure_entity(
1932                    "agent",
1933                    "bob",
1934                    EnsureEntityOptions {
1935                        alias: Some("Bob".into()),
1936                        occurred_at: Some("2025-01-01T00:00:00Z".into()),
1937                        ..Default::default()
1938                    },
1939                )
1940                .unwrap();
1941            let episode = store
1942                .add_episode(EpisodeInput {
1943                    source_type: "conversation".into(),
1944                    payload: serde_json::json!({ "text": "hello" }),
1945                    occurred_at: "2025-01-01T00:00:00Z".into(),
1946                    trace_hash: None,
1947                })
1948                .unwrap();
1949            let fact = store
1950                .upsert_fact(FactWriteInput {
1951                    subject_entity_id: alice.entity_id,
1952                    predicate_key: "mentions".into(),
1953                    object_entity_id: Some(bob.entity_id),
1954                    object_value: None,
1955                    valid_from: Some("2025-01-01T00:00:00Z".into()),
1956                    valid_to: None,
1957                    confidence: None,
1958                    source_episode_id: episode.episode_id,
1959                })
1960                .unwrap();
1961            store
1962                .link_episode(
1963                    episode.episode_id,
1964                    EpisodeLinkOptions {
1965                        entity_id: Some(alice.entity_id),
1966                        fact_id: Some(fact.fact_id),
1967                        role: "author".into(),
1968                    },
1969                )
1970                .unwrap();
1971        }
1972
1973        let alice_id = db.temporal_store().get_entities().unwrap()[0].entity_id;
1974        let timeline = db.timeline_query(TimelineQuery {
1975            entity_id: alice_id,
1976            predicate_key: Some("mentions".into()),
1977            role: Some(TimelineRole::Subject),
1978            ..Default::default()
1979        });
1980        assert_eq!(timeline.len(), 1);
1981
1982        let episodes = db.timeline_trace(timeline[0].fact_id);
1983        assert_eq!(episodes.len(), 1);
1984    }
1985
1986    #[test]
1987    #[cfg(not(target_arch = "wasm32"))]
1988    fn test_transaction_api() {
1989        use tempfile::tempdir;
1990
1991        let dir = tempdir().unwrap();
1992        let path = dir.path().join("tx_test.nervus");
1993        let mut db = Database::open(Options::new(&path)).unwrap();
1994
1995        // Test basic transaction operations
1996        assert!(!db.is_transaction_active());
1997
1998        db.begin_transaction().unwrap();
1999        assert!(db.is_transaction_active());
2000
2001        // Add data within transaction
2002        db.add_fact(Fact::new("alice", "knows", "bob")).unwrap();
2003
2004        // Commit
2005        db.commit_transaction().unwrap();
2006        assert!(!db.is_transaction_active());
2007
2008        // Verify data persisted
2009        let query_result = db.query(QueryCriteria::default()).count();
2010        assert!(query_result > 0);
2011
2012        // Test abort transaction
2013        db.begin_transaction().unwrap();
2014        db.add_fact(Fact::new("bob", "knows", "charlie")).unwrap();
2015        db.abort_transaction().unwrap();
2016
2017        // Data should not persist after abort
2018        let alice_knows_bob = db.query(QueryCriteria::default()).count();
2019        let should_be_same = db.query(QueryCriteria::default()).count();
2020        assert_eq!(alice_knows_bob, should_be_same);
2021    }
2022
2023    #[test]
2024    #[cfg(not(target_arch = "wasm32"))]
2025    fn test_with_transaction_api() {
2026        use tempfile::tempdir;
2027
2028        let dir = tempdir().unwrap();
2029        let path = dir.path().join("with_tx_test.nervus");
2030        let mut db = Database::open(Options::new(&path)).unwrap();
2031
2032        // Test successful transaction
2033        let result = db.with_transaction(|db| {
2034            db.add_fact(Fact::new("alice", "knows", "bob"))?;
2035            db.add_fact(Fact::new("bob", "knows", "charlie"))?;
2036            Ok("success")
2037        });
2038        assert!(result.is_ok());
2039        assert_eq!(result.unwrap(), "success");
2040        assert!(!db.is_transaction_active());
2041
2042        // Verify data committed
2043        let triples_count = db.query(QueryCriteria::default()).count();
2044        assert!(triples_count >= 2);
2045
2046        // Test failed transaction (should rollback)
2047        let original_count = db.query(QueryCriteria::default()).count();
2048        let result: Result<&str> = db.with_transaction(|db| {
2049            db.add_fact(Fact::new("dave", "knows", "eve"))?;
2050            // Simulate error
2051            Err(crate::Error::Other("simulated error".to_string()))
2052        });
2053        assert!(result.is_err());
2054        assert!(!db.is_transaction_active());
2055
2056        // Verify data was rolled back
2057        let final_count = db.query(QueryCriteria::default()).count();
2058        assert_eq!(original_count, final_count);
2059    }
2060}