1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25pub struct Session {
31 store: Arc<LpgStore>,
33 graph_store: Arc<dyn GraphStoreMut>,
35 catalog: Arc<Catalog>,
37 #[cfg(feature = "rdf")]
39 rdf_store: Arc<RdfStore>,
40 transaction_manager: Arc<TransactionManager>,
42 query_cache: Arc<QueryCache>,
44 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
48 read_only_tx: parking_lot::Mutex<bool>,
50 auto_commit: bool,
52 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
55 factorized_execution: bool,
57 graph_model: GraphModel,
59 query_timeout: Option<Duration>,
61 commit_counter: Arc<AtomicUsize>,
63 gc_interval: usize,
65 transaction_start_node_count: AtomicUsize,
67 transaction_start_edge_count: AtomicUsize,
69 #[cfg(feature = "wal")]
71 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
72 #[cfg(feature = "cdc")]
74 cdc_log: Arc<crate::cdc::CdcLog>,
75 current_graph: parking_lot::Mutex<Option<String>>,
77 time_zone: parking_lot::Mutex<Option<String>>,
79 session_params:
81 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
82 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
84 savepoints: parking_lot::Mutex<Vec<(String, u64, u64, usize)>>,
86 transaction_nesting_depth: parking_lot::Mutex<u32>,
90}
91
92impl Session {
93 #[allow(dead_code, clippy::too_many_arguments)]
95 pub(crate) fn with_adaptive(
96 store: Arc<LpgStore>,
97 transaction_manager: Arc<TransactionManager>,
98 query_cache: Arc<QueryCache>,
99 catalog: Arc<Catalog>,
100 adaptive_config: AdaptiveConfig,
101 factorized_execution: bool,
102 graph_model: GraphModel,
103 query_timeout: Option<Duration>,
104 commit_counter: Arc<AtomicUsize>,
105 gc_interval: usize,
106 ) -> Self {
107 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
108 Self {
109 store,
110 graph_store,
111 catalog,
112 #[cfg(feature = "rdf")]
113 rdf_store: Arc::new(RdfStore::new()),
114 transaction_manager,
115 query_cache,
116 current_transaction: parking_lot::Mutex::new(None),
117 read_only_tx: parking_lot::Mutex::new(false),
118 auto_commit: true,
119 adaptive_config,
120 factorized_execution,
121 graph_model,
122 query_timeout,
123 commit_counter,
124 gc_interval,
125 transaction_start_node_count: AtomicUsize::new(0),
126 transaction_start_edge_count: AtomicUsize::new(0),
127 #[cfg(feature = "wal")]
128 wal: None,
129 #[cfg(feature = "cdc")]
130 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
131 current_graph: parking_lot::Mutex::new(None),
132 time_zone: parking_lot::Mutex::new(None),
133 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
134 viewing_epoch_override: parking_lot::Mutex::new(None),
135 savepoints: parking_lot::Mutex::new(Vec::new()),
136 transaction_nesting_depth: parking_lot::Mutex::new(0),
137 }
138 }
139
140 #[cfg(feature = "wal")]
145 pub(crate) fn set_wal(&mut self, wal: Arc<grafeo_adapters::storage::wal::LpgWal>) {
146 self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
148 Arc::clone(&self.store),
149 Arc::clone(&wal),
150 ));
151 self.wal = Some(wal);
152 }
153
154 #[cfg(feature = "cdc")]
156 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
157 self.cdc_log = cdc_log;
158 }
159
160 #[cfg(feature = "rdf")]
162 #[allow(clippy::too_many_arguments)]
163 pub(crate) fn with_rdf_store_and_adaptive(
164 store: Arc<LpgStore>,
165 rdf_store: Arc<RdfStore>,
166 transaction_manager: Arc<TransactionManager>,
167 query_cache: Arc<QueryCache>,
168 catalog: Arc<Catalog>,
169 adaptive_config: AdaptiveConfig,
170 factorized_execution: bool,
171 graph_model: GraphModel,
172 query_timeout: Option<Duration>,
173 commit_counter: Arc<AtomicUsize>,
174 gc_interval: usize,
175 ) -> Self {
176 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
177 Self {
178 store,
179 graph_store,
180 catalog,
181 rdf_store,
182 transaction_manager,
183 query_cache,
184 current_transaction: parking_lot::Mutex::new(None),
185 read_only_tx: parking_lot::Mutex::new(false),
186 auto_commit: true,
187 adaptive_config,
188 factorized_execution,
189 graph_model,
190 query_timeout,
191 commit_counter,
192 gc_interval,
193 transaction_start_node_count: AtomicUsize::new(0),
194 transaction_start_edge_count: AtomicUsize::new(0),
195 #[cfg(feature = "wal")]
196 wal: None,
197 #[cfg(feature = "cdc")]
198 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
199 current_graph: parking_lot::Mutex::new(None),
200 time_zone: parking_lot::Mutex::new(None),
201 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
202 viewing_epoch_override: parking_lot::Mutex::new(None),
203 savepoints: parking_lot::Mutex::new(Vec::new()),
204 transaction_nesting_depth: parking_lot::Mutex::new(0),
205 }
206 }
207
208 #[allow(clippy::too_many_arguments)]
213 pub(crate) fn with_external_store(
214 store: Arc<dyn GraphStoreMut>,
215 transaction_manager: Arc<TransactionManager>,
216 query_cache: Arc<QueryCache>,
217 catalog: Arc<Catalog>,
218 adaptive_config: AdaptiveConfig,
219 factorized_execution: bool,
220 graph_model: GraphModel,
221 query_timeout: Option<Duration>,
222 commit_counter: Arc<AtomicUsize>,
223 gc_interval: usize,
224 ) -> Self {
225 Self {
226 store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), graph_store: store,
228 catalog,
229 #[cfg(feature = "rdf")]
230 rdf_store: Arc::new(RdfStore::new()),
231 transaction_manager,
232 query_cache,
233 current_transaction: parking_lot::Mutex::new(None),
234 read_only_tx: parking_lot::Mutex::new(false),
235 auto_commit: true,
236 adaptive_config,
237 factorized_execution,
238 graph_model,
239 query_timeout,
240 commit_counter,
241 gc_interval,
242 transaction_start_node_count: AtomicUsize::new(0),
243 transaction_start_edge_count: AtomicUsize::new(0),
244 #[cfg(feature = "wal")]
245 wal: None,
246 #[cfg(feature = "cdc")]
247 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
248 current_graph: parking_lot::Mutex::new(None),
249 time_zone: parking_lot::Mutex::new(None),
250 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
251 viewing_epoch_override: parking_lot::Mutex::new(None),
252 savepoints: parking_lot::Mutex::new(Vec::new()),
253 transaction_nesting_depth: parking_lot::Mutex::new(0),
254 }
255 }
256
257 #[must_use]
259 pub fn graph_model(&self) -> GraphModel {
260 self.graph_model
261 }
262
263 pub fn use_graph(&self, name: &str) {
267 *self.current_graph.lock() = Some(name.to_string());
268 }
269
270 #[must_use]
272 pub fn current_graph(&self) -> Option<String> {
273 self.current_graph.lock().clone()
274 }
275
276 pub fn set_time_zone(&self, tz: &str) {
278 *self.time_zone.lock() = Some(tz.to_string());
279 }
280
281 #[must_use]
283 pub fn time_zone(&self) -> Option<String> {
284 self.time_zone.lock().clone()
285 }
286
287 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
289 self.session_params.lock().insert(key.to_string(), value);
290 }
291
292 #[must_use]
294 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
295 self.session_params.lock().get(key).cloned()
296 }
297
298 pub fn reset_session(&self) {
300 *self.current_graph.lock() = None;
301 *self.time_zone.lock() = None;
302 self.session_params.lock().clear();
303 *self.viewing_epoch_override.lock() = None;
304 }
305
306 pub fn set_viewing_epoch(&self, epoch: EpochId) {
314 *self.viewing_epoch_override.lock() = Some(epoch);
315 }
316
317 pub fn clear_viewing_epoch(&self) {
319 *self.viewing_epoch_override.lock() = None;
320 }
321
322 #[must_use]
324 pub fn viewing_epoch(&self) -> Option<EpochId> {
325 *self.viewing_epoch_override.lock()
326 }
327
328 #[must_use]
332 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
333 self.store.get_node_history(id)
334 }
335
336 #[must_use]
340 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
341 self.store.get_edge_history(id)
342 }
343
344 fn require_lpg(&self, language: &str) -> Result<()> {
346 if self.graph_model == GraphModel::Rdf {
347 return Err(grafeo_common::utils::error::Error::Internal(format!(
348 "This is an RDF database. {language} queries require an LPG database."
349 )));
350 }
351 Ok(())
352 }
353
354 #[cfg(feature = "gql")]
356 fn execute_session_command(
357 &self,
358 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
359 ) -> Result<QueryResult> {
360 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
361 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
362
363 match cmd {
364 SessionCommand::CreateGraph {
365 name,
366 if_not_exists,
367 typed,
368 like_graph,
369 copy_of,
370 open: _,
371 } => {
372 if let Some(ref src) = like_graph
374 && self.store.graph(src).is_none()
375 {
376 return Err(Error::Query(QueryError::new(
377 QueryErrorKind::Semantic,
378 format!("Source graph '{src}' does not exist"),
379 )));
380 }
381 if let Some(ref src) = copy_of
382 && self.store.graph(src).is_none()
383 {
384 return Err(Error::Query(QueryError::new(
385 QueryErrorKind::Semantic,
386 format!("Source graph '{src}' does not exist"),
387 )));
388 }
389
390 let created = self
391 .store
392 .create_graph(&name)
393 .map_err(|e| Error::Internal(e.to_string()))?;
394 if !created && !if_not_exists {
395 return Err(Error::Query(QueryError::new(
396 QueryErrorKind::Semantic,
397 format!("Graph '{name}' already exists"),
398 )));
399 }
400
401 if let Some(ref src) = copy_of {
403 self.store
404 .copy_graph(Some(src), Some(&name))
405 .map_err(|e| Error::Internal(e.to_string()))?;
406 }
407
408 if let Some(type_name) = typed
410 && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
411 {
412 return Err(Error::Query(QueryError::new(
413 QueryErrorKind::Semantic,
414 e.to_string(),
415 )));
416 }
417
418 if let Some(ref src) = like_graph
420 && let Some(src_type) = self.catalog.get_graph_type_binding(src)
421 {
422 let _ = self.catalog.bind_graph_type(&name, src_type);
423 }
424
425 Ok(QueryResult::empty())
426 }
427 SessionCommand::DropGraph { name, if_exists } => {
428 let dropped = self.store.drop_graph(&name);
429 if !dropped && !if_exists {
430 return Err(Error::Query(QueryError::new(
431 QueryErrorKind::Semantic,
432 format!("Graph '{name}' does not exist"),
433 )));
434 }
435 Ok(QueryResult::empty())
436 }
437 SessionCommand::UseGraph(name) => {
438 if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
440 return Err(Error::Query(QueryError::new(
441 QueryErrorKind::Semantic,
442 format!("Graph '{name}' does not exist"),
443 )));
444 }
445 self.use_graph(&name);
446 Ok(QueryResult::empty())
447 }
448 SessionCommand::SessionSetGraph(name) => {
449 self.use_graph(&name);
450 Ok(QueryResult::empty())
451 }
452 SessionCommand::SessionSetTimeZone(tz) => {
453 self.set_time_zone(&tz);
454 Ok(QueryResult::empty())
455 }
456 SessionCommand::SessionSetParameter(key, expr) => {
457 if key.eq_ignore_ascii_case("viewing_epoch") {
458 match Self::eval_integer_literal(&expr) {
459 Some(n) if n >= 0 => {
460 self.set_viewing_epoch(EpochId::new(n as u64));
461 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
462 }
463 _ => Err(Error::Query(QueryError::new(
464 QueryErrorKind::Semantic,
465 "viewing_epoch must be a non-negative integer literal",
466 ))),
467 }
468 } else {
469 self.set_parameter(&key, Value::Null);
472 Ok(QueryResult::empty())
473 }
474 }
475 SessionCommand::SessionReset => {
476 self.reset_session();
477 Ok(QueryResult::empty())
478 }
479 SessionCommand::SessionClose => {
480 self.reset_session();
481 Ok(QueryResult::empty())
482 }
483 SessionCommand::StartTransaction {
484 read_only,
485 isolation_level,
486 } => {
487 let engine_level = isolation_level.map(|l| match l {
488 TransactionIsolationLevel::ReadCommitted => {
489 crate::transaction::IsolationLevel::ReadCommitted
490 }
491 TransactionIsolationLevel::SnapshotIsolation => {
492 crate::transaction::IsolationLevel::SnapshotIsolation
493 }
494 TransactionIsolationLevel::Serializable => {
495 crate::transaction::IsolationLevel::Serializable
496 }
497 });
498 self.begin_transaction_inner(read_only, engine_level)?;
499 Ok(QueryResult::status("Transaction started"))
500 }
501 SessionCommand::Commit => {
502 self.commit_inner()?;
503 Ok(QueryResult::status("Transaction committed"))
504 }
505 SessionCommand::Rollback => {
506 self.rollback_inner()?;
507 Ok(QueryResult::status("Transaction rolled back"))
508 }
509 SessionCommand::Savepoint(name) => {
510 self.savepoint(&name)?;
511 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
512 }
513 SessionCommand::RollbackToSavepoint(name) => {
514 self.rollback_to_savepoint(&name)?;
515 Ok(QueryResult::status(format!(
516 "Rolled back to savepoint '{name}'"
517 )))
518 }
519 SessionCommand::ReleaseSavepoint(name) => {
520 self.release_savepoint(&name)?;
521 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
522 }
523 }
524 }
525
526 #[cfg(feature = "wal")]
528 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
529 if let Some(ref wal) = self.wal
530 && let Err(e) = wal.log(record)
531 {
532 tracing::warn!("Failed to log schema change to WAL: {}", e);
533 }
534 }
535
536 #[cfg(feature = "gql")]
538 fn execute_schema_command(
539 &self,
540 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
541 ) -> Result<QueryResult> {
542 use crate::catalog::{
543 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
544 };
545 use grafeo_adapters::query::gql::ast::SchemaStatement;
546 #[cfg(feature = "wal")]
547 use grafeo_adapters::storage::wal::WalRecord;
548 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
549
550 macro_rules! wal_log {
552 ($self:expr, $record:expr) => {
553 #[cfg(feature = "wal")]
554 $self.log_schema_wal(&$record);
555 };
556 }
557
558 let result = match cmd {
559 SchemaStatement::CreateNodeType(stmt) => {
560 #[cfg(feature = "wal")]
561 let props_for_wal: Vec<(String, String, bool)> = stmt
562 .properties
563 .iter()
564 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
565 .collect();
566 let def = NodeTypeDefinition {
567 name: stmt.name.clone(),
568 properties: stmt
569 .properties
570 .iter()
571 .map(|p| TypedProperty {
572 name: p.name.clone(),
573 data_type: PropertyDataType::from_type_name(&p.data_type),
574 nullable: p.nullable,
575 default_value: None,
576 })
577 .collect(),
578 constraints: Vec::new(),
579 };
580 let result = if stmt.or_replace {
581 let _ = self.catalog.drop_node_type(&stmt.name);
582 self.catalog.register_node_type(def)
583 } else {
584 self.catalog.register_node_type(def)
585 };
586 match result {
587 Ok(()) => {
588 wal_log!(
589 self,
590 WalRecord::CreateNodeType {
591 name: stmt.name.clone(),
592 properties: props_for_wal,
593 constraints: Vec::new(),
594 }
595 );
596 Ok(QueryResult::status(format!(
597 "Created node type '{}'",
598 stmt.name
599 )))
600 }
601 Err(e) if stmt.if_not_exists => {
602 let _ = e;
603 Ok(QueryResult::status("No change"))
604 }
605 Err(e) => Err(Error::Query(QueryError::new(
606 QueryErrorKind::Semantic,
607 e.to_string(),
608 ))),
609 }
610 }
611 SchemaStatement::CreateEdgeType(stmt) => {
612 #[cfg(feature = "wal")]
613 let props_for_wal: Vec<(String, String, bool)> = stmt
614 .properties
615 .iter()
616 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
617 .collect();
618 let def = EdgeTypeDefinition {
619 name: stmt.name.clone(),
620 properties: stmt
621 .properties
622 .iter()
623 .map(|p| TypedProperty {
624 name: p.name.clone(),
625 data_type: PropertyDataType::from_type_name(&p.data_type),
626 nullable: p.nullable,
627 default_value: None,
628 })
629 .collect(),
630 constraints: Vec::new(),
631 };
632 let result = if stmt.or_replace {
633 let _ = self.catalog.drop_edge_type_def(&stmt.name);
634 self.catalog.register_edge_type_def(def)
635 } else {
636 self.catalog.register_edge_type_def(def)
637 };
638 match result {
639 Ok(()) => {
640 wal_log!(
641 self,
642 WalRecord::CreateEdgeType {
643 name: stmt.name.clone(),
644 properties: props_for_wal,
645 constraints: Vec::new(),
646 }
647 );
648 Ok(QueryResult::status(format!(
649 "Created edge type '{}'",
650 stmt.name
651 )))
652 }
653 Err(e) if stmt.if_not_exists => {
654 let _ = e;
655 Ok(QueryResult::status("No change"))
656 }
657 Err(e) => Err(Error::Query(QueryError::new(
658 QueryErrorKind::Semantic,
659 e.to_string(),
660 ))),
661 }
662 }
663 SchemaStatement::CreateVectorIndex(stmt) => {
664 Self::create_vector_index_on_store(
665 &self.store,
666 &stmt.node_label,
667 &stmt.property,
668 stmt.dimensions,
669 stmt.metric.as_deref(),
670 )?;
671 wal_log!(
672 self,
673 WalRecord::CreateIndex {
674 name: stmt.name.clone(),
675 label: stmt.node_label.clone(),
676 property: stmt.property.clone(),
677 index_type: "vector".to_string(),
678 }
679 );
680 Ok(QueryResult::status(format!(
681 "Created vector index '{}'",
682 stmt.name
683 )))
684 }
685 SchemaStatement::DropNodeType { name, if_exists } => {
686 match self.catalog.drop_node_type(&name) {
687 Ok(()) => {
688 wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
689 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
690 }
691 Err(e) if if_exists => {
692 let _ = e;
693 Ok(QueryResult::status("No change"))
694 }
695 Err(e) => Err(Error::Query(QueryError::new(
696 QueryErrorKind::Semantic,
697 e.to_string(),
698 ))),
699 }
700 }
701 SchemaStatement::DropEdgeType { name, if_exists } => {
702 match self.catalog.drop_edge_type_def(&name) {
703 Ok(()) => {
704 wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
705 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
706 }
707 Err(e) if if_exists => {
708 let _ = e;
709 Ok(QueryResult::status("No change"))
710 }
711 Err(e) => Err(Error::Query(QueryError::new(
712 QueryErrorKind::Semantic,
713 e.to_string(),
714 ))),
715 }
716 }
717 SchemaStatement::CreateIndex(stmt) => {
718 use grafeo_adapters::query::gql::ast::IndexKind;
719 let index_type_str = match stmt.index_kind {
720 IndexKind::Property => "property",
721 IndexKind::BTree => "btree",
722 IndexKind::Text => "text",
723 IndexKind::Vector => "vector",
724 };
725 match stmt.index_kind {
726 IndexKind::Property | IndexKind::BTree => {
727 for prop in &stmt.properties {
728 self.store.create_property_index(prop);
729 }
730 }
731 IndexKind::Text => {
732 for prop in &stmt.properties {
733 Self::create_text_index_on_store(&self.store, &stmt.label, prop)?;
734 }
735 }
736 IndexKind::Vector => {
737 for prop in &stmt.properties {
738 Self::create_vector_index_on_store(
739 &self.store,
740 &stmt.label,
741 prop,
742 stmt.options.dimensions,
743 stmt.options.metric.as_deref(),
744 )?;
745 }
746 }
747 }
748 #[cfg(feature = "wal")]
749 for prop in &stmt.properties {
750 wal_log!(
751 self,
752 WalRecord::CreateIndex {
753 name: stmt.name.clone(),
754 label: stmt.label.clone(),
755 property: prop.clone(),
756 index_type: index_type_str.to_string(),
757 }
758 );
759 }
760 Ok(QueryResult::status(format!(
761 "Created {} index '{}'",
762 index_type_str, stmt.name
763 )))
764 }
765 SchemaStatement::DropIndex { name, if_exists } => {
766 let dropped = self.store.drop_property_index(&name);
768 if dropped || if_exists {
769 if dropped {
770 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
771 }
772 Ok(QueryResult::status(if dropped {
773 format!("Dropped index '{name}'")
774 } else {
775 "No change".to_string()
776 }))
777 } else {
778 Err(Error::Query(QueryError::new(
779 QueryErrorKind::Semantic,
780 format!("Index '{name}' does not exist"),
781 )))
782 }
783 }
784 SchemaStatement::CreateConstraint(stmt) => {
785 use grafeo_adapters::query::gql::ast::ConstraintKind;
786 let kind_str = match stmt.constraint_kind {
787 ConstraintKind::Unique => "unique",
788 ConstraintKind::NodeKey => "node_key",
789 ConstraintKind::NotNull => "not_null",
790 ConstraintKind::Exists => "exists",
791 };
792 let constraint_name = stmt
793 .name
794 .clone()
795 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
796 wal_log!(
797 self,
798 WalRecord::CreateConstraint {
799 name: constraint_name.clone(),
800 label: stmt.label.clone(),
801 properties: stmt.properties.clone(),
802 kind: kind_str.to_string(),
803 }
804 );
805 Ok(QueryResult::status(format!(
806 "Created {kind_str} constraint '{constraint_name}'"
807 )))
808 }
809 SchemaStatement::DropConstraint { name, if_exists } => {
810 let _ = if_exists;
811 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
812 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
813 }
814 SchemaStatement::CreateGraphType(stmt) => {
815 use crate::catalog::GraphTypeDefinition;
816 use grafeo_adapters::query::gql::ast::InlineElementType;
817
818 let (mut node_types, mut edge_types, open) =
820 if let Some(ref like_graph) = stmt.like_graph {
821 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
823 if let Some(existing) = self
824 .catalog
825 .schema()
826 .and_then(|s| s.get_graph_type(&type_name))
827 {
828 (
829 existing.allowed_node_types.clone(),
830 existing.allowed_edge_types.clone(),
831 existing.open,
832 )
833 } else {
834 (Vec::new(), Vec::new(), true)
835 }
836 } else {
837 let nt = self.catalog.all_node_type_names();
839 let et = self.catalog.all_edge_type_names();
840 if nt.is_empty() && et.is_empty() {
841 (Vec::new(), Vec::new(), true)
842 } else {
843 (nt, et, false)
844 }
845 }
846 } else {
847 (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
848 };
849
850 for inline in &stmt.inline_types {
852 match inline {
853 InlineElementType::Node {
854 name, properties, ..
855 } => {
856 let def = NodeTypeDefinition {
857 name: name.clone(),
858 properties: properties
859 .iter()
860 .map(|p| TypedProperty {
861 name: p.name.clone(),
862 data_type: PropertyDataType::from_type_name(&p.data_type),
863 nullable: p.nullable,
864 default_value: None,
865 })
866 .collect(),
867 constraints: Vec::new(),
868 };
869 self.catalog.register_or_replace_node_type(def);
871 #[cfg(feature = "wal")]
872 {
873 let props_for_wal: Vec<(String, String, bool)> = properties
874 .iter()
875 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
876 .collect();
877 self.log_schema_wal(&WalRecord::CreateNodeType {
878 name: name.clone(),
879 properties: props_for_wal,
880 constraints: Vec::new(),
881 });
882 }
883 if !node_types.contains(name) {
884 node_types.push(name.clone());
885 }
886 }
887 InlineElementType::Edge {
888 name, properties, ..
889 } => {
890 let def = EdgeTypeDefinition {
891 name: name.clone(),
892 properties: properties
893 .iter()
894 .map(|p| TypedProperty {
895 name: p.name.clone(),
896 data_type: PropertyDataType::from_type_name(&p.data_type),
897 nullable: p.nullable,
898 default_value: None,
899 })
900 .collect(),
901 constraints: Vec::new(),
902 };
903 self.catalog.register_or_replace_edge_type_def(def);
904 #[cfg(feature = "wal")]
905 {
906 let props_for_wal: Vec<(String, String, bool)> = properties
907 .iter()
908 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
909 .collect();
910 self.log_schema_wal(&WalRecord::CreateEdgeType {
911 name: name.clone(),
912 properties: props_for_wal,
913 constraints: Vec::new(),
914 });
915 }
916 if !edge_types.contains(name) {
917 edge_types.push(name.clone());
918 }
919 }
920 }
921 }
922
923 let def = GraphTypeDefinition {
924 name: stmt.name.clone(),
925 allowed_node_types: node_types.clone(),
926 allowed_edge_types: edge_types.clone(),
927 open,
928 };
929 let result = if stmt.or_replace {
930 let _ = self.catalog.drop_graph_type(&stmt.name);
932 self.catalog.register_graph_type(def)
933 } else {
934 self.catalog.register_graph_type(def)
935 };
936 match result {
937 Ok(()) => {
938 wal_log!(
939 self,
940 WalRecord::CreateGraphType {
941 name: stmt.name.clone(),
942 node_types,
943 edge_types,
944 open,
945 }
946 );
947 Ok(QueryResult::status(format!(
948 "Created graph type '{}'",
949 stmt.name
950 )))
951 }
952 Err(e) if stmt.if_not_exists => {
953 let _ = e;
954 Ok(QueryResult::status("No change"))
955 }
956 Err(e) => Err(Error::Query(QueryError::new(
957 QueryErrorKind::Semantic,
958 e.to_string(),
959 ))),
960 }
961 }
962 SchemaStatement::DropGraphType { name, if_exists } => {
963 match self.catalog.drop_graph_type(&name) {
964 Ok(()) => {
965 wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
966 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
967 }
968 Err(e) if if_exists => {
969 let _ = e;
970 Ok(QueryResult::status("No change"))
971 }
972 Err(e) => Err(Error::Query(QueryError::new(
973 QueryErrorKind::Semantic,
974 e.to_string(),
975 ))),
976 }
977 }
978 SchemaStatement::CreateSchema {
979 name,
980 if_not_exists,
981 } => match self.catalog.register_schema_namespace(name.clone()) {
982 Ok(()) => {
983 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
984 Ok(QueryResult::status(format!("Created schema '{name}'")))
985 }
986 Err(e) if if_not_exists => {
987 let _ = e;
988 Ok(QueryResult::status("No change"))
989 }
990 Err(e) => Err(Error::Query(QueryError::new(
991 QueryErrorKind::Semantic,
992 e.to_string(),
993 ))),
994 },
995 SchemaStatement::DropSchema { name, if_exists } => {
996 match self.catalog.drop_schema_namespace(&name) {
997 Ok(()) => {
998 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
999 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1000 }
1001 Err(e) if if_exists => {
1002 let _ = e;
1003 Ok(QueryResult::status("No change"))
1004 }
1005 Err(e) => Err(Error::Query(QueryError::new(
1006 QueryErrorKind::Semantic,
1007 e.to_string(),
1008 ))),
1009 }
1010 }
1011 SchemaStatement::AlterNodeType(stmt) => {
1012 use grafeo_adapters::query::gql::ast::TypeAlteration;
1013 let mut wal_alts = Vec::new();
1014 for alt in &stmt.alterations {
1015 match alt {
1016 TypeAlteration::AddProperty(prop) => {
1017 let typed = TypedProperty {
1018 name: prop.name.clone(),
1019 data_type: PropertyDataType::from_type_name(&prop.data_type),
1020 nullable: prop.nullable,
1021 default_value: None,
1022 };
1023 self.catalog
1024 .alter_node_type_add_property(&stmt.name, typed)
1025 .map_err(|e| {
1026 Error::Query(QueryError::new(
1027 QueryErrorKind::Semantic,
1028 e.to_string(),
1029 ))
1030 })?;
1031 wal_alts.push((
1032 "add".to_string(),
1033 prop.name.clone(),
1034 prop.data_type.clone(),
1035 prop.nullable,
1036 ));
1037 }
1038 TypeAlteration::DropProperty(name) => {
1039 self.catalog
1040 .alter_node_type_drop_property(&stmt.name, name)
1041 .map_err(|e| {
1042 Error::Query(QueryError::new(
1043 QueryErrorKind::Semantic,
1044 e.to_string(),
1045 ))
1046 })?;
1047 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1048 }
1049 }
1050 }
1051 wal_log!(
1052 self,
1053 WalRecord::AlterNodeType {
1054 name: stmt.name.clone(),
1055 alterations: wal_alts,
1056 }
1057 );
1058 Ok(QueryResult::status(format!(
1059 "Altered node type '{}'",
1060 stmt.name
1061 )))
1062 }
1063 SchemaStatement::AlterEdgeType(stmt) => {
1064 use grafeo_adapters::query::gql::ast::TypeAlteration;
1065 let mut wal_alts = Vec::new();
1066 for alt in &stmt.alterations {
1067 match alt {
1068 TypeAlteration::AddProperty(prop) => {
1069 let typed = TypedProperty {
1070 name: prop.name.clone(),
1071 data_type: PropertyDataType::from_type_name(&prop.data_type),
1072 nullable: prop.nullable,
1073 default_value: None,
1074 };
1075 self.catalog
1076 .alter_edge_type_add_property(&stmt.name, typed)
1077 .map_err(|e| {
1078 Error::Query(QueryError::new(
1079 QueryErrorKind::Semantic,
1080 e.to_string(),
1081 ))
1082 })?;
1083 wal_alts.push((
1084 "add".to_string(),
1085 prop.name.clone(),
1086 prop.data_type.clone(),
1087 prop.nullable,
1088 ));
1089 }
1090 TypeAlteration::DropProperty(name) => {
1091 self.catalog
1092 .alter_edge_type_drop_property(&stmt.name, name)
1093 .map_err(|e| {
1094 Error::Query(QueryError::new(
1095 QueryErrorKind::Semantic,
1096 e.to_string(),
1097 ))
1098 })?;
1099 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1100 }
1101 }
1102 }
1103 wal_log!(
1104 self,
1105 WalRecord::AlterEdgeType {
1106 name: stmt.name.clone(),
1107 alterations: wal_alts,
1108 }
1109 );
1110 Ok(QueryResult::status(format!(
1111 "Altered edge type '{}'",
1112 stmt.name
1113 )))
1114 }
1115 SchemaStatement::AlterGraphType(stmt) => {
1116 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1117 let mut wal_alts = Vec::new();
1118 for alt in &stmt.alterations {
1119 match alt {
1120 GraphTypeAlteration::AddNodeType(name) => {
1121 self.catalog
1122 .alter_graph_type_add_node_type(&stmt.name, name.clone())
1123 .map_err(|e| {
1124 Error::Query(QueryError::new(
1125 QueryErrorKind::Semantic,
1126 e.to_string(),
1127 ))
1128 })?;
1129 wal_alts.push(("add_node_type".to_string(), name.clone()));
1130 }
1131 GraphTypeAlteration::DropNodeType(name) => {
1132 self.catalog
1133 .alter_graph_type_drop_node_type(&stmt.name, name)
1134 .map_err(|e| {
1135 Error::Query(QueryError::new(
1136 QueryErrorKind::Semantic,
1137 e.to_string(),
1138 ))
1139 })?;
1140 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1141 }
1142 GraphTypeAlteration::AddEdgeType(name) => {
1143 self.catalog
1144 .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1145 .map_err(|e| {
1146 Error::Query(QueryError::new(
1147 QueryErrorKind::Semantic,
1148 e.to_string(),
1149 ))
1150 })?;
1151 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1152 }
1153 GraphTypeAlteration::DropEdgeType(name) => {
1154 self.catalog
1155 .alter_graph_type_drop_edge_type(&stmt.name, name)
1156 .map_err(|e| {
1157 Error::Query(QueryError::new(
1158 QueryErrorKind::Semantic,
1159 e.to_string(),
1160 ))
1161 })?;
1162 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1163 }
1164 }
1165 }
1166 wal_log!(
1167 self,
1168 WalRecord::AlterGraphType {
1169 name: stmt.name.clone(),
1170 alterations: wal_alts,
1171 }
1172 );
1173 Ok(QueryResult::status(format!(
1174 "Altered graph type '{}'",
1175 stmt.name
1176 )))
1177 }
1178 SchemaStatement::CreateProcedure(stmt) => {
1179 use crate::catalog::ProcedureDefinition;
1180
1181 let def = ProcedureDefinition {
1182 name: stmt.name.clone(),
1183 params: stmt
1184 .params
1185 .iter()
1186 .map(|p| (p.name.clone(), p.param_type.clone()))
1187 .collect(),
1188 returns: stmt
1189 .returns
1190 .iter()
1191 .map(|r| (r.name.clone(), r.return_type.clone()))
1192 .collect(),
1193 body: stmt.body.clone(),
1194 };
1195
1196 if stmt.or_replace {
1197 self.catalog.replace_procedure(def).map_err(|e| {
1198 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1199 })?;
1200 } else {
1201 match self.catalog.register_procedure(def) {
1202 Ok(()) => {}
1203 Err(_) if stmt.if_not_exists => {
1204 return Ok(QueryResult::empty());
1205 }
1206 Err(e) => {
1207 return Err(Error::Query(QueryError::new(
1208 QueryErrorKind::Semantic,
1209 e.to_string(),
1210 )));
1211 }
1212 }
1213 }
1214
1215 wal_log!(
1216 self,
1217 WalRecord::CreateProcedure {
1218 name: stmt.name.clone(),
1219 params: stmt
1220 .params
1221 .iter()
1222 .map(|p| (p.name.clone(), p.param_type.clone()))
1223 .collect(),
1224 returns: stmt
1225 .returns
1226 .iter()
1227 .map(|r| (r.name.clone(), r.return_type.clone()))
1228 .collect(),
1229 body: stmt.body,
1230 }
1231 );
1232 Ok(QueryResult::status(format!(
1233 "Created procedure '{}'",
1234 stmt.name
1235 )))
1236 }
1237 SchemaStatement::DropProcedure { name, if_exists } => {
1238 match self.catalog.drop_procedure(&name) {
1239 Ok(()) => {}
1240 Err(_) if if_exists => {
1241 return Ok(QueryResult::empty());
1242 }
1243 Err(e) => {
1244 return Err(Error::Query(QueryError::new(
1245 QueryErrorKind::Semantic,
1246 e.to_string(),
1247 )));
1248 }
1249 }
1250 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1251 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1252 }
1253 };
1254
1255 if result.is_ok() {
1258 self.query_cache.clear();
1259 }
1260
1261 result
1262 }
1263
1264 #[cfg(all(feature = "gql", feature = "vector-index"))]
1266 fn create_vector_index_on_store(
1267 store: &LpgStore,
1268 label: &str,
1269 property: &str,
1270 dimensions: Option<usize>,
1271 metric: Option<&str>,
1272 ) -> Result<()> {
1273 use grafeo_common::types::{PropertyKey, Value};
1274 use grafeo_common::utils::error::Error;
1275 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1276
1277 let metric = match metric {
1278 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1279 Error::Internal(format!(
1280 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1281 ))
1282 })?,
1283 None => DistanceMetric::Cosine,
1284 };
1285
1286 let prop_key = PropertyKey::new(property);
1287 let mut found_dims: Option<usize> = dimensions;
1288 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1289
1290 for node in store.nodes_with_label(label) {
1291 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1292 if let Some(expected) = found_dims {
1293 if v.len() != expected {
1294 return Err(Error::Internal(format!(
1295 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1296 v.len(),
1297 node.id.0
1298 )));
1299 }
1300 } else {
1301 found_dims = Some(v.len());
1302 }
1303 vectors.push((node.id, v.to_vec()));
1304 }
1305 }
1306
1307 let Some(dims) = found_dims else {
1308 return Err(Error::Internal(format!(
1309 "No vector properties found on :{label}({property}) and no dimensions specified"
1310 )));
1311 };
1312
1313 let config = HnswConfig::new(dims, metric);
1314 let index = HnswIndex::with_capacity(config, vectors.len());
1315 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1316 for (node_id, vec) in &vectors {
1317 index.insert(*node_id, vec, &accessor);
1318 }
1319
1320 store.add_vector_index(label, property, Arc::new(index));
1321 Ok(())
1322 }
1323
1324 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1326 fn create_vector_index_on_store(
1327 _store: &LpgStore,
1328 _label: &str,
1329 _property: &str,
1330 _dimensions: Option<usize>,
1331 _metric: Option<&str>,
1332 ) -> Result<()> {
1333 Err(grafeo_common::utils::error::Error::Internal(
1334 "Vector index support requires the 'vector-index' feature".to_string(),
1335 ))
1336 }
1337
1338 #[cfg(all(feature = "gql", feature = "text-index"))]
1340 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1341 use grafeo_common::types::{PropertyKey, Value};
1342 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1343
1344 let mut index = InvertedIndex::new(BM25Config::default());
1345 let prop_key = PropertyKey::new(property);
1346
1347 let nodes = store.nodes_by_label(label);
1348 for node_id in nodes {
1349 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1350 index.insert(node_id, text.as_str());
1351 }
1352 }
1353
1354 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1355 Ok(())
1356 }
1357
1358 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1360 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1361 Err(grafeo_common::utils::error::Error::Internal(
1362 "Text index support requires the 'text-index' feature".to_string(),
1363 ))
1364 }
1365
1366 fn execute_show_indexes(&self) -> Result<QueryResult> {
1368 let indexes = self.catalog.all_indexes();
1369 let columns = vec![
1370 "name".to_string(),
1371 "type".to_string(),
1372 "label".to_string(),
1373 "property".to_string(),
1374 ];
1375 let rows: Vec<Vec<Value>> = indexes
1376 .into_iter()
1377 .map(|def| {
1378 let label_name = self
1379 .catalog
1380 .get_label_name(def.label)
1381 .unwrap_or_else(|| "?".into());
1382 let prop_name = self
1383 .catalog
1384 .get_property_key_name(def.property_key)
1385 .unwrap_or_else(|| "?".into());
1386 vec![
1387 Value::from(format!("idx_{}_{}", label_name, prop_name)),
1388 Value::from(format!("{:?}", def.index_type)),
1389 Value::from(&*label_name),
1390 Value::from(&*prop_name),
1391 ]
1392 })
1393 .collect();
1394 Ok(QueryResult {
1395 columns,
1396 column_types: Vec::new(),
1397 rows,
1398 ..QueryResult::empty()
1399 })
1400 }
1401
1402 fn execute_show_constraints(&self) -> Result<QueryResult> {
1404 Ok(QueryResult {
1407 columns: vec![
1408 "name".to_string(),
1409 "type".to_string(),
1410 "label".to_string(),
1411 "properties".to_string(),
1412 ],
1413 column_types: Vec::new(),
1414 rows: Vec::new(),
1415 ..QueryResult::empty()
1416 })
1417 }
1418
1419 #[cfg(feature = "gql")]
1446 pub fn execute(&self, query: &str) -> Result<QueryResult> {
1447 self.require_lpg("GQL")?;
1448
1449 use crate::query::{
1450 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1451 processor::QueryLanguage, translators::gql,
1452 };
1453
1454 #[cfg(not(target_arch = "wasm32"))]
1455 let start_time = std::time::Instant::now();
1456
1457 let translation = gql::translate_full(query)?;
1459 let logical_plan = match translation {
1460 gql::GqlTranslationResult::SessionCommand(cmd) => {
1461 return self.execute_session_command(cmd);
1462 }
1463 gql::GqlTranslationResult::SchemaCommand(cmd) => {
1464 if *self.read_only_tx.lock() {
1466 return Err(grafeo_common::utils::error::Error::Transaction(
1467 grafeo_common::utils::error::TransactionError::ReadOnly,
1468 ));
1469 }
1470 return self.execute_schema_command(cmd);
1471 }
1472 gql::GqlTranslationResult::Plan(plan) => {
1473 if *self.read_only_tx.lock() && plan.root.has_mutations() {
1475 return Err(grafeo_common::utils::error::Error::Transaction(
1476 grafeo_common::utils::error::TransactionError::ReadOnly,
1477 ));
1478 }
1479 plan
1480 }
1481 };
1482
1483 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
1485
1486 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1488 cached_plan
1489 } else {
1490 let mut binder = Binder::new();
1492 let _binding_context = binder.bind(&logical_plan)?;
1493
1494 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1496 let plan = optimizer.optimize(logical_plan)?;
1497
1498 self.query_cache.put_optimized(cache_key, plan.clone());
1500
1501 plan
1502 };
1503
1504 if optimized_plan.explain {
1506 use crate::query::processor::{annotate_pushdown_hints, explain_result};
1507 let mut plan = optimized_plan;
1508 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
1509 return Ok(explain_result(&plan));
1510 }
1511
1512 if optimized_plan.profile {
1514 let has_mutations = optimized_plan.root.has_mutations();
1515 return self.with_auto_commit(has_mutations, || {
1516 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1517 let planner = self.create_planner(viewing_epoch, transaction_id);
1518 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
1519
1520 let executor = Executor::with_columns(physical_plan.columns.clone())
1521 .with_deadline(self.query_deadline());
1522 let _result = executor.execute(physical_plan.operator.as_mut())?;
1523
1524 let total_time_ms;
1525 #[cfg(not(target_arch = "wasm32"))]
1526 {
1527 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1528 }
1529 #[cfg(target_arch = "wasm32")]
1530 {
1531 total_time_ms = 0.0;
1532 }
1533
1534 let profile_tree = crate::query::profile::build_profile_tree(
1535 &optimized_plan.root,
1536 &mut entries.into_iter(),
1537 );
1538 Ok(crate::query::profile::profile_result(
1539 &profile_tree,
1540 total_time_ms,
1541 ))
1542 });
1543 }
1544
1545 let has_mutations = optimized_plan.root.has_mutations();
1546
1547 self.with_auto_commit(has_mutations, || {
1548 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1550
1551 let planner = self.create_planner(viewing_epoch, transaction_id);
1554 let mut physical_plan = planner.plan(&optimized_plan)?;
1555
1556 let executor = Executor::with_columns(physical_plan.columns.clone())
1558 .with_deadline(self.query_deadline());
1559 let mut result = executor.execute(physical_plan.operator.as_mut())?;
1560
1561 let rows_scanned = result.rows.len() as u64;
1563 #[cfg(not(target_arch = "wasm32"))]
1564 {
1565 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1566 result.execution_time_ms = Some(elapsed_ms);
1567 }
1568 result.rows_scanned = Some(rows_scanned);
1569
1570 Ok(result)
1571 })
1572 }
1573
1574 #[cfg(feature = "gql")]
1583 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
1584 let previous = self.viewing_epoch_override.lock().replace(epoch);
1585 let result = self.execute(query);
1586 *self.viewing_epoch_override.lock() = previous;
1587 result
1588 }
1589
1590 #[cfg(feature = "gql")]
1596 pub fn execute_with_params(
1597 &self,
1598 query: &str,
1599 params: std::collections::HashMap<String, Value>,
1600 ) -> Result<QueryResult> {
1601 self.require_lpg("GQL")?;
1602
1603 use crate::query::processor::{QueryLanguage, QueryProcessor};
1604
1605 let has_mutations = Self::query_looks_like_mutation(query);
1606
1607 self.with_auto_commit(has_mutations, || {
1608 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1610
1611 let processor = QueryProcessor::for_graph_store_with_transaction(
1613 Arc::clone(&self.graph_store),
1614 Arc::clone(&self.transaction_manager),
1615 );
1616
1617 let processor = if let Some(transaction_id) = transaction_id {
1619 processor.with_transaction_context(viewing_epoch, transaction_id)
1620 } else {
1621 processor
1622 };
1623
1624 processor.process(query, QueryLanguage::Gql, Some(¶ms))
1625 })
1626 }
1627
1628 #[cfg(not(any(feature = "gql", feature = "cypher")))]
1634 pub fn execute_with_params(
1635 &self,
1636 _query: &str,
1637 _params: std::collections::HashMap<String, Value>,
1638 ) -> Result<QueryResult> {
1639 Err(grafeo_common::utils::error::Error::Internal(
1640 "No query language enabled".to_string(),
1641 ))
1642 }
1643
1644 #[cfg(not(any(feature = "gql", feature = "cypher")))]
1650 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
1651 Err(grafeo_common::utils::error::Error::Internal(
1652 "No query language enabled".to_string(),
1653 ))
1654 }
1655
1656 #[cfg(feature = "cypher")]
1662 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
1663 use crate::query::{
1664 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1665 processor::QueryLanguage, translators::cypher,
1666 };
1667 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
1668
1669 let translation = cypher::translate_full(query)?;
1671 match translation {
1672 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
1673 if *self.read_only_tx.lock() {
1674 return Err(GrafeoError::Query(QueryError::new(
1675 QueryErrorKind::Semantic,
1676 "Cannot execute schema DDL in a read-only transaction",
1677 )));
1678 }
1679 return self.execute_schema_command(cmd);
1680 }
1681 cypher::CypherTranslationResult::ShowIndexes => {
1682 return self.execute_show_indexes();
1683 }
1684 cypher::CypherTranslationResult::ShowConstraints => {
1685 return self.execute_show_constraints();
1686 }
1687 cypher::CypherTranslationResult::Plan(_) => {
1688 }
1690 }
1691
1692 #[cfg(not(target_arch = "wasm32"))]
1693 let start_time = std::time::Instant::now();
1694
1695 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
1697
1698 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1700 cached_plan
1701 } else {
1702 let logical_plan = cypher::translate(query)?;
1704
1705 let mut binder = Binder::new();
1707 let _binding_context = binder.bind(&logical_plan)?;
1708
1709 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1711 let plan = optimizer.optimize(logical_plan)?;
1712
1713 self.query_cache.put_optimized(cache_key, plan.clone());
1715
1716 plan
1717 };
1718
1719 if optimized_plan.explain {
1721 use crate::query::processor::{annotate_pushdown_hints, explain_result};
1722 let mut plan = optimized_plan;
1723 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
1724 return Ok(explain_result(&plan));
1725 }
1726
1727 if optimized_plan.profile {
1729 let has_mutations = optimized_plan.root.has_mutations();
1730 return self.with_auto_commit(has_mutations, || {
1731 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1732 let planner = self.create_planner(viewing_epoch, transaction_id);
1733 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
1734
1735 let executor = Executor::with_columns(physical_plan.columns.clone())
1736 .with_deadline(self.query_deadline());
1737 let _result = executor.execute(physical_plan.operator.as_mut())?;
1738
1739 let total_time_ms;
1740 #[cfg(not(target_arch = "wasm32"))]
1741 {
1742 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1743 }
1744 #[cfg(target_arch = "wasm32")]
1745 {
1746 total_time_ms = 0.0;
1747 }
1748
1749 let profile_tree = crate::query::profile::build_profile_tree(
1750 &optimized_plan.root,
1751 &mut entries.into_iter(),
1752 );
1753 Ok(crate::query::profile::profile_result(
1754 &profile_tree,
1755 total_time_ms,
1756 ))
1757 });
1758 }
1759
1760 let has_mutations = optimized_plan.root.has_mutations();
1761
1762 self.with_auto_commit(has_mutations, || {
1763 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1765
1766 let planner = self.create_planner(viewing_epoch, transaction_id);
1768 let mut physical_plan = planner.plan(&optimized_plan)?;
1769
1770 let executor = Executor::with_columns(physical_plan.columns.clone())
1772 .with_deadline(self.query_deadline());
1773 executor.execute(physical_plan.operator.as_mut())
1774 })
1775 }
1776
1777 #[cfg(feature = "gremlin")]
1801 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
1802 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
1803
1804 let logical_plan = gremlin::translate(query)?;
1806
1807 let mut binder = Binder::new();
1809 let _binding_context = binder.bind(&logical_plan)?;
1810
1811 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1813 let optimized_plan = optimizer.optimize(logical_plan)?;
1814
1815 let has_mutations = optimized_plan.root.has_mutations();
1816
1817 self.with_auto_commit(has_mutations, || {
1818 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1820
1821 let planner = self.create_planner(viewing_epoch, transaction_id);
1823 let mut physical_plan = planner.plan(&optimized_plan)?;
1824
1825 let executor = Executor::with_columns(physical_plan.columns.clone())
1827 .with_deadline(self.query_deadline());
1828 executor.execute(physical_plan.operator.as_mut())
1829 })
1830 }
1831
1832 #[cfg(feature = "gremlin")]
1838 pub fn execute_gremlin_with_params(
1839 &self,
1840 query: &str,
1841 params: std::collections::HashMap<String, Value>,
1842 ) -> Result<QueryResult> {
1843 use crate::query::processor::{QueryLanguage, QueryProcessor};
1844
1845 let has_mutations = Self::query_looks_like_mutation(query);
1846
1847 self.with_auto_commit(has_mutations, || {
1848 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1850
1851 let processor = QueryProcessor::for_graph_store_with_transaction(
1853 Arc::clone(&self.graph_store),
1854 Arc::clone(&self.transaction_manager),
1855 );
1856
1857 let processor = if let Some(transaction_id) = transaction_id {
1859 processor.with_transaction_context(viewing_epoch, transaction_id)
1860 } else {
1861 processor
1862 };
1863
1864 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
1865 })
1866 }
1867
1868 #[cfg(feature = "graphql")]
1892 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
1893 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
1894
1895 let logical_plan = graphql::translate(query)?;
1897
1898 let mut binder = Binder::new();
1900 let _binding_context = binder.bind(&logical_plan)?;
1901
1902 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1904 let optimized_plan = optimizer.optimize(logical_plan)?;
1905
1906 let has_mutations = optimized_plan.root.has_mutations();
1907
1908 self.with_auto_commit(has_mutations, || {
1909 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1911
1912 let planner = self.create_planner(viewing_epoch, transaction_id);
1914 let mut physical_plan = planner.plan(&optimized_plan)?;
1915
1916 let executor = Executor::with_columns(physical_plan.columns.clone())
1918 .with_deadline(self.query_deadline());
1919 executor.execute(physical_plan.operator.as_mut())
1920 })
1921 }
1922
1923 #[cfg(feature = "graphql")]
1929 pub fn execute_graphql_with_params(
1930 &self,
1931 query: &str,
1932 params: std::collections::HashMap<String, Value>,
1933 ) -> Result<QueryResult> {
1934 use crate::query::processor::{QueryLanguage, QueryProcessor};
1935
1936 let has_mutations = Self::query_looks_like_mutation(query);
1937
1938 self.with_auto_commit(has_mutations, || {
1939 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1941
1942 let processor = QueryProcessor::for_graph_store_with_transaction(
1944 Arc::clone(&self.graph_store),
1945 Arc::clone(&self.transaction_manager),
1946 );
1947
1948 let processor = if let Some(transaction_id) = transaction_id {
1950 processor.with_transaction_context(viewing_epoch, transaction_id)
1951 } else {
1952 processor
1953 };
1954
1955 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
1956 })
1957 }
1958
1959 #[cfg(all(feature = "graphql", feature = "rdf"))]
1965 pub fn execute_graphql_rdf(&self, query: &str) -> Result<QueryResult> {
1966 use crate::query::{
1967 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::graphql_rdf,
1968 };
1969
1970 let logical_plan = graphql_rdf::translate(query, "http://example.org/")?;
1971
1972 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1973 let optimized_plan = optimizer.optimize(logical_plan)?;
1974
1975 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
1976 .with_transaction_id(*self.current_transaction.lock());
1977 let mut physical_plan = planner.plan(&optimized_plan)?;
1978
1979 let executor = Executor::with_columns(physical_plan.columns.clone())
1980 .with_deadline(self.query_deadline());
1981 executor.execute(physical_plan.operator.as_mut())
1982 }
1983
1984 #[cfg(all(feature = "graphql", feature = "rdf"))]
1990 pub fn execute_graphql_rdf_with_params(
1991 &self,
1992 query: &str,
1993 params: std::collections::HashMap<String, Value>,
1994 ) -> Result<QueryResult> {
1995 use crate::query::processor::{QueryLanguage, QueryProcessor};
1996
1997 let has_mutations = Self::query_looks_like_mutation(query);
1998
1999 self.with_auto_commit(has_mutations, || {
2000 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2001
2002 let processor = QueryProcessor::for_graph_store_with_transaction(
2003 Arc::clone(&self.graph_store),
2004 Arc::clone(&self.transaction_manager),
2005 );
2006
2007 let processor = if let Some(transaction_id) = transaction_id {
2008 processor.with_transaction_context(viewing_epoch, transaction_id)
2009 } else {
2010 processor
2011 };
2012
2013 processor.process(query, QueryLanguage::GraphQLRdf, Some(¶ms))
2014 })
2015 }
2016
2017 #[cfg(feature = "sql-pgq")]
2042 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2043 use crate::query::{
2044 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2045 processor::QueryLanguage, translators::sql_pgq,
2046 };
2047
2048 let logical_plan = sql_pgq::translate(query)?;
2050
2051 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2053 return Ok(QueryResult {
2054 columns: vec!["status".into()],
2055 column_types: vec![grafeo_common::types::LogicalType::String],
2056 rows: vec![vec![Value::from(format!(
2057 "Property graph '{}' created ({} node tables, {} edge tables)",
2058 cpg.name,
2059 cpg.node_tables.len(),
2060 cpg.edge_tables.len()
2061 ))]],
2062 execution_time_ms: None,
2063 rows_scanned: None,
2064 status_message: None,
2065 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2066 });
2067 }
2068
2069 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
2071
2072 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2074 cached_plan
2075 } else {
2076 let mut binder = Binder::new();
2078 let _binding_context = binder.bind(&logical_plan)?;
2079
2080 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
2082 let plan = optimizer.optimize(logical_plan)?;
2083
2084 self.query_cache.put_optimized(cache_key, plan.clone());
2086
2087 plan
2088 };
2089
2090 let has_mutations = optimized_plan.root.has_mutations();
2091
2092 self.with_auto_commit(has_mutations, || {
2093 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2095
2096 let planner = self.create_planner(viewing_epoch, transaction_id);
2098 let mut physical_plan = planner.plan(&optimized_plan)?;
2099
2100 let executor = Executor::with_columns(physical_plan.columns.clone())
2102 .with_deadline(self.query_deadline());
2103 executor.execute(physical_plan.operator.as_mut())
2104 })
2105 }
2106
2107 #[cfg(feature = "sql-pgq")]
2113 pub fn execute_sql_with_params(
2114 &self,
2115 query: &str,
2116 params: std::collections::HashMap<String, Value>,
2117 ) -> Result<QueryResult> {
2118 use crate::query::processor::{QueryLanguage, QueryProcessor};
2119
2120 let has_mutations = Self::query_looks_like_mutation(query);
2121
2122 self.with_auto_commit(has_mutations, || {
2123 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2125
2126 let processor = QueryProcessor::for_graph_store_with_transaction(
2128 Arc::clone(&self.graph_store),
2129 Arc::clone(&self.transaction_manager),
2130 );
2131
2132 let processor = if let Some(transaction_id) = transaction_id {
2134 processor.with_transaction_context(viewing_epoch, transaction_id)
2135 } else {
2136 processor
2137 };
2138
2139 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
2140 })
2141 }
2142
2143 #[cfg(all(feature = "sparql", feature = "rdf"))]
2149 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
2150 use crate::query::{
2151 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
2152 };
2153
2154 let logical_plan = sparql::translate(query)?;
2156
2157 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
2159 let optimized_plan = optimizer.optimize(logical_plan)?;
2160
2161 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2163 .with_transaction_id(*self.current_transaction.lock());
2164 let mut physical_plan = planner.plan(&optimized_plan)?;
2165
2166 let executor = Executor::with_columns(physical_plan.columns.clone())
2168 .with_deadline(self.query_deadline());
2169 executor.execute(physical_plan.operator.as_mut())
2170 }
2171
2172 #[cfg(all(feature = "sparql", feature = "rdf"))]
2178 pub fn execute_sparql_with_params(
2179 &self,
2180 query: &str,
2181 params: std::collections::HashMap<String, Value>,
2182 ) -> Result<QueryResult> {
2183 use crate::query::{
2184 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, processor::substitute_params,
2185 translators::sparql,
2186 };
2187
2188 let mut logical_plan = sparql::translate(query)?;
2189
2190 substitute_params(&mut logical_plan, ¶ms)?;
2191
2192 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
2193 let optimized_plan = optimizer.optimize(logical_plan)?;
2194
2195 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2196 .with_transaction_id(*self.current_transaction.lock());
2197 let mut physical_plan = planner.plan(&optimized_plan)?;
2198
2199 let executor = Executor::with_columns(physical_plan.columns.clone())
2200 .with_deadline(self.query_deadline());
2201 executor.execute(physical_plan.operator.as_mut())
2202 }
2203
2204 pub fn execute_language(
2213 &self,
2214 query: &str,
2215 language: &str,
2216 params: Option<std::collections::HashMap<String, Value>>,
2217 ) -> Result<QueryResult> {
2218 match language {
2219 "gql" => {
2220 if let Some(p) = params {
2221 self.execute_with_params(query, p)
2222 } else {
2223 self.execute(query)
2224 }
2225 }
2226 #[cfg(feature = "cypher")]
2227 "cypher" => {
2228 if let Some(p) = params {
2229 use crate::query::processor::{QueryLanguage, QueryProcessor};
2230 let has_mutations = Self::query_looks_like_mutation(query);
2231 self.with_auto_commit(has_mutations, || {
2232 let processor = QueryProcessor::for_graph_store_with_transaction(
2233 Arc::clone(&self.graph_store),
2234 Arc::clone(&self.transaction_manager),
2235 );
2236 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2237 let processor = if let Some(transaction_id) = transaction_id {
2238 processor.with_transaction_context(viewing_epoch, transaction_id)
2239 } else {
2240 processor
2241 };
2242 processor.process(query, QueryLanguage::Cypher, Some(&p))
2243 })
2244 } else {
2245 self.execute_cypher(query)
2246 }
2247 }
2248 #[cfg(feature = "gremlin")]
2249 "gremlin" => {
2250 if let Some(p) = params {
2251 self.execute_gremlin_with_params(query, p)
2252 } else {
2253 self.execute_gremlin(query)
2254 }
2255 }
2256 #[cfg(feature = "graphql")]
2257 "graphql" => {
2258 if let Some(p) = params {
2259 self.execute_graphql_with_params(query, p)
2260 } else {
2261 self.execute_graphql(query)
2262 }
2263 }
2264 #[cfg(all(feature = "graphql", feature = "rdf"))]
2265 "graphql-rdf" => {
2266 if let Some(p) = params {
2267 self.execute_graphql_rdf_with_params(query, p)
2268 } else {
2269 self.execute_graphql_rdf(query)
2270 }
2271 }
2272 #[cfg(feature = "sql-pgq")]
2273 "sql" | "sql-pgq" => {
2274 if let Some(p) = params {
2275 self.execute_sql_with_params(query, p)
2276 } else {
2277 self.execute_sql(query)
2278 }
2279 }
2280 #[cfg(all(feature = "sparql", feature = "rdf"))]
2281 "sparql" => {
2282 if let Some(p) = params {
2283 self.execute_sparql_with_params(query, p)
2284 } else {
2285 self.execute_sparql(query)
2286 }
2287 }
2288 other => Err(grafeo_common::utils::error::Error::Query(
2289 grafeo_common::utils::error::QueryError::new(
2290 grafeo_common::utils::error::QueryErrorKind::Semantic,
2291 format!("Unknown query language: '{other}'"),
2292 ),
2293 )),
2294 }
2295 }
2296
2297 pub fn clear_plan_cache(&self) {
2324 self.query_cache.clear();
2325 }
2326
2327 pub fn begin_transaction(&mut self) -> Result<()> {
2335 self.begin_transaction_inner(false, None)
2336 }
2337
2338 pub fn begin_transaction_with_isolation(
2346 &mut self,
2347 isolation_level: crate::transaction::IsolationLevel,
2348 ) -> Result<()> {
2349 self.begin_transaction_inner(false, Some(isolation_level))
2350 }
2351
2352 fn begin_transaction_inner(
2354 &self,
2355 read_only: bool,
2356 isolation_level: Option<crate::transaction::IsolationLevel>,
2357 ) -> Result<()> {
2358 let mut current = self.current_transaction.lock();
2359 if current.is_some() {
2360 drop(current);
2362 let mut depth = self.transaction_nesting_depth.lock();
2363 *depth += 1;
2364 let sp_name = format!("_nested_tx_{}", *depth);
2365 self.savepoint(&sp_name)?;
2366 return Ok(());
2367 }
2368
2369 self.transaction_start_node_count
2370 .store(self.store.node_count(), Ordering::Relaxed);
2371 self.transaction_start_edge_count
2372 .store(self.store.edge_count(), Ordering::Relaxed);
2373 let transaction_id = if let Some(level) = isolation_level {
2374 self.transaction_manager.begin_with_isolation(level)
2375 } else {
2376 self.transaction_manager.begin()
2377 };
2378 *current = Some(transaction_id);
2379 *self.read_only_tx.lock() = read_only;
2380 Ok(())
2381 }
2382
2383 pub fn commit(&mut self) -> Result<()> {
2391 self.commit_inner()
2392 }
2393
2394 fn commit_inner(&self) -> Result<()> {
2396 {
2398 let mut depth = self.transaction_nesting_depth.lock();
2399 if *depth > 0 {
2400 let sp_name = format!("_nested_tx_{depth}");
2401 *depth -= 1;
2402 drop(depth);
2403 return self.release_savepoint(&sp_name);
2404 }
2405 }
2406
2407 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2408 grafeo_common::utils::error::Error::Transaction(
2409 grafeo_common::utils::error::TransactionError::InvalidState(
2410 "No active transaction".to_string(),
2411 ),
2412 )
2413 })?;
2414
2415 #[cfg(feature = "rdf")]
2417 self.rdf_store.commit_transaction(transaction_id);
2418
2419 self.store.commit_transaction_properties(transaction_id);
2421
2422 self.transaction_manager.commit(transaction_id)?;
2423
2424 self.store
2428 .sync_epoch(self.transaction_manager.current_epoch());
2429
2430 *self.read_only_tx.lock() = false;
2432 self.savepoints.lock().clear();
2433
2434 if self.gc_interval > 0 {
2436 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
2437 if count.is_multiple_of(self.gc_interval) {
2438 let min_epoch = self.transaction_manager.min_active_epoch();
2439 self.store.gc_versions(min_epoch);
2440 self.transaction_manager.gc();
2441 }
2442 }
2443
2444 Ok(())
2445 }
2446
2447 pub fn rollback(&mut self) -> Result<()> {
2471 self.rollback_inner()
2472 }
2473
2474 fn rollback_inner(&self) -> Result<()> {
2476 {
2478 let mut depth = self.transaction_nesting_depth.lock();
2479 if *depth > 0 {
2480 let sp_name = format!("_nested_tx_{depth}");
2481 *depth -= 1;
2482 drop(depth);
2483 return self.rollback_to_savepoint(&sp_name);
2484 }
2485 }
2486
2487 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2488 grafeo_common::utils::error::Error::Transaction(
2489 grafeo_common::utils::error::TransactionError::InvalidState(
2490 "No active transaction".to_string(),
2491 ),
2492 )
2493 })?;
2494
2495 *self.read_only_tx.lock() = false;
2497
2498 self.store.discard_uncommitted_versions(transaction_id);
2500
2501 #[cfg(feature = "rdf")]
2503 self.rdf_store.rollback_transaction(transaction_id);
2504
2505 self.savepoints.lock().clear();
2507
2508 self.transaction_manager.abort(transaction_id)
2510 }
2511
2512 pub fn savepoint(&self, name: &str) -> Result<()> {
2522 let tx_id = self.current_transaction.lock().ok_or_else(|| {
2523 grafeo_common::utils::error::Error::Transaction(
2524 grafeo_common::utils::error::TransactionError::InvalidState(
2525 "No active transaction".to_string(),
2526 ),
2527 )
2528 })?;
2529
2530 let next_node = self.store.peek_next_node_id();
2531 let next_edge = self.store.peek_next_edge_id();
2532 let undo_position = self.store.property_undo_log_position(tx_id);
2533 self.savepoints
2534 .lock()
2535 .push((name.to_string(), next_node, next_edge, undo_position));
2536 Ok(())
2537 }
2538
2539 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
2548 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
2549 grafeo_common::utils::error::Error::Transaction(
2550 grafeo_common::utils::error::TransactionError::InvalidState(
2551 "No active transaction".to_string(),
2552 ),
2553 )
2554 })?;
2555
2556 let mut savepoints = self.savepoints.lock();
2557
2558 let pos = savepoints
2560 .iter()
2561 .rposition(|(n, _, _, _)| n == name)
2562 .ok_or_else(|| {
2563 grafeo_common::utils::error::Error::Transaction(
2564 grafeo_common::utils::error::TransactionError::InvalidState(format!(
2565 "Savepoint '{name}' not found"
2566 )),
2567 )
2568 })?;
2569
2570 let (_, sp_next_node, sp_next_edge, sp_undo_position) = savepoints[pos].clone();
2571
2572 savepoints.truncate(pos);
2574 drop(savepoints);
2575
2576 self.store
2578 .rollback_transaction_properties_to(transaction_id, sp_undo_position);
2579
2580 let current_next_node = self.store.peek_next_node_id();
2582 let current_next_edge = self.store.peek_next_edge_id();
2583
2584 let node_ids: Vec<NodeId> = (sp_next_node..current_next_node).map(NodeId::new).collect();
2585 let edge_ids: Vec<EdgeId> = (sp_next_edge..current_next_edge).map(EdgeId::new).collect();
2586
2587 if !node_ids.is_empty() || !edge_ids.is_empty() {
2588 self.store
2589 .discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
2590 }
2591
2592 Ok(())
2593 }
2594
2595 pub fn release_savepoint(&self, name: &str) -> Result<()> {
2601 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
2602 grafeo_common::utils::error::Error::Transaction(
2603 grafeo_common::utils::error::TransactionError::InvalidState(
2604 "No active transaction".to_string(),
2605 ),
2606 )
2607 })?;
2608
2609 let mut savepoints = self.savepoints.lock();
2610 let pos = savepoints
2611 .iter()
2612 .rposition(|(n, _, _, _)| n == name)
2613 .ok_or_else(|| {
2614 grafeo_common::utils::error::Error::Transaction(
2615 grafeo_common::utils::error::TransactionError::InvalidState(format!(
2616 "Savepoint '{name}' not found"
2617 )),
2618 )
2619 })?;
2620 savepoints.remove(pos);
2621 Ok(())
2622 }
2623
2624 #[must_use]
2626 pub fn in_transaction(&self) -> bool {
2627 self.current_transaction.lock().is_some()
2628 }
2629
2630 #[must_use]
2632 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
2633 *self.current_transaction.lock()
2634 }
2635
2636 #[must_use]
2638 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
2639 &self.transaction_manager
2640 }
2641
2642 #[must_use]
2644 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
2645 (
2646 self.transaction_start_node_count.load(Ordering::Relaxed),
2647 self.store.node_count(),
2648 )
2649 }
2650
2651 #[must_use]
2653 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
2654 (
2655 self.transaction_start_edge_count.load(Ordering::Relaxed),
2656 self.store.edge_count(),
2657 )
2658 }
2659
2660 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
2694 crate::transaction::PreparedCommit::new(self)
2695 }
2696
2697 pub fn set_auto_commit(&mut self, auto_commit: bool) {
2699 self.auto_commit = auto_commit;
2700 }
2701
2702 #[must_use]
2704 pub fn auto_commit(&self) -> bool {
2705 self.auto_commit
2706 }
2707
2708 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
2713 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
2714 }
2715
2716 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
2719 where
2720 F: FnOnce() -> Result<QueryResult>,
2721 {
2722 if self.needs_auto_commit(has_mutations) {
2723 self.begin_transaction_inner(false, None)?;
2724 match body() {
2725 Ok(result) => {
2726 self.commit_inner()?;
2727 Ok(result)
2728 }
2729 Err(e) => {
2730 let _ = self.rollback_inner();
2731 Err(e)
2732 }
2733 }
2734 } else {
2735 body()
2736 }
2737 }
2738
2739 fn query_looks_like_mutation(query: &str) -> bool {
2745 let upper = query.to_ascii_uppercase();
2746 upper.contains("INSERT")
2747 || upper.contains("CREATE")
2748 || upper.contains("DELETE")
2749 || upper.contains("MERGE")
2750 || upper.contains("SET")
2751 || upper.contains("REMOVE")
2752 || upper.contains("DROP")
2753 || upper.contains("ALTER")
2754 }
2755
2756 #[must_use]
2758 fn query_deadline(&self) -> Option<Instant> {
2759 #[cfg(not(target_arch = "wasm32"))]
2760 {
2761 self.query_timeout.map(|d| Instant::now() + d)
2762 }
2763 #[cfg(target_arch = "wasm32")]
2764 {
2765 let _ = &self.query_timeout;
2766 None
2767 }
2768 }
2769
2770 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
2772 use grafeo_adapters::query::gql::ast::{Expression, Literal};
2773 match expr {
2774 Expression::Literal(Literal::Integer(n)) => Some(*n),
2775 _ => None,
2776 }
2777 }
2778
2779 #[must_use]
2785 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
2786 if let Some(epoch) = *self.viewing_epoch_override.lock() {
2788 return (epoch, None);
2789 }
2790
2791 if let Some(transaction_id) = *self.current_transaction.lock() {
2792 let epoch = self
2794 .transaction_manager
2795 .start_epoch(transaction_id)
2796 .unwrap_or_else(|| self.transaction_manager.current_epoch());
2797 (epoch, Some(transaction_id))
2798 } else {
2799 (self.transaction_manager.current_epoch(), None)
2801 }
2802 }
2803
2804 fn create_planner(
2806 &self,
2807 viewing_epoch: EpochId,
2808 transaction_id: Option<TransactionId>,
2809 ) -> crate::query::Planner {
2810 use crate::query::Planner;
2811
2812 let mut planner = Planner::with_context(
2813 Arc::clone(&self.graph_store),
2814 Arc::clone(&self.transaction_manager),
2815 transaction_id,
2816 viewing_epoch,
2817 )
2818 .with_factorized_execution(self.factorized_execution)
2819 .with_catalog(Arc::clone(&self.catalog));
2820
2821 let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog));
2823 planner = planner.with_validator(Arc::new(validator));
2824
2825 planner
2826 }
2827
2828 pub fn create_node(&self, labels: &[&str]) -> NodeId {
2833 let (epoch, transaction_id) = self.get_transaction_context();
2834 self.store.create_node_versioned(
2835 labels,
2836 epoch,
2837 transaction_id.unwrap_or(TransactionId::SYSTEM),
2838 )
2839 }
2840
2841 pub fn create_node_with_props<'a>(
2845 &self,
2846 labels: &[&str],
2847 properties: impl IntoIterator<Item = (&'a str, Value)>,
2848 ) -> NodeId {
2849 let (epoch, transaction_id) = self.get_transaction_context();
2850 self.store.create_node_with_props_versioned(
2851 labels,
2852 properties,
2853 epoch,
2854 transaction_id.unwrap_or(TransactionId::SYSTEM),
2855 )
2856 }
2857
2858 pub fn create_edge(
2863 &self,
2864 src: NodeId,
2865 dst: NodeId,
2866 edge_type: &str,
2867 ) -> grafeo_common::types::EdgeId {
2868 let (epoch, transaction_id) = self.get_transaction_context();
2869 self.store.create_edge_versioned(
2870 src,
2871 dst,
2872 edge_type,
2873 epoch,
2874 transaction_id.unwrap_or(TransactionId::SYSTEM),
2875 )
2876 }
2877
2878 #[must_use]
2906 pub fn get_node(&self, id: NodeId) -> Option<Node> {
2907 let (epoch, transaction_id) = self.get_transaction_context();
2908 self.store
2909 .get_node_versioned(id, epoch, transaction_id.unwrap_or(TransactionId::SYSTEM))
2910 }
2911
2912 #[must_use]
2936 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
2937 self.get_node(id)
2938 .and_then(|node| node.get_property(key).cloned())
2939 }
2940
2941 #[must_use]
2948 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
2949 let (epoch, transaction_id) = self.get_transaction_context();
2950 self.store
2951 .get_edge_versioned(id, epoch, transaction_id.unwrap_or(TransactionId::SYSTEM))
2952 }
2953
2954 #[must_use]
2980 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2981 self.store.edges_from(node, Direction::Outgoing).collect()
2982 }
2983
2984 #[must_use]
2993 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2994 self.store.edges_from(node, Direction::Incoming).collect()
2995 }
2996
2997 #[must_use]
3009 pub fn get_neighbors_outgoing_by_type(
3010 &self,
3011 node: NodeId,
3012 edge_type: &str,
3013 ) -> Vec<(NodeId, EdgeId)> {
3014 self.store
3015 .edges_from(node, Direction::Outgoing)
3016 .filter(|(_, edge_id)| {
3017 self.get_edge(*edge_id)
3018 .is_some_and(|e| e.edge_type.as_str() == edge_type)
3019 })
3020 .collect()
3021 }
3022
3023 #[must_use]
3030 pub fn node_exists(&self, id: NodeId) -> bool {
3031 self.get_node(id).is_some()
3032 }
3033
3034 #[must_use]
3036 pub fn edge_exists(&self, id: EdgeId) -> bool {
3037 self.get_edge(id).is_some()
3038 }
3039
3040 #[must_use]
3044 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3045 let out = self.store.out_degree(node);
3046 let in_degree = self.store.in_degree(node);
3047 (out, in_degree)
3048 }
3049
3050 #[must_use]
3060 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3061 let (epoch, transaction_id) = self.get_transaction_context();
3062 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3063 ids.iter()
3064 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
3065 .collect()
3066 }
3067
3068 #[cfg(feature = "cdc")]
3072 pub fn history(
3073 &self,
3074 entity_id: impl Into<crate::cdc::EntityId>,
3075 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3076 Ok(self.cdc_log.history(entity_id.into()))
3077 }
3078
3079 #[cfg(feature = "cdc")]
3081 pub fn history_since(
3082 &self,
3083 entity_id: impl Into<crate::cdc::EntityId>,
3084 since_epoch: EpochId,
3085 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3086 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3087 }
3088
3089 #[cfg(feature = "cdc")]
3091 pub fn changes_between(
3092 &self,
3093 start_epoch: EpochId,
3094 end_epoch: EpochId,
3095 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3096 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3097 }
3098}
3099
3100#[cfg(test)]
3101mod tests {
3102 use crate::database::GrafeoDB;
3103
3104 #[test]
3105 fn test_session_create_node() {
3106 let db = GrafeoDB::new_in_memory();
3107 let session = db.session();
3108
3109 let id = session.create_node(&["Person"]);
3110 assert!(id.is_valid());
3111 assert_eq!(db.node_count(), 1);
3112 }
3113
3114 #[test]
3115 fn test_session_transaction() {
3116 let db = GrafeoDB::new_in_memory();
3117 let mut session = db.session();
3118
3119 assert!(!session.in_transaction());
3120
3121 session.begin_transaction().unwrap();
3122 assert!(session.in_transaction());
3123
3124 session.commit().unwrap();
3125 assert!(!session.in_transaction());
3126 }
3127
3128 #[test]
3129 fn test_session_transaction_context() {
3130 let db = GrafeoDB::new_in_memory();
3131 let mut session = db.session();
3132
3133 let (_epoch1, transaction_id1) = session.get_transaction_context();
3135 assert!(transaction_id1.is_none());
3136
3137 session.begin_transaction().unwrap();
3139 let (epoch2, transaction_id2) = session.get_transaction_context();
3140 assert!(transaction_id2.is_some());
3141 let _ = epoch2; session.commit().unwrap();
3146 let (epoch3, tx_id3) = session.get_transaction_context();
3147 assert!(tx_id3.is_none());
3148 assert!(epoch3.as_u64() >= epoch2.as_u64());
3150 }
3151
3152 #[test]
3153 fn test_session_rollback() {
3154 let db = GrafeoDB::new_in_memory();
3155 let mut session = db.session();
3156
3157 session.begin_transaction().unwrap();
3158 session.rollback().unwrap();
3159 assert!(!session.in_transaction());
3160 }
3161
3162 #[test]
3163 fn test_session_rollback_discards_versions() {
3164 use grafeo_common::types::TransactionId;
3165
3166 let db = GrafeoDB::new_in_memory();
3167
3168 let node_before = db.store().create_node(&["Person"]);
3170 assert!(node_before.is_valid());
3171 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3172
3173 let mut session = db.session();
3175 session.begin_transaction().unwrap();
3176 let transaction_id = session.current_transaction.lock().unwrap();
3177
3178 let epoch = db.store().current_epoch();
3180 let node_in_tx = db
3181 .store()
3182 .create_node_versioned(&["Person"], epoch, transaction_id);
3183 assert!(node_in_tx.is_valid());
3184
3185 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
3187
3188 session.rollback().unwrap();
3190 assert!(!session.in_transaction());
3191
3192 let count_after = db.node_count();
3195 assert_eq!(
3196 count_after, 1,
3197 "Rollback should discard uncommitted node, but got {count_after}"
3198 );
3199
3200 let current_epoch = db.store().current_epoch();
3202 assert!(
3203 db.store()
3204 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
3205 .is_some(),
3206 "Original node should still exist"
3207 );
3208
3209 assert!(
3211 db.store()
3212 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
3213 .is_none(),
3214 "Transaction node should be gone"
3215 );
3216 }
3217
3218 #[test]
3219 fn test_session_create_node_in_transaction() {
3220 let db = GrafeoDB::new_in_memory();
3222
3223 let node_before = db.create_node(&["Person"]);
3225 assert!(node_before.is_valid());
3226 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3227
3228 let mut session = db.session();
3230 session.begin_transaction().unwrap();
3231
3232 let node_in_tx = session.create_node(&["Person"]);
3234 assert!(node_in_tx.is_valid());
3235
3236 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
3238
3239 session.rollback().unwrap();
3241
3242 let count_after = db.node_count();
3244 assert_eq!(
3245 count_after, 1,
3246 "Rollback should discard node created via session.create_node(), but got {count_after}"
3247 );
3248 }
3249
3250 #[test]
3251 fn test_session_create_node_with_props_in_transaction() {
3252 use grafeo_common::types::Value;
3253
3254 let db = GrafeoDB::new_in_memory();
3256
3257 db.create_node(&["Person"]);
3259 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3260
3261 let mut session = db.session();
3263 session.begin_transaction().unwrap();
3264
3265 let node_in_tx =
3266 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3267 assert!(node_in_tx.is_valid());
3268
3269 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
3271
3272 session.rollback().unwrap();
3274
3275 let count_after = db.node_count();
3277 assert_eq!(
3278 count_after, 1,
3279 "Rollback should discard node created via session.create_node_with_props()"
3280 );
3281 }
3282
3283 #[cfg(feature = "gql")]
3284 mod gql_tests {
3285 use super::*;
3286
3287 #[test]
3288 fn test_gql_query_execution() {
3289 let db = GrafeoDB::new_in_memory();
3290 let session = db.session();
3291
3292 session.create_node(&["Person"]);
3294 session.create_node(&["Person"]);
3295 session.create_node(&["Animal"]);
3296
3297 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3299
3300 assert_eq!(result.row_count(), 2);
3302 assert_eq!(result.column_count(), 1);
3303 assert_eq!(result.columns[0], "n");
3304 }
3305
3306 #[test]
3307 fn test_gql_empty_result() {
3308 let db = GrafeoDB::new_in_memory();
3309 let session = db.session();
3310
3311 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3313
3314 assert_eq!(result.row_count(), 0);
3315 }
3316
3317 #[test]
3318 fn test_gql_parse_error() {
3319 let db = GrafeoDB::new_in_memory();
3320 let session = db.session();
3321
3322 let result = session.execute("MATCH (n RETURN n");
3324
3325 assert!(result.is_err());
3326 }
3327
3328 #[test]
3329 fn test_gql_relationship_traversal() {
3330 let db = GrafeoDB::new_in_memory();
3331 let session = db.session();
3332
3333 let alix = session.create_node(&["Person"]);
3335 let gus = session.create_node(&["Person"]);
3336 let vincent = session.create_node(&["Person"]);
3337
3338 session.create_edge(alix, gus, "KNOWS");
3339 session.create_edge(alix, vincent, "KNOWS");
3340
3341 let result = session
3343 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
3344 .unwrap();
3345
3346 assert_eq!(result.row_count(), 2);
3348 assert_eq!(result.column_count(), 2);
3349 assert_eq!(result.columns[0], "a");
3350 assert_eq!(result.columns[1], "b");
3351 }
3352
3353 #[test]
3354 fn test_gql_relationship_with_type_filter() {
3355 let db = GrafeoDB::new_in_memory();
3356 let session = db.session();
3357
3358 let alix = session.create_node(&["Person"]);
3360 let gus = session.create_node(&["Person"]);
3361 let vincent = session.create_node(&["Person"]);
3362
3363 session.create_edge(alix, gus, "KNOWS");
3364 session.create_edge(alix, vincent, "WORKS_WITH");
3365
3366 let result = session
3368 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
3369 .unwrap();
3370
3371 assert_eq!(result.row_count(), 1);
3373 }
3374
3375 #[test]
3376 fn test_gql_semantic_error_undefined_variable() {
3377 let db = GrafeoDB::new_in_memory();
3378 let session = db.session();
3379
3380 let result = session.execute("MATCH (n:Person) RETURN x");
3382
3383 assert!(result.is_err());
3385 let Err(err) = result else {
3386 panic!("Expected error")
3387 };
3388 assert!(
3389 err.to_string().contains("Undefined variable"),
3390 "Expected undefined variable error, got: {}",
3391 err
3392 );
3393 }
3394
3395 #[test]
3396 fn test_gql_where_clause_property_filter() {
3397 use grafeo_common::types::Value;
3398
3399 let db = GrafeoDB::new_in_memory();
3400 let session = db.session();
3401
3402 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
3404 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
3405 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
3406
3407 let result = session
3409 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
3410 .unwrap();
3411
3412 assert_eq!(result.row_count(), 2);
3414 }
3415
3416 #[test]
3417 fn test_gql_where_clause_equality() {
3418 use grafeo_common::types::Value;
3419
3420 let db = GrafeoDB::new_in_memory();
3421 let session = db.session();
3422
3423 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3425 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
3426 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3427
3428 let result = session
3430 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
3431 .unwrap();
3432
3433 assert_eq!(result.row_count(), 2);
3435 }
3436
3437 #[test]
3438 fn test_gql_return_property_access() {
3439 use grafeo_common::types::Value;
3440
3441 let db = GrafeoDB::new_in_memory();
3442 let session = db.session();
3443
3444 session.create_node_with_props(
3446 &["Person"],
3447 [
3448 ("name", Value::String("Alix".into())),
3449 ("age", Value::Int64(30)),
3450 ],
3451 );
3452 session.create_node_with_props(
3453 &["Person"],
3454 [
3455 ("name", Value::String("Gus".into())),
3456 ("age", Value::Int64(25)),
3457 ],
3458 );
3459
3460 let result = session
3462 .execute("MATCH (n:Person) RETURN n.name, n.age")
3463 .unwrap();
3464
3465 assert_eq!(result.row_count(), 2);
3467 assert_eq!(result.column_count(), 2);
3468 assert_eq!(result.columns[0], "n.name");
3469 assert_eq!(result.columns[1], "n.age");
3470
3471 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
3473 assert!(names.contains(&&Value::String("Alix".into())));
3474 assert!(names.contains(&&Value::String("Gus".into())));
3475 }
3476
3477 #[test]
3478 fn test_gql_return_mixed_expressions() {
3479 use grafeo_common::types::Value;
3480
3481 let db = GrafeoDB::new_in_memory();
3482 let session = db.session();
3483
3484 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3486
3487 let result = session
3489 .execute("MATCH (n:Person) RETURN n, n.name")
3490 .unwrap();
3491
3492 assert_eq!(result.row_count(), 1);
3493 assert_eq!(result.column_count(), 2);
3494 assert_eq!(result.columns[0], "n");
3495 assert_eq!(result.columns[1], "n.name");
3496
3497 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
3499 }
3500 }
3501
3502 #[cfg(feature = "cypher")]
3503 mod cypher_tests {
3504 use super::*;
3505
3506 #[test]
3507 fn test_cypher_query_execution() {
3508 let db = GrafeoDB::new_in_memory();
3509 let session = db.session();
3510
3511 session.create_node(&["Person"]);
3513 session.create_node(&["Person"]);
3514 session.create_node(&["Animal"]);
3515
3516 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3518
3519 assert_eq!(result.row_count(), 2);
3521 assert_eq!(result.column_count(), 1);
3522 assert_eq!(result.columns[0], "n");
3523 }
3524
3525 #[test]
3526 fn test_cypher_empty_result() {
3527 let db = GrafeoDB::new_in_memory();
3528 let session = db.session();
3529
3530 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3532
3533 assert_eq!(result.row_count(), 0);
3534 }
3535
3536 #[test]
3537 fn test_cypher_parse_error() {
3538 let db = GrafeoDB::new_in_memory();
3539 let session = db.session();
3540
3541 let result = session.execute_cypher("MATCH (n RETURN n");
3543
3544 assert!(result.is_err());
3545 }
3546 }
3547
3548 mod direct_lookup_tests {
3551 use super::*;
3552 use grafeo_common::types::Value;
3553
3554 #[test]
3555 fn test_get_node() {
3556 let db = GrafeoDB::new_in_memory();
3557 let session = db.session();
3558
3559 let id = session.create_node(&["Person"]);
3560 let node = session.get_node(id);
3561
3562 assert!(node.is_some());
3563 let node = node.unwrap();
3564 assert_eq!(node.id, id);
3565 }
3566
3567 #[test]
3568 fn test_get_node_not_found() {
3569 use grafeo_common::types::NodeId;
3570
3571 let db = GrafeoDB::new_in_memory();
3572 let session = db.session();
3573
3574 let node = session.get_node(NodeId::new(9999));
3576 assert!(node.is_none());
3577 }
3578
3579 #[test]
3580 fn test_get_node_property() {
3581 let db = GrafeoDB::new_in_memory();
3582 let session = db.session();
3583
3584 let id = session
3585 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3586
3587 let name = session.get_node_property(id, "name");
3588 assert_eq!(name, Some(Value::String("Alix".into())));
3589
3590 let missing = session.get_node_property(id, "missing");
3592 assert!(missing.is_none());
3593 }
3594
3595 #[test]
3596 fn test_get_edge() {
3597 let db = GrafeoDB::new_in_memory();
3598 let session = db.session();
3599
3600 let alix = session.create_node(&["Person"]);
3601 let gus = session.create_node(&["Person"]);
3602 let edge_id = session.create_edge(alix, gus, "KNOWS");
3603
3604 let edge = session.get_edge(edge_id);
3605 assert!(edge.is_some());
3606 let edge = edge.unwrap();
3607 assert_eq!(edge.id, edge_id);
3608 assert_eq!(edge.src, alix);
3609 assert_eq!(edge.dst, gus);
3610 }
3611
3612 #[test]
3613 fn test_get_edge_not_found() {
3614 use grafeo_common::types::EdgeId;
3615
3616 let db = GrafeoDB::new_in_memory();
3617 let session = db.session();
3618
3619 let edge = session.get_edge(EdgeId::new(9999));
3620 assert!(edge.is_none());
3621 }
3622
3623 #[test]
3624 fn test_get_neighbors_outgoing() {
3625 let db = GrafeoDB::new_in_memory();
3626 let session = db.session();
3627
3628 let alix = session.create_node(&["Person"]);
3629 let gus = session.create_node(&["Person"]);
3630 let harm = session.create_node(&["Person"]);
3631
3632 session.create_edge(alix, gus, "KNOWS");
3633 session.create_edge(alix, harm, "KNOWS");
3634
3635 let neighbors = session.get_neighbors_outgoing(alix);
3636 assert_eq!(neighbors.len(), 2);
3637
3638 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3639 assert!(neighbor_ids.contains(&gus));
3640 assert!(neighbor_ids.contains(&harm));
3641 }
3642
3643 #[test]
3644 fn test_get_neighbors_incoming() {
3645 let db = GrafeoDB::new_in_memory();
3646 let session = db.session();
3647
3648 let alix = session.create_node(&["Person"]);
3649 let gus = session.create_node(&["Person"]);
3650 let harm = session.create_node(&["Person"]);
3651
3652 session.create_edge(gus, alix, "KNOWS");
3653 session.create_edge(harm, alix, "KNOWS");
3654
3655 let neighbors = session.get_neighbors_incoming(alix);
3656 assert_eq!(neighbors.len(), 2);
3657
3658 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3659 assert!(neighbor_ids.contains(&gus));
3660 assert!(neighbor_ids.contains(&harm));
3661 }
3662
3663 #[test]
3664 fn test_get_neighbors_outgoing_by_type() {
3665 let db = GrafeoDB::new_in_memory();
3666 let session = db.session();
3667
3668 let alix = session.create_node(&["Person"]);
3669 let gus = session.create_node(&["Person"]);
3670 let company = session.create_node(&["Company"]);
3671
3672 session.create_edge(alix, gus, "KNOWS");
3673 session.create_edge(alix, company, "WORKS_AT");
3674
3675 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3676 assert_eq!(knows_neighbors.len(), 1);
3677 assert_eq!(knows_neighbors[0].0, gus);
3678
3679 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
3680 assert_eq!(works_neighbors.len(), 1);
3681 assert_eq!(works_neighbors[0].0, company);
3682
3683 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
3685 assert!(no_neighbors.is_empty());
3686 }
3687
3688 #[test]
3689 fn test_node_exists() {
3690 use grafeo_common::types::NodeId;
3691
3692 let db = GrafeoDB::new_in_memory();
3693 let session = db.session();
3694
3695 let id = session.create_node(&["Person"]);
3696
3697 assert!(session.node_exists(id));
3698 assert!(!session.node_exists(NodeId::new(9999)));
3699 }
3700
3701 #[test]
3702 fn test_edge_exists() {
3703 use grafeo_common::types::EdgeId;
3704
3705 let db = GrafeoDB::new_in_memory();
3706 let session = db.session();
3707
3708 let alix = session.create_node(&["Person"]);
3709 let gus = session.create_node(&["Person"]);
3710 let edge_id = session.create_edge(alix, gus, "KNOWS");
3711
3712 assert!(session.edge_exists(edge_id));
3713 assert!(!session.edge_exists(EdgeId::new(9999)));
3714 }
3715
3716 #[test]
3717 fn test_get_degree() {
3718 let db = GrafeoDB::new_in_memory();
3719 let session = db.session();
3720
3721 let alix = session.create_node(&["Person"]);
3722 let gus = session.create_node(&["Person"]);
3723 let harm = session.create_node(&["Person"]);
3724
3725 session.create_edge(alix, gus, "KNOWS");
3727 session.create_edge(alix, harm, "KNOWS");
3728 session.create_edge(gus, alix, "KNOWS");
3730
3731 let (out_degree, in_degree) = session.get_degree(alix);
3732 assert_eq!(out_degree, 2);
3733 assert_eq!(in_degree, 1);
3734
3735 let lonely = session.create_node(&["Person"]);
3737 let (out, in_deg) = session.get_degree(lonely);
3738 assert_eq!(out, 0);
3739 assert_eq!(in_deg, 0);
3740 }
3741
3742 #[test]
3743 fn test_get_nodes_batch() {
3744 let db = GrafeoDB::new_in_memory();
3745 let session = db.session();
3746
3747 let alix = session.create_node(&["Person"]);
3748 let gus = session.create_node(&["Person"]);
3749 let harm = session.create_node(&["Person"]);
3750
3751 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
3752 assert_eq!(nodes.len(), 3);
3753 assert!(nodes[0].is_some());
3754 assert!(nodes[1].is_some());
3755 assert!(nodes[2].is_some());
3756
3757 use grafeo_common::types::NodeId;
3759 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
3760 assert_eq!(nodes_with_missing.len(), 3);
3761 assert!(nodes_with_missing[0].is_some());
3762 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
3764 }
3765
3766 #[test]
3767 fn test_auto_commit_setting() {
3768 let db = GrafeoDB::new_in_memory();
3769 let mut session = db.session();
3770
3771 assert!(session.auto_commit());
3773
3774 session.set_auto_commit(false);
3775 assert!(!session.auto_commit());
3776
3777 session.set_auto_commit(true);
3778 assert!(session.auto_commit());
3779 }
3780
3781 #[test]
3782 fn test_transaction_double_begin_nests() {
3783 let db = GrafeoDB::new_in_memory();
3784 let mut session = db.session();
3785
3786 session.begin_transaction().unwrap();
3787 let result = session.begin_transaction();
3789 assert!(result.is_ok());
3790 session.commit().unwrap();
3792 session.commit().unwrap();
3794 }
3795
3796 #[test]
3797 fn test_commit_without_transaction_error() {
3798 let db = GrafeoDB::new_in_memory();
3799 let mut session = db.session();
3800
3801 let result = session.commit();
3802 assert!(result.is_err());
3803 }
3804
3805 #[test]
3806 fn test_rollback_without_transaction_error() {
3807 let db = GrafeoDB::new_in_memory();
3808 let mut session = db.session();
3809
3810 let result = session.rollback();
3811 assert!(result.is_err());
3812 }
3813
3814 #[test]
3815 fn test_create_edge_in_transaction() {
3816 let db = GrafeoDB::new_in_memory();
3817 let mut session = db.session();
3818
3819 let alix = session.create_node(&["Person"]);
3821 let gus = session.create_node(&["Person"]);
3822
3823 session.begin_transaction().unwrap();
3825 let edge_id = session.create_edge(alix, gus, "KNOWS");
3826
3827 assert!(session.edge_exists(edge_id));
3829
3830 session.commit().unwrap();
3832
3833 assert!(session.edge_exists(edge_id));
3835 }
3836
3837 #[test]
3838 fn test_neighbors_empty_node() {
3839 let db = GrafeoDB::new_in_memory();
3840 let session = db.session();
3841
3842 let lonely = session.create_node(&["Person"]);
3843
3844 assert!(session.get_neighbors_outgoing(lonely).is_empty());
3845 assert!(session.get_neighbors_incoming(lonely).is_empty());
3846 assert!(
3847 session
3848 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
3849 .is_empty()
3850 );
3851 }
3852 }
3853
3854 #[test]
3855 fn test_auto_gc_triggers_on_commit_interval() {
3856 use crate::config::Config;
3857
3858 let config = Config::in_memory().with_gc_interval(2);
3859 let db = GrafeoDB::with_config(config).unwrap();
3860 let mut session = db.session();
3861
3862 session.begin_transaction().unwrap();
3864 session.create_node(&["A"]);
3865 session.commit().unwrap();
3866
3867 session.begin_transaction().unwrap();
3869 session.create_node(&["B"]);
3870 session.commit().unwrap();
3871
3872 assert_eq!(db.node_count(), 2);
3874 }
3875
3876 #[test]
3877 fn test_query_timeout_config_propagates_to_session() {
3878 use crate::config::Config;
3879 use std::time::Duration;
3880
3881 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
3882 let db = GrafeoDB::with_config(config).unwrap();
3883 let session = db.session();
3884
3885 assert!(session.query_deadline().is_some());
3887 }
3888
3889 #[test]
3890 fn test_no_query_timeout_returns_no_deadline() {
3891 let db = GrafeoDB::new_in_memory();
3892 let session = db.session();
3893
3894 assert!(session.query_deadline().is_none());
3896 }
3897
3898 #[test]
3899 fn test_graph_model_accessor() {
3900 use crate::config::GraphModel;
3901
3902 let db = GrafeoDB::new_in_memory();
3903 let session = db.session();
3904
3905 assert_eq!(session.graph_model(), GraphModel::Lpg);
3906 }
3907
3908 #[cfg(feature = "gql")]
3909 #[test]
3910 fn test_external_store_session() {
3911 use grafeo_core::graph::GraphStoreMut;
3912 use std::sync::Arc;
3913
3914 let config = crate::config::Config::in_memory();
3915 let store =
3916 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
3917 let db = GrafeoDB::with_store(store, config).unwrap();
3918
3919 let session = db.session();
3920
3921 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
3923
3924 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
3926 assert_eq!(result.row_count(), 1);
3927 }
3928
3929 #[cfg(feature = "gql")]
3932 mod session_command_tests {
3933 use super::*;
3934
3935 #[test]
3936 fn test_use_graph_sets_current_graph() {
3937 let db = GrafeoDB::new_in_memory();
3938 let session = db.session();
3939
3940 session.execute("CREATE GRAPH mydb").unwrap();
3942 session.execute("USE GRAPH mydb").unwrap();
3943
3944 assert_eq!(session.current_graph(), Some("mydb".to_string()));
3945 }
3946
3947 #[test]
3948 fn test_use_graph_nonexistent_errors() {
3949 let db = GrafeoDB::new_in_memory();
3950 let session = db.session();
3951
3952 let result = session.execute("USE GRAPH doesnotexist");
3953 assert!(result.is_err());
3954 let err = result.unwrap_err().to_string();
3955 assert!(
3956 err.contains("does not exist"),
3957 "Expected 'does not exist' error, got: {err}"
3958 );
3959 }
3960
3961 #[test]
3962 fn test_use_graph_default_always_valid() {
3963 let db = GrafeoDB::new_in_memory();
3964 let session = db.session();
3965
3966 session.execute("USE GRAPH default").unwrap();
3968 assert_eq!(session.current_graph(), Some("default".to_string()));
3969 }
3970
3971 #[test]
3972 fn test_session_set_graph() {
3973 let db = GrafeoDB::new_in_memory();
3974 let session = db.session();
3975
3976 session.execute("SESSION SET GRAPH analytics").unwrap();
3978 assert_eq!(session.current_graph(), Some("analytics".to_string()));
3979 }
3980
3981 #[test]
3982 fn test_session_set_time_zone() {
3983 let db = GrafeoDB::new_in_memory();
3984 let session = db.session();
3985
3986 assert_eq!(session.time_zone(), None);
3987
3988 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3989 assert_eq!(session.time_zone(), Some("UTC".to_string()));
3990
3991 session
3992 .execute("SESSION SET TIME ZONE 'America/New_York'")
3993 .unwrap();
3994 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
3995 }
3996
3997 #[test]
3998 fn test_session_set_parameter() {
3999 let db = GrafeoDB::new_in_memory();
4000 let session = db.session();
4001
4002 session
4003 .execute("SESSION SET PARAMETER $timeout = 30")
4004 .unwrap();
4005
4006 assert!(session.get_parameter("timeout").is_some());
4009 }
4010
4011 #[test]
4012 fn test_session_reset_clears_all_state() {
4013 let db = GrafeoDB::new_in_memory();
4014 let session = db.session();
4015
4016 session.execute("SESSION SET GRAPH analytics").unwrap();
4018 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4019 session
4020 .execute("SESSION SET PARAMETER $limit = 100")
4021 .unwrap();
4022
4023 assert!(session.current_graph().is_some());
4025 assert!(session.time_zone().is_some());
4026 assert!(session.get_parameter("limit").is_some());
4027
4028 session.execute("SESSION RESET").unwrap();
4030
4031 assert_eq!(session.current_graph(), None);
4032 assert_eq!(session.time_zone(), None);
4033 assert!(session.get_parameter("limit").is_none());
4034 }
4035
4036 #[test]
4037 fn test_session_close_clears_state() {
4038 let db = GrafeoDB::new_in_memory();
4039 let session = db.session();
4040
4041 session.execute("SESSION SET GRAPH analytics").unwrap();
4042 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4043
4044 session.execute("SESSION CLOSE").unwrap();
4045
4046 assert_eq!(session.current_graph(), None);
4047 assert_eq!(session.time_zone(), None);
4048 }
4049
4050 #[test]
4051 fn test_create_graph() {
4052 let db = GrafeoDB::new_in_memory();
4053 let session = db.session();
4054
4055 session.execute("CREATE GRAPH mydb").unwrap();
4056
4057 session.execute("USE GRAPH mydb").unwrap();
4059 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4060 }
4061
4062 #[test]
4063 fn test_create_graph_duplicate_errors() {
4064 let db = GrafeoDB::new_in_memory();
4065 let session = db.session();
4066
4067 session.execute("CREATE GRAPH mydb").unwrap();
4068 let result = session.execute("CREATE GRAPH mydb");
4069
4070 assert!(result.is_err());
4071 let err = result.unwrap_err().to_string();
4072 assert!(
4073 err.contains("already exists"),
4074 "Expected 'already exists' error, got: {err}"
4075 );
4076 }
4077
4078 #[test]
4079 fn test_create_graph_if_not_exists() {
4080 let db = GrafeoDB::new_in_memory();
4081 let session = db.session();
4082
4083 session.execute("CREATE GRAPH mydb").unwrap();
4084 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
4086 }
4087
4088 #[test]
4089 fn test_drop_graph() {
4090 let db = GrafeoDB::new_in_memory();
4091 let session = db.session();
4092
4093 session.execute("CREATE GRAPH mydb").unwrap();
4094 session.execute("DROP GRAPH mydb").unwrap();
4095
4096 let result = session.execute("USE GRAPH mydb");
4098 assert!(result.is_err());
4099 }
4100
4101 #[test]
4102 fn test_drop_graph_nonexistent_errors() {
4103 let db = GrafeoDB::new_in_memory();
4104 let session = db.session();
4105
4106 let result = session.execute("DROP GRAPH nosuchgraph");
4107 assert!(result.is_err());
4108 let err = result.unwrap_err().to_string();
4109 assert!(
4110 err.contains("does not exist"),
4111 "Expected 'does not exist' error, got: {err}"
4112 );
4113 }
4114
4115 #[test]
4116 fn test_drop_graph_if_exists() {
4117 let db = GrafeoDB::new_in_memory();
4118 let session = db.session();
4119
4120 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
4122 }
4123
4124 #[test]
4125 fn test_start_transaction_via_gql() {
4126 let db = GrafeoDB::new_in_memory();
4127 let session = db.session();
4128
4129 session.execute("START TRANSACTION").unwrap();
4130 assert!(session.in_transaction());
4131 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4132 session.execute("COMMIT").unwrap();
4133 assert!(!session.in_transaction());
4134
4135 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4136 assert_eq!(result.rows.len(), 1);
4137 }
4138
4139 #[test]
4140 fn test_start_transaction_read_only_blocks_insert() {
4141 let db = GrafeoDB::new_in_memory();
4142 let session = db.session();
4143
4144 session.execute("START TRANSACTION READ ONLY").unwrap();
4145 let result = session.execute("INSERT (:Person {name: 'Alix'})");
4146 assert!(result.is_err());
4147 let err = result.unwrap_err().to_string();
4148 assert!(
4149 err.contains("read-only"),
4150 "Expected read-only error, got: {err}"
4151 );
4152 session.execute("ROLLBACK").unwrap();
4153 }
4154
4155 #[test]
4156 fn test_start_transaction_read_only_allows_reads() {
4157 let db = GrafeoDB::new_in_memory();
4158 let mut session = db.session();
4159 session.begin_transaction().unwrap();
4160 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4161 session.commit().unwrap();
4162
4163 session.execute("START TRANSACTION READ ONLY").unwrap();
4164 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4165 assert_eq!(result.rows.len(), 1);
4166 session.execute("COMMIT").unwrap();
4167 }
4168
4169 #[test]
4170 fn test_rollback_via_gql() {
4171 let db = GrafeoDB::new_in_memory();
4172 let session = db.session();
4173
4174 session.execute("START TRANSACTION").unwrap();
4175 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4176 session.execute("ROLLBACK").unwrap();
4177
4178 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4179 assert!(result.rows.is_empty());
4180 }
4181
4182 #[test]
4183 fn test_start_transaction_with_isolation_level() {
4184 let db = GrafeoDB::new_in_memory();
4185 let session = db.session();
4186
4187 session
4188 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
4189 .unwrap();
4190 assert!(session.in_transaction());
4191 session.execute("ROLLBACK").unwrap();
4192 }
4193
4194 #[test]
4195 fn test_session_commands_return_empty_result() {
4196 let db = GrafeoDB::new_in_memory();
4197 let session = db.session();
4198
4199 let result = session.execute("SESSION SET GRAPH test").unwrap();
4200 assert_eq!(result.row_count(), 0);
4201 assert_eq!(result.column_count(), 0);
4202 }
4203
4204 #[test]
4205 fn test_current_graph_default_is_none() {
4206 let db = GrafeoDB::new_in_memory();
4207 let session = db.session();
4208
4209 assert_eq!(session.current_graph(), None);
4210 }
4211
4212 #[test]
4213 fn test_time_zone_default_is_none() {
4214 let db = GrafeoDB::new_in_memory();
4215 let session = db.session();
4216
4217 assert_eq!(session.time_zone(), None);
4218 }
4219
4220 #[test]
4221 fn test_session_state_independent_across_sessions() {
4222 let db = GrafeoDB::new_in_memory();
4223 let session1 = db.session();
4224 let session2 = db.session();
4225
4226 session1.execute("SESSION SET GRAPH first").unwrap();
4227 session2.execute("SESSION SET GRAPH second").unwrap();
4228
4229 assert_eq!(session1.current_graph(), Some("first".to_string()));
4230 assert_eq!(session2.current_graph(), Some("second".to_string()));
4231 }
4232 }
4233}