1#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_core::graph::Direction;
17use grafeo_core::graph::GraphStoreMut;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::catalog::{Catalog, CatalogConstraintValidator};
23use crate::config::{AdaptiveConfig, GraphModel};
24use crate::database::QueryResult;
25use crate::query::cache::QueryCache;
26use crate::transaction::TransactionManager;
27
28fn parse_default_literal(text: &str) -> Value {
33 if text.eq_ignore_ascii_case("null") {
34 return Value::Null;
35 }
36 if text.eq_ignore_ascii_case("true") {
37 return Value::Bool(true);
38 }
39 if text.eq_ignore_ascii_case("false") {
40 return Value::Bool(false);
41 }
42 if (text.starts_with('\'') && text.ends_with('\''))
44 || (text.starts_with('"') && text.ends_with('"'))
45 {
46 return Value::String(text[1..text.len() - 1].into());
47 }
48 if let Ok(i) = text.parse::<i64>() {
50 return Value::Int64(i);
51 }
52 if let Ok(f) = text.parse::<f64>() {
53 return Value::Float64(f);
54 }
55 Value::String(text.into())
57}
58
59pub(crate) struct SessionConfig {
64 pub transaction_manager: Arc<TransactionManager>,
65 pub query_cache: Arc<QueryCache>,
66 pub catalog: Arc<Catalog>,
67 pub adaptive_config: AdaptiveConfig,
68 pub factorized_execution: bool,
69 pub graph_model: GraphModel,
70 pub query_timeout: Option<Duration>,
71 pub commit_counter: Arc<AtomicUsize>,
72 pub gc_interval: usize,
73 pub read_only: bool,
75}
76
77pub struct Session {
83 store: Arc<LpgStore>,
85 graph_store: Arc<dyn GraphStoreMut>,
87 catalog: Arc<Catalog>,
89 #[cfg(feature = "rdf")]
91 rdf_store: Arc<RdfStore>,
92 transaction_manager: Arc<TransactionManager>,
94 query_cache: Arc<QueryCache>,
96 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
100 read_only_tx: parking_lot::Mutex<bool>,
102 db_read_only: bool,
105 auto_commit: bool,
107 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
110 factorized_execution: bool,
112 graph_model: GraphModel,
114 query_timeout: Option<Duration>,
116 commit_counter: Arc<AtomicUsize>,
118 gc_interval: usize,
120 transaction_start_node_count: AtomicUsize,
122 transaction_start_edge_count: AtomicUsize,
124 #[cfg(feature = "wal")]
126 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
127 #[cfg(feature = "wal")]
129 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
130 #[cfg(feature = "cdc")]
132 cdc_log: Arc<crate::cdc::CdcLog>,
133 current_graph: parking_lot::Mutex<Option<String>>,
135 current_schema: parking_lot::Mutex<Option<String>>,
138 time_zone: parking_lot::Mutex<Option<String>>,
140 session_params:
142 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
143 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
145 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
147 transaction_nesting_depth: parking_lot::Mutex<u32>,
151 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
155 #[cfg(feature = "metrics")]
157 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
158 #[cfg(feature = "metrics")]
160 tx_start_time: parking_lot::Mutex<Option<Instant>>,
161}
162
163#[derive(Clone)]
165struct GraphSavepoint {
166 graph_name: Option<String>,
167 next_node_id: u64,
168 next_edge_id: u64,
169 undo_log_position: usize,
170}
171
172#[derive(Clone)]
174struct SavepointState {
175 name: String,
176 graph_snapshots: Vec<GraphSavepoint>,
177 #[allow(dead_code)]
180 active_graph: Option<String>,
181}
182
183impl Session {
184 #[allow(dead_code)]
186 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
187 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
188 Self {
189 store,
190 graph_store,
191 catalog: cfg.catalog,
192 #[cfg(feature = "rdf")]
193 rdf_store: Arc::new(RdfStore::new()),
194 transaction_manager: cfg.transaction_manager,
195 query_cache: cfg.query_cache,
196 current_transaction: parking_lot::Mutex::new(None),
197 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
198 db_read_only: cfg.read_only,
199 auto_commit: true,
200 adaptive_config: cfg.adaptive_config,
201 factorized_execution: cfg.factorized_execution,
202 graph_model: cfg.graph_model,
203 query_timeout: cfg.query_timeout,
204 commit_counter: cfg.commit_counter,
205 gc_interval: cfg.gc_interval,
206 transaction_start_node_count: AtomicUsize::new(0),
207 transaction_start_edge_count: AtomicUsize::new(0),
208 #[cfg(feature = "wal")]
209 wal: None,
210 #[cfg(feature = "wal")]
211 wal_graph_context: None,
212 #[cfg(feature = "cdc")]
213 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
214 current_graph: parking_lot::Mutex::new(None),
215 current_schema: parking_lot::Mutex::new(None),
216 time_zone: parking_lot::Mutex::new(None),
217 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
218 viewing_epoch_override: parking_lot::Mutex::new(None),
219 savepoints: parking_lot::Mutex::new(Vec::new()),
220 transaction_nesting_depth: parking_lot::Mutex::new(0),
221 touched_graphs: parking_lot::Mutex::new(Vec::new()),
222 #[cfg(feature = "metrics")]
223 metrics: None,
224 #[cfg(feature = "metrics")]
225 tx_start_time: parking_lot::Mutex::new(None),
226 }
227 }
228
229 #[cfg(feature = "wal")]
234 pub(crate) fn set_wal(
235 &mut self,
236 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
237 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
238 ) {
239 self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
241 Arc::clone(&self.store),
242 Arc::clone(&wal),
243 Arc::clone(&wal_graph_context),
244 ));
245 self.wal = Some(wal);
246 self.wal_graph_context = Some(wal_graph_context);
247 }
248
249 #[cfg(feature = "cdc")]
251 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
252 self.cdc_log = cdc_log;
253 }
254
255 #[cfg(feature = "metrics")]
257 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
258 self.metrics = Some(metrics);
259 }
260
261 pub(crate) fn with_external_store(
270 store: Arc<dyn GraphStoreMut>,
271 cfg: SessionConfig,
272 ) -> Result<Self> {
273 Ok(Self {
274 store: Arc::new(LpgStore::new()?),
275 graph_store: store,
276 catalog: cfg.catalog,
277 #[cfg(feature = "rdf")]
278 rdf_store: Arc::new(RdfStore::new()),
279 transaction_manager: cfg.transaction_manager,
280 query_cache: cfg.query_cache,
281 current_transaction: parking_lot::Mutex::new(None),
282 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
283 db_read_only: cfg.read_only,
284 auto_commit: true,
285 adaptive_config: cfg.adaptive_config,
286 factorized_execution: cfg.factorized_execution,
287 graph_model: cfg.graph_model,
288 query_timeout: cfg.query_timeout,
289 commit_counter: cfg.commit_counter,
290 gc_interval: cfg.gc_interval,
291 transaction_start_node_count: AtomicUsize::new(0),
292 transaction_start_edge_count: AtomicUsize::new(0),
293 #[cfg(feature = "wal")]
294 wal: None,
295 #[cfg(feature = "wal")]
296 wal_graph_context: None,
297 #[cfg(feature = "cdc")]
298 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
299 current_graph: parking_lot::Mutex::new(None),
300 current_schema: parking_lot::Mutex::new(None),
301 time_zone: parking_lot::Mutex::new(None),
302 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
303 viewing_epoch_override: parking_lot::Mutex::new(None),
304 savepoints: parking_lot::Mutex::new(Vec::new()),
305 transaction_nesting_depth: parking_lot::Mutex::new(0),
306 touched_graphs: parking_lot::Mutex::new(Vec::new()),
307 #[cfg(feature = "metrics")]
308 metrics: None,
309 #[cfg(feature = "metrics")]
310 tx_start_time: parking_lot::Mutex::new(None),
311 })
312 }
313
314 #[must_use]
316 pub fn graph_model(&self) -> GraphModel {
317 self.graph_model
318 }
319
320 pub fn use_graph(&self, name: &str) {
324 *self.current_graph.lock() = Some(name.to_string());
325 }
326
327 #[must_use]
329 pub fn current_graph(&self) -> Option<String> {
330 self.current_graph.lock().clone()
331 }
332
333 pub fn set_schema(&self, name: &str) {
337 *self.current_schema.lock() = Some(name.to_string());
338 }
339
340 #[must_use]
344 pub fn current_schema(&self) -> Option<String> {
345 self.current_schema.lock().clone()
346 }
347
348 fn effective_graph_key(&self, graph_name: &str) -> String {
353 let schema = self.current_schema.lock().clone();
354 match schema {
355 Some(s) => format!("{s}/{graph_name}"),
356 None => graph_name.to_string(),
357 }
358 }
359
360 fn active_graph_storage_key(&self) -> Option<String> {
364 let graph = self.current_graph.lock().clone();
365 let schema = self.current_schema.lock().clone();
366 match (schema, graph) {
367 (_, None) => None,
368 (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
369 (None, Some(name)) => Some(name),
370 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
371 }
372 }
373
374 fn active_store(&self) -> Arc<dyn GraphStoreMut> {
382 let key = self.active_graph_storage_key();
383 match key {
384 None => Arc::clone(&self.graph_store),
385 Some(ref name) => match self.store.graph(name) {
386 Some(named_store) => {
387 #[cfg(feature = "wal")]
388 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
389 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
390 named_store,
391 Arc::clone(wal),
392 name.clone(),
393 Arc::clone(ctx),
394 )) as Arc<dyn GraphStoreMut>;
395 }
396 named_store as Arc<dyn GraphStoreMut>
397 }
398 None => Arc::clone(&self.graph_store),
399 },
400 }
401 }
402
403 fn active_lpg_store(&self) -> Arc<LpgStore> {
408 let key = self.active_graph_storage_key();
409 match key {
410 None => Arc::clone(&self.store),
411 Some(ref name) => self
412 .store
413 .graph(name)
414 .unwrap_or_else(|| Arc::clone(&self.store)),
415 }
416 }
417
418 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
421 match graph_name {
422 None => Arc::clone(&self.store),
423 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
424 Some(name) => self
425 .store
426 .graph(name)
427 .unwrap_or_else(|| Arc::clone(&self.store)),
428 }
429 }
430
431 fn track_graph_touch(&self) {
436 if self.current_transaction.lock().is_some() {
437 let key = self.active_graph_storage_key();
438 let mut touched = self.touched_graphs.lock();
439 if !touched.contains(&key) {
440 touched.push(key);
441 }
442 }
443 }
444
445 pub fn set_time_zone(&self, tz: &str) {
447 *self.time_zone.lock() = Some(tz.to_string());
448 }
449
450 #[must_use]
452 pub fn time_zone(&self) -> Option<String> {
453 self.time_zone.lock().clone()
454 }
455
456 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
458 self.session_params.lock().insert(key.to_string(), value);
459 }
460
461 #[must_use]
463 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
464 self.session_params.lock().get(key).cloned()
465 }
466
467 pub fn reset_session(&self) {
469 *self.current_schema.lock() = None;
470 *self.current_graph.lock() = None;
471 *self.time_zone.lock() = None;
472 self.session_params.lock().clear();
473 *self.viewing_epoch_override.lock() = None;
474 }
475
476 pub fn reset_schema(&self) {
478 *self.current_schema.lock() = None;
479 }
480
481 pub fn reset_graph(&self) {
483 *self.current_graph.lock() = None;
484 }
485
486 pub fn reset_time_zone(&self) {
488 *self.time_zone.lock() = None;
489 }
490
491 pub fn reset_parameters(&self) {
493 self.session_params.lock().clear();
494 }
495
496 pub fn set_viewing_epoch(&self, epoch: EpochId) {
504 *self.viewing_epoch_override.lock() = Some(epoch);
505 }
506
507 pub fn clear_viewing_epoch(&self) {
509 *self.viewing_epoch_override.lock() = None;
510 }
511
512 #[must_use]
514 pub fn viewing_epoch(&self) -> Option<EpochId> {
515 *self.viewing_epoch_override.lock()
516 }
517
518 #[must_use]
522 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
523 self.active_lpg_store().get_node_history(id)
524 }
525
526 #[must_use]
530 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
531 self.active_lpg_store().get_edge_history(id)
532 }
533
534 fn require_lpg(&self, language: &str) -> Result<()> {
536 if self.graph_model == GraphModel::Rdf {
537 return Err(grafeo_common::utils::error::Error::Internal(format!(
538 "This is an RDF database. {language} queries require an LPG database."
539 )));
540 }
541 Ok(())
542 }
543
544 #[cfg(feature = "gql")]
546 fn execute_session_command(
547 &self,
548 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
549 ) -> Result<QueryResult> {
550 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
551 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
552
553 if *self.read_only_tx.lock() {
555 match &cmd {
556 SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
557 return Err(Error::Transaction(
558 grafeo_common::utils::error::TransactionError::ReadOnly,
559 ));
560 }
561 _ => {} }
563 }
564
565 match cmd {
566 SessionCommand::CreateGraph {
567 name,
568 if_not_exists,
569 typed,
570 like_graph,
571 copy_of,
572 open: _,
573 } => {
574 let storage_key = self.effective_graph_key(&name);
576
577 if let Some(ref src) = like_graph {
579 let src_key = self.effective_graph_key(src);
580 if self.store.graph(&src_key).is_none() {
581 return Err(Error::Query(QueryError::new(
582 QueryErrorKind::Semantic,
583 format!("Source graph '{src}' does not exist"),
584 )));
585 }
586 }
587 if let Some(ref src) = copy_of {
588 let src_key = self.effective_graph_key(src);
589 if self.store.graph(&src_key).is_none() {
590 return Err(Error::Query(QueryError::new(
591 QueryErrorKind::Semantic,
592 format!("Source graph '{src}' does not exist"),
593 )));
594 }
595 }
596
597 let created = self
598 .store
599 .create_graph(&storage_key)
600 .map_err(|e| Error::Internal(e.to_string()))?;
601 if !created && !if_not_exists {
602 return Err(Error::Query(QueryError::new(
603 QueryErrorKind::Semantic,
604 format!("Graph '{name}' already exists"),
605 )));
606 }
607 if created {
608 #[cfg(feature = "wal")]
609 self.log_schema_wal(
610 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
611 name: storage_key.clone(),
612 },
613 );
614 }
615
616 if let Some(ref src) = copy_of {
618 let src_key = self.effective_graph_key(src);
619 self.store
620 .copy_graph(Some(&src_key), Some(&storage_key))
621 .map_err(|e| Error::Internal(e.to_string()))?;
622 }
623
624 if let Some(type_name) = typed
626 && let Err(e) = self
627 .catalog
628 .bind_graph_type(&storage_key, type_name.clone())
629 {
630 return Err(Error::Query(QueryError::new(
631 QueryErrorKind::Semantic,
632 e.to_string(),
633 )));
634 }
635
636 if let Some(ref src) = like_graph {
638 let src_key = self.effective_graph_key(src);
639 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
640 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
641 }
642 }
643
644 Ok(QueryResult::empty())
645 }
646 SessionCommand::DropGraph { name, if_exists } => {
647 let storage_key = self.effective_graph_key(&name);
648 let dropped = self.store.drop_graph(&storage_key);
649 if !dropped && !if_exists {
650 return Err(Error::Query(QueryError::new(
651 QueryErrorKind::Semantic,
652 format!("Graph '{name}' does not exist"),
653 )));
654 }
655 if dropped {
656 #[cfg(feature = "wal")]
657 self.log_schema_wal(
658 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
659 name: storage_key.clone(),
660 },
661 );
662 let mut current = self.current_graph.lock();
664 if current
665 .as_deref()
666 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
667 {
668 *current = None;
669 }
670 }
671 Ok(QueryResult::empty())
672 }
673 SessionCommand::UseGraph(name) => {
674 let effective_key = self.effective_graph_key(&name);
676 if !name.eq_ignore_ascii_case("default")
677 && self.store.graph(&effective_key).is_none()
678 {
679 return Err(Error::Query(QueryError::new(
680 QueryErrorKind::Semantic,
681 format!("Graph '{name}' does not exist"),
682 )));
683 }
684 self.use_graph(&name);
685 self.track_graph_touch();
687 Ok(QueryResult::empty())
688 }
689 SessionCommand::SessionSetGraph(name) => {
690 let effective_key = self.effective_graph_key(&name);
692 if !name.eq_ignore_ascii_case("default")
693 && self.store.graph(&effective_key).is_none()
694 {
695 return Err(Error::Query(QueryError::new(
696 QueryErrorKind::Semantic,
697 format!("Graph '{name}' does not exist"),
698 )));
699 }
700 self.use_graph(&name);
701 self.track_graph_touch();
703 Ok(QueryResult::empty())
704 }
705 SessionCommand::SessionSetSchema(name) => {
706 if !self.catalog.schema_exists(&name) {
708 return Err(Error::Query(QueryError::new(
709 QueryErrorKind::Semantic,
710 format!("Schema '{name}' does not exist"),
711 )));
712 }
713 self.set_schema(&name);
714 Ok(QueryResult::empty())
715 }
716 SessionCommand::SessionSetTimeZone(tz) => {
717 self.set_time_zone(&tz);
718 Ok(QueryResult::empty())
719 }
720 SessionCommand::SessionSetParameter(key, expr) => {
721 if key.eq_ignore_ascii_case("viewing_epoch") {
722 match Self::eval_integer_literal(&expr) {
723 Some(n) if n >= 0 => {
724 self.set_viewing_epoch(EpochId::new(n as u64));
725 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
726 }
727 _ => Err(Error::Query(QueryError::new(
728 QueryErrorKind::Semantic,
729 "viewing_epoch must be a non-negative integer literal",
730 ))),
731 }
732 } else {
733 self.set_parameter(&key, Value::Null);
736 Ok(QueryResult::empty())
737 }
738 }
739 SessionCommand::SessionReset(target) => {
740 use grafeo_adapters::query::gql::ast::SessionResetTarget;
741 match target {
742 SessionResetTarget::All => self.reset_session(),
743 SessionResetTarget::Schema => self.reset_schema(),
744 SessionResetTarget::Graph => self.reset_graph(),
745 SessionResetTarget::TimeZone => self.reset_time_zone(),
746 SessionResetTarget::Parameters => self.reset_parameters(),
747 }
748 Ok(QueryResult::empty())
749 }
750 SessionCommand::SessionClose => {
751 self.reset_session();
752 Ok(QueryResult::empty())
753 }
754 SessionCommand::StartTransaction {
755 read_only,
756 isolation_level,
757 } => {
758 let engine_level = isolation_level.map(|l| match l {
759 TransactionIsolationLevel::ReadCommitted => {
760 crate::transaction::IsolationLevel::ReadCommitted
761 }
762 TransactionIsolationLevel::SnapshotIsolation => {
763 crate::transaction::IsolationLevel::SnapshotIsolation
764 }
765 TransactionIsolationLevel::Serializable => {
766 crate::transaction::IsolationLevel::Serializable
767 }
768 });
769 self.begin_transaction_inner(read_only, engine_level)?;
770 Ok(QueryResult::status("Transaction started"))
771 }
772 SessionCommand::Commit => {
773 self.commit_inner()?;
774 Ok(QueryResult::status("Transaction committed"))
775 }
776 SessionCommand::Rollback => {
777 self.rollback_inner()?;
778 Ok(QueryResult::status("Transaction rolled back"))
779 }
780 SessionCommand::Savepoint(name) => {
781 self.savepoint(&name)?;
782 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
783 }
784 SessionCommand::RollbackToSavepoint(name) => {
785 self.rollback_to_savepoint(&name)?;
786 Ok(QueryResult::status(format!(
787 "Rolled back to savepoint '{name}'"
788 )))
789 }
790 SessionCommand::ReleaseSavepoint(name) => {
791 self.release_savepoint(&name)?;
792 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
793 }
794 }
795 }
796
797 #[cfg(feature = "wal")]
799 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
800 if let Some(ref wal) = self.wal
801 && let Err(e) = wal.log(record)
802 {
803 tracing::warn!("Failed to log schema change to WAL: {}", e);
804 }
805 }
806
807 #[cfg(feature = "gql")]
809 fn execute_schema_command(
810 &self,
811 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
812 ) -> Result<QueryResult> {
813 use crate::catalog::{
814 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
815 };
816 use grafeo_adapters::query::gql::ast::SchemaStatement;
817 #[cfg(feature = "wal")]
818 use grafeo_adapters::storage::wal::WalRecord;
819 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
820
821 macro_rules! wal_log {
823 ($self:expr, $record:expr) => {
824 #[cfg(feature = "wal")]
825 $self.log_schema_wal(&$record);
826 };
827 }
828
829 let result = match cmd {
830 SchemaStatement::CreateNodeType(stmt) => {
831 #[cfg(feature = "wal")]
832 let props_for_wal: Vec<(String, String, bool)> = stmt
833 .properties
834 .iter()
835 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
836 .collect();
837 let def = NodeTypeDefinition {
838 name: stmt.name.clone(),
839 properties: stmt
840 .properties
841 .iter()
842 .map(|p| TypedProperty {
843 name: p.name.clone(),
844 data_type: PropertyDataType::from_type_name(&p.data_type),
845 nullable: p.nullable,
846 default_value: p
847 .default_value
848 .as_ref()
849 .map(|s| parse_default_literal(s)),
850 })
851 .collect(),
852 constraints: Vec::new(),
853 parent_types: stmt.parent_types.clone(),
854 };
855 let result = if stmt.or_replace {
856 let _ = self.catalog.drop_node_type(&stmt.name);
857 self.catalog.register_node_type(def)
858 } else {
859 self.catalog.register_node_type(def)
860 };
861 match result {
862 Ok(()) => {
863 wal_log!(
864 self,
865 WalRecord::CreateNodeType {
866 name: stmt.name.clone(),
867 properties: props_for_wal,
868 constraints: Vec::new(),
869 }
870 );
871 Ok(QueryResult::status(format!(
872 "Created node type '{}'",
873 stmt.name
874 )))
875 }
876 Err(e) if stmt.if_not_exists => {
877 let _ = e;
878 Ok(QueryResult::status("No change"))
879 }
880 Err(e) => Err(Error::Query(QueryError::new(
881 QueryErrorKind::Semantic,
882 e.to_string(),
883 ))),
884 }
885 }
886 SchemaStatement::CreateEdgeType(stmt) => {
887 #[cfg(feature = "wal")]
888 let props_for_wal: Vec<(String, String, bool)> = stmt
889 .properties
890 .iter()
891 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
892 .collect();
893 let def = EdgeTypeDefinition {
894 name: stmt.name.clone(),
895 properties: stmt
896 .properties
897 .iter()
898 .map(|p| TypedProperty {
899 name: p.name.clone(),
900 data_type: PropertyDataType::from_type_name(&p.data_type),
901 nullable: p.nullable,
902 default_value: p
903 .default_value
904 .as_ref()
905 .map(|s| parse_default_literal(s)),
906 })
907 .collect(),
908 constraints: Vec::new(),
909 source_node_types: stmt.source_node_types.clone(),
910 target_node_types: stmt.target_node_types.clone(),
911 };
912 let result = if stmt.or_replace {
913 let _ = self.catalog.drop_edge_type_def(&stmt.name);
914 self.catalog.register_edge_type_def(def)
915 } else {
916 self.catalog.register_edge_type_def(def)
917 };
918 match result {
919 Ok(()) => {
920 wal_log!(
921 self,
922 WalRecord::CreateEdgeType {
923 name: stmt.name.clone(),
924 properties: props_for_wal,
925 constraints: Vec::new(),
926 }
927 );
928 Ok(QueryResult::status(format!(
929 "Created edge type '{}'",
930 stmt.name
931 )))
932 }
933 Err(e) if stmt.if_not_exists => {
934 let _ = e;
935 Ok(QueryResult::status("No change"))
936 }
937 Err(e) => Err(Error::Query(QueryError::new(
938 QueryErrorKind::Semantic,
939 e.to_string(),
940 ))),
941 }
942 }
943 SchemaStatement::CreateVectorIndex(stmt) => {
944 Self::create_vector_index_on_store(
945 &self.active_lpg_store(),
946 &stmt.node_label,
947 &stmt.property,
948 stmt.dimensions,
949 stmt.metric.as_deref(),
950 )?;
951 wal_log!(
952 self,
953 WalRecord::CreateIndex {
954 name: stmt.name.clone(),
955 label: stmt.node_label.clone(),
956 property: stmt.property.clone(),
957 index_type: "vector".to_string(),
958 }
959 );
960 Ok(QueryResult::status(format!(
961 "Created vector index '{}'",
962 stmt.name
963 )))
964 }
965 SchemaStatement::DropNodeType { name, if_exists } => {
966 match self.catalog.drop_node_type(&name) {
967 Ok(()) => {
968 wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
969 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
970 }
971 Err(e) if if_exists => {
972 let _ = e;
973 Ok(QueryResult::status("No change"))
974 }
975 Err(e) => Err(Error::Query(QueryError::new(
976 QueryErrorKind::Semantic,
977 e.to_string(),
978 ))),
979 }
980 }
981 SchemaStatement::DropEdgeType { name, if_exists } => {
982 match self.catalog.drop_edge_type_def(&name) {
983 Ok(()) => {
984 wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
985 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
986 }
987 Err(e) if if_exists => {
988 let _ = e;
989 Ok(QueryResult::status("No change"))
990 }
991 Err(e) => Err(Error::Query(QueryError::new(
992 QueryErrorKind::Semantic,
993 e.to_string(),
994 ))),
995 }
996 }
997 SchemaStatement::CreateIndex(stmt) => {
998 use grafeo_adapters::query::gql::ast::IndexKind;
999 let active = self.active_lpg_store();
1000 let index_type_str = match stmt.index_kind {
1001 IndexKind::Property => "property",
1002 IndexKind::BTree => "btree",
1003 IndexKind::Text => "text",
1004 IndexKind::Vector => "vector",
1005 };
1006 match stmt.index_kind {
1007 IndexKind::Property | IndexKind::BTree => {
1008 for prop in &stmt.properties {
1009 active.create_property_index(prop);
1010 }
1011 }
1012 IndexKind::Text => {
1013 for prop in &stmt.properties {
1014 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1015 }
1016 }
1017 IndexKind::Vector => {
1018 for prop in &stmt.properties {
1019 Self::create_vector_index_on_store(
1020 &active,
1021 &stmt.label,
1022 prop,
1023 stmt.options.dimensions,
1024 stmt.options.metric.as_deref(),
1025 )?;
1026 }
1027 }
1028 }
1029 #[cfg(feature = "wal")]
1030 for prop in &stmt.properties {
1031 wal_log!(
1032 self,
1033 WalRecord::CreateIndex {
1034 name: stmt.name.clone(),
1035 label: stmt.label.clone(),
1036 property: prop.clone(),
1037 index_type: index_type_str.to_string(),
1038 }
1039 );
1040 }
1041 Ok(QueryResult::status(format!(
1042 "Created {} index '{}'",
1043 index_type_str, stmt.name
1044 )))
1045 }
1046 SchemaStatement::DropIndex { name, if_exists } => {
1047 let dropped = self.active_lpg_store().drop_property_index(&name);
1049 if dropped || if_exists {
1050 if dropped {
1051 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1052 }
1053 Ok(QueryResult::status(if dropped {
1054 format!("Dropped index '{name}'")
1055 } else {
1056 "No change".to_string()
1057 }))
1058 } else {
1059 Err(Error::Query(QueryError::new(
1060 QueryErrorKind::Semantic,
1061 format!("Index '{name}' does not exist"),
1062 )))
1063 }
1064 }
1065 SchemaStatement::CreateConstraint(stmt) => {
1066 use crate::catalog::TypeConstraint;
1067 use grafeo_adapters::query::gql::ast::ConstraintKind;
1068 let kind_str = match stmt.constraint_kind {
1069 ConstraintKind::Unique => "unique",
1070 ConstraintKind::NodeKey => "node_key",
1071 ConstraintKind::NotNull => "not_null",
1072 ConstraintKind::Exists => "exists",
1073 };
1074 let constraint_name = stmt
1075 .name
1076 .clone()
1077 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1078
1079 match stmt.constraint_kind {
1081 ConstraintKind::Unique => {
1082 for prop in &stmt.properties {
1083 let label_id = self.catalog.get_or_create_label(&stmt.label);
1084 let prop_id = self.catalog.get_or_create_property_key(prop);
1085 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1086 }
1087 let _ = self.catalog.add_constraint_to_type(
1088 &stmt.label,
1089 TypeConstraint::Unique(stmt.properties.clone()),
1090 );
1091 }
1092 ConstraintKind::NodeKey => {
1093 for prop in &stmt.properties {
1094 let label_id = self.catalog.get_or_create_label(&stmt.label);
1095 let prop_id = self.catalog.get_or_create_property_key(prop);
1096 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1097 let _ = self.catalog.add_required_property(label_id, prop_id);
1098 }
1099 let _ = self.catalog.add_constraint_to_type(
1100 &stmt.label,
1101 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1102 );
1103 }
1104 ConstraintKind::NotNull | ConstraintKind::Exists => {
1105 for prop in &stmt.properties {
1106 let label_id = self.catalog.get_or_create_label(&stmt.label);
1107 let prop_id = self.catalog.get_or_create_property_key(prop);
1108 let _ = self.catalog.add_required_property(label_id, prop_id);
1109 let _ = self.catalog.add_constraint_to_type(
1110 &stmt.label,
1111 TypeConstraint::NotNull(prop.clone()),
1112 );
1113 }
1114 }
1115 }
1116
1117 wal_log!(
1118 self,
1119 WalRecord::CreateConstraint {
1120 name: constraint_name.clone(),
1121 label: stmt.label.clone(),
1122 properties: stmt.properties.clone(),
1123 kind: kind_str.to_string(),
1124 }
1125 );
1126 Ok(QueryResult::status(format!(
1127 "Created {kind_str} constraint '{constraint_name}'"
1128 )))
1129 }
1130 SchemaStatement::DropConstraint { name, if_exists } => {
1131 let _ = if_exists;
1132 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1133 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1134 }
1135 SchemaStatement::CreateGraphType(stmt) => {
1136 use crate::catalog::GraphTypeDefinition;
1137 use grafeo_adapters::query::gql::ast::InlineElementType;
1138
1139 let (mut node_types, mut edge_types, open) =
1141 if let Some(ref like_graph) = stmt.like_graph {
1142 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1144 if let Some(existing) = self
1145 .catalog
1146 .schema()
1147 .and_then(|s| s.get_graph_type(&type_name))
1148 {
1149 (
1150 existing.allowed_node_types.clone(),
1151 existing.allowed_edge_types.clone(),
1152 existing.open,
1153 )
1154 } else {
1155 (Vec::new(), Vec::new(), true)
1156 }
1157 } else {
1158 let nt = self.catalog.all_node_type_names();
1160 let et = self.catalog.all_edge_type_names();
1161 if nt.is_empty() && et.is_empty() {
1162 (Vec::new(), Vec::new(), true)
1163 } else {
1164 (nt, et, false)
1165 }
1166 }
1167 } else {
1168 (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1169 };
1170
1171 for inline in &stmt.inline_types {
1173 match inline {
1174 InlineElementType::Node {
1175 name,
1176 properties,
1177 key_labels,
1178 ..
1179 } => {
1180 let def = NodeTypeDefinition {
1181 name: name.clone(),
1182 properties: properties
1183 .iter()
1184 .map(|p| TypedProperty {
1185 name: p.name.clone(),
1186 data_type: PropertyDataType::from_type_name(&p.data_type),
1187 nullable: p.nullable,
1188 default_value: None,
1189 })
1190 .collect(),
1191 constraints: Vec::new(),
1192 parent_types: key_labels.clone(),
1193 };
1194 self.catalog.register_or_replace_node_type(def);
1196 #[cfg(feature = "wal")]
1197 {
1198 let props_for_wal: Vec<(String, String, bool)> = properties
1199 .iter()
1200 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1201 .collect();
1202 self.log_schema_wal(&WalRecord::CreateNodeType {
1203 name: name.clone(),
1204 properties: props_for_wal,
1205 constraints: Vec::new(),
1206 });
1207 }
1208 if !node_types.contains(name) {
1209 node_types.push(name.clone());
1210 }
1211 }
1212 InlineElementType::Edge {
1213 name,
1214 properties,
1215 source_node_types,
1216 target_node_types,
1217 ..
1218 } => {
1219 let def = EdgeTypeDefinition {
1220 name: name.clone(),
1221 properties: properties
1222 .iter()
1223 .map(|p| TypedProperty {
1224 name: p.name.clone(),
1225 data_type: PropertyDataType::from_type_name(&p.data_type),
1226 nullable: p.nullable,
1227 default_value: None,
1228 })
1229 .collect(),
1230 constraints: Vec::new(),
1231 source_node_types: source_node_types.clone(),
1232 target_node_types: target_node_types.clone(),
1233 };
1234 self.catalog.register_or_replace_edge_type_def(def);
1235 #[cfg(feature = "wal")]
1236 {
1237 let props_for_wal: Vec<(String, String, bool)> = properties
1238 .iter()
1239 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1240 .collect();
1241 self.log_schema_wal(&WalRecord::CreateEdgeType {
1242 name: name.clone(),
1243 properties: props_for_wal,
1244 constraints: Vec::new(),
1245 });
1246 }
1247 if !edge_types.contains(name) {
1248 edge_types.push(name.clone());
1249 }
1250 }
1251 }
1252 }
1253
1254 let def = GraphTypeDefinition {
1255 name: stmt.name.clone(),
1256 allowed_node_types: node_types.clone(),
1257 allowed_edge_types: edge_types.clone(),
1258 open,
1259 };
1260 let result = if stmt.or_replace {
1261 let _ = self.catalog.drop_graph_type(&stmt.name);
1263 self.catalog.register_graph_type(def)
1264 } else {
1265 self.catalog.register_graph_type(def)
1266 };
1267 match result {
1268 Ok(()) => {
1269 wal_log!(
1270 self,
1271 WalRecord::CreateGraphType {
1272 name: stmt.name.clone(),
1273 node_types,
1274 edge_types,
1275 open,
1276 }
1277 );
1278 Ok(QueryResult::status(format!(
1279 "Created graph type '{}'",
1280 stmt.name
1281 )))
1282 }
1283 Err(e) if stmt.if_not_exists => {
1284 let _ = e;
1285 Ok(QueryResult::status("No change"))
1286 }
1287 Err(e) => Err(Error::Query(QueryError::new(
1288 QueryErrorKind::Semantic,
1289 e.to_string(),
1290 ))),
1291 }
1292 }
1293 SchemaStatement::DropGraphType { name, if_exists } => {
1294 match self.catalog.drop_graph_type(&name) {
1295 Ok(()) => {
1296 wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1297 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1298 }
1299 Err(e) if if_exists => {
1300 let _ = e;
1301 Ok(QueryResult::status("No change"))
1302 }
1303 Err(e) => Err(Error::Query(QueryError::new(
1304 QueryErrorKind::Semantic,
1305 e.to_string(),
1306 ))),
1307 }
1308 }
1309 SchemaStatement::CreateSchema {
1310 name,
1311 if_not_exists,
1312 } => match self.catalog.register_schema_namespace(name.clone()) {
1313 Ok(()) => {
1314 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1315 Ok(QueryResult::status(format!("Created schema '{name}'")))
1316 }
1317 Err(e) if if_not_exists => {
1318 let _ = e;
1319 Ok(QueryResult::status("No change"))
1320 }
1321 Err(e) => Err(Error::Query(QueryError::new(
1322 QueryErrorKind::Semantic,
1323 e.to_string(),
1324 ))),
1325 },
1326 SchemaStatement::DropSchema { name, if_exists } => {
1327 let prefix = format!("{name}/");
1329 let has_graphs = self
1330 .store
1331 .graph_names()
1332 .iter()
1333 .any(|g| g.starts_with(&prefix));
1334 if has_graphs {
1335 return Err(Error::Query(QueryError::new(
1336 QueryErrorKind::Semantic,
1337 format!(
1338 "Schema '{name}' is not empty: drop all graphs in the schema first"
1339 ),
1340 )));
1341 }
1342 match self.catalog.drop_schema_namespace(&name) {
1343 Ok(()) => {
1344 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1345 let mut current = self.current_schema.lock();
1347 if current
1348 .as_deref()
1349 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1350 {
1351 *current = None;
1352 }
1353 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1354 }
1355 Err(e) if if_exists => {
1356 let _ = e;
1357 Ok(QueryResult::status("No change"))
1358 }
1359 Err(e) => Err(Error::Query(QueryError::new(
1360 QueryErrorKind::Semantic,
1361 e.to_string(),
1362 ))),
1363 }
1364 }
1365 SchemaStatement::AlterNodeType(stmt) => {
1366 use grafeo_adapters::query::gql::ast::TypeAlteration;
1367 let mut wal_alts = Vec::new();
1368 for alt in &stmt.alterations {
1369 match alt {
1370 TypeAlteration::AddProperty(prop) => {
1371 let typed = TypedProperty {
1372 name: prop.name.clone(),
1373 data_type: PropertyDataType::from_type_name(&prop.data_type),
1374 nullable: prop.nullable,
1375 default_value: prop
1376 .default_value
1377 .as_ref()
1378 .map(|s| parse_default_literal(s)),
1379 };
1380 self.catalog
1381 .alter_node_type_add_property(&stmt.name, typed)
1382 .map_err(|e| {
1383 Error::Query(QueryError::new(
1384 QueryErrorKind::Semantic,
1385 e.to_string(),
1386 ))
1387 })?;
1388 wal_alts.push((
1389 "add".to_string(),
1390 prop.name.clone(),
1391 prop.data_type.clone(),
1392 prop.nullable,
1393 ));
1394 }
1395 TypeAlteration::DropProperty(name) => {
1396 self.catalog
1397 .alter_node_type_drop_property(&stmt.name, name)
1398 .map_err(|e| {
1399 Error::Query(QueryError::new(
1400 QueryErrorKind::Semantic,
1401 e.to_string(),
1402 ))
1403 })?;
1404 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1405 }
1406 }
1407 }
1408 wal_log!(
1409 self,
1410 WalRecord::AlterNodeType {
1411 name: stmt.name.clone(),
1412 alterations: wal_alts,
1413 }
1414 );
1415 Ok(QueryResult::status(format!(
1416 "Altered node type '{}'",
1417 stmt.name
1418 )))
1419 }
1420 SchemaStatement::AlterEdgeType(stmt) => {
1421 use grafeo_adapters::query::gql::ast::TypeAlteration;
1422 let mut wal_alts = Vec::new();
1423 for alt in &stmt.alterations {
1424 match alt {
1425 TypeAlteration::AddProperty(prop) => {
1426 let typed = TypedProperty {
1427 name: prop.name.clone(),
1428 data_type: PropertyDataType::from_type_name(&prop.data_type),
1429 nullable: prop.nullable,
1430 default_value: prop
1431 .default_value
1432 .as_ref()
1433 .map(|s| parse_default_literal(s)),
1434 };
1435 self.catalog
1436 .alter_edge_type_add_property(&stmt.name, typed)
1437 .map_err(|e| {
1438 Error::Query(QueryError::new(
1439 QueryErrorKind::Semantic,
1440 e.to_string(),
1441 ))
1442 })?;
1443 wal_alts.push((
1444 "add".to_string(),
1445 prop.name.clone(),
1446 prop.data_type.clone(),
1447 prop.nullable,
1448 ));
1449 }
1450 TypeAlteration::DropProperty(name) => {
1451 self.catalog
1452 .alter_edge_type_drop_property(&stmt.name, name)
1453 .map_err(|e| {
1454 Error::Query(QueryError::new(
1455 QueryErrorKind::Semantic,
1456 e.to_string(),
1457 ))
1458 })?;
1459 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1460 }
1461 }
1462 }
1463 wal_log!(
1464 self,
1465 WalRecord::AlterEdgeType {
1466 name: stmt.name.clone(),
1467 alterations: wal_alts,
1468 }
1469 );
1470 Ok(QueryResult::status(format!(
1471 "Altered edge type '{}'",
1472 stmt.name
1473 )))
1474 }
1475 SchemaStatement::AlterGraphType(stmt) => {
1476 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1477 let mut wal_alts = Vec::new();
1478 for alt in &stmt.alterations {
1479 match alt {
1480 GraphTypeAlteration::AddNodeType(name) => {
1481 self.catalog
1482 .alter_graph_type_add_node_type(&stmt.name, name.clone())
1483 .map_err(|e| {
1484 Error::Query(QueryError::new(
1485 QueryErrorKind::Semantic,
1486 e.to_string(),
1487 ))
1488 })?;
1489 wal_alts.push(("add_node_type".to_string(), name.clone()));
1490 }
1491 GraphTypeAlteration::DropNodeType(name) => {
1492 self.catalog
1493 .alter_graph_type_drop_node_type(&stmt.name, name)
1494 .map_err(|e| {
1495 Error::Query(QueryError::new(
1496 QueryErrorKind::Semantic,
1497 e.to_string(),
1498 ))
1499 })?;
1500 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1501 }
1502 GraphTypeAlteration::AddEdgeType(name) => {
1503 self.catalog
1504 .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1505 .map_err(|e| {
1506 Error::Query(QueryError::new(
1507 QueryErrorKind::Semantic,
1508 e.to_string(),
1509 ))
1510 })?;
1511 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1512 }
1513 GraphTypeAlteration::DropEdgeType(name) => {
1514 self.catalog
1515 .alter_graph_type_drop_edge_type(&stmt.name, name)
1516 .map_err(|e| {
1517 Error::Query(QueryError::new(
1518 QueryErrorKind::Semantic,
1519 e.to_string(),
1520 ))
1521 })?;
1522 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1523 }
1524 }
1525 }
1526 wal_log!(
1527 self,
1528 WalRecord::AlterGraphType {
1529 name: stmt.name.clone(),
1530 alterations: wal_alts,
1531 }
1532 );
1533 Ok(QueryResult::status(format!(
1534 "Altered graph type '{}'",
1535 stmt.name
1536 )))
1537 }
1538 SchemaStatement::CreateProcedure(stmt) => {
1539 use crate::catalog::ProcedureDefinition;
1540
1541 let def = ProcedureDefinition {
1542 name: stmt.name.clone(),
1543 params: stmt
1544 .params
1545 .iter()
1546 .map(|p| (p.name.clone(), p.param_type.clone()))
1547 .collect(),
1548 returns: stmt
1549 .returns
1550 .iter()
1551 .map(|r| (r.name.clone(), r.return_type.clone()))
1552 .collect(),
1553 body: stmt.body.clone(),
1554 };
1555
1556 if stmt.or_replace {
1557 self.catalog.replace_procedure(def).map_err(|e| {
1558 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1559 })?;
1560 } else {
1561 match self.catalog.register_procedure(def) {
1562 Ok(()) => {}
1563 Err(_) if stmt.if_not_exists => {
1564 return Ok(QueryResult::empty());
1565 }
1566 Err(e) => {
1567 return Err(Error::Query(QueryError::new(
1568 QueryErrorKind::Semantic,
1569 e.to_string(),
1570 )));
1571 }
1572 }
1573 }
1574
1575 wal_log!(
1576 self,
1577 WalRecord::CreateProcedure {
1578 name: stmt.name.clone(),
1579 params: stmt
1580 .params
1581 .iter()
1582 .map(|p| (p.name.clone(), p.param_type.clone()))
1583 .collect(),
1584 returns: stmt
1585 .returns
1586 .iter()
1587 .map(|r| (r.name.clone(), r.return_type.clone()))
1588 .collect(),
1589 body: stmt.body,
1590 }
1591 );
1592 Ok(QueryResult::status(format!(
1593 "Created procedure '{}'",
1594 stmt.name
1595 )))
1596 }
1597 SchemaStatement::DropProcedure { name, if_exists } => {
1598 match self.catalog.drop_procedure(&name) {
1599 Ok(()) => {}
1600 Err(_) if if_exists => {
1601 return Ok(QueryResult::empty());
1602 }
1603 Err(e) => {
1604 return Err(Error::Query(QueryError::new(
1605 QueryErrorKind::Semantic,
1606 e.to_string(),
1607 )));
1608 }
1609 }
1610 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1611 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1612 }
1613 SchemaStatement::ShowIndexes => {
1614 return self.execute_show_indexes();
1615 }
1616 SchemaStatement::ShowConstraints => {
1617 return self.execute_show_constraints();
1618 }
1619 SchemaStatement::ShowNodeTypes => {
1620 return self.execute_show_node_types();
1621 }
1622 SchemaStatement::ShowEdgeTypes => {
1623 return self.execute_show_edge_types();
1624 }
1625 SchemaStatement::ShowGraphTypes => {
1626 return self.execute_show_graph_types();
1627 }
1628 SchemaStatement::ShowGraphType(name) => {
1629 return self.execute_show_graph_type(&name);
1630 }
1631 SchemaStatement::ShowCurrentGraphType => {
1632 return self.execute_show_current_graph_type();
1633 }
1634 SchemaStatement::ShowGraphs => {
1635 return self.execute_show_graphs();
1636 }
1637 SchemaStatement::ShowSchemas => {
1638 return self.execute_show_schemas();
1639 }
1640 };
1641
1642 if result.is_ok() {
1645 self.query_cache.clear();
1646 }
1647
1648 result
1649 }
1650
1651 #[cfg(all(feature = "gql", feature = "vector-index"))]
1653 fn create_vector_index_on_store(
1654 store: &LpgStore,
1655 label: &str,
1656 property: &str,
1657 dimensions: Option<usize>,
1658 metric: Option<&str>,
1659 ) -> Result<()> {
1660 use grafeo_common::types::{PropertyKey, Value};
1661 use grafeo_common::utils::error::Error;
1662 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1663
1664 let metric = match metric {
1665 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1666 Error::Internal(format!(
1667 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1668 ))
1669 })?,
1670 None => DistanceMetric::Cosine,
1671 };
1672
1673 let prop_key = PropertyKey::new(property);
1674 let mut found_dims: Option<usize> = dimensions;
1675 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1676
1677 for node in store.nodes_with_label(label) {
1678 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1679 if let Some(expected) = found_dims {
1680 if v.len() != expected {
1681 return Err(Error::Internal(format!(
1682 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1683 v.len(),
1684 node.id.0
1685 )));
1686 }
1687 } else {
1688 found_dims = Some(v.len());
1689 }
1690 vectors.push((node.id, v.to_vec()));
1691 }
1692 }
1693
1694 let Some(dims) = found_dims else {
1695 return Err(Error::Internal(format!(
1696 "No vector properties found on :{label}({property}) and no dimensions specified"
1697 )));
1698 };
1699
1700 let config = HnswConfig::new(dims, metric);
1701 let index = HnswIndex::with_capacity(config, vectors.len());
1702 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1703 for (node_id, vec) in &vectors {
1704 index.insert(*node_id, vec, &accessor);
1705 }
1706
1707 store.add_vector_index(label, property, Arc::new(index));
1708 Ok(())
1709 }
1710
1711 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1713 fn create_vector_index_on_store(
1714 _store: &LpgStore,
1715 _label: &str,
1716 _property: &str,
1717 _dimensions: Option<usize>,
1718 _metric: Option<&str>,
1719 ) -> Result<()> {
1720 Err(grafeo_common::utils::error::Error::Internal(
1721 "Vector index support requires the 'vector-index' feature".to_string(),
1722 ))
1723 }
1724
1725 #[cfg(all(feature = "gql", feature = "text-index"))]
1727 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1728 use grafeo_common::types::{PropertyKey, Value};
1729 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1730
1731 let mut index = InvertedIndex::new(BM25Config::default());
1732 let prop_key = PropertyKey::new(property);
1733
1734 let nodes = store.nodes_by_label(label);
1735 for node_id in nodes {
1736 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1737 index.insert(node_id, text.as_str());
1738 }
1739 }
1740
1741 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1742 Ok(())
1743 }
1744
1745 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1747 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1748 Err(grafeo_common::utils::error::Error::Internal(
1749 "Text index support requires the 'text-index' feature".to_string(),
1750 ))
1751 }
1752
1753 fn execute_show_indexes(&self) -> Result<QueryResult> {
1755 let indexes = self.catalog.all_indexes();
1756 let columns = vec![
1757 "name".to_string(),
1758 "type".to_string(),
1759 "label".to_string(),
1760 "property".to_string(),
1761 ];
1762 let rows: Vec<Vec<Value>> = indexes
1763 .into_iter()
1764 .map(|def| {
1765 let label_name = self
1766 .catalog
1767 .get_label_name(def.label)
1768 .unwrap_or_else(|| "?".into());
1769 let prop_name = self
1770 .catalog
1771 .get_property_key_name(def.property_key)
1772 .unwrap_or_else(|| "?".into());
1773 vec![
1774 Value::from(format!("idx_{}_{}", label_name, prop_name)),
1775 Value::from(format!("{:?}", def.index_type)),
1776 Value::from(&*label_name),
1777 Value::from(&*prop_name),
1778 ]
1779 })
1780 .collect();
1781 Ok(QueryResult {
1782 columns,
1783 column_types: Vec::new(),
1784 rows,
1785 ..QueryResult::empty()
1786 })
1787 }
1788
1789 fn execute_show_constraints(&self) -> Result<QueryResult> {
1791 Ok(QueryResult {
1794 columns: vec![
1795 "name".to_string(),
1796 "type".to_string(),
1797 "label".to_string(),
1798 "properties".to_string(),
1799 ],
1800 column_types: Vec::new(),
1801 rows: Vec::new(),
1802 ..QueryResult::empty()
1803 })
1804 }
1805
1806 fn execute_show_node_types(&self) -> Result<QueryResult> {
1808 let columns = vec![
1809 "name".to_string(),
1810 "properties".to_string(),
1811 "constraints".to_string(),
1812 "parents".to_string(),
1813 ];
1814 let type_names = self.catalog.all_node_type_names();
1815 let rows: Vec<Vec<Value>> = type_names
1816 .into_iter()
1817 .filter_map(|name| {
1818 let def = self.catalog.get_node_type(&name)?;
1819 let props: Vec<String> = def
1820 .properties
1821 .iter()
1822 .map(|p| {
1823 let nullable = if p.nullable { "" } else { " NOT NULL" };
1824 format!("{} {}{}", p.name, p.data_type, nullable)
1825 })
1826 .collect();
1827 let constraints: Vec<String> =
1828 def.constraints.iter().map(|c| format!("{c:?}")).collect();
1829 let parents = def.parent_types.join(", ");
1830 Some(vec![
1831 Value::from(name),
1832 Value::from(props.join(", ")),
1833 Value::from(constraints.join(", ")),
1834 Value::from(parents),
1835 ])
1836 })
1837 .collect();
1838 Ok(QueryResult {
1839 columns,
1840 column_types: Vec::new(),
1841 rows,
1842 ..QueryResult::empty()
1843 })
1844 }
1845
1846 fn execute_show_edge_types(&self) -> Result<QueryResult> {
1848 let columns = vec![
1849 "name".to_string(),
1850 "properties".to_string(),
1851 "source_types".to_string(),
1852 "target_types".to_string(),
1853 ];
1854 let type_names = self.catalog.all_edge_type_names();
1855 let rows: Vec<Vec<Value>> = type_names
1856 .into_iter()
1857 .filter_map(|name| {
1858 let def = self.catalog.get_edge_type_def(&name)?;
1859 let props: Vec<String> = def
1860 .properties
1861 .iter()
1862 .map(|p| {
1863 let nullable = if p.nullable { "" } else { " NOT NULL" };
1864 format!("{} {}{}", p.name, p.data_type, nullable)
1865 })
1866 .collect();
1867 let src = def.source_node_types.join(", ");
1868 let tgt = def.target_node_types.join(", ");
1869 Some(vec![
1870 Value::from(name),
1871 Value::from(props.join(", ")),
1872 Value::from(src),
1873 Value::from(tgt),
1874 ])
1875 })
1876 .collect();
1877 Ok(QueryResult {
1878 columns,
1879 column_types: Vec::new(),
1880 rows,
1881 ..QueryResult::empty()
1882 })
1883 }
1884
1885 fn execute_show_graph_types(&self) -> Result<QueryResult> {
1887 let columns = vec![
1888 "name".to_string(),
1889 "open".to_string(),
1890 "node_types".to_string(),
1891 "edge_types".to_string(),
1892 ];
1893 let type_names = self.catalog.all_graph_type_names();
1894 let rows: Vec<Vec<Value>> = type_names
1895 .into_iter()
1896 .filter_map(|name| {
1897 let def = self.catalog.get_graph_type_def(&name)?;
1898 Some(vec![
1899 Value::from(name),
1900 Value::from(def.open),
1901 Value::from(def.allowed_node_types.join(", ")),
1902 Value::from(def.allowed_edge_types.join(", ")),
1903 ])
1904 })
1905 .collect();
1906 Ok(QueryResult {
1907 columns,
1908 column_types: Vec::new(),
1909 rows,
1910 ..QueryResult::empty()
1911 })
1912 }
1913
1914 fn execute_show_graphs(&self) -> Result<QueryResult> {
1920 let schema = self.current_schema.lock().clone();
1921 let all_names = self.store.graph_names();
1922
1923 let mut names: Vec<String> = match &schema {
1924 Some(s) => {
1925 let prefix = format!("{s}/");
1926 all_names
1927 .into_iter()
1928 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1929 .collect()
1930 }
1931 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1932 };
1933 names.sort();
1934
1935 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1936 Ok(QueryResult {
1937 columns: vec!["name".to_string()],
1938 column_types: Vec::new(),
1939 rows,
1940 ..QueryResult::empty()
1941 })
1942 }
1943
1944 fn execute_show_schemas(&self) -> Result<QueryResult> {
1946 let mut names = self.catalog.schema_names();
1947 names.sort();
1948 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1949 Ok(QueryResult {
1950 columns: vec!["name".to_string()],
1951 column_types: Vec::new(),
1952 rows,
1953 ..QueryResult::empty()
1954 })
1955 }
1956
1957 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1959 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1960
1961 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1962 Error::Query(QueryError::new(
1963 QueryErrorKind::Semantic,
1964 format!("Graph type '{name}' not found"),
1965 ))
1966 })?;
1967
1968 let columns = vec![
1969 "name".to_string(),
1970 "open".to_string(),
1971 "node_types".to_string(),
1972 "edge_types".to_string(),
1973 ];
1974 let rows = vec![vec![
1975 Value::from(def.name),
1976 Value::from(def.open),
1977 Value::from(def.allowed_node_types.join(", ")),
1978 Value::from(def.allowed_edge_types.join(", ")),
1979 ]];
1980 Ok(QueryResult {
1981 columns,
1982 column_types: Vec::new(),
1983 rows,
1984 ..QueryResult::empty()
1985 })
1986 }
1987
1988 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1990 let graph_name = self
1991 .current_graph()
1992 .unwrap_or_else(|| "default".to_string());
1993 let columns = vec![
1994 "graph".to_string(),
1995 "graph_type".to_string(),
1996 "open".to_string(),
1997 "node_types".to_string(),
1998 "edge_types".to_string(),
1999 ];
2000
2001 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2002 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2003 {
2004 let rows = vec![vec![
2005 Value::from(graph_name),
2006 Value::from(type_name),
2007 Value::from(def.open),
2008 Value::from(def.allowed_node_types.join(", ")),
2009 Value::from(def.allowed_edge_types.join(", ")),
2010 ]];
2011 return Ok(QueryResult {
2012 columns,
2013 column_types: Vec::new(),
2014 rows,
2015 ..QueryResult::empty()
2016 });
2017 }
2018
2019 Ok(QueryResult {
2021 columns,
2022 column_types: Vec::new(),
2023 rows: vec![vec![
2024 Value::from(graph_name),
2025 Value::Null,
2026 Value::Null,
2027 Value::Null,
2028 Value::Null,
2029 ]],
2030 ..QueryResult::empty()
2031 })
2032 }
2033
2034 #[cfg(feature = "gql")]
2061 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2062 self.require_lpg("GQL")?;
2063
2064 use crate::query::{
2065 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2066 processor::QueryLanguage, translators::gql,
2067 };
2068
2069 let _span = tracing::info_span!(
2070 "grafeo::session::execute",
2071 language = "gql",
2072 query_len = query.len(),
2073 )
2074 .entered();
2075
2076 #[cfg(not(target_arch = "wasm32"))]
2077 let start_time = std::time::Instant::now();
2078
2079 let translation = gql::translate_full(query)?;
2081 let logical_plan = match translation {
2082 gql::GqlTranslationResult::SessionCommand(cmd) => {
2083 return self.execute_session_command(cmd);
2084 }
2085 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2086 if *self.read_only_tx.lock() {
2088 return Err(grafeo_common::utils::error::Error::Transaction(
2089 grafeo_common::utils::error::TransactionError::ReadOnly,
2090 ));
2091 }
2092 return self.execute_schema_command(cmd);
2093 }
2094 gql::GqlTranslationResult::Plan(plan) => {
2095 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2097 return Err(grafeo_common::utils::error::Error::Transaction(
2098 grafeo_common::utils::error::TransactionError::ReadOnly,
2099 ));
2100 }
2101 plan
2102 }
2103 };
2104
2105 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2107
2108 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2110 cached_plan
2111 } else {
2112 let mut binder = Binder::new();
2114 let _binding_context = binder.bind(&logical_plan)?;
2115
2116 let active = self.active_store();
2118 let optimizer = Optimizer::from_graph_store(&*active);
2119 let plan = optimizer.optimize(logical_plan)?;
2120
2121 self.query_cache.put_optimized(cache_key, plan.clone());
2123
2124 plan
2125 };
2126
2127 let active = self.active_store();
2129
2130 if optimized_plan.explain {
2132 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2133 let mut plan = optimized_plan;
2134 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2135 return Ok(explain_result(&plan));
2136 }
2137
2138 if optimized_plan.profile {
2140 let has_mutations = optimized_plan.root.has_mutations();
2141 return self.with_auto_commit(has_mutations, || {
2142 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2143 let planner = self.create_planner_for_store(
2144 Arc::clone(&active),
2145 viewing_epoch,
2146 transaction_id,
2147 );
2148 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2149
2150 let executor = Executor::with_columns(physical_plan.columns.clone())
2151 .with_deadline(self.query_deadline());
2152 let _result = executor.execute(physical_plan.operator.as_mut())?;
2153
2154 let total_time_ms;
2155 #[cfg(not(target_arch = "wasm32"))]
2156 {
2157 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2158 }
2159 #[cfg(target_arch = "wasm32")]
2160 {
2161 total_time_ms = 0.0;
2162 }
2163
2164 let profile_tree = crate::query::profile::build_profile_tree(
2165 &optimized_plan.root,
2166 &mut entries.into_iter(),
2167 );
2168 Ok(crate::query::profile::profile_result(
2169 &profile_tree,
2170 total_time_ms,
2171 ))
2172 });
2173 }
2174
2175 let has_mutations = optimized_plan.root.has_mutations();
2176
2177 let result = self.with_auto_commit(has_mutations, || {
2178 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2180
2181 let has_active_tx = self.current_transaction.lock().is_some();
2186 let read_only = !has_mutations && !has_active_tx;
2187 let planner = self.create_planner_for_store_with_read_only(
2188 Arc::clone(&active),
2189 viewing_epoch,
2190 transaction_id,
2191 read_only,
2192 );
2193 let mut physical_plan = planner.plan(&optimized_plan)?;
2194
2195 let executor = Executor::with_columns(physical_plan.columns.clone())
2197 .with_deadline(self.query_deadline());
2198 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2199
2200 let rows_scanned = result.rows.len() as u64;
2202 #[cfg(not(target_arch = "wasm32"))]
2203 {
2204 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2205 result.execution_time_ms = Some(elapsed_ms);
2206 }
2207 result.rows_scanned = Some(rows_scanned);
2208
2209 Ok(result)
2210 });
2211
2212 #[cfg(feature = "metrics")]
2214 {
2215 #[cfg(not(target_arch = "wasm32"))]
2216 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2217 #[cfg(target_arch = "wasm32")]
2218 let elapsed_ms = None;
2219 self.record_query_metrics("gql", elapsed_ms, &result);
2220 }
2221
2222 result
2223 }
2224
2225 #[cfg(feature = "gql")]
2234 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2235 let previous = self.viewing_epoch_override.lock().replace(epoch);
2236 let result = self.execute(query);
2237 *self.viewing_epoch_override.lock() = previous;
2238 result
2239 }
2240
2241 #[cfg(feature = "gql")]
2249 pub fn execute_at_epoch_with_params(
2250 &self,
2251 query: &str,
2252 epoch: EpochId,
2253 params: Option<std::collections::HashMap<String, Value>>,
2254 ) -> Result<QueryResult> {
2255 let previous = self.viewing_epoch_override.lock().replace(epoch);
2256 let result = if let Some(p) = params {
2257 self.execute_with_params(query, p)
2258 } else {
2259 self.execute(query)
2260 };
2261 *self.viewing_epoch_override.lock() = previous;
2262 result
2263 }
2264
2265 #[cfg(feature = "gql")]
2271 pub fn execute_with_params(
2272 &self,
2273 query: &str,
2274 params: std::collections::HashMap<String, Value>,
2275 ) -> Result<QueryResult> {
2276 self.require_lpg("GQL")?;
2277
2278 use crate::query::processor::{QueryLanguage, QueryProcessor};
2279
2280 let has_mutations = Self::query_looks_like_mutation(query);
2281 let active = self.active_store();
2282
2283 self.with_auto_commit(has_mutations, || {
2284 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2286
2287 let processor = QueryProcessor::for_graph_store_with_transaction(
2289 Arc::clone(&active),
2290 Arc::clone(&self.transaction_manager),
2291 )?;
2292
2293 let processor = if let Some(transaction_id) = transaction_id {
2295 processor.with_transaction_context(viewing_epoch, transaction_id)
2296 } else {
2297 processor
2298 };
2299
2300 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2301 })
2302 }
2303
2304 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2310 pub fn execute_with_params(
2311 &self,
2312 _query: &str,
2313 _params: std::collections::HashMap<String, Value>,
2314 ) -> Result<QueryResult> {
2315 Err(grafeo_common::utils::error::Error::Internal(
2316 "No query language enabled".to_string(),
2317 ))
2318 }
2319
2320 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2326 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2327 Err(grafeo_common::utils::error::Error::Internal(
2328 "No query language enabled".to_string(),
2329 ))
2330 }
2331
2332 #[cfg(feature = "cypher")]
2338 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2339 use crate::query::{
2340 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2341 processor::QueryLanguage, translators::cypher,
2342 };
2343 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2344
2345 let translation = cypher::translate_full(query)?;
2347 match translation {
2348 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2349 if *self.read_only_tx.lock() {
2350 return Err(GrafeoError::Query(QueryError::new(
2351 QueryErrorKind::Semantic,
2352 "Cannot execute schema DDL in a read-only transaction",
2353 )));
2354 }
2355 return self.execute_schema_command(cmd);
2356 }
2357 cypher::CypherTranslationResult::ShowIndexes => {
2358 return self.execute_show_indexes();
2359 }
2360 cypher::CypherTranslationResult::ShowConstraints => {
2361 return self.execute_show_constraints();
2362 }
2363 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2364 return self.execute_show_current_graph_type();
2365 }
2366 cypher::CypherTranslationResult::Plan(_) => {
2367 }
2369 }
2370
2371 #[cfg(not(target_arch = "wasm32"))]
2372 let start_time = std::time::Instant::now();
2373
2374 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2376
2377 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2379 cached_plan
2380 } else {
2381 let logical_plan = cypher::translate(query)?;
2383
2384 let mut binder = Binder::new();
2386 let _binding_context = binder.bind(&logical_plan)?;
2387
2388 let active = self.active_store();
2390 let optimizer = Optimizer::from_graph_store(&*active);
2391 let plan = optimizer.optimize(logical_plan)?;
2392
2393 self.query_cache.put_optimized(cache_key, plan.clone());
2395
2396 plan
2397 };
2398
2399 let active = self.active_store();
2401
2402 if optimized_plan.explain {
2404 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2405 let mut plan = optimized_plan;
2406 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2407 return Ok(explain_result(&plan));
2408 }
2409
2410 if optimized_plan.profile {
2412 let has_mutations = optimized_plan.root.has_mutations();
2413 return self.with_auto_commit(has_mutations, || {
2414 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2415 let planner = self.create_planner_for_store(
2416 Arc::clone(&active),
2417 viewing_epoch,
2418 transaction_id,
2419 );
2420 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2421
2422 let executor = Executor::with_columns(physical_plan.columns.clone())
2423 .with_deadline(self.query_deadline());
2424 let _result = executor.execute(physical_plan.operator.as_mut())?;
2425
2426 let total_time_ms;
2427 #[cfg(not(target_arch = "wasm32"))]
2428 {
2429 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2430 }
2431 #[cfg(target_arch = "wasm32")]
2432 {
2433 total_time_ms = 0.0;
2434 }
2435
2436 let profile_tree = crate::query::profile::build_profile_tree(
2437 &optimized_plan.root,
2438 &mut entries.into_iter(),
2439 );
2440 Ok(crate::query::profile::profile_result(
2441 &profile_tree,
2442 total_time_ms,
2443 ))
2444 });
2445 }
2446
2447 let has_mutations = optimized_plan.root.has_mutations();
2448
2449 let result = self.with_auto_commit(has_mutations, || {
2450 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2452
2453 let planner =
2455 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2456 let mut physical_plan = planner.plan(&optimized_plan)?;
2457
2458 let executor = Executor::with_columns(physical_plan.columns.clone())
2460 .with_deadline(self.query_deadline());
2461 executor.execute(physical_plan.operator.as_mut())
2462 });
2463
2464 #[cfg(feature = "metrics")]
2465 {
2466 #[cfg(not(target_arch = "wasm32"))]
2467 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2468 #[cfg(target_arch = "wasm32")]
2469 let elapsed_ms = None;
2470 self.record_query_metrics("cypher", elapsed_ms, &result);
2471 }
2472
2473 result
2474 }
2475
2476 #[cfg(feature = "gremlin")]
2500 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2501 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2502
2503 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2504 let start_time = Instant::now();
2505
2506 let logical_plan = gremlin::translate(query)?;
2508
2509 let mut binder = Binder::new();
2511 let _binding_context = binder.bind(&logical_plan)?;
2512
2513 let active = self.active_store();
2515 let optimizer = Optimizer::from_graph_store(&*active);
2516 let optimized_plan = optimizer.optimize(logical_plan)?;
2517
2518 let has_mutations = optimized_plan.root.has_mutations();
2519
2520 let result = self.with_auto_commit(has_mutations, || {
2521 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2523
2524 let planner =
2526 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2527 let mut physical_plan = planner.plan(&optimized_plan)?;
2528
2529 let executor = Executor::with_columns(physical_plan.columns.clone())
2531 .with_deadline(self.query_deadline());
2532 executor.execute(physical_plan.operator.as_mut())
2533 });
2534
2535 #[cfg(feature = "metrics")]
2536 {
2537 #[cfg(not(target_arch = "wasm32"))]
2538 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2539 #[cfg(target_arch = "wasm32")]
2540 let elapsed_ms = None;
2541 self.record_query_metrics("gremlin", elapsed_ms, &result);
2542 }
2543
2544 result
2545 }
2546
2547 #[cfg(feature = "gremlin")]
2553 pub fn execute_gremlin_with_params(
2554 &self,
2555 query: &str,
2556 params: std::collections::HashMap<String, Value>,
2557 ) -> Result<QueryResult> {
2558 use crate::query::processor::{QueryLanguage, QueryProcessor};
2559
2560 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2561 let start_time = Instant::now();
2562
2563 let has_mutations = Self::query_looks_like_mutation(query);
2564 let active = self.active_store();
2565
2566 let result = self.with_auto_commit(has_mutations, || {
2567 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2568 let processor = QueryProcessor::for_graph_store_with_transaction(
2569 Arc::clone(&active),
2570 Arc::clone(&self.transaction_manager),
2571 )?;
2572 let processor = if let Some(transaction_id) = transaction_id {
2573 processor.with_transaction_context(viewing_epoch, transaction_id)
2574 } else {
2575 processor
2576 };
2577 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2578 });
2579
2580 #[cfg(feature = "metrics")]
2581 {
2582 #[cfg(not(target_arch = "wasm32"))]
2583 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2584 #[cfg(target_arch = "wasm32")]
2585 let elapsed_ms = None;
2586 self.record_query_metrics("gremlin", elapsed_ms, &result);
2587 }
2588
2589 result
2590 }
2591
2592 #[cfg(feature = "graphql")]
2616 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2617 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2618
2619 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2620 let start_time = Instant::now();
2621
2622 let logical_plan = graphql::translate(query)?;
2623 let mut binder = Binder::new();
2624 let _binding_context = binder.bind(&logical_plan)?;
2625
2626 let active = self.active_store();
2627 let optimizer = Optimizer::from_graph_store(&*active);
2628 let optimized_plan = optimizer.optimize(logical_plan)?;
2629 let has_mutations = optimized_plan.root.has_mutations();
2630
2631 let result = self.with_auto_commit(has_mutations, || {
2632 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2633 let planner =
2634 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2635 let mut physical_plan = planner.plan(&optimized_plan)?;
2636 let executor = Executor::with_columns(physical_plan.columns.clone())
2637 .with_deadline(self.query_deadline());
2638 executor.execute(physical_plan.operator.as_mut())
2639 });
2640
2641 #[cfg(feature = "metrics")]
2642 {
2643 #[cfg(not(target_arch = "wasm32"))]
2644 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2645 #[cfg(target_arch = "wasm32")]
2646 let elapsed_ms = None;
2647 self.record_query_metrics("graphql", elapsed_ms, &result);
2648 }
2649
2650 result
2651 }
2652
2653 #[cfg(feature = "graphql")]
2659 pub fn execute_graphql_with_params(
2660 &self,
2661 query: &str,
2662 params: std::collections::HashMap<String, Value>,
2663 ) -> Result<QueryResult> {
2664 use crate::query::processor::{QueryLanguage, QueryProcessor};
2665
2666 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2667 let start_time = Instant::now();
2668
2669 let has_mutations = Self::query_looks_like_mutation(query);
2670 let active = self.active_store();
2671
2672 let result = self.with_auto_commit(has_mutations, || {
2673 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2674 let processor = QueryProcessor::for_graph_store_with_transaction(
2675 Arc::clone(&active),
2676 Arc::clone(&self.transaction_manager),
2677 )?;
2678 let processor = if let Some(transaction_id) = transaction_id {
2679 processor.with_transaction_context(viewing_epoch, transaction_id)
2680 } else {
2681 processor
2682 };
2683 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2684 });
2685
2686 #[cfg(feature = "metrics")]
2687 {
2688 #[cfg(not(target_arch = "wasm32"))]
2689 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2690 #[cfg(target_arch = "wasm32")]
2691 let elapsed_ms = None;
2692 self.record_query_metrics("graphql", elapsed_ms, &result);
2693 }
2694
2695 result
2696 }
2697
2698 #[cfg(feature = "sql-pgq")]
2723 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2724 use crate::query::{
2725 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2726 processor::QueryLanguage, translators::sql_pgq,
2727 };
2728
2729 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2730 let start_time = Instant::now();
2731
2732 let logical_plan = sql_pgq::translate(query)?;
2734
2735 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2737 return Ok(QueryResult {
2738 columns: vec!["status".into()],
2739 column_types: vec![grafeo_common::types::LogicalType::String],
2740 rows: vec![vec![Value::from(format!(
2741 "Property graph '{}' created ({} node tables, {} edge tables)",
2742 cpg.name,
2743 cpg.node_tables.len(),
2744 cpg.edge_tables.len()
2745 ))]],
2746 execution_time_ms: None,
2747 rows_scanned: None,
2748 status_message: None,
2749 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2750 });
2751 }
2752
2753 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2754
2755 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2756 cached_plan
2757 } else {
2758 let mut binder = Binder::new();
2759 let _binding_context = binder.bind(&logical_plan)?;
2760 let active = self.active_store();
2761 let optimizer = Optimizer::from_graph_store(&*active);
2762 let plan = optimizer.optimize(logical_plan)?;
2763 self.query_cache.put_optimized(cache_key, plan.clone());
2764 plan
2765 };
2766
2767 let active = self.active_store();
2768 let has_mutations = optimized_plan.root.has_mutations();
2769
2770 let result = self.with_auto_commit(has_mutations, || {
2771 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2772 let planner =
2773 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2774 let mut physical_plan = planner.plan(&optimized_plan)?;
2775 let executor = Executor::with_columns(physical_plan.columns.clone())
2776 .with_deadline(self.query_deadline());
2777 executor.execute(physical_plan.operator.as_mut())
2778 });
2779
2780 #[cfg(feature = "metrics")]
2781 {
2782 #[cfg(not(target_arch = "wasm32"))]
2783 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2784 #[cfg(target_arch = "wasm32")]
2785 let elapsed_ms = None;
2786 self.record_query_metrics("sql", elapsed_ms, &result);
2787 }
2788
2789 result
2790 }
2791
2792 #[cfg(feature = "sql-pgq")]
2798 pub fn execute_sql_with_params(
2799 &self,
2800 query: &str,
2801 params: std::collections::HashMap<String, Value>,
2802 ) -> Result<QueryResult> {
2803 use crate::query::processor::{QueryLanguage, QueryProcessor};
2804
2805 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2806 let start_time = Instant::now();
2807
2808 let has_mutations = Self::query_looks_like_mutation(query);
2809 let active = self.active_store();
2810
2811 let result = self.with_auto_commit(has_mutations, || {
2812 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2813 let processor = QueryProcessor::for_graph_store_with_transaction(
2814 Arc::clone(&active),
2815 Arc::clone(&self.transaction_manager),
2816 )?;
2817 let processor = if let Some(transaction_id) = transaction_id {
2818 processor.with_transaction_context(viewing_epoch, transaction_id)
2819 } else {
2820 processor
2821 };
2822 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
2823 });
2824
2825 #[cfg(feature = "metrics")]
2826 {
2827 #[cfg(not(target_arch = "wasm32"))]
2828 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2829 #[cfg(target_arch = "wasm32")]
2830 let elapsed_ms = None;
2831 self.record_query_metrics("sql", elapsed_ms, &result);
2832 }
2833
2834 result
2835 }
2836
2837 pub fn execute_language(
2846 &self,
2847 query: &str,
2848 language: &str,
2849 params: Option<std::collections::HashMap<String, Value>>,
2850 ) -> Result<QueryResult> {
2851 let _span = tracing::info_span!(
2852 "grafeo::session::execute",
2853 language,
2854 query_len = query.len(),
2855 )
2856 .entered();
2857 match language {
2858 "gql" => {
2859 if let Some(p) = params {
2860 self.execute_with_params(query, p)
2861 } else {
2862 self.execute(query)
2863 }
2864 }
2865 #[cfg(feature = "cypher")]
2866 "cypher" => {
2867 if let Some(p) = params {
2868 use crate::query::processor::{QueryLanguage, QueryProcessor};
2869
2870 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2871 let start_time = Instant::now();
2872
2873 let has_mutations = Self::query_looks_like_mutation(query);
2874 let active = self.active_store();
2875 let result = self.with_auto_commit(has_mutations, || {
2876 let processor = QueryProcessor::for_graph_store_with_transaction(
2877 Arc::clone(&active),
2878 Arc::clone(&self.transaction_manager),
2879 )?;
2880 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2881 let processor = if let Some(transaction_id) = transaction_id {
2882 processor.with_transaction_context(viewing_epoch, transaction_id)
2883 } else {
2884 processor
2885 };
2886 processor.process(query, QueryLanguage::Cypher, Some(&p))
2887 });
2888
2889 #[cfg(feature = "metrics")]
2890 {
2891 #[cfg(not(target_arch = "wasm32"))]
2892 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2893 #[cfg(target_arch = "wasm32")]
2894 let elapsed_ms = None;
2895 self.record_query_metrics("cypher", elapsed_ms, &result);
2896 }
2897
2898 result
2899 } else {
2900 self.execute_cypher(query)
2901 }
2902 }
2903 #[cfg(feature = "gremlin")]
2904 "gremlin" => {
2905 if let Some(p) = params {
2906 self.execute_gremlin_with_params(query, p)
2907 } else {
2908 self.execute_gremlin(query)
2909 }
2910 }
2911 #[cfg(feature = "graphql")]
2912 "graphql" => {
2913 if let Some(p) = params {
2914 self.execute_graphql_with_params(query, p)
2915 } else {
2916 self.execute_graphql(query)
2917 }
2918 }
2919 #[cfg(all(feature = "graphql", feature = "rdf"))]
2920 "graphql-rdf" => {
2921 if let Some(p) = params {
2922 self.execute_graphql_rdf_with_params(query, p)
2923 } else {
2924 self.execute_graphql_rdf(query)
2925 }
2926 }
2927 #[cfg(feature = "sql-pgq")]
2928 "sql" | "sql-pgq" => {
2929 if let Some(p) = params {
2930 self.execute_sql_with_params(query, p)
2931 } else {
2932 self.execute_sql(query)
2933 }
2934 }
2935 #[cfg(all(feature = "sparql", feature = "rdf"))]
2936 "sparql" => {
2937 if let Some(p) = params {
2938 self.execute_sparql_with_params(query, p)
2939 } else {
2940 self.execute_sparql(query)
2941 }
2942 }
2943 other => Err(grafeo_common::utils::error::Error::Query(
2944 grafeo_common::utils::error::QueryError::new(
2945 grafeo_common::utils::error::QueryErrorKind::Semantic,
2946 format!("Unknown query language: '{other}'"),
2947 ),
2948 )),
2949 }
2950 }
2951
2952 pub fn clear_plan_cache(&self) {
2979 self.query_cache.clear();
2980 }
2981
2982 pub fn begin_transaction(&mut self) -> Result<()> {
2990 self.begin_transaction_inner(false, None)
2991 }
2992
2993 pub fn begin_transaction_with_isolation(
3001 &mut self,
3002 isolation_level: crate::transaction::IsolationLevel,
3003 ) -> Result<()> {
3004 self.begin_transaction_inner(false, Some(isolation_level))
3005 }
3006
3007 fn begin_transaction_inner(
3009 &self,
3010 read_only: bool,
3011 isolation_level: Option<crate::transaction::IsolationLevel>,
3012 ) -> Result<()> {
3013 let _span = tracing::debug_span!("grafeo::tx::begin", read_only).entered();
3014 let mut current = self.current_transaction.lock();
3015 if current.is_some() {
3016 drop(current);
3018 let mut depth = self.transaction_nesting_depth.lock();
3019 *depth += 1;
3020 let sp_name = format!("_nested_tx_{}", *depth);
3021 self.savepoint(&sp_name)?;
3022 return Ok(());
3023 }
3024
3025 let active = self.active_lpg_store();
3026 self.transaction_start_node_count
3027 .store(active.node_count(), Ordering::Relaxed);
3028 self.transaction_start_edge_count
3029 .store(active.edge_count(), Ordering::Relaxed);
3030 let transaction_id = if let Some(level) = isolation_level {
3031 self.transaction_manager.begin_with_isolation(level)
3032 } else {
3033 self.transaction_manager.begin()
3034 };
3035 *current = Some(transaction_id);
3036 *self.read_only_tx.lock() = read_only || self.db_read_only;
3037
3038 let key = self.active_graph_storage_key();
3041 let mut touched = self.touched_graphs.lock();
3042 touched.clear();
3043 touched.push(key);
3044
3045 #[cfg(feature = "metrics")]
3046 {
3047 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3048 #[cfg(not(target_arch = "wasm32"))]
3049 {
3050 *self.tx_start_time.lock() = Some(Instant::now());
3051 }
3052 }
3053
3054 Ok(())
3055 }
3056
3057 pub fn commit(&mut self) -> Result<()> {
3065 self.commit_inner()
3066 }
3067
3068 fn commit_inner(&self) -> Result<()> {
3070 let _span = tracing::debug_span!("grafeo::tx::commit").entered();
3071 {
3073 let mut depth = self.transaction_nesting_depth.lock();
3074 if *depth > 0 {
3075 let sp_name = format!("_nested_tx_{depth}");
3076 *depth -= 1;
3077 drop(depth);
3078 return self.release_savepoint(&sp_name);
3079 }
3080 }
3081
3082 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3083 grafeo_common::utils::error::Error::Transaction(
3084 grafeo_common::utils::error::TransactionError::InvalidState(
3085 "No active transaction".to_string(),
3086 ),
3087 )
3088 })?;
3089
3090 let touched = self.touched_graphs.lock().clone();
3093 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3094 Ok(epoch) => epoch,
3095 Err(e) => {
3096 for graph_name in &touched {
3098 let store = self.resolve_store(graph_name);
3099 store.rollback_transaction_properties(transaction_id);
3100 }
3101 #[cfg(feature = "rdf")]
3102 self.rollback_rdf_transaction(transaction_id);
3103 *self.read_only_tx.lock() = self.db_read_only;
3104 self.savepoints.lock().clear();
3105 self.touched_graphs.lock().clear();
3106 #[cfg(feature = "metrics")]
3107 {
3108 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3109 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3110 #[cfg(not(target_arch = "wasm32"))]
3111 if let Some(start) = self.tx_start_time.lock().take() {
3112 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3113 crate::metrics::record_metric!(
3114 self.metrics,
3115 tx_duration,
3116 observe duration_ms
3117 );
3118 }
3119 }
3120 return Err(e);
3121 }
3122 };
3123
3124 for graph_name in &touched {
3126 let store = self.resolve_store(graph_name);
3127 store.finalize_version_epochs(transaction_id, commit_epoch);
3128 }
3129
3130 #[cfg(feature = "rdf")]
3132 self.commit_rdf_transaction(transaction_id);
3133
3134 for graph_name in &touched {
3135 let store = self.resolve_store(graph_name);
3136 store.commit_transaction_properties(transaction_id);
3137 }
3138
3139 let current_epoch = self.transaction_manager.current_epoch();
3142 for graph_name in &touched {
3143 let store = self.resolve_store(graph_name);
3144 store.sync_epoch(current_epoch);
3145 }
3146
3147 *self.read_only_tx.lock() = self.db_read_only;
3149 self.savepoints.lock().clear();
3150 self.touched_graphs.lock().clear();
3151
3152 if self.gc_interval > 0 {
3154 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3155 if count.is_multiple_of(self.gc_interval) {
3156 let min_epoch = self.transaction_manager.min_active_epoch();
3157 for graph_name in &touched {
3158 let store = self.resolve_store(graph_name);
3159 store.gc_versions(min_epoch);
3160 }
3161 self.transaction_manager.gc();
3162 #[cfg(feature = "metrics")]
3163 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3164 }
3165 }
3166
3167 #[cfg(feature = "metrics")]
3168 {
3169 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3170 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3171 #[cfg(not(target_arch = "wasm32"))]
3172 if let Some(start) = self.tx_start_time.lock().take() {
3173 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3174 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3175 }
3176 }
3177
3178 Ok(())
3179 }
3180
3181 pub fn rollback(&mut self) -> Result<()> {
3205 self.rollback_inner()
3206 }
3207
3208 fn rollback_inner(&self) -> Result<()> {
3210 let _span = tracing::debug_span!("grafeo::tx::rollback").entered();
3211 {
3213 let mut depth = self.transaction_nesting_depth.lock();
3214 if *depth > 0 {
3215 let sp_name = format!("_nested_tx_{depth}");
3216 *depth -= 1;
3217 drop(depth);
3218 return self.rollback_to_savepoint(&sp_name);
3219 }
3220 }
3221
3222 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3223 grafeo_common::utils::error::Error::Transaction(
3224 grafeo_common::utils::error::TransactionError::InvalidState(
3225 "No active transaction".to_string(),
3226 ),
3227 )
3228 })?;
3229
3230 *self.read_only_tx.lock() = self.db_read_only;
3232
3233 let touched = self.touched_graphs.lock().clone();
3235 for graph_name in &touched {
3236 let store = self.resolve_store(graph_name);
3237 store.discard_uncommitted_versions(transaction_id);
3238 }
3239
3240 #[cfg(feature = "rdf")]
3242 self.rollback_rdf_transaction(transaction_id);
3243
3244 self.savepoints.lock().clear();
3246 self.touched_graphs.lock().clear();
3247
3248 let result = self.transaction_manager.abort(transaction_id);
3250
3251 #[cfg(feature = "metrics")]
3252 if result.is_ok() {
3253 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3254 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3255 #[cfg(not(target_arch = "wasm32"))]
3256 if let Some(start) = self.tx_start_time.lock().take() {
3257 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3258 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3259 }
3260 }
3261
3262 result
3263 }
3264
3265 pub fn savepoint(&self, name: &str) -> Result<()> {
3275 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3276 grafeo_common::utils::error::Error::Transaction(
3277 grafeo_common::utils::error::TransactionError::InvalidState(
3278 "No active transaction".to_string(),
3279 ),
3280 )
3281 })?;
3282
3283 let touched = self.touched_graphs.lock().clone();
3285 let graph_snapshots: Vec<GraphSavepoint> = touched
3286 .iter()
3287 .map(|graph_name| {
3288 let store = self.resolve_store(graph_name);
3289 GraphSavepoint {
3290 graph_name: graph_name.clone(),
3291 next_node_id: store.peek_next_node_id(),
3292 next_edge_id: store.peek_next_edge_id(),
3293 undo_log_position: store.property_undo_log_position(tx_id),
3294 }
3295 })
3296 .collect();
3297
3298 self.savepoints.lock().push(SavepointState {
3299 name: name.to_string(),
3300 graph_snapshots,
3301 active_graph: self.current_graph.lock().clone(),
3302 });
3303 Ok(())
3304 }
3305
3306 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3315 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3316 grafeo_common::utils::error::Error::Transaction(
3317 grafeo_common::utils::error::TransactionError::InvalidState(
3318 "No active transaction".to_string(),
3319 ),
3320 )
3321 })?;
3322
3323 let mut savepoints = self.savepoints.lock();
3324
3325 let pos = savepoints
3327 .iter()
3328 .rposition(|sp| sp.name == name)
3329 .ok_or_else(|| {
3330 grafeo_common::utils::error::Error::Transaction(
3331 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3332 "Savepoint '{name}' not found"
3333 )),
3334 )
3335 })?;
3336
3337 let sp_state = savepoints[pos].clone();
3338
3339 savepoints.truncate(pos);
3341 drop(savepoints);
3342
3343 for gs in &sp_state.graph_snapshots {
3345 let store = self.resolve_store(&gs.graph_name);
3346
3347 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3349
3350 let current_next_node = store.peek_next_node_id();
3352 let current_next_edge = store.peek_next_edge_id();
3353
3354 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3355 .map(NodeId::new)
3356 .collect();
3357 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3358 .map(EdgeId::new)
3359 .collect();
3360
3361 if !node_ids.is_empty() || !edge_ids.is_empty() {
3362 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3363 }
3364 }
3365
3366 let touched = self.touched_graphs.lock().clone();
3370 for graph_name in &touched {
3371 let already_captured = sp_state
3372 .graph_snapshots
3373 .iter()
3374 .any(|gs| gs.graph_name == *graph_name);
3375 if !already_captured {
3376 let store = self.resolve_store(graph_name);
3377 store.discard_uncommitted_versions(transaction_id);
3378 }
3379 }
3380
3381 let mut touched = self.touched_graphs.lock();
3383 touched.clear();
3384 for gs in &sp_state.graph_snapshots {
3385 if !touched.contains(&gs.graph_name) {
3386 touched.push(gs.graph_name.clone());
3387 }
3388 }
3389
3390 Ok(())
3391 }
3392
3393 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3399 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3400 grafeo_common::utils::error::Error::Transaction(
3401 grafeo_common::utils::error::TransactionError::InvalidState(
3402 "No active transaction".to_string(),
3403 ),
3404 )
3405 })?;
3406
3407 let mut savepoints = self.savepoints.lock();
3408 let pos = savepoints
3409 .iter()
3410 .rposition(|sp| sp.name == name)
3411 .ok_or_else(|| {
3412 grafeo_common::utils::error::Error::Transaction(
3413 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3414 "Savepoint '{name}' not found"
3415 )),
3416 )
3417 })?;
3418 savepoints.remove(pos);
3419 Ok(())
3420 }
3421
3422 #[must_use]
3424 pub fn in_transaction(&self) -> bool {
3425 self.current_transaction.lock().is_some()
3426 }
3427
3428 #[must_use]
3430 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3431 *self.current_transaction.lock()
3432 }
3433
3434 #[must_use]
3436 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3437 &self.transaction_manager
3438 }
3439
3440 #[must_use]
3442 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3443 (
3444 self.transaction_start_node_count.load(Ordering::Relaxed),
3445 self.active_lpg_store().node_count(),
3446 )
3447 }
3448
3449 #[must_use]
3451 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3452 (
3453 self.transaction_start_edge_count.load(Ordering::Relaxed),
3454 self.active_lpg_store().edge_count(),
3455 )
3456 }
3457
3458 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3492 crate::transaction::PreparedCommit::new(self)
3493 }
3494
3495 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3497 self.auto_commit = auto_commit;
3498 }
3499
3500 #[must_use]
3502 pub fn auto_commit(&self) -> bool {
3503 self.auto_commit
3504 }
3505
3506 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3511 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3512 }
3513
3514 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3517 where
3518 F: FnOnce() -> Result<QueryResult>,
3519 {
3520 if self.needs_auto_commit(has_mutations) {
3521 self.begin_transaction_inner(false, None)?;
3522 match body() {
3523 Ok(result) => {
3524 self.commit_inner()?;
3525 Ok(result)
3526 }
3527 Err(e) => {
3528 let _ = self.rollback_inner();
3529 Err(e)
3530 }
3531 }
3532 } else {
3533 body()
3534 }
3535 }
3536
3537 fn query_looks_like_mutation(query: &str) -> bool {
3543 let upper = query.to_ascii_uppercase();
3544 upper.contains("INSERT")
3545 || upper.contains("CREATE")
3546 || upper.contains("DELETE")
3547 || upper.contains("MERGE")
3548 || upper.contains("SET")
3549 || upper.contains("REMOVE")
3550 || upper.contains("DROP")
3551 || upper.contains("ALTER")
3552 }
3553
3554 #[must_use]
3556 fn query_deadline(&self) -> Option<Instant> {
3557 #[cfg(not(target_arch = "wasm32"))]
3558 {
3559 self.query_timeout.map(|d| Instant::now() + d)
3560 }
3561 #[cfg(target_arch = "wasm32")]
3562 {
3563 let _ = &self.query_timeout;
3564 None
3565 }
3566 }
3567
3568 #[cfg(feature = "metrics")]
3574 fn record_query_metrics(
3575 &self,
3576 language: &str,
3577 elapsed_ms: Option<f64>,
3578 result: &Result<crate::database::QueryResult>,
3579 ) {
3580 use crate::metrics::record_metric;
3581
3582 record_metric!(self.metrics, query_count, inc);
3583 if let Some(ref reg) = self.metrics {
3584 reg.query_count_by_language.increment(language);
3585 }
3586 if let Some(ms) = elapsed_ms {
3587 record_metric!(self.metrics, query_latency, observe ms);
3588 }
3589 match result {
3590 Ok(r) => {
3591 let returned = r.rows.len() as u64;
3592 record_metric!(self.metrics, rows_returned, add returned);
3593 if let Some(scanned) = r.rows_scanned {
3594 record_metric!(self.metrics, rows_scanned, add scanned);
3595 }
3596 }
3597 Err(e) => {
3598 record_metric!(self.metrics, query_errors, inc);
3599 let msg = e.to_string();
3601 if msg.contains("exceeded timeout") {
3602 record_metric!(self.metrics, query_timeouts, inc);
3603 }
3604 }
3605 }
3606 }
3607
3608 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3610 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3611 match expr {
3612 Expression::Literal(Literal::Integer(n)) => Some(*n),
3613 _ => None,
3614 }
3615 }
3616
3617 #[must_use]
3623 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3624 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3626 return (epoch, None);
3627 }
3628
3629 if let Some(transaction_id) = *self.current_transaction.lock() {
3630 let epoch = self
3632 .transaction_manager
3633 .start_epoch(transaction_id)
3634 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3635 (epoch, Some(transaction_id))
3636 } else {
3637 (self.transaction_manager.current_epoch(), None)
3639 }
3640 }
3641
3642 fn create_planner_for_store(
3647 &self,
3648 store: Arc<dyn GraphStoreMut>,
3649 viewing_epoch: EpochId,
3650 transaction_id: Option<TransactionId>,
3651 ) -> crate::query::Planner {
3652 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3653 }
3654
3655 fn create_planner_for_store_with_read_only(
3656 &self,
3657 store: Arc<dyn GraphStoreMut>,
3658 viewing_epoch: EpochId,
3659 transaction_id: Option<TransactionId>,
3660 read_only: bool,
3661 ) -> crate::query::Planner {
3662 use crate::query::Planner;
3663 use grafeo_core::execution::operators::{LazyValue, SessionContext};
3664
3665 let info_store = Arc::clone(&store);
3667 let schema_store = Arc::clone(&store);
3668
3669 let session_context = SessionContext {
3670 current_schema: self.current_schema(),
3671 current_graph: self.current_graph(),
3672 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3673 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3674 };
3675
3676 let mut planner = Planner::with_context(
3677 Arc::clone(&store),
3678 Arc::clone(&self.transaction_manager),
3679 transaction_id,
3680 viewing_epoch,
3681 )
3682 .with_factorized_execution(self.factorized_execution)
3683 .with_catalog(Arc::clone(&self.catalog))
3684 .with_session_context(session_context)
3685 .with_read_only(read_only);
3686
3687 let validator =
3689 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3690 planner = planner.with_validator(Arc::new(validator));
3691
3692 planner
3693 }
3694
3695 fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3697 use grafeo_common::types::PropertyKey;
3698 use std::collections::BTreeMap;
3699
3700 let mut map = BTreeMap::new();
3701 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3702 map.insert(
3703 PropertyKey::from("node_count"),
3704 Value::Int64(store.node_count() as i64),
3705 );
3706 map.insert(
3707 PropertyKey::from("edge_count"),
3708 Value::Int64(store.edge_count() as i64),
3709 );
3710 map.insert(
3711 PropertyKey::from("version"),
3712 Value::String(env!("CARGO_PKG_VERSION").into()),
3713 );
3714 Value::Map(map.into())
3715 }
3716
3717 fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3719 use grafeo_common::types::PropertyKey;
3720 use std::collections::BTreeMap;
3721
3722 let labels: Vec<Value> = store
3723 .all_labels()
3724 .into_iter()
3725 .map(|l| Value::String(l.into()))
3726 .collect();
3727 let edge_types: Vec<Value> = store
3728 .all_edge_types()
3729 .into_iter()
3730 .map(|t| Value::String(t.into()))
3731 .collect();
3732 let property_keys: Vec<Value> = store
3733 .all_property_keys()
3734 .into_iter()
3735 .map(|k| Value::String(k.into()))
3736 .collect();
3737
3738 let mut map = BTreeMap::new();
3739 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3740 map.insert(
3741 PropertyKey::from("edge_types"),
3742 Value::List(edge_types.into()),
3743 );
3744 map.insert(
3745 PropertyKey::from("property_keys"),
3746 Value::List(property_keys.into()),
3747 );
3748 Value::Map(map.into())
3749 }
3750
3751 pub fn create_node(&self, labels: &[&str]) -> NodeId {
3756 let (epoch, transaction_id) = self.get_transaction_context();
3757 self.active_lpg_store().create_node_versioned(
3758 labels,
3759 epoch,
3760 transaction_id.unwrap_or(TransactionId::SYSTEM),
3761 )
3762 }
3763
3764 pub fn create_node_with_props<'a>(
3768 &self,
3769 labels: &[&str],
3770 properties: impl IntoIterator<Item = (&'a str, Value)>,
3771 ) -> NodeId {
3772 let (epoch, transaction_id) = self.get_transaction_context();
3773 self.active_lpg_store().create_node_with_props_versioned(
3774 labels,
3775 properties,
3776 epoch,
3777 transaction_id.unwrap_or(TransactionId::SYSTEM),
3778 )
3779 }
3780
3781 pub fn create_edge(
3786 &self,
3787 src: NodeId,
3788 dst: NodeId,
3789 edge_type: &str,
3790 ) -> grafeo_common::types::EdgeId {
3791 let (epoch, transaction_id) = self.get_transaction_context();
3792 self.active_lpg_store().create_edge_versioned(
3793 src,
3794 dst,
3795 edge_type,
3796 epoch,
3797 transaction_id.unwrap_or(TransactionId::SYSTEM),
3798 )
3799 }
3800
3801 #[must_use]
3829 pub fn get_node(&self, id: NodeId) -> Option<Node> {
3830 let (epoch, transaction_id) = self.get_transaction_context();
3831 self.active_lpg_store().get_node_versioned(
3832 id,
3833 epoch,
3834 transaction_id.unwrap_or(TransactionId::SYSTEM),
3835 )
3836 }
3837
3838 #[must_use]
3862 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3863 self.get_node(id)
3864 .and_then(|node| node.get_property(key).cloned())
3865 }
3866
3867 #[must_use]
3874 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3875 let (epoch, transaction_id) = self.get_transaction_context();
3876 self.active_lpg_store().get_edge_versioned(
3877 id,
3878 epoch,
3879 transaction_id.unwrap_or(TransactionId::SYSTEM),
3880 )
3881 }
3882
3883 #[must_use]
3909 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3910 self.active_lpg_store()
3911 .edges_from(node, Direction::Outgoing)
3912 .collect()
3913 }
3914
3915 #[must_use]
3924 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3925 self.active_lpg_store()
3926 .edges_from(node, Direction::Incoming)
3927 .collect()
3928 }
3929
3930 #[must_use]
3942 pub fn get_neighbors_outgoing_by_type(
3943 &self,
3944 node: NodeId,
3945 edge_type: &str,
3946 ) -> Vec<(NodeId, EdgeId)> {
3947 self.active_lpg_store()
3948 .edges_from(node, Direction::Outgoing)
3949 .filter(|(_, edge_id)| {
3950 self.get_edge(*edge_id)
3951 .is_some_and(|e| e.edge_type.as_str() == edge_type)
3952 })
3953 .collect()
3954 }
3955
3956 #[must_use]
3963 pub fn node_exists(&self, id: NodeId) -> bool {
3964 self.get_node(id).is_some()
3965 }
3966
3967 #[must_use]
3969 pub fn edge_exists(&self, id: EdgeId) -> bool {
3970 self.get_edge(id).is_some()
3971 }
3972
3973 #[must_use]
3977 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3978 let active = self.active_lpg_store();
3979 let out = active.out_degree(node);
3980 let in_degree = active.in_degree(node);
3981 (out, in_degree)
3982 }
3983
3984 #[must_use]
3994 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3995 let (epoch, transaction_id) = self.get_transaction_context();
3996 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3997 let active = self.active_lpg_store();
3998 ids.iter()
3999 .map(|&id| active.get_node_versioned(id, epoch, tx))
4000 .collect()
4001 }
4002
4003 #[cfg(feature = "cdc")]
4007 pub fn history(
4008 &self,
4009 entity_id: impl Into<crate::cdc::EntityId>,
4010 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4011 Ok(self.cdc_log.history(entity_id.into()))
4012 }
4013
4014 #[cfg(feature = "cdc")]
4016 pub fn history_since(
4017 &self,
4018 entity_id: impl Into<crate::cdc::EntityId>,
4019 since_epoch: EpochId,
4020 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4021 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4022 }
4023
4024 #[cfg(feature = "cdc")]
4026 pub fn changes_between(
4027 &self,
4028 start_epoch: EpochId,
4029 end_epoch: EpochId,
4030 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4031 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4032 }
4033}
4034
4035impl Drop for Session {
4036 fn drop(&mut self) {
4037 if self.in_transaction() {
4040 let _ = self.rollback_inner();
4041 }
4042
4043 #[cfg(feature = "metrics")]
4044 if let Some(ref reg) = self.metrics {
4045 reg.session_active
4046 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4047 }
4048 }
4049}
4050
4051#[cfg(test)]
4052mod tests {
4053 use super::parse_default_literal;
4054 use crate::database::GrafeoDB;
4055 use grafeo_common::types::Value;
4056
4057 #[test]
4062 fn parse_default_literal_null() {
4063 assert_eq!(parse_default_literal("null"), Value::Null);
4064 assert_eq!(parse_default_literal("NULL"), Value::Null);
4065 assert_eq!(parse_default_literal("Null"), Value::Null);
4066 }
4067
4068 #[test]
4069 fn parse_default_literal_bool() {
4070 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4071 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4072 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4073 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4074 }
4075
4076 #[test]
4077 fn parse_default_literal_string_single_quoted() {
4078 assert_eq!(
4079 parse_default_literal("'hello'"),
4080 Value::String("hello".into())
4081 );
4082 }
4083
4084 #[test]
4085 fn parse_default_literal_string_double_quoted() {
4086 assert_eq!(
4087 parse_default_literal("\"world\""),
4088 Value::String("world".into())
4089 );
4090 }
4091
4092 #[test]
4093 fn parse_default_literal_integer() {
4094 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4095 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4096 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4097 }
4098
4099 #[test]
4100 fn parse_default_literal_float() {
4101 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4102 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4103 }
4104
4105 #[test]
4106 fn parse_default_literal_fallback_string() {
4107 assert_eq!(
4109 parse_default_literal("some_identifier"),
4110 Value::String("some_identifier".into())
4111 );
4112 }
4113
4114 #[test]
4115 fn test_session_create_node() {
4116 let db = GrafeoDB::new_in_memory();
4117 let session = db.session();
4118
4119 let id = session.create_node(&["Person"]);
4120 assert!(id.is_valid());
4121 assert_eq!(db.node_count(), 1);
4122 }
4123
4124 #[test]
4125 fn test_session_transaction() {
4126 let db = GrafeoDB::new_in_memory();
4127 let mut session = db.session();
4128
4129 assert!(!session.in_transaction());
4130
4131 session.begin_transaction().unwrap();
4132 assert!(session.in_transaction());
4133
4134 session.commit().unwrap();
4135 assert!(!session.in_transaction());
4136 }
4137
4138 #[test]
4139 fn test_session_transaction_context() {
4140 let db = GrafeoDB::new_in_memory();
4141 let mut session = db.session();
4142
4143 let (_epoch1, transaction_id1) = session.get_transaction_context();
4145 assert!(transaction_id1.is_none());
4146
4147 session.begin_transaction().unwrap();
4149 let (epoch2, transaction_id2) = session.get_transaction_context();
4150 assert!(transaction_id2.is_some());
4151 let _ = epoch2; session.commit().unwrap();
4156 let (epoch3, tx_id3) = session.get_transaction_context();
4157 assert!(tx_id3.is_none());
4158 assert!(epoch3.as_u64() >= epoch2.as_u64());
4160 }
4161
4162 #[test]
4163 fn test_session_rollback() {
4164 let db = GrafeoDB::new_in_memory();
4165 let mut session = db.session();
4166
4167 session.begin_transaction().unwrap();
4168 session.rollback().unwrap();
4169 assert!(!session.in_transaction());
4170 }
4171
4172 #[test]
4173 fn test_session_rollback_discards_versions() {
4174 use grafeo_common::types::TransactionId;
4175
4176 let db = GrafeoDB::new_in_memory();
4177
4178 let node_before = db.store().create_node(&["Person"]);
4180 assert!(node_before.is_valid());
4181 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4182
4183 let mut session = db.session();
4185 session.begin_transaction().unwrap();
4186 let transaction_id = session.current_transaction.lock().unwrap();
4187
4188 let epoch = db.store().current_epoch();
4190 let node_in_tx = db
4191 .store()
4192 .create_node_versioned(&["Person"], epoch, transaction_id);
4193 assert!(node_in_tx.is_valid());
4194
4195 assert_eq!(
4199 db.node_count(),
4200 1,
4201 "PENDING nodes should be invisible to non-versioned node_count()"
4202 );
4203 assert!(
4204 db.store()
4205 .get_node_versioned(node_in_tx, epoch, transaction_id)
4206 .is_some(),
4207 "Transaction node should be visible to its own transaction"
4208 );
4209
4210 session.rollback().unwrap();
4212 assert!(!session.in_transaction());
4213
4214 let count_after = db.node_count();
4217 assert_eq!(
4218 count_after, 1,
4219 "Rollback should discard uncommitted node, but got {count_after}"
4220 );
4221
4222 let current_epoch = db.store().current_epoch();
4224 assert!(
4225 db.store()
4226 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4227 .is_some(),
4228 "Original node should still exist"
4229 );
4230
4231 assert!(
4233 db.store()
4234 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4235 .is_none(),
4236 "Transaction node should be gone"
4237 );
4238 }
4239
4240 #[test]
4241 fn test_session_create_node_in_transaction() {
4242 let db = GrafeoDB::new_in_memory();
4244
4245 let node_before = db.create_node(&["Person"]);
4247 assert!(node_before.is_valid());
4248 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4249
4250 let mut session = db.session();
4252 session.begin_transaction().unwrap();
4253 let transaction_id = session.current_transaction.lock().unwrap();
4254
4255 let node_in_tx = session.create_node(&["Person"]);
4257 assert!(node_in_tx.is_valid());
4258
4259 assert_eq!(
4262 db.node_count(),
4263 1,
4264 "PENDING nodes should be invisible to non-versioned node_count()"
4265 );
4266 let epoch = db.store().current_epoch();
4267 assert!(
4268 db.store()
4269 .get_node_versioned(node_in_tx, epoch, transaction_id)
4270 .is_some(),
4271 "Transaction node should be visible to its own transaction"
4272 );
4273
4274 session.rollback().unwrap();
4276
4277 let count_after = db.node_count();
4279 assert_eq!(
4280 count_after, 1,
4281 "Rollback should discard node created via session.create_node(), but got {count_after}"
4282 );
4283 }
4284
4285 #[test]
4286 fn test_session_create_node_with_props_in_transaction() {
4287 use grafeo_common::types::Value;
4288
4289 let db = GrafeoDB::new_in_memory();
4291
4292 db.create_node(&["Person"]);
4294 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4295
4296 let mut session = db.session();
4298 session.begin_transaction().unwrap();
4299 let transaction_id = session.current_transaction.lock().unwrap();
4300
4301 let node_in_tx =
4302 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4303 assert!(node_in_tx.is_valid());
4304
4305 assert_eq!(
4308 db.node_count(),
4309 1,
4310 "PENDING nodes should be invisible to non-versioned node_count()"
4311 );
4312 let epoch = db.store().current_epoch();
4313 assert!(
4314 db.store()
4315 .get_node_versioned(node_in_tx, epoch, transaction_id)
4316 .is_some(),
4317 "Transaction node should be visible to its own transaction"
4318 );
4319
4320 session.rollback().unwrap();
4322
4323 let count_after = db.node_count();
4325 assert_eq!(
4326 count_after, 1,
4327 "Rollback should discard node created via session.create_node_with_props()"
4328 );
4329 }
4330
4331 #[cfg(feature = "gql")]
4332 mod gql_tests {
4333 use super::*;
4334
4335 #[test]
4336 fn test_gql_query_execution() {
4337 let db = GrafeoDB::new_in_memory();
4338 let session = db.session();
4339
4340 session.create_node(&["Person"]);
4342 session.create_node(&["Person"]);
4343 session.create_node(&["Animal"]);
4344
4345 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4347
4348 assert_eq!(result.row_count(), 2);
4350 assert_eq!(result.column_count(), 1);
4351 assert_eq!(result.columns[0], "n");
4352 }
4353
4354 #[test]
4355 fn test_gql_empty_result() {
4356 let db = GrafeoDB::new_in_memory();
4357 let session = db.session();
4358
4359 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4361
4362 assert_eq!(result.row_count(), 0);
4363 }
4364
4365 #[test]
4366 fn test_gql_parse_error() {
4367 let db = GrafeoDB::new_in_memory();
4368 let session = db.session();
4369
4370 let result = session.execute("MATCH (n RETURN n");
4372
4373 assert!(result.is_err());
4374 }
4375
4376 #[test]
4377 fn test_gql_relationship_traversal() {
4378 let db = GrafeoDB::new_in_memory();
4379 let session = db.session();
4380
4381 let alix = session.create_node(&["Person"]);
4383 let gus = session.create_node(&["Person"]);
4384 let vincent = session.create_node(&["Person"]);
4385
4386 session.create_edge(alix, gus, "KNOWS");
4387 session.create_edge(alix, vincent, "KNOWS");
4388
4389 let result = session
4391 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4392 .unwrap();
4393
4394 assert_eq!(result.row_count(), 2);
4396 assert_eq!(result.column_count(), 2);
4397 assert_eq!(result.columns[0], "a");
4398 assert_eq!(result.columns[1], "b");
4399 }
4400
4401 #[test]
4402 fn test_gql_relationship_with_type_filter() {
4403 let db = GrafeoDB::new_in_memory();
4404 let session = db.session();
4405
4406 let alix = session.create_node(&["Person"]);
4408 let gus = session.create_node(&["Person"]);
4409 let vincent = session.create_node(&["Person"]);
4410
4411 session.create_edge(alix, gus, "KNOWS");
4412 session.create_edge(alix, vincent, "WORKS_WITH");
4413
4414 let result = session
4416 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4417 .unwrap();
4418
4419 assert_eq!(result.row_count(), 1);
4421 }
4422
4423 #[test]
4424 fn test_gql_semantic_error_undefined_variable() {
4425 let db = GrafeoDB::new_in_memory();
4426 let session = db.session();
4427
4428 let result = session.execute("MATCH (n:Person) RETURN x");
4430
4431 assert!(result.is_err());
4433 let Err(err) = result else {
4434 panic!("Expected error")
4435 };
4436 assert!(
4437 err.to_string().contains("Undefined variable"),
4438 "Expected undefined variable error, got: {}",
4439 err
4440 );
4441 }
4442
4443 #[test]
4444 fn test_gql_where_clause_property_filter() {
4445 use grafeo_common::types::Value;
4446
4447 let db = GrafeoDB::new_in_memory();
4448 let session = db.session();
4449
4450 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4452 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4453 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4454
4455 let result = session
4457 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4458 .unwrap();
4459
4460 assert_eq!(result.row_count(), 2);
4462 }
4463
4464 #[test]
4465 fn test_gql_where_clause_equality() {
4466 use grafeo_common::types::Value;
4467
4468 let db = GrafeoDB::new_in_memory();
4469 let session = db.session();
4470
4471 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4473 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4474 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4475
4476 let result = session
4478 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4479 .unwrap();
4480
4481 assert_eq!(result.row_count(), 2);
4483 }
4484
4485 #[test]
4486 fn test_gql_return_property_access() {
4487 use grafeo_common::types::Value;
4488
4489 let db = GrafeoDB::new_in_memory();
4490 let session = db.session();
4491
4492 session.create_node_with_props(
4494 &["Person"],
4495 [
4496 ("name", Value::String("Alix".into())),
4497 ("age", Value::Int64(30)),
4498 ],
4499 );
4500 session.create_node_with_props(
4501 &["Person"],
4502 [
4503 ("name", Value::String("Gus".into())),
4504 ("age", Value::Int64(25)),
4505 ],
4506 );
4507
4508 let result = session
4510 .execute("MATCH (n:Person) RETURN n.name, n.age")
4511 .unwrap();
4512
4513 assert_eq!(result.row_count(), 2);
4515 assert_eq!(result.column_count(), 2);
4516 assert_eq!(result.columns[0], "n.name");
4517 assert_eq!(result.columns[1], "n.age");
4518
4519 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4521 assert!(names.contains(&&Value::String("Alix".into())));
4522 assert!(names.contains(&&Value::String("Gus".into())));
4523 }
4524
4525 #[test]
4526 fn test_gql_return_mixed_expressions() {
4527 use grafeo_common::types::Value;
4528
4529 let db = GrafeoDB::new_in_memory();
4530 let session = db.session();
4531
4532 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4534
4535 let result = session
4537 .execute("MATCH (n:Person) RETURN n, n.name")
4538 .unwrap();
4539
4540 assert_eq!(result.row_count(), 1);
4541 assert_eq!(result.column_count(), 2);
4542 assert_eq!(result.columns[0], "n");
4543 assert_eq!(result.columns[1], "n.name");
4544
4545 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4547 }
4548 }
4549
4550 #[cfg(feature = "cypher")]
4551 mod cypher_tests {
4552 use super::*;
4553
4554 #[test]
4555 fn test_cypher_query_execution() {
4556 let db = GrafeoDB::new_in_memory();
4557 let session = db.session();
4558
4559 session.create_node(&["Person"]);
4561 session.create_node(&["Person"]);
4562 session.create_node(&["Animal"]);
4563
4564 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4566
4567 assert_eq!(result.row_count(), 2);
4569 assert_eq!(result.column_count(), 1);
4570 assert_eq!(result.columns[0], "n");
4571 }
4572
4573 #[test]
4574 fn test_cypher_empty_result() {
4575 let db = GrafeoDB::new_in_memory();
4576 let session = db.session();
4577
4578 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4580
4581 assert_eq!(result.row_count(), 0);
4582 }
4583
4584 #[test]
4585 fn test_cypher_parse_error() {
4586 let db = GrafeoDB::new_in_memory();
4587 let session = db.session();
4588
4589 let result = session.execute_cypher("MATCH (n RETURN n");
4591
4592 assert!(result.is_err());
4593 }
4594 }
4595
4596 mod direct_lookup_tests {
4599 use super::*;
4600 use grafeo_common::types::Value;
4601
4602 #[test]
4603 fn test_get_node() {
4604 let db = GrafeoDB::new_in_memory();
4605 let session = db.session();
4606
4607 let id = session.create_node(&["Person"]);
4608 let node = session.get_node(id);
4609
4610 assert!(node.is_some());
4611 let node = node.unwrap();
4612 assert_eq!(node.id, id);
4613 }
4614
4615 #[test]
4616 fn test_get_node_not_found() {
4617 use grafeo_common::types::NodeId;
4618
4619 let db = GrafeoDB::new_in_memory();
4620 let session = db.session();
4621
4622 let node = session.get_node(NodeId::new(9999));
4624 assert!(node.is_none());
4625 }
4626
4627 #[test]
4628 fn test_get_node_property() {
4629 let db = GrafeoDB::new_in_memory();
4630 let session = db.session();
4631
4632 let id = session
4633 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4634
4635 let name = session.get_node_property(id, "name");
4636 assert_eq!(name, Some(Value::String("Alix".into())));
4637
4638 let missing = session.get_node_property(id, "missing");
4640 assert!(missing.is_none());
4641 }
4642
4643 #[test]
4644 fn test_get_edge() {
4645 let db = GrafeoDB::new_in_memory();
4646 let session = db.session();
4647
4648 let alix = session.create_node(&["Person"]);
4649 let gus = session.create_node(&["Person"]);
4650 let edge_id = session.create_edge(alix, gus, "KNOWS");
4651
4652 let edge = session.get_edge(edge_id);
4653 assert!(edge.is_some());
4654 let edge = edge.unwrap();
4655 assert_eq!(edge.id, edge_id);
4656 assert_eq!(edge.src, alix);
4657 assert_eq!(edge.dst, gus);
4658 }
4659
4660 #[test]
4661 fn test_get_edge_not_found() {
4662 use grafeo_common::types::EdgeId;
4663
4664 let db = GrafeoDB::new_in_memory();
4665 let session = db.session();
4666
4667 let edge = session.get_edge(EdgeId::new(9999));
4668 assert!(edge.is_none());
4669 }
4670
4671 #[test]
4672 fn test_get_neighbors_outgoing() {
4673 let db = GrafeoDB::new_in_memory();
4674 let session = db.session();
4675
4676 let alix = session.create_node(&["Person"]);
4677 let gus = session.create_node(&["Person"]);
4678 let harm = session.create_node(&["Person"]);
4679
4680 session.create_edge(alix, gus, "KNOWS");
4681 session.create_edge(alix, harm, "KNOWS");
4682
4683 let neighbors = session.get_neighbors_outgoing(alix);
4684 assert_eq!(neighbors.len(), 2);
4685
4686 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4687 assert!(neighbor_ids.contains(&gus));
4688 assert!(neighbor_ids.contains(&harm));
4689 }
4690
4691 #[test]
4692 fn test_get_neighbors_incoming() {
4693 let db = GrafeoDB::new_in_memory();
4694 let session = db.session();
4695
4696 let alix = session.create_node(&["Person"]);
4697 let gus = session.create_node(&["Person"]);
4698 let harm = session.create_node(&["Person"]);
4699
4700 session.create_edge(gus, alix, "KNOWS");
4701 session.create_edge(harm, alix, "KNOWS");
4702
4703 let neighbors = session.get_neighbors_incoming(alix);
4704 assert_eq!(neighbors.len(), 2);
4705
4706 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4707 assert!(neighbor_ids.contains(&gus));
4708 assert!(neighbor_ids.contains(&harm));
4709 }
4710
4711 #[test]
4712 fn test_get_neighbors_outgoing_by_type() {
4713 let db = GrafeoDB::new_in_memory();
4714 let session = db.session();
4715
4716 let alix = session.create_node(&["Person"]);
4717 let gus = session.create_node(&["Person"]);
4718 let company = session.create_node(&["Company"]);
4719
4720 session.create_edge(alix, gus, "KNOWS");
4721 session.create_edge(alix, company, "WORKS_AT");
4722
4723 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4724 assert_eq!(knows_neighbors.len(), 1);
4725 assert_eq!(knows_neighbors[0].0, gus);
4726
4727 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4728 assert_eq!(works_neighbors.len(), 1);
4729 assert_eq!(works_neighbors[0].0, company);
4730
4731 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4733 assert!(no_neighbors.is_empty());
4734 }
4735
4736 #[test]
4737 fn test_node_exists() {
4738 use grafeo_common::types::NodeId;
4739
4740 let db = GrafeoDB::new_in_memory();
4741 let session = db.session();
4742
4743 let id = session.create_node(&["Person"]);
4744
4745 assert!(session.node_exists(id));
4746 assert!(!session.node_exists(NodeId::new(9999)));
4747 }
4748
4749 #[test]
4750 fn test_edge_exists() {
4751 use grafeo_common::types::EdgeId;
4752
4753 let db = GrafeoDB::new_in_memory();
4754 let session = db.session();
4755
4756 let alix = session.create_node(&["Person"]);
4757 let gus = session.create_node(&["Person"]);
4758 let edge_id = session.create_edge(alix, gus, "KNOWS");
4759
4760 assert!(session.edge_exists(edge_id));
4761 assert!(!session.edge_exists(EdgeId::new(9999)));
4762 }
4763
4764 #[test]
4765 fn test_get_degree() {
4766 let db = GrafeoDB::new_in_memory();
4767 let session = db.session();
4768
4769 let alix = session.create_node(&["Person"]);
4770 let gus = session.create_node(&["Person"]);
4771 let harm = session.create_node(&["Person"]);
4772
4773 session.create_edge(alix, gus, "KNOWS");
4775 session.create_edge(alix, harm, "KNOWS");
4776 session.create_edge(gus, alix, "KNOWS");
4778
4779 let (out_degree, in_degree) = session.get_degree(alix);
4780 assert_eq!(out_degree, 2);
4781 assert_eq!(in_degree, 1);
4782
4783 let lonely = session.create_node(&["Person"]);
4785 let (out, in_deg) = session.get_degree(lonely);
4786 assert_eq!(out, 0);
4787 assert_eq!(in_deg, 0);
4788 }
4789
4790 #[test]
4791 fn test_get_nodes_batch() {
4792 let db = GrafeoDB::new_in_memory();
4793 let session = db.session();
4794
4795 let alix = session.create_node(&["Person"]);
4796 let gus = session.create_node(&["Person"]);
4797 let harm = session.create_node(&["Person"]);
4798
4799 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4800 assert_eq!(nodes.len(), 3);
4801 assert!(nodes[0].is_some());
4802 assert!(nodes[1].is_some());
4803 assert!(nodes[2].is_some());
4804
4805 use grafeo_common::types::NodeId;
4807 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4808 assert_eq!(nodes_with_missing.len(), 3);
4809 assert!(nodes_with_missing[0].is_some());
4810 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
4812 }
4813
4814 #[test]
4815 fn test_auto_commit_setting() {
4816 let db = GrafeoDB::new_in_memory();
4817 let mut session = db.session();
4818
4819 assert!(session.auto_commit());
4821
4822 session.set_auto_commit(false);
4823 assert!(!session.auto_commit());
4824
4825 session.set_auto_commit(true);
4826 assert!(session.auto_commit());
4827 }
4828
4829 #[test]
4830 fn test_transaction_double_begin_nests() {
4831 let db = GrafeoDB::new_in_memory();
4832 let mut session = db.session();
4833
4834 session.begin_transaction().unwrap();
4835 let result = session.begin_transaction();
4837 assert!(result.is_ok());
4838 session.commit().unwrap();
4840 session.commit().unwrap();
4842 }
4843
4844 #[test]
4845 fn test_commit_without_transaction_error() {
4846 let db = GrafeoDB::new_in_memory();
4847 let mut session = db.session();
4848
4849 let result = session.commit();
4850 assert!(result.is_err());
4851 }
4852
4853 #[test]
4854 fn test_rollback_without_transaction_error() {
4855 let db = GrafeoDB::new_in_memory();
4856 let mut session = db.session();
4857
4858 let result = session.rollback();
4859 assert!(result.is_err());
4860 }
4861
4862 #[test]
4863 fn test_create_edge_in_transaction() {
4864 let db = GrafeoDB::new_in_memory();
4865 let mut session = db.session();
4866
4867 let alix = session.create_node(&["Person"]);
4869 let gus = session.create_node(&["Person"]);
4870
4871 session.begin_transaction().unwrap();
4873 let edge_id = session.create_edge(alix, gus, "KNOWS");
4874
4875 assert!(session.edge_exists(edge_id));
4877
4878 session.commit().unwrap();
4880
4881 assert!(session.edge_exists(edge_id));
4883 }
4884
4885 #[test]
4886 fn test_neighbors_empty_node() {
4887 let db = GrafeoDB::new_in_memory();
4888 let session = db.session();
4889
4890 let lonely = session.create_node(&["Person"]);
4891
4892 assert!(session.get_neighbors_outgoing(lonely).is_empty());
4893 assert!(session.get_neighbors_incoming(lonely).is_empty());
4894 assert!(
4895 session
4896 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4897 .is_empty()
4898 );
4899 }
4900 }
4901
4902 #[test]
4903 fn test_auto_gc_triggers_on_commit_interval() {
4904 use crate::config::Config;
4905
4906 let config = Config::in_memory().with_gc_interval(2);
4907 let db = GrafeoDB::with_config(config).unwrap();
4908 let mut session = db.session();
4909
4910 session.begin_transaction().unwrap();
4912 session.create_node(&["A"]);
4913 session.commit().unwrap();
4914
4915 session.begin_transaction().unwrap();
4917 session.create_node(&["B"]);
4918 session.commit().unwrap();
4919
4920 assert_eq!(db.node_count(), 2);
4922 }
4923
4924 #[test]
4925 fn test_query_timeout_config_propagates_to_session() {
4926 use crate::config::Config;
4927 use std::time::Duration;
4928
4929 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4930 let db = GrafeoDB::with_config(config).unwrap();
4931 let session = db.session();
4932
4933 assert!(session.query_deadline().is_some());
4935 }
4936
4937 #[test]
4938 fn test_no_query_timeout_returns_no_deadline() {
4939 let db = GrafeoDB::new_in_memory();
4940 let session = db.session();
4941
4942 assert!(session.query_deadline().is_none());
4944 }
4945
4946 #[test]
4947 fn test_graph_model_accessor() {
4948 use crate::config::GraphModel;
4949
4950 let db = GrafeoDB::new_in_memory();
4951 let session = db.session();
4952
4953 assert_eq!(session.graph_model(), GraphModel::Lpg);
4954 }
4955
4956 #[cfg(feature = "gql")]
4957 #[test]
4958 fn test_external_store_session() {
4959 use grafeo_core::graph::GraphStoreMut;
4960 use std::sync::Arc;
4961
4962 let config = crate::config::Config::in_memory();
4963 let store =
4964 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4965 let db = GrafeoDB::with_store(store, config).unwrap();
4966
4967 let mut session = db.session();
4968
4969 session.begin_transaction().unwrap();
4973 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4974
4975 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4977 assert_eq!(result.row_count(), 1);
4978
4979 session.commit().unwrap();
4980 }
4981
4982 #[cfg(feature = "gql")]
4985 mod session_command_tests {
4986 use super::*;
4987 use grafeo_common::types::Value;
4988
4989 #[test]
4990 fn test_use_graph_sets_current_graph() {
4991 let db = GrafeoDB::new_in_memory();
4992 let session = db.session();
4993
4994 session.execute("CREATE GRAPH mydb").unwrap();
4996 session.execute("USE GRAPH mydb").unwrap();
4997
4998 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4999 }
5000
5001 #[test]
5002 fn test_use_graph_nonexistent_errors() {
5003 let db = GrafeoDB::new_in_memory();
5004 let session = db.session();
5005
5006 let result = session.execute("USE GRAPH doesnotexist");
5007 assert!(result.is_err());
5008 let err = result.unwrap_err().to_string();
5009 assert!(
5010 err.contains("does not exist"),
5011 "Expected 'does not exist' error, got: {err}"
5012 );
5013 }
5014
5015 #[test]
5016 fn test_use_graph_default_always_valid() {
5017 let db = GrafeoDB::new_in_memory();
5018 let session = db.session();
5019
5020 session.execute("USE GRAPH default").unwrap();
5022 assert_eq!(session.current_graph(), Some("default".to_string()));
5023 }
5024
5025 #[test]
5026 fn test_session_set_graph() {
5027 let db = GrafeoDB::new_in_memory();
5028 let session = db.session();
5029
5030 session.execute("CREATE GRAPH analytics").unwrap();
5031 session.execute("SESSION SET GRAPH analytics").unwrap();
5032 assert_eq!(session.current_graph(), Some("analytics".to_string()));
5033 }
5034
5035 #[test]
5036 fn test_session_set_graph_nonexistent_errors() {
5037 let db = GrafeoDB::new_in_memory();
5038 let session = db.session();
5039
5040 let result = session.execute("SESSION SET GRAPH nosuchgraph");
5041 assert!(result.is_err());
5042 }
5043
5044 #[test]
5045 fn test_session_set_time_zone() {
5046 let db = GrafeoDB::new_in_memory();
5047 let session = db.session();
5048
5049 assert_eq!(session.time_zone(), None);
5050
5051 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5052 assert_eq!(session.time_zone(), Some("UTC".to_string()));
5053
5054 session
5055 .execute("SESSION SET TIME ZONE 'America/New_York'")
5056 .unwrap();
5057 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5058 }
5059
5060 #[test]
5061 fn test_session_set_parameter() {
5062 let db = GrafeoDB::new_in_memory();
5063 let session = db.session();
5064
5065 session
5066 .execute("SESSION SET PARAMETER $timeout = 30")
5067 .unwrap();
5068
5069 assert!(session.get_parameter("timeout").is_some());
5072 }
5073
5074 #[test]
5075 fn test_session_reset_clears_all_state() {
5076 let db = GrafeoDB::new_in_memory();
5077 let session = db.session();
5078
5079 session.execute("CREATE GRAPH analytics").unwrap();
5081 session.execute("SESSION SET GRAPH analytics").unwrap();
5082 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5083 session
5084 .execute("SESSION SET PARAMETER $limit = 100")
5085 .unwrap();
5086
5087 assert!(session.current_graph().is_some());
5089 assert!(session.time_zone().is_some());
5090 assert!(session.get_parameter("limit").is_some());
5091
5092 session.execute("SESSION RESET").unwrap();
5094
5095 assert_eq!(session.current_graph(), None);
5096 assert_eq!(session.time_zone(), None);
5097 assert!(session.get_parameter("limit").is_none());
5098 }
5099
5100 #[test]
5101 fn test_session_close_clears_state() {
5102 let db = GrafeoDB::new_in_memory();
5103 let session = db.session();
5104
5105 session.execute("CREATE GRAPH analytics").unwrap();
5106 session.execute("SESSION SET GRAPH analytics").unwrap();
5107 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5108
5109 session.execute("SESSION CLOSE").unwrap();
5110
5111 assert_eq!(session.current_graph(), None);
5112 assert_eq!(session.time_zone(), None);
5113 }
5114
5115 #[test]
5116 fn test_create_graph() {
5117 let db = GrafeoDB::new_in_memory();
5118 let session = db.session();
5119
5120 session.execute("CREATE GRAPH mydb").unwrap();
5121
5122 session.execute("USE GRAPH mydb").unwrap();
5124 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5125 }
5126
5127 #[test]
5128 fn test_create_graph_duplicate_errors() {
5129 let db = GrafeoDB::new_in_memory();
5130 let session = db.session();
5131
5132 session.execute("CREATE GRAPH mydb").unwrap();
5133 let result = session.execute("CREATE GRAPH mydb");
5134
5135 assert!(result.is_err());
5136 let err = result.unwrap_err().to_string();
5137 assert!(
5138 err.contains("already exists"),
5139 "Expected 'already exists' error, got: {err}"
5140 );
5141 }
5142
5143 #[test]
5144 fn test_create_graph_if_not_exists() {
5145 let db = GrafeoDB::new_in_memory();
5146 let session = db.session();
5147
5148 session.execute("CREATE GRAPH mydb").unwrap();
5149 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5151 }
5152
5153 #[test]
5154 fn test_drop_graph() {
5155 let db = GrafeoDB::new_in_memory();
5156 let session = db.session();
5157
5158 session.execute("CREATE GRAPH mydb").unwrap();
5159 session.execute("DROP GRAPH mydb").unwrap();
5160
5161 let result = session.execute("USE GRAPH mydb");
5163 assert!(result.is_err());
5164 }
5165
5166 #[test]
5167 fn test_drop_graph_nonexistent_errors() {
5168 let db = GrafeoDB::new_in_memory();
5169 let session = db.session();
5170
5171 let result = session.execute("DROP GRAPH nosuchgraph");
5172 assert!(result.is_err());
5173 let err = result.unwrap_err().to_string();
5174 assert!(
5175 err.contains("does not exist"),
5176 "Expected 'does not exist' error, got: {err}"
5177 );
5178 }
5179
5180 #[test]
5181 fn test_drop_graph_if_exists() {
5182 let db = GrafeoDB::new_in_memory();
5183 let session = db.session();
5184
5185 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5187 }
5188
5189 #[test]
5190 fn test_start_transaction_via_gql() {
5191 let db = GrafeoDB::new_in_memory();
5192 let session = db.session();
5193
5194 session.execute("START TRANSACTION").unwrap();
5195 assert!(session.in_transaction());
5196 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5197 session.execute("COMMIT").unwrap();
5198 assert!(!session.in_transaction());
5199
5200 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5201 assert_eq!(result.rows.len(), 1);
5202 }
5203
5204 #[test]
5205 fn test_start_transaction_read_only_blocks_insert() {
5206 let db = GrafeoDB::new_in_memory();
5207 let session = db.session();
5208
5209 session.execute("START TRANSACTION READ ONLY").unwrap();
5210 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5211 assert!(result.is_err());
5212 let err = result.unwrap_err().to_string();
5213 assert!(
5214 err.contains("read-only"),
5215 "Expected read-only error, got: {err}"
5216 );
5217 session.execute("ROLLBACK").unwrap();
5218 }
5219
5220 #[test]
5221 fn test_start_transaction_read_only_allows_reads() {
5222 let db = GrafeoDB::new_in_memory();
5223 let mut session = db.session();
5224 session.begin_transaction().unwrap();
5225 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5226 session.commit().unwrap();
5227
5228 session.execute("START TRANSACTION READ ONLY").unwrap();
5229 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5230 assert_eq!(result.rows.len(), 1);
5231 session.execute("COMMIT").unwrap();
5232 }
5233
5234 #[test]
5235 fn test_rollback_via_gql() {
5236 let db = GrafeoDB::new_in_memory();
5237 let session = db.session();
5238
5239 session.execute("START TRANSACTION").unwrap();
5240 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5241 session.execute("ROLLBACK").unwrap();
5242
5243 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5244 assert!(result.rows.is_empty());
5245 }
5246
5247 #[test]
5248 fn test_start_transaction_with_isolation_level() {
5249 let db = GrafeoDB::new_in_memory();
5250 let session = db.session();
5251
5252 session
5253 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5254 .unwrap();
5255 assert!(session.in_transaction());
5256 session.execute("ROLLBACK").unwrap();
5257 }
5258
5259 #[test]
5260 fn test_session_commands_return_empty_result() {
5261 let db = GrafeoDB::new_in_memory();
5262 let session = db.session();
5263
5264 session.execute("CREATE GRAPH test").unwrap();
5265 let result = session.execute("SESSION SET GRAPH test").unwrap();
5266 assert_eq!(result.row_count(), 0);
5267 assert_eq!(result.column_count(), 0);
5268 }
5269
5270 #[test]
5271 fn test_current_graph_default_is_none() {
5272 let db = GrafeoDB::new_in_memory();
5273 let session = db.session();
5274
5275 assert_eq!(session.current_graph(), None);
5276 }
5277
5278 #[test]
5279 fn test_time_zone_default_is_none() {
5280 let db = GrafeoDB::new_in_memory();
5281 let session = db.session();
5282
5283 assert_eq!(session.time_zone(), None);
5284 }
5285
5286 #[test]
5287 fn test_session_state_independent_across_sessions() {
5288 let db = GrafeoDB::new_in_memory();
5289 let session1 = db.session();
5290 let session2 = db.session();
5291
5292 session1.execute("CREATE GRAPH first").unwrap();
5293 session1.execute("CREATE GRAPH second").unwrap();
5294 session1.execute("SESSION SET GRAPH first").unwrap();
5295 session2.execute("SESSION SET GRAPH second").unwrap();
5296
5297 assert_eq!(session1.current_graph(), Some("first".to_string()));
5298 assert_eq!(session2.current_graph(), Some("second".to_string()));
5299 }
5300
5301 #[test]
5302 fn test_show_node_types() {
5303 let db = GrafeoDB::new_in_memory();
5304 let session = db.session();
5305
5306 session
5307 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5308 .unwrap();
5309
5310 let result = session.execute("SHOW NODE TYPES").unwrap();
5311 assert_eq!(
5312 result.columns,
5313 vec!["name", "properties", "constraints", "parents"]
5314 );
5315 assert_eq!(result.rows.len(), 1);
5316 assert_eq!(result.rows[0][0], Value::from("Person"));
5318 }
5319
5320 #[test]
5321 fn test_show_edge_types() {
5322 let db = GrafeoDB::new_in_memory();
5323 let session = db.session();
5324
5325 session
5326 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5327 .unwrap();
5328
5329 let result = session.execute("SHOW EDGE TYPES").unwrap();
5330 assert_eq!(
5331 result.columns,
5332 vec!["name", "properties", "source_types", "target_types"]
5333 );
5334 assert_eq!(result.rows.len(), 1);
5335 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5336 }
5337
5338 #[test]
5339 fn test_show_graph_types() {
5340 let db = GrafeoDB::new_in_memory();
5341 let session = db.session();
5342
5343 session
5344 .execute("CREATE NODE TYPE Person (name STRING)")
5345 .unwrap();
5346 session
5347 .execute(
5348 "CREATE GRAPH TYPE social (\
5349 NODE TYPE Person (name STRING)\
5350 )",
5351 )
5352 .unwrap();
5353
5354 let result = session.execute("SHOW GRAPH TYPES").unwrap();
5355 assert_eq!(
5356 result.columns,
5357 vec!["name", "open", "node_types", "edge_types"]
5358 );
5359 assert_eq!(result.rows.len(), 1);
5360 assert_eq!(result.rows[0][0], Value::from("social"));
5361 }
5362
5363 #[test]
5364 fn test_show_graph_type_named() {
5365 let db = GrafeoDB::new_in_memory();
5366 let session = db.session();
5367
5368 session
5369 .execute("CREATE NODE TYPE Person (name STRING)")
5370 .unwrap();
5371 session
5372 .execute(
5373 "CREATE GRAPH TYPE social (\
5374 NODE TYPE Person (name STRING)\
5375 )",
5376 )
5377 .unwrap();
5378
5379 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5380 assert_eq!(result.rows.len(), 1);
5381 assert_eq!(result.rows[0][0], Value::from("social"));
5382 }
5383
5384 #[test]
5385 fn test_show_graph_type_not_found() {
5386 let db = GrafeoDB::new_in_memory();
5387 let session = db.session();
5388
5389 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5390 assert!(result.is_err());
5391 }
5392
5393 #[test]
5394 fn test_show_indexes_via_gql() {
5395 let db = GrafeoDB::new_in_memory();
5396 let session = db.session();
5397
5398 let result = session.execute("SHOW INDEXES").unwrap();
5399 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5400 }
5401
5402 #[test]
5403 fn test_show_constraints_via_gql() {
5404 let db = GrafeoDB::new_in_memory();
5405 let session = db.session();
5406
5407 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5408 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5409 }
5410
5411 #[test]
5412 fn test_pattern_form_graph_type_roundtrip() {
5413 let db = GrafeoDB::new_in_memory();
5414 let session = db.session();
5415
5416 session
5418 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5419 .unwrap();
5420 session
5421 .execute("CREATE NODE TYPE City (name STRING)")
5422 .unwrap();
5423 session
5424 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5425 .unwrap();
5426 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5427
5428 session
5430 .execute(
5431 "CREATE GRAPH TYPE social (\
5432 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5433 (:Person)-[:LIVES_IN]->(:City)\
5434 )",
5435 )
5436 .unwrap();
5437
5438 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5440 assert_eq!(result.rows.len(), 1);
5441 assert_eq!(result.rows[0][0], Value::from("social"));
5442 }
5443 }
5444}