1pub mod algorithms;
4mod error;
5#[cfg(not(target_arch = "wasm32"))]
6pub mod ffi;
7#[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
8mod fts_index;
9#[cfg(not(target_arch = "wasm32"))]
10pub mod migration;
11pub mod parser;
12pub mod query;
13pub mod storage;
14pub mod triple;
15#[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
16mod vector_index;
17
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20#[cfg(not(target_arch = "wasm32"))]
21use std::sync::Arc;
22
23#[cfg(not(target_arch = "wasm32"))]
24use storage::{Hexastore, open_store};
25
26pub type StringId = u64;
27pub use error::{Error, Result};
28#[cfg(not(target_arch = "wasm32"))]
29use redb::{Database as RedbDatabase, WriteTransaction};
30#[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
32pub use nervusdb_temporal::{
33 EnsureEntityOptions, EpisodeInput, EpisodeLinkOptions, EpisodeLinkRecord, FactWriteInput,
34 StoredAlias, StoredEntity, StoredEpisode, StoredFact, TemporalStoreV2 as TemporalStore,
35 TimelineQuery, TimelineRole,
36};
37pub use triple::{Fact, Triple};
38
39#[derive(Debug, Clone)]
41pub struct Options {
42 data_path: PathBuf,
43}
44
45impl Options {
46 pub fn new<P: AsRef<Path>>(path: P) -> Self {
47 Self {
48 data_path: path.as_ref().to_owned(),
49 }
50 }
51}
52
53pub struct Database {
54 store: Box<dyn Hexastore + Send>,
55 #[cfg(not(target_arch = "wasm32"))]
56 redb: Arc<RedbDatabase>,
57 #[cfg(all(any(feature = "vector", feature = "fts"), not(target_arch = "wasm32")))]
58 redb_path: PathBuf,
59 #[cfg(not(target_arch = "wasm32"))]
60 active_write: Option<WriteTransaction>,
61 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
62 fts_index: Option<fts_index::FtsIndex>,
63 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
64 fts_write_log: HashMap<u64, Vec<u8>>,
65 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
66 vector_index: Option<vector_index::VectorIndex>,
67 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
68 vector_undo_log: Vec<VectorUndoEntry>,
69 #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
70 temporal: TemporalStore,
71 cursors: HashMap<u64, QueryCursor>,
72 next_cursor_id: u64,
73}
74
75#[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
76#[derive(Debug, Clone)]
77struct VectorUndoEntry {
78 node_id: u64,
79 old: Option<Vec<f32>>,
80}
81
82struct QueryCursor {
83 iter: crate::storage::HexastoreIter,
84 finished: bool,
85}
86
87impl QueryCursor {
88 fn new(iter: crate::storage::HexastoreIter) -> Self {
89 Self {
90 iter,
91 finished: false,
92 }
93 }
94
95 fn next_batch(&mut self, batch_size: usize) -> (Vec<Triple>, bool) {
96 let mut batch = Vec::with_capacity(batch_size);
97 for _ in 0..batch_size.max(1) {
98 match self.iter.next() {
99 Some(triple) => batch.push(triple),
100 None => {
101 self.finished = true;
102 break;
103 }
104 }
105 }
106 let done = self.finished || batch.is_empty();
107 (batch, done)
108 }
109}
110
111fn debug_env_enabled() -> bool {
112 match std::env::var("NERVUSDB_DEBUG_NATIVE") {
113 Ok(val) => val == "1" || val.eq_ignore_ascii_case("true"),
114 Err(_) => false,
115 }
116}
117
118fn emit_debug(message: &str) {
119 if debug_env_enabled() {
120 eprintln!("{}", message);
121 }
122}
123
124#[derive(Debug, Default, Clone, Copy)]
125pub struct QueryCriteria {
126 pub subject_id: Option<StringId>,
127 pub predicate_id: Option<StringId>,
128 pub object_id: Option<StringId>,
129}
130
131impl Database {
132 pub fn open(options: Options) -> Result<Self> {
137 let path = options.data_path;
138 #[cfg(not(target_arch = "wasm32"))]
140 if let Some(parent) = path.parent() {
141 std::fs::create_dir_all(parent).map_err(|e| Error::Other(e.to_string()))?;
142 }
143
144 #[cfg(not(target_arch = "wasm32"))]
145 let redb_path = path.with_extension("redb");
146 #[cfg(not(target_arch = "wasm32"))]
147 let redb = Arc::new(
148 RedbDatabase::create(redb_path.clone())
149 .map_err(|e| Error::Other(format!("failed to open redb: {e}")))?,
150 );
151
152 #[cfg(not(target_arch = "wasm32"))]
153 let store = open_store(redb.clone())?;
154 #[cfg(target_arch = "wasm32")]
155 let store = open_store(&path)?;
156
157 #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
158 let temporal = TemporalStore::open(redb.clone())?;
159
160 #[cfg_attr(
161 not(all(any(feature = "vector", feature = "fts"), not(target_arch = "wasm32"))),
162 allow(unused_mut)
163 )]
164 let mut db = Self {
165 store,
166 #[cfg(not(target_arch = "wasm32"))]
167 redb,
168 #[cfg(all(any(feature = "vector", feature = "fts"), not(target_arch = "wasm32")))]
169 redb_path: redb_path.clone(),
170 #[cfg(not(target_arch = "wasm32"))]
171 active_write: None,
172 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
173 fts_index: None,
174 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
175 fts_write_log: HashMap::new(),
176 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
177 vector_index: None,
178 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
179 vector_undo_log: Vec::new(),
180 #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
181 temporal,
182 cursors: HashMap::new(),
183 next_cursor_id: 1,
184 };
185
186 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
187 {
188 db.vector_index = vector_index::VectorIndex::open_or_rebuild(&db, &db.redb_path)?;
189 }
190
191 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
192 {
193 db.fts_index = fts_index::FtsIndex::open_or_rebuild(&db, &db.redb_path)?;
194 }
195
196 Ok(db)
197 }
198
199 pub fn hydrate(
200 &mut self,
201 dictionary_values: Vec<String>,
202 triples: Vec<(StringId, StringId, StringId)>,
203 ) -> Result<()> {
204 for value in dictionary_values {
217 self.store.intern(&value)?;
218 }
219
220 for (subject_id, predicate_id, object_id) in triples {
221 let triple = Triple::new(subject_id, predicate_id, object_id);
222 self.store.insert(&triple)?;
223 }
224
225 self.reset_cursors();
226
227 Ok(())
228 }
229
230 pub fn set_node_property(&mut self, id: u64, json: &str) -> Result<()> {
232 let props: std::collections::HashMap<String, serde_json::Value> =
234 serde_json::from_str(json)
235 .map_err(|e| Error::Other(format!("Invalid JSON in set_node_property: {}", e)))?;
236 let binary = crate::storage::property::serialize_properties(&props)?;
237 self.set_node_property_binary(id, &binary)
238 }
239
240 pub fn get_node_property(&self, id: u64) -> Result<Option<String>> {
242 if let Some(binary) = self.get_node_property_binary(id)? {
244 let props = crate::storage::property::deserialize_properties(&binary)?;
246 let json_string = serde_json::to_string(&props)
247 .map_err(|e| Error::Other(format!("Failed to serialize to JSON: {}", e)))?;
248 Ok(Some(json_string))
249 } else {
250 Ok(None)
251 }
252 }
253
254 pub fn set_edge_property(&mut self, s: u64, p: u64, o: u64, json: &str) -> Result<()> {
256 let props: std::collections::HashMap<String, serde_json::Value> =
258 serde_json::from_str(json)
259 .map_err(|e| Error::Other(format!("Invalid JSON in set_edge_property: {}", e)))?;
260 let binary = crate::storage::property::serialize_properties(&props)?;
261 self.set_edge_property_binary(s, p, o, &binary)
262 }
263
264 pub fn get_edge_property(&self, s: u64, p: u64, o: u64) -> Result<Option<String>> {
266 if let Some(binary) = self.get_edge_property_binary(s, p, o)? {
268 let props = crate::storage::property::deserialize_properties(&binary)?;
270 let json_string = serde_json::to_string(&props)
271 .map_err(|e| Error::Other(format!("Failed to serialize to JSON: {}", e)))?;
272 Ok(Some(json_string))
273 } else {
274 Ok(None)
275 }
276 }
277
278 pub fn set_node_property_binary(&mut self, id: u64, value: &[u8]) -> Result<()> {
281 #[cfg(not(target_arch = "wasm32"))]
282 if let Some(txn) = self.active_write.as_mut() {
283 {
284 let mut table = txn
285 .open_table(crate::storage::schema::TABLE_NODE_PROPS_BINARY)
286 .map_err(|e| Error::Other(e.to_string()))?;
287 table
288 .insert(id, value)
289 .map_err(|e| Error::Other(e.to_string()))?;
290 }
291
292 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
293 fts_index::bump_committed_writes_in_txn(txn, 1)?;
294 } else {
295 let tx = self
296 .redb
297 .begin_write()
298 .map_err(|e| Error::Other(e.to_string()))?;
299 {
300 let mut table = tx
301 .open_table(crate::storage::schema::TABLE_NODE_PROPS_BINARY)
302 .map_err(|e| Error::Other(e.to_string()))?;
303 table
304 .insert(id, value)
305 .map_err(|e| Error::Other(e.to_string()))?;
306 }
307
308 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
309 fts_index::bump_committed_writes_in_txn(&tx, 1)?;
310
311 tx.commit().map_err(|e| Error::Other(e.to_string()))?;
312 self.store.after_write_commit();
313 }
314
315 #[cfg(target_arch = "wasm32")]
316 self.store.set_node_property_binary(id, value)?;
317
318 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
319 self.update_vector_index_from_node_props(id, value);
320
321 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
322 self.update_fts_index_from_node_props(id, value);
323
324 Ok(())
325 }
326
327 pub fn get_node_property_binary(&self, id: u64) -> Result<Option<Vec<u8>>> {
328 self.store.get_node_property_binary(id)
329 }
330
331 pub fn set_edge_property_binary(&mut self, s: u64, p: u64, o: u64, value: &[u8]) -> Result<()> {
332 #[cfg(not(target_arch = "wasm32"))]
333 if let Some(txn) = self.active_write.as_mut() {
334 let mut table = txn
335 .open_table(crate::storage::schema::TABLE_EDGE_PROPS_BINARY)
336 .map_err(|e| Error::Other(e.to_string()))?;
337 table
338 .insert((s, p, o), value)
339 .map_err(|e| Error::Other(e.to_string()))?;
340 } else {
341 self.store.set_edge_property_binary(s, p, o, value)?;
342 }
343
344 Ok(())
345 }
346
347 pub fn get_edge_property_binary(&self, s: u64, p: u64, o: u64) -> Result<Option<Vec<u8>>> {
348 self.store.get_edge_property_binary(s, p, o)
349 }
350
351 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
352 fn update_vector_index_from_node_props(&mut self, node_id: u64, value: &[u8]) {
353 let Some(index) = self.vector_index.as_mut() else {
354 return;
355 };
356
357 let Ok(props) = crate::storage::property::deserialize_properties(value) else {
358 return;
359 };
360
361 if self.active_write.is_some() {
362 let old = index.get_vector(node_id).ok().flatten();
363 self.vector_undo_log.push(VectorUndoEntry { node_id, old });
364 }
365
366 if index.upsert_from_props(node_id, &props).is_err() {
367 self.vector_index = None;
368 self.vector_undo_log.clear();
369 }
370 }
371
372 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
373 fn rollback_vector_index(&mut self) {
374 let Some(index) = self.vector_index.as_mut() else {
375 self.vector_undo_log.clear();
376 return;
377 };
378
379 for entry in self.vector_undo_log.drain(..).rev() {
380 let _ = index.upsert(entry.node_id, entry.old.as_deref());
381 }
382 }
383
384 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
385 fn update_fts_index_from_node_props(&mut self, node_id: u64, value: &[u8]) {
386 let Some(index) = self.fts_index.as_mut() else {
387 return;
388 };
389
390 if self.active_write.is_some() {
391 self.fts_write_log.insert(node_id, value.to_vec());
392 return;
393 }
394
395 let Ok(props) = crate::storage::property::deserialize_properties(value) else {
396 return;
397 };
398 if index.upsert_from_props(node_id, &props).is_err() {
399 self.fts_index = None;
400 self.fts_write_log.clear();
401 }
402 }
403
404 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
405 pub(crate) fn fts_txt_score(&self, node_id: u64, property: &str, query: &str) -> f64 {
406 let Some(index) = self.fts_index.as_ref() else {
407 return 0.0;
408 };
409 index.txt_score(node_id, property, query).unwrap_or(0.0) as f64
410 }
411
412 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
413 pub(crate) fn fts_scores_for_query(
414 &self,
415 property: &str,
416 query: &str,
417 ) -> Option<Arc<HashMap<u64, f32>>> {
418 let index = self.fts_index.as_ref()?;
419 index.scores_for_query(property, query).ok()
420 }
421
422 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
423 pub fn configure_fts_index(&mut self, mode: &str) -> Result<()> {
424 if self.active_write.is_some() {
425 return Err(Error::Other(
426 "cannot configure fts index during active transaction".to_string(),
427 ));
428 }
429
430 let config = fts_index::FtsIndexConfig {
431 mode: if mode.is_empty() {
432 "all_string_props".to_string()
433 } else {
434 mode.to_string()
435 },
436 };
437 fts_index::write_config(self, &config)?;
438 self.fts_index = fts_index::FtsIndex::open_or_rebuild(self, &self.redb_path)?;
439 self.fts_write_log.clear();
440 Ok(())
441 }
442
443 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
444 pub fn disable_fts_index(&mut self) -> Result<()> {
445 if self.active_write.is_some() {
446 return Err(Error::Other(
447 "cannot disable fts index during active transaction".to_string(),
448 ));
449 }
450 fts_index::clear_config(self)?;
451 self.fts_index = None;
452 self.fts_write_log.clear();
453 Ok(())
454 }
455
456 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
457 pub fn configure_vector_index(
458 &mut self,
459 dim: usize,
460 property: &str,
461 metric: &str,
462 ) -> Result<()> {
463 if self.active_write.is_some() {
464 return Err(Error::Other(
465 "cannot configure vector index during active transaction".to_string(),
466 ));
467 }
468 if dim == 0 {
469 return Err(Error::Other("vector dim must be > 0".to_string()));
470 }
471
472 let config = vector_index::VectorIndexConfig {
473 dim,
474 property: property.to_string(),
475 metric: metric.to_string(),
476 };
477 vector_index::write_config(self, &config)?;
478 self.vector_index = vector_index::VectorIndex::open_or_rebuild(self, &self.redb_path)?;
479 Ok(())
480 }
481
482 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
483 pub fn disable_vector_index(&mut self) -> Result<()> {
484 if self.active_write.is_some() {
485 return Err(Error::Other(
486 "cannot disable vector index during active transaction".to_string(),
487 ));
488 }
489 vector_index::clear_config(self)?;
490 self.vector_index = None;
491 self.vector_undo_log.clear();
492 Ok(())
493 }
494
495 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
496 pub fn vector_search(&self, query: &[f32], limit: usize) -> Result<Vec<(u64, f32)>> {
497 let Some(index) = self.vector_index.as_ref() else {
498 return Err(Error::Other("vector index not configured".to_string()));
499 };
500 index.search(query, limit)
501 }
502
503 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
504 pub(crate) fn vector_index_config(&self) -> Option<&vector_index::VectorIndexConfig> {
505 self.vector_index.as_ref().map(|index| index.config())
506 }
507
508 pub fn flush_indexes(&mut self) -> Result<()> {
509 #[cfg(not(target_arch = "wasm32"))]
510 if self.active_write.is_some() {
511 return Err(Error::Other(
512 "cannot flush indexes during active transaction".to_string(),
513 ));
514 }
515
516 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
517 if let Some(index) = self.vector_index.as_mut() {
518 index.flush()?;
519 }
520
521 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
522 {
523 let committed_writes = fts_index::read_committed_writes(self)?;
524 if let Some(index) = self.fts_index.as_mut() {
525 index.flush(committed_writes)?;
526 }
527 }
528
529 Ok(())
530 }
531
532 pub fn batch_insert(&mut self, triples: &[Triple]) -> Result<usize> {
537 self.store.batch_insert(triples)
538 }
539
540 pub fn batch_delete(&mut self, triples: &[Triple]) -> Result<usize> {
543 self.store.batch_delete(triples)
544 }
545
546 pub fn batch_add_facts(&mut self, facts: &[Fact<'_>]) -> Result<Vec<Triple>> {
550 self.store.batch_insert_facts(facts)
551 }
552
553 pub fn add_fact(&mut self, fact: Fact<'_>) -> Result<Triple> {
554 #[cfg(not(target_arch = "wasm32"))]
555 if let Some(txn) = self.active_write.as_mut() {
556 let s = crate::storage::disk::intern_in_txn(txn, fact.subject)?;
557 let p = crate::storage::disk::intern_in_txn(txn, fact.predicate)?;
558 let o = crate::storage::disk::intern_in_txn(txn, fact.object)?;
559 let triple = Triple::new(s, p, o);
560 crate::storage::disk::insert_triple(txn, &triple)?;
561 return Ok(triple);
562 }
563 self.store.insert_fact(fact)
564 }
565
566 pub fn delete_fact(&mut self, fact: Fact<'_>) -> Result<bool> {
567 let s = self.resolve_id(fact.subject)?.ok_or(Error::NotFound)?;
568 let p = self.resolve_id(fact.predicate)?.ok_or(Error::NotFound)?;
569 let o = self.resolve_id(fact.object)?.ok_or(Error::NotFound)?;
570 let triple = Triple::new(s, p, o);
571 #[cfg(not(target_arch = "wasm32"))]
572 if let Some(txn) = self.active_write.as_mut() {
573 return crate::storage::disk::delete_triple(txn, &triple);
574 }
575 self.store.delete(&triple)
576 }
577
578 pub fn all_triples(&self) -> Vec<Triple> {
579 self.store.iter().collect()
580 }
581
582 pub fn resolve_str(&self, id: StringId) -> Result<Option<String>> {
583 self.store.resolve_str(id)
584 }
585
586 pub fn resolve_id(&self, value: &str) -> Result<Option<StringId>> {
587 self.store.resolve_id(value)
588 }
589
590 pub fn intern(&mut self, value: &str) -> Result<u64> {
591 #[cfg(not(target_arch = "wasm32"))]
592 if let Some(txn) = self.active_write.as_mut() {
593 return crate::storage::disk::intern_in_txn(txn, value);
594 }
595 self.store.intern(value)
596 }
597
598 pub fn bulk_intern(&mut self, values: &[&str]) -> Result<Vec<u64>> {
600 self.store.bulk_intern(values)
601 }
602
603 pub fn dictionary_size(&self) -> Result<u64> {
604 self.store.dictionary_size()
605 }
606
607 pub fn execute_query(
608 &mut self,
609 query_string: &str,
610 ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
611 self.execute_query_with_params(query_string, None)
612 }
613
614 pub fn execute_query_with_params(
615 &mut self,
616 query_string: &str,
617 params: Option<HashMap<String, serde_json::Value>>,
618 ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
619 use query::parser::Parser;
620
621 let query = Parser::parse(query_string)?;
622
623 let param_values: HashMap<String, query::executor::Value> = params
624 .unwrap_or_default()
625 .into_iter()
626 .map(|(k, v)| (k, Self::serde_value_to_executor_value(v)))
627 .collect();
628
629 if debug_env_enabled() {
630 let keys: Vec<_> = param_values.keys().cloned().collect();
631 emit_debug(&format!(
632 "[nervusdb-core] execute_query_with_params received: {:?}",
633 keys
634 ));
635 }
636
637 self.execute_parsed_query_with_params(query, ¶m_values)
638 }
639
640 fn execute_parsed_query_with_params(
641 &mut self,
642 query: query::ast::Query,
643 param_values: &HashMap<String, query::executor::Value>,
644 ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
645 use query::ast::Clause;
646 use query::executor::{ExecutionContext, ExecutionPlan};
647 use query::planner::QueryPlanner;
648
649 if query.clauses.len() == 1
651 && let Clause::Call(call_clause) = &query.clauses[0]
652 {
653 return self.execute_parsed_query_with_params(call_clause.query.clone(), param_values);
654 }
655 if query
656 .clauses
657 .iter()
658 .any(|clause| matches!(clause, Clause::Call(_)))
659 {
660 return Err(Error::NotImplemented("CALL with other clauses"));
661 }
662 if query
663 .clauses
664 .iter()
665 .any(|clause| matches!(clause, Clause::Union(_)))
666 {
667 return self.execute_union_query_with_params(query, param_values);
668 }
669
670 let has_set = query
672 .clauses
673 .iter()
674 .any(|clause| matches!(clause, Clause::Set(_)));
675
676 let has_delete = query
678 .clauses
679 .iter()
680 .any(|clause| matches!(clause, Clause::Delete(_)));
681
682 if query.clauses.len() == 1
684 && let Clause::Create(create_clause) = &query.clauses[0]
685 {
686 return self.execute_create_pattern(&create_clause.pattern);
688 }
689
690 if query.clauses.len() == 1
692 && let Clause::Merge(merge_clause) = &query.clauses[0]
693 {
694 return self.execute_merge_pattern(&merge_clause.pattern);
695 }
696
697 if has_set {
699 return self.execute_set_query_with_plan(&query, param_values);
700 }
701
702 if has_delete {
704 return self.execute_delete_query_with_plan(&query, param_values);
705 }
706
707 let planner = QueryPlanner::new();
709 let plan = planner.plan(query)?;
710
711 let ctx = ExecutionContext {
712 db: self,
713 params: param_values,
714 };
715 let iterator = plan.execute(&ctx)?;
716
717 let mut results = Vec::new();
718 for record in iterator {
719 results.push(record?.values);
720 }
721
722 Ok(results)
723 }
724
725 fn execute_union_query_with_params(
726 &mut self,
727 query: query::ast::Query,
728 param_values: &HashMap<String, query::executor::Value>,
729 ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
730 use query::ast::{Clause, Expression, Query, ReturnClause, UnionClause};
731
732 fn is_write_clause(clause: &Clause) -> bool {
733 matches!(
734 clause,
735 Clause::Create(_) | Clause::Merge(_) | Clause::Set(_) | Clause::Delete(_)
736 )
737 }
738
739 fn infer_alias(expr: &Expression) -> String {
740 match expr {
741 Expression::Variable(name) => name.clone(),
742 Expression::PropertyAccess(pa) => format!("{}.{}", pa.variable, pa.property),
743 _ => "col".to_string(),
744 }
745 }
746
747 fn return_columns(query: &Query) -> Option<Vec<String>> {
748 query.clauses.iter().find_map(|clause| match clause {
749 Clause::Return(ReturnClause { items, .. }) => Some(
750 items
751 .iter()
752 .map(|item| {
753 item.alias
754 .clone()
755 .unwrap_or_else(|| infer_alias(&item.expression))
756 })
757 .collect(),
758 ),
759 _ => None,
760 })
761 }
762
763 fn validate_row_columns(
764 expected: &[String],
765 row: &std::collections::HashMap<String, query::executor::Value>,
766 ) -> Result<()> {
767 if row.len() != expected.len() {
768 return Err(Error::Other("UNION schema mismatch".to_string()));
769 }
770 for col in expected {
771 if !row.contains_key(col) {
772 return Err(Error::Other("UNION schema mismatch".to_string()));
773 }
774 }
775 Ok(())
776 }
777
778 fn row_key(row: &std::collections::HashMap<String, query::executor::Value>) -> String {
779 let mut items: Vec<_> = row.iter().collect();
780 items.sort_by(|(a, _), (b, _)| a.cmp(b));
781 items
782 .into_iter()
783 .map(|(k, v)| format!("{k}={v:?}"))
784 .collect::<Vec<_>>()
785 .join("|")
786 }
787
788 let mut left_clauses = Vec::new();
789 let mut unions: Vec<UnionClause> = Vec::new();
790
791 for clause in query.clauses {
792 match clause {
793 Clause::Union(u) => unions.push(u),
794 other => left_clauses.push(other),
795 }
796 }
797
798 let left_query = Query {
799 clauses: left_clauses,
800 };
801
802 if left_query.clauses.iter().any(is_write_clause)
803 || unions
804 .iter()
805 .any(|u| u.query.clauses.iter().any(is_write_clause))
806 {
807 return Err(Error::NotImplemented("UNION with write clauses"));
808 }
809
810 let Some(expected_cols) = return_columns(&left_query) else {
811 return Err(Error::Other("UNION requires explicit RETURN".to_string()));
812 };
813
814 for u in &unions {
815 let Some(cols) = return_columns(&u.query) else {
816 return Err(Error::Other("UNION requires explicit RETURN".to_string()));
817 };
818 if cols != expected_cols {
819 return Err(Error::Other(
820 "UNION queries must return the same columns".to_string(),
821 ));
822 }
823 }
824
825 let mut current = self.execute_parsed_query_with_params(left_query, param_values)?;
826 for row in ¤t {
827 validate_row_columns(&expected_cols, row)?;
828 }
829
830 for u in unions {
831 let mut right = self.execute_parsed_query_with_params(u.query, param_values)?;
832 for row in &right {
833 validate_row_columns(&expected_cols, row)?;
834 }
835
836 if u.all {
837 current.append(&mut right);
838 } else {
839 let mut deduped = Vec::new();
840 let mut seen = std::collections::HashSet::new();
841 for row in current.into_iter().chain(right) {
842 if seen.insert(row_key(&row)) {
843 deduped.push(row);
844 }
845 }
846 current = deduped;
847 }
848 }
849
850 Ok(current)
851 }
852
853 pub fn serde_value_to_executor_value(value: serde_json::Value) -> query::executor::Value {
855 use query::executor::Value as ExecValue;
856
857 match value {
858 serde_json::Value::String(s) => ExecValue::String(s),
859 serde_json::Value::Number(n) => ExecValue::Float(n.as_f64().unwrap_or(0.0)),
860 serde_json::Value::Bool(b) => ExecValue::Boolean(b),
861 serde_json::Value::Null => ExecValue::Null,
862 serde_json::Value::Array(items) => {
863 let mut out = Vec::with_capacity(items.len());
864 for item in &items {
865 let Some(n) = item.as_f64() else {
866 return ExecValue::String(serde_json::Value::Array(items).to_string());
867 };
868 out.push(n as f32);
869 }
870 ExecValue::Vector(out)
871 }
872 _ => ExecValue::Null,
873 }
874 }
875
876 fn execute_create_pattern(
877 &mut self,
878 pattern: &query::ast::Pattern,
879 ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
880 use query::ast::{PathElement, RelationshipDirection};
881 use query::executor::Value;
882 use std::collections::HashMap;
883
884 let mut result_record: HashMap<String, Value> = HashMap::new();
885 let mut last_node_info: Option<(String, u64)> = None;
886
887 let mut i = 0;
888 while i < pattern.elements.len() {
889 match &pattern.elements[i] {
890 PathElement::Node(node_pattern) => {
891 let anon_name = format!("_anon{}", i);
893 let node_str = node_pattern.variable.as_deref().unwrap_or(&anon_name);
894 let label = node_pattern
895 .labels
896 .first()
897 .map(|s| s.as_str())
898 .unwrap_or("Node");
899
900 let fact = self.add_fact(Fact::new(node_str, "type", label))?;
901 let node_id = fact.subject_id;
902
903 if let Some(props) = &node_pattern.properties {
905 let props_map = self.convert_property_map_to_json(props)?;
906 let binary = crate::storage::property::serialize_properties(&props_map)?;
907 self.set_node_property_binary(node_id, &binary)?;
908 }
909
910 if let Some(var) = &node_pattern.variable {
911 result_record.insert(var.clone(), Value::Node(node_id));
912 last_node_info = Some((var.clone(), node_id));
913 } else {
914 last_node_info = Some((format!("_anon{}", i), node_id));
915 }
916 }
917 PathElement::Relationship(rel_pattern) => {
918 if i + 1 >= pattern.elements.len() {
921 return Err(Error::Other(
922 "Relationship must be followed by a node".to_string(),
923 ));
924 }
925
926 if let Some((_, start_node_id)) = last_node_info {
927 i += 1;
929 if let PathElement::Node(end_node_pattern) = &pattern.elements[i] {
930 let end_anon_name = format!("_anon{}", i);
931 let end_node_str = end_node_pattern
932 .variable
933 .as_deref()
934 .unwrap_or(&end_anon_name);
935 let end_label = end_node_pattern
936 .labels
937 .first()
938 .map(|s| s.as_str())
939 .unwrap_or("Node");
940
941 let end_fact =
942 self.add_fact(Fact::new(end_node_str, "type", end_label))?;
943 let end_node_id = end_fact.subject_id;
944
945 if let Some(props) = &end_node_pattern.properties {
947 let props_map = self.convert_property_map_to_json(props)?;
948 let binary =
949 crate::storage::property::serialize_properties(&props_map)?;
950 self.set_node_property_binary(end_node_id, &binary)?;
951 }
952
953 if let Some(var) = &end_node_pattern.variable {
954 result_record.insert(var.clone(), Value::Node(end_node_id));
955 }
956
957 let rel_type = rel_pattern
959 .types
960 .first()
961 .map(|s| s.as_str())
962 .unwrap_or("RELATED_TO");
963
964 let (subject_id, object_id) = match rel_pattern.direction {
966 RelationshipDirection::LeftToRight => (start_node_id, end_node_id),
967 RelationshipDirection::RightToLeft => (end_node_id, start_node_id),
968 RelationshipDirection::Undirected => (start_node_id, end_node_id), };
970
971 let subject_str = self.resolve_str(subject_id)?.ok_or_else(|| {
972 Error::Other("Subject node not found".to_string())
973 })?;
974 let object_str = self
975 .resolve_str(object_id)?
976 .ok_or_else(|| Error::Other("Object node not found".to_string()))?;
977
978 let rel_fact =
979 self.add_fact(Fact::new(&subject_str, rel_type, &object_str))?;
980
981 if let Some(props) = &rel_pattern.properties {
983 let props_map = self.convert_property_map_to_json(props)?;
984 let binary =
985 crate::storage::property::serialize_properties(&props_map)?;
986 self.set_edge_property_binary(
987 rel_fact.subject_id,
988 rel_fact.predicate_id,
989 rel_fact.object_id,
990 &binary,
991 )?;
992 }
993
994 if let Some(var) = &rel_pattern.variable {
996 result_record.insert(var.clone(), Value::Relationship(rel_fact));
997 }
998
999 last_node_info = end_node_pattern
1001 .variable
1002 .as_ref()
1003 .map(|v| (v.clone(), end_node_id))
1004 .or(Some((format!("_anon{}", i), end_node_id)));
1005 } else {
1006 return Err(Error::Other(
1007 "Expected node after relationship".to_string(),
1008 ));
1009 }
1010 } else {
1011 return Err(Error::Other("Relationship must follow a node".to_string()));
1012 }
1013 }
1014 }
1015 i += 1;
1016 }
1017
1018 Ok(vec![result_record])
1019 }
1020
1021 fn execute_merge_pattern(
1022 &mut self,
1023 pattern: &query::ast::Pattern,
1024 ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
1025 use query::ast::{PathElement, RelationshipDirection};
1026 use query::executor::Value;
1027 use std::collections::HashMap;
1028
1029 let mut result_record: HashMap<String, Value> = HashMap::new();
1030 let mut last_node_info: Option<(String, u64)> = None;
1031
1032 let mut i = 0;
1033 while i < pattern.elements.len() {
1034 match &pattern.elements[i] {
1035 PathElement::Node(node_pattern) => {
1036 let anon_name = format!("_anon{}", i);
1037 let node_str = node_pattern.variable.as_deref().unwrap_or(&anon_name);
1038 let label = node_pattern
1039 .labels
1040 .first()
1041 .map(|s| s.as_str())
1042 .unwrap_or("Node");
1043
1044 let node_id = self.ensure_node(node_str, label)?;
1045
1046 if let Some(props) = &node_pattern.properties {
1048 let props_map = self.convert_property_map_to_json(props)?;
1049 let binary = crate::storage::property::serialize_properties(&props_map)?;
1050 self.set_node_property_binary(node_id, &binary)?;
1051 }
1052
1053 if let Some(var) = &node_pattern.variable {
1054 result_record.insert(var.clone(), Value::Node(node_id));
1055 last_node_info = Some((var.clone(), node_id));
1056 } else {
1057 last_node_info = Some((anon_name, node_id));
1058 }
1059 }
1060 PathElement::Relationship(rel_pattern) => {
1061 if i + 1 >= pattern.elements.len() {
1062 return Err(Error::Other(
1063 "Relationship must be followed by a node".to_string(),
1064 ));
1065 }
1066
1067 let Some((_, start_node_id)) = last_node_info else {
1068 return Err(Error::Other("Relationship must follow a node".to_string()));
1069 };
1070
1071 i += 1;
1072 let PathElement::Node(end_node_pattern) = &pattern.elements[i] else {
1073 return Err(Error::Other("Expected node after relationship".to_string()));
1074 };
1075
1076 let end_anon_name = format!("_anon{}", i);
1077 let end_node_str = end_node_pattern
1078 .variable
1079 .as_deref()
1080 .unwrap_or(&end_anon_name);
1081 let end_label = end_node_pattern
1082 .labels
1083 .first()
1084 .map(|s| s.as_str())
1085 .unwrap_or("Node");
1086
1087 let end_node_id = self.ensure_node(end_node_str, end_label)?;
1088
1089 if let Some(props) = &end_node_pattern.properties {
1091 let props_map = self.convert_property_map_to_json(props)?;
1092 let binary = crate::storage::property::serialize_properties(&props_map)?;
1093 self.set_node_property_binary(end_node_id, &binary)?;
1094 }
1095
1096 if let Some(var) = &end_node_pattern.variable {
1097 result_record.insert(var.clone(), Value::Node(end_node_id));
1098 }
1099
1100 let rel_type = rel_pattern
1101 .types
1102 .first()
1103 .map(|s| s.as_str())
1104 .unwrap_or("RELATED_TO");
1105
1106 let (subject_id, object_id) = match rel_pattern.direction {
1107 RelationshipDirection::LeftToRight => (start_node_id, end_node_id),
1108 RelationshipDirection::RightToLeft => (end_node_id, start_node_id),
1109 RelationshipDirection::Undirected => (start_node_id, end_node_id),
1110 };
1111
1112 let rel_triple = self.ensure_relationship(subject_id, rel_type, object_id)?;
1113
1114 if let Some(props) = &rel_pattern.properties {
1116 let props_map = self.convert_property_map_to_json(props)?;
1117 let binary = crate::storage::property::serialize_properties(&props_map)?;
1118 self.set_edge_property_binary(
1119 rel_triple.subject_id,
1120 rel_triple.predicate_id,
1121 rel_triple.object_id,
1122 &binary,
1123 )?;
1124 }
1125
1126 if let Some(var) = &rel_pattern.variable {
1127 result_record.insert(var.clone(), Value::Relationship(rel_triple));
1128 }
1129
1130 last_node_info = end_node_pattern
1131 .variable
1132 .as_ref()
1133 .map(|v| (v.clone(), end_node_id))
1134 .or(Some((end_anon_name, end_node_id)));
1135 }
1136 }
1137 i += 1;
1138 }
1139
1140 Ok(vec![result_record])
1141 }
1142
1143 fn ensure_node(&mut self, node_str: &str, label: &str) -> Result<u64> {
1144 let node_id = self.resolve_id(node_str)?;
1145 let type_id = self.resolve_id("type")?;
1146 let label_id = self.resolve_id(label)?;
1147
1148 if let (Some(node_id), Some(type_id), Some(label_id)) = (node_id, type_id, label_id) {
1149 let criteria = QueryCriteria {
1150 subject_id: Some(node_id),
1151 predicate_id: Some(type_id),
1152 object_id: Some(label_id),
1153 };
1154 if self.query(criteria).next().is_some() {
1155 return Ok(node_id);
1156 }
1157 }
1158
1159 let fact = self.add_fact(Fact::new(node_str, "type", label))?;
1160 Ok(fact.subject_id)
1161 }
1162
1163 fn ensure_relationship(
1164 &mut self,
1165 subject_id: u64,
1166 rel_type: &str,
1167 object_id: u64,
1168 ) -> Result<Triple> {
1169 if let Some(predicate_id) = self.resolve_id(rel_type)? {
1170 let criteria = QueryCriteria {
1171 subject_id: Some(subject_id),
1172 predicate_id: Some(predicate_id),
1173 object_id: Some(object_id),
1174 };
1175 if self.query(criteria).next().is_some() {
1176 return Ok(Triple::new(subject_id, predicate_id, object_id));
1177 }
1178 }
1179
1180 let subject_str = self
1181 .resolve_str(subject_id)?
1182 .ok_or_else(|| Error::Other("Subject node not found".to_string()))?;
1183 let object_str = self
1184 .resolve_str(object_id)?
1185 .ok_or_else(|| Error::Other("Object node not found".to_string()))?;
1186
1187 self.add_fact(Fact::new(&subject_str, rel_type, &object_str))
1188 }
1189
1190 fn convert_property_map_to_json(
1191 &self,
1192 prop_map: &query::ast::PropertyMap,
1193 ) -> Result<HashMap<String, serde_json::Value>> {
1194 use query::ast::{Expression, Literal};
1195 let mut map = HashMap::new();
1196
1197 for pair in &prop_map.properties {
1198 let value = match &pair.value {
1199 Expression::Literal(lit) => match lit {
1200 Literal::String(s) => serde_json::Value::String(s.clone()),
1201 Literal::Float(f) => serde_json::json!(f),
1202 Literal::Boolean(b) => serde_json::Value::Bool(*b),
1203 Literal::Null => serde_json::Value::Null,
1204 _ => serde_json::Value::Null,
1205 },
1206 _ => serde_json::Value::Null, };
1208 map.insert(pair.key.clone(), value);
1209 }
1210
1211 Ok(map)
1212 }
1213
1214 fn execute_set_query_with_plan(
1215 &mut self,
1216 query: &query::ast::Query,
1217 params: &HashMap<String, query::executor::Value>,
1218 ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
1219 use query::executor::{ExecutionContext, ExecutionPlan, Value, evaluate_expression_value};
1220 use query::planner::QueryPlanner;
1221 use std::collections::HashMap;
1222
1223 let planner = QueryPlanner::new();
1225 let plan = planner.plan(query.clone())?;
1226
1227 let (set_node, return_clause) = self.extract_set_node(&plan, query)?;
1229
1230 let mut records: Vec<query::executor::Record> = {
1232 let ctx = ExecutionContext { db: &*self, params };
1233 let iterator = set_node.input.execute(&ctx)?;
1234 let mut rows = Vec::new();
1235 for record in iterator {
1236 rows.push(record?);
1237 }
1238 rows
1239 };
1240
1241 for record in &mut records {
1243 for set_item in &set_node.items {
1245 let var_name = &set_item.property.variable;
1246
1247 if let Some(Value::Node(node_id)) = record.get(var_name) {
1249 let node_id = *node_id;
1250
1251 let new_value = {
1253 let ctx = ExecutionContext { db: &*self, params };
1254 evaluate_expression_value(&set_item.value, record, &ctx)
1255 };
1256
1257 let mut props = if let Ok(Some(binary)) = self.get_node_property_binary(node_id)
1259 {
1260 crate::storage::property::deserialize_properties(&binary)?
1261 } else {
1262 HashMap::new()
1263 };
1264
1265 let json_value = match new_value {
1267 Value::String(s) => serde_json::Value::String(s),
1268 Value::Float(f) => serde_json::json!(f),
1269 Value::Boolean(b) => serde_json::Value::Bool(b),
1270 Value::Null => serde_json::Value::Null,
1271 _ => serde_json::Value::Null,
1272 };
1273 props.insert(set_item.property.property.clone(), json_value);
1274
1275 let binary = crate::storage::property::serialize_properties(&props)?;
1277 self.set_node_property_binary(node_id, &binary)?;
1278 }
1279 }
1280 }
1281
1282 if let Some(return_clause) = return_clause {
1284 let mut results = Vec::new();
1285 for record in records {
1286 let mut result = HashMap::new();
1287 for item in &return_clause.items {
1288 let alias = item
1289 .alias
1290 .clone()
1291 .unwrap_or_else(|| match &item.expression {
1292 query::ast::Expression::Variable(name) => name.clone(),
1293 query::ast::Expression::PropertyAccess(pa) => {
1294 format!("{}.{}", pa.variable, pa.property)
1295 }
1296 _ => "col".to_string(),
1297 });
1298 let value = {
1299 let ctx = ExecutionContext { db: &*self, params };
1300 evaluate_expression_value(&item.expression, &record, &ctx)
1301 };
1302 result.insert(alias, value);
1303 }
1304 results.push(result);
1305 }
1306 Ok(results)
1307 } else {
1308 Ok(records.into_iter().map(|r| r.values).collect())
1310 }
1311 }
1312
1313 fn extract_set_node<'a>(
1314 &self,
1315 plan: &'a query::planner::PhysicalPlan,
1316 query: &query::ast::Query,
1317 ) -> Result<(
1318 &'a query::planner::SetNode,
1319 Option<query::ast::ReturnClause>,
1320 )> {
1321 let _set_clause = query
1323 .clauses
1324 .iter()
1325 .find_map(|c| {
1326 if let query::ast::Clause::Set(s) = c {
1327 Some(s.clone())
1328 } else {
1329 None
1330 }
1331 })
1332 .ok_or_else(|| Error::Other("No SET clause found".to_string()))?;
1333
1334 let return_clause = query.clauses.iter().find_map(|c| {
1335 if let query::ast::Clause::Return(r) = c {
1336 Some(r.clone())
1337 } else {
1338 None
1339 }
1340 });
1341
1342 fn find_set_node(plan: &query::planner::PhysicalPlan) -> Option<&query::planner::SetNode> {
1344 match plan {
1345 query::planner::PhysicalPlan::Set(node) => Some(node),
1346 query::planner::PhysicalPlan::Project(node) => find_set_node(&node.input),
1347 query::planner::PhysicalPlan::Filter(node) => find_set_node(&node.input),
1348 _ => None,
1349 }
1350 }
1351
1352 let set_node = find_set_node(plan)
1353 .ok_or_else(|| Error::Other("No SetNode found in plan".to_string()))?;
1354
1355 Ok((set_node, return_clause))
1356 }
1357
1358 fn execute_delete_query_with_plan(
1359 &mut self,
1360 query: &query::ast::Query,
1361 params: &HashMap<String, query::executor::Value>,
1362 ) -> Result<Vec<std::collections::HashMap<String, query::executor::Value>>> {
1363 use query::executor::{ExecutionContext, ExecutionPlan, Value, evaluate_expression_value};
1364 use query::planner::{PhysicalPlan, QueryPlanner};
1365
1366 let planner = QueryPlanner::new();
1368 let plan = planner.plan(query.clone())?;
1369
1370 let (delete_node, input_plan) = self.extract_delete_and_input_plan(&plan, query)?;
1373
1374 let exec_plan: &PhysicalPlan = match &plan {
1376 PhysicalPlan::Delete(_) => {
1377 &delete_node.input
1379 }
1380 PhysicalPlan::Filter(_filter_node) => {
1381 &delete_node.input
1390 }
1391 _ => input_plan,
1392 };
1393
1394 let base_records: Vec<query::executor::Record> = {
1396 let ctx = ExecutionContext { db: &*self, params };
1397 let iterator = exec_plan.execute(&ctx)?;
1398 let mut rows = Vec::new();
1399 for record in iterator {
1400 rows.push(record?);
1401 }
1402 rows
1403 };
1404
1405 let filter_predicate = if let PhysicalPlan::Filter(filter_node) = &plan {
1408 Some(&filter_node.predicate)
1409 } else {
1410 None
1411 };
1412
1413 let mut records: Vec<query::executor::Record> = Vec::new();
1414 for rec in base_records {
1415 if let Some(predicate) = filter_predicate {
1417 use query::executor::evaluate_expression_value;
1418 let filter_result = {
1419 let ctx = ExecutionContext { db: &*self, params };
1420 evaluate_expression_value(predicate, &rec, &ctx)
1421 };
1422 if filter_result == Value::Boolean(true) {
1423 records.push(rec);
1424 }
1425 } else {
1426 records.push(rec);
1427 }
1428 }
1429
1430 let mut node_ids_to_delete = Vec::new();
1432
1433 for record in &records {
1434 for expr in &delete_node.expressions {
1435 let value = {
1436 let ctx = ExecutionContext { db: &*self, params };
1437 evaluate_expression_value(expr, record, &ctx)
1438 };
1439 if let Value::Node(node_id) = value {
1440 node_ids_to_delete.push(node_id);
1441 }
1442 }
1443 }
1444
1445 for node_id in node_ids_to_delete {
1447 self.delete_node(node_id, delete_node.detach)?;
1448 }
1449
1450 Ok(Vec::new())
1452 }
1453
1454 fn extract_delete_and_input_plan<'a>(
1455 &self,
1456 plan: &'a query::planner::PhysicalPlan,
1457 query: &query::ast::Query,
1458 ) -> Result<(
1459 &'a query::planner::DeleteNode,
1460 &'a query::planner::PhysicalPlan,
1461 )> {
1462 let _delete_clause = query
1464 .clauses
1465 .iter()
1466 .find_map(|c| {
1467 if let query::ast::Clause::Delete(d) = c {
1468 Some(d.clone())
1469 } else {
1470 None
1471 }
1472 })
1473 .ok_or_else(|| Error::Other("No DELETE clause found".to_string()))?;
1474
1475 match plan {
1484 query::planner::PhysicalPlan::Delete(delete_node) => {
1485 Ok((delete_node, &delete_node.input))
1487 }
1488 query::planner::PhysicalPlan::Filter(filter_node) => {
1489 match &*filter_node.input {
1491 query::planner::PhysicalPlan::Delete(delete_node) => {
1492 Ok((delete_node, plan))
1494 }
1495 _ => Err(Error::Other("Expected Delete inside Filter".to_string())),
1496 }
1497 }
1498 query::planner::PhysicalPlan::Project(project_node) => {
1499 match &*project_node.input {
1501 query::planner::PhysicalPlan::Delete(delete_node) => {
1502 Ok((delete_node, &delete_node.input))
1503 }
1504 query::planner::PhysicalPlan::Filter(filter_node) => {
1505 match &*filter_node.input {
1506 query::planner::PhysicalPlan::Delete(delete_node) => {
1507 Ok((delete_node, &project_node.input))
1508 }
1509 _ => Err(Error::Other("Expected Delete inside Filter".to_string())),
1510 }
1511 }
1512 _ => Err(Error::Other(
1513 "Expected Delete or Filter(Delete) inside Project".to_string(),
1514 )),
1515 }
1516 }
1517 _ => Err(Error::Other("No DELETE plan found".to_string())),
1518 }
1519 }
1520
1521 fn delete_node(&mut self, node_id: u64, detach: bool) -> Result<()> {
1522 let has_relationships = self.node_has_relationships(node_id);
1524
1525 if has_relationships && !detach {
1526 return Err(Error::Other(format!(
1527 "Cannot delete node {} because it has relationships. Use DETACH DELETE to remove relationships first.",
1528 node_id
1529 )));
1530 }
1531
1532 if detach {
1534 self.delete_all_relationships(node_id)?;
1535 }
1536
1537 if let Some(type_id) = self.resolve_id("type")? {
1539 let criteria = QueryCriteria {
1540 subject_id: Some(node_id),
1541 predicate_id: Some(type_id),
1542 object_id: None,
1543 };
1544
1545 let triples_to_delete: Vec<Triple> = self.query(criteria).collect();
1546 for triple in triples_to_delete {
1547 self.store.delete(&triple)?;
1548 }
1549 }
1550
1551 #[cfg(not(target_arch = "wasm32"))]
1553 if let Some(txn) = self.active_write.as_mut() {
1554 {
1555 let mut binary_table = txn
1557 .open_table(crate::storage::schema::TABLE_NODE_PROPS_BINARY)
1558 .map_err(|e| Error::Other(e.to_string()))?;
1559 binary_table
1560 .remove(node_id)
1561 .map_err(|e| Error::Other(e.to_string()))?;
1562
1563 let mut string_table = txn
1565 .open_table(crate::storage::schema::TABLE_NODE_PROPS)
1566 .map_err(|e| Error::Other(e.to_string()))?;
1567 string_table
1568 .remove(node_id)
1569 .map_err(|e| Error::Other(e.to_string()))?;
1570 }
1571
1572 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1573 fts_index::bump_committed_writes_in_txn(txn, 1)?;
1574 } else {
1575 let tx = self
1576 .redb
1577 .begin_write()
1578 .map_err(|e| Error::Other(e.to_string()))?;
1579 {
1580 let mut binary_table = tx
1582 .open_table(crate::storage::schema::TABLE_NODE_PROPS_BINARY)
1583 .map_err(|e| Error::Other(e.to_string()))?;
1584 binary_table
1585 .remove(node_id)
1586 .map_err(|e| Error::Other(e.to_string()))?;
1587
1588 let mut string_table = tx
1590 .open_table(crate::storage::schema::TABLE_NODE_PROPS)
1591 .map_err(|e| Error::Other(e.to_string()))?;
1592 string_table
1593 .remove(node_id)
1594 .map_err(|e| Error::Other(e.to_string()))?;
1595 }
1596
1597 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1598 fts_index::bump_committed_writes_in_txn(&tx, 1)?;
1599
1600 tx.commit().map_err(|e| Error::Other(e.to_string()))?;
1601 self.store.after_write_commit();
1602 }
1603
1604 #[cfg(target_arch = "wasm32")]
1605 self.store.delete_node_properties(node_id)?;
1606
1607 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
1608 if let Some(index) = self.vector_index.as_mut() {
1609 let _ = index.remove(node_id);
1610 }
1611
1612 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1613 if let Some(index) = self.fts_index.as_mut() {
1614 let _ = index.delete_node(node_id);
1615 }
1616
1617 Ok(())
1618 }
1619
1620 fn node_has_relationships(&self, node_id: u64) -> bool {
1621 let type_id = match self.resolve_id("type") {
1623 Ok(Some(id)) => id,
1624 _ => return false,
1625 };
1626
1627 let criteria_as_subject = QueryCriteria {
1629 subject_id: Some(node_id),
1630 predicate_id: None,
1631 object_id: None,
1632 };
1633
1634 for triple in self.query(criteria_as_subject) {
1635 if triple.predicate_id != type_id {
1636 return true; }
1638 }
1639
1640 let criteria_as_object = QueryCriteria {
1642 subject_id: None,
1643 predicate_id: None,
1644 object_id: Some(node_id),
1645 };
1646
1647 self.query(criteria_as_object).next().is_some()
1648 }
1649
1650 fn delete_all_relationships(&mut self, node_id: u64) -> Result<()> {
1651 let type_id = self
1653 .resolve_id("type")?
1654 .ok_or_else(|| Error::Other("Type predicate not found".to_string()))?;
1655
1656 let criteria_as_subject = QueryCriteria {
1658 subject_id: Some(node_id),
1659 predicate_id: None,
1660 object_id: None,
1661 };
1662
1663 let triples_to_delete: Vec<Triple> = self
1664 .query(criteria_as_subject)
1665 .filter(|t| t.predicate_id != type_id)
1666 .collect();
1667
1668 for triple in triples_to_delete {
1669 self.store.delete(&triple)?;
1670 }
1671
1672 let criteria_as_object = QueryCriteria {
1674 subject_id: None,
1675 predicate_id: None,
1676 object_id: Some(node_id),
1677 };
1678
1679 let triples_to_delete: Vec<Triple> = self.query(criteria_as_object).collect();
1680
1681 for triple in triples_to_delete {
1682 self.store.delete(&triple)?;
1683 }
1684
1685 Ok(())
1686 }
1687
1688 pub fn query(&self, criteria: QueryCriteria) -> crate::storage::HexastoreIter {
1689 self.store.query(
1690 criteria.subject_id,
1691 criteria.predicate_id,
1692 criteria.object_id,
1693 )
1694 }
1695
1696 pub fn get_store(&self) -> &dyn crate::storage::Hexastore {
1698 self.store.as_ref()
1699 }
1700
1701 pub fn open_cursor(&mut self, criteria: QueryCriteria) -> Result<u64> {
1702 let iter = self.query(criteria);
1703 let cursor_id = self.next_cursor_id;
1704 self.next_cursor_id = self.next_cursor_id.wrapping_add(1).max(1);
1705 self.cursors.insert(cursor_id, QueryCursor::new(iter));
1706 Ok(cursor_id)
1707 }
1708
1709 pub fn cursor_next(
1710 &mut self,
1711 cursor_id: u64,
1712 batch_size: usize,
1713 ) -> Result<(Vec<Triple>, bool)> {
1714 let cursor = self
1715 .cursors
1716 .get_mut(&cursor_id)
1717 .ok_or(Error::InvalidCursor(cursor_id))?;
1718 let (batch, done) = cursor.next_batch(batch_size.max(1));
1719 if done {
1720 self.cursors.remove(&cursor_id);
1721 }
1722 Ok((batch, done))
1723 }
1724
1725 pub fn close_cursor(&mut self, cursor_id: u64) -> Result<()> {
1726 self.cursors
1727 .remove(&cursor_id)
1728 .ok_or(Error::InvalidCursor(cursor_id))?;
1729 Ok(())
1730 }
1731
1732 fn reset_cursors(&mut self) {
1733 self.cursors.clear();
1734 self.next_cursor_id = 1;
1735 }
1736
1737 #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1738 pub fn temporal_store(&self) -> &TemporalStore {
1739 &self.temporal
1740 }
1741
1742 #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1743 pub fn temporal_store_mut(&mut self) -> &mut TemporalStore {
1744 &mut self.temporal
1745 }
1746
1747 #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1748 pub fn timeline_query(&self, query: TimelineQuery) -> Vec<StoredFact> {
1749 self.temporal.query_timeline(&query).unwrap_or_default()
1750 }
1751
1752 #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1753 pub fn timeline_trace(&self, fact_id: u64) -> Vec<StoredEpisode> {
1754 self.temporal.trace_back(fact_id).unwrap_or_default()
1755 }
1756
1757 #[cfg(not(target_arch = "wasm32"))]
1766 pub fn begin_transaction(&mut self) -> Result<()> {
1767 if self.active_write.is_some() {
1768 return Err(Error::Other("transaction already open".to_string()));
1769 }
1770 let tx = self
1771 .redb
1772 .begin_write()
1773 .map_err(|e| Error::Other(e.to_string()))?;
1774 self.active_write = Some(tx);
1775 Ok(())
1776 }
1777
1778 #[cfg(not(target_arch = "wasm32"))]
1780 pub fn commit_transaction(&mut self) -> Result<()> {
1781 let tx = self
1782 .active_write
1783 .take()
1784 .ok_or_else(|| Error::Other("no active transaction".to_string()))?;
1785 tx.commit().map_err(|e| Error::Other(e.to_string()))?;
1786 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1787 {
1788 let staged = std::mem::take(&mut self.fts_write_log);
1789 if let Some(index) = self.fts_index.as_mut() {
1790 for (node_id, value) in staged {
1791 if let Ok(props) = crate::storage::property::deserialize_properties(&value) {
1792 let _ = index.upsert_from_props(node_id, &props);
1793 }
1794 }
1795 }
1796 }
1797 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
1798 self.vector_undo_log.clear();
1799 self.store.after_write_commit();
1800 Ok(())
1801 }
1802
1803 #[cfg(not(target_arch = "wasm32"))]
1805 pub fn abort_transaction(&mut self) -> Result<()> {
1806 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1807 self.fts_write_log.clear();
1808 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
1809 self.rollback_vector_index();
1810 self.active_write = None;
1811 Ok(())
1812 }
1813
1814 #[cfg(not(target_arch = "wasm32"))]
1816 pub fn is_transaction_active(&self) -> bool {
1817 self.active_write.is_some()
1818 }
1819
1820 #[cfg(not(target_arch = "wasm32"))]
1823 pub fn with_transaction<F, R>(&mut self, f: F) -> Result<R>
1824 where
1825 F: FnOnce(&mut Self) -> Result<R>,
1826 {
1827 if self.is_transaction_active() {
1828 return Err(Error::Other("transaction already active".to_string()));
1829 }
1830
1831 self.begin_transaction()?;
1832
1833 match f(self) {
1834 Ok(result) => {
1835 self.commit_transaction()?;
1836 Ok(result)
1837 }
1838 Err(error) => {
1839 let _ = self.abort_transaction();
1841 Err(error)
1842 }
1843 }
1844 }
1845}
1846
1847#[cfg(test)]
1848mod tests {
1849 use super::*;
1850 use tempfile::tempdir;
1851
1852 #[test]
1853 fn open_and_insert() {
1854 let tmp = tempdir().unwrap();
1855 let mut db = Database::open(Options::new(tmp.path())).unwrap();
1856 let triple = db.add_fact(Fact::new("alice", "knows", "bob")).unwrap();
1857 assert_eq!(db.all_triples(), vec![triple]);
1858 assert_eq!(db.resolve_str(triple.subject_id).unwrap().unwrap(), "alice");
1859
1860 let results: Vec<_> = db
1861 .query(QueryCriteria {
1862 subject_id: Some(triple.subject_id),
1863 predicate_id: None,
1864 object_id: None,
1865 })
1866 .collect();
1867 assert_eq!(results, vec![triple]);
1868
1869 let cursor_id = db
1870 .open_cursor(QueryCriteria {
1871 subject_id: Some(triple.subject_id),
1872 predicate_id: None,
1873 object_id: None,
1874 })
1875 .unwrap();
1876 let (batch, done) = db.cursor_next(cursor_id, 10).unwrap();
1877 assert!(done);
1878 assert_eq!(batch, vec![triple]);
1879 }
1880
1881 #[cfg(not(target_arch = "wasm32"))]
1882 #[test]
1883 fn node_and_edge_properties_roundtrip() {
1884 let tmp = tempdir().unwrap();
1885 let mut db = Database::open(Options::new(tmp.path())).unwrap();
1886 let triple = db.add_fact(Fact::new("alice", "knows", "bob")).unwrap();
1887
1888 db.set_node_property(triple.subject_id, r#"{"name":"alice"}"#)
1889 .unwrap();
1890 db.set_edge_property(
1891 triple.subject_id,
1892 triple.predicate_id,
1893 triple.object_id,
1894 r#"{"since":2020}"#,
1895 )
1896 .unwrap();
1897
1898 assert_eq!(
1899 db.get_node_property(triple.subject_id).unwrap().unwrap(),
1900 r#"{"name":"alice"}"#
1901 );
1902 assert_eq!(
1903 db.get_edge_property(triple.subject_id, triple.predicate_id, triple.object_id)
1904 .unwrap()
1905 .unwrap(),
1906 r#"{"since":2020}"#
1907 );
1908 }
1909
1910 #[test]
1911 #[cfg(all(feature = "temporal", not(target_arch = "wasm32")))]
1912 fn timeline_query_via_database() {
1913 let dir = tempdir().unwrap();
1914 let path = dir.path().join("timeline-db");
1915 let mut db = Database::open(Options::new(&path)).unwrap();
1916
1917 {
1918 let store = db.temporal_store_mut();
1919 let alice = store
1920 .ensure_entity(
1921 "agent",
1922 "alice",
1923 EnsureEntityOptions {
1924 alias: Some("Alice".into()),
1925 occurred_at: Some("2025-01-01T00:00:00Z".into()),
1926 ..Default::default()
1927 },
1928 )
1929 .unwrap();
1930 let bob = store
1931 .ensure_entity(
1932 "agent",
1933 "bob",
1934 EnsureEntityOptions {
1935 alias: Some("Bob".into()),
1936 occurred_at: Some("2025-01-01T00:00:00Z".into()),
1937 ..Default::default()
1938 },
1939 )
1940 .unwrap();
1941 let episode = store
1942 .add_episode(EpisodeInput {
1943 source_type: "conversation".into(),
1944 payload: serde_json::json!({ "text": "hello" }),
1945 occurred_at: "2025-01-01T00:00:00Z".into(),
1946 trace_hash: None,
1947 })
1948 .unwrap();
1949 let fact = store
1950 .upsert_fact(FactWriteInput {
1951 subject_entity_id: alice.entity_id,
1952 predicate_key: "mentions".into(),
1953 object_entity_id: Some(bob.entity_id),
1954 object_value: None,
1955 valid_from: Some("2025-01-01T00:00:00Z".into()),
1956 valid_to: None,
1957 confidence: None,
1958 source_episode_id: episode.episode_id,
1959 })
1960 .unwrap();
1961 store
1962 .link_episode(
1963 episode.episode_id,
1964 EpisodeLinkOptions {
1965 entity_id: Some(alice.entity_id),
1966 fact_id: Some(fact.fact_id),
1967 role: "author".into(),
1968 },
1969 )
1970 .unwrap();
1971 }
1972
1973 let alice_id = db.temporal_store().get_entities().unwrap()[0].entity_id;
1974 let timeline = db.timeline_query(TimelineQuery {
1975 entity_id: alice_id,
1976 predicate_key: Some("mentions".into()),
1977 role: Some(TimelineRole::Subject),
1978 ..Default::default()
1979 });
1980 assert_eq!(timeline.len(), 1);
1981
1982 let episodes = db.timeline_trace(timeline[0].fact_id);
1983 assert_eq!(episodes.len(), 1);
1984 }
1985
1986 #[test]
1987 #[cfg(not(target_arch = "wasm32"))]
1988 fn test_transaction_api() {
1989 use tempfile::tempdir;
1990
1991 let dir = tempdir().unwrap();
1992 let path = dir.path().join("tx_test.nervus");
1993 let mut db = Database::open(Options::new(&path)).unwrap();
1994
1995 assert!(!db.is_transaction_active());
1997
1998 db.begin_transaction().unwrap();
1999 assert!(db.is_transaction_active());
2000
2001 db.add_fact(Fact::new("alice", "knows", "bob")).unwrap();
2003
2004 db.commit_transaction().unwrap();
2006 assert!(!db.is_transaction_active());
2007
2008 let query_result = db.query(QueryCriteria::default()).count();
2010 assert!(query_result > 0);
2011
2012 db.begin_transaction().unwrap();
2014 db.add_fact(Fact::new("bob", "knows", "charlie")).unwrap();
2015 db.abort_transaction().unwrap();
2016
2017 let alice_knows_bob = db.query(QueryCriteria::default()).count();
2019 let should_be_same = db.query(QueryCriteria::default()).count();
2020 assert_eq!(alice_knows_bob, should_be_same);
2021 }
2022
2023 #[test]
2024 #[cfg(not(target_arch = "wasm32"))]
2025 fn test_with_transaction_api() {
2026 use tempfile::tempdir;
2027
2028 let dir = tempdir().unwrap();
2029 let path = dir.path().join("with_tx_test.nervus");
2030 let mut db = Database::open(Options::new(&path)).unwrap();
2031
2032 let result = db.with_transaction(|db| {
2034 db.add_fact(Fact::new("alice", "knows", "bob"))?;
2035 db.add_fact(Fact::new("bob", "knows", "charlie"))?;
2036 Ok("success")
2037 });
2038 assert!(result.is_ok());
2039 assert_eq!(result.unwrap(), "success");
2040 assert!(!db.is_transaction_active());
2041
2042 let triples_count = db.query(QueryCriteria::default()).count();
2044 assert!(triples_count >= 2);
2045
2046 let original_count = db.query(QueryCriteria::default()).count();
2048 let result: Result<&str> = db.with_transaction(|db| {
2049 db.add_fact(Fact::new("dave", "knows", "eve"))?;
2050 Err(crate::Error::Other("simulated error".to_string()))
2052 });
2053 assert!(result.is_err());
2054 assert!(!db.is_transaction_active());
2055
2056 let final_count = db.query(QueryCriteria::default()).count();
2058 assert_eq!(original_count, final_count);
2059 }
2060}