1#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_common::{grafeo_debug_span, grafeo_info_span, grafeo_warn};
17use grafeo_core::graph::Direction;
18use grafeo_core::graph::GraphStoreMut;
19use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
20#[cfg(feature = "rdf")]
21use grafeo_core::graph::rdf::RdfStore;
22
23use crate::catalog::{Catalog, CatalogConstraintValidator};
24use crate::config::{AdaptiveConfig, GraphModel};
25use crate::database::QueryResult;
26use crate::query::cache::QueryCache;
27use crate::transaction::TransactionManager;
28
29fn parse_default_literal(text: &str) -> Value {
34 if text.eq_ignore_ascii_case("null") {
35 return Value::Null;
36 }
37 if text.eq_ignore_ascii_case("true") {
38 return Value::Bool(true);
39 }
40 if text.eq_ignore_ascii_case("false") {
41 return Value::Bool(false);
42 }
43 if (text.starts_with('\'') && text.ends_with('\''))
45 || (text.starts_with('"') && text.ends_with('"'))
46 {
47 return Value::String(text[1..text.len() - 1].into());
48 }
49 if let Ok(i) = text.parse::<i64>() {
51 return Value::Int64(i);
52 }
53 if let Ok(f) = text.parse::<f64>() {
54 return Value::Float64(f);
55 }
56 Value::String(text.into())
58}
59
60pub(crate) struct SessionConfig {
65 pub transaction_manager: Arc<TransactionManager>,
66 pub query_cache: Arc<QueryCache>,
67 pub catalog: Arc<Catalog>,
68 pub adaptive_config: AdaptiveConfig,
69 pub factorized_execution: bool,
70 pub graph_model: GraphModel,
71 pub query_timeout: Option<Duration>,
72 pub commit_counter: Arc<AtomicUsize>,
73 pub gc_interval: usize,
74 pub read_only: bool,
76}
77
78pub struct Session {
84 store: Arc<LpgStore>,
86 graph_store: Arc<dyn GraphStoreMut>,
88 catalog: Arc<Catalog>,
90 #[cfg(feature = "rdf")]
92 rdf_store: Arc<RdfStore>,
93 transaction_manager: Arc<TransactionManager>,
95 query_cache: Arc<QueryCache>,
97 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
101 read_only_tx: parking_lot::Mutex<bool>,
103 db_read_only: bool,
106 auto_commit: bool,
108 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
111 factorized_execution: bool,
113 graph_model: GraphModel,
115 query_timeout: Option<Duration>,
117 commit_counter: Arc<AtomicUsize>,
119 gc_interval: usize,
121 transaction_start_node_count: AtomicUsize,
123 transaction_start_edge_count: AtomicUsize,
125 #[cfg(feature = "wal")]
127 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
128 #[cfg(feature = "wal")]
130 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
131 #[cfg(feature = "cdc")]
133 cdc_log: Arc<crate::cdc::CdcLog>,
134 current_graph: parking_lot::Mutex<Option<String>>,
136 current_schema: parking_lot::Mutex<Option<String>>,
139 time_zone: parking_lot::Mutex<Option<String>>,
141 session_params:
143 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
144 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
146 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
148 transaction_nesting_depth: parking_lot::Mutex<u32>,
152 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
156 #[cfg(feature = "metrics")]
158 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
159 #[cfg(feature = "metrics")]
161 tx_start_time: parking_lot::Mutex<Option<Instant>>,
162}
163
164#[derive(Clone)]
166struct GraphSavepoint {
167 graph_name: Option<String>,
168 next_node_id: u64,
169 next_edge_id: u64,
170 undo_log_position: usize,
171}
172
173#[derive(Clone)]
175struct SavepointState {
176 name: String,
177 graph_snapshots: Vec<GraphSavepoint>,
178 #[allow(dead_code)]
181 active_graph: Option<String>,
182}
183
184impl Session {
185 #[allow(dead_code)]
187 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
188 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
189 Self {
190 store,
191 graph_store,
192 catalog: cfg.catalog,
193 #[cfg(feature = "rdf")]
194 rdf_store: Arc::new(RdfStore::new()),
195 transaction_manager: cfg.transaction_manager,
196 query_cache: cfg.query_cache,
197 current_transaction: parking_lot::Mutex::new(None),
198 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
199 db_read_only: cfg.read_only,
200 auto_commit: true,
201 adaptive_config: cfg.adaptive_config,
202 factorized_execution: cfg.factorized_execution,
203 graph_model: cfg.graph_model,
204 query_timeout: cfg.query_timeout,
205 commit_counter: cfg.commit_counter,
206 gc_interval: cfg.gc_interval,
207 transaction_start_node_count: AtomicUsize::new(0),
208 transaction_start_edge_count: AtomicUsize::new(0),
209 #[cfg(feature = "wal")]
210 wal: None,
211 #[cfg(feature = "wal")]
212 wal_graph_context: None,
213 #[cfg(feature = "cdc")]
214 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
215 current_graph: parking_lot::Mutex::new(None),
216 current_schema: parking_lot::Mutex::new(None),
217 time_zone: parking_lot::Mutex::new(None),
218 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
219 viewing_epoch_override: parking_lot::Mutex::new(None),
220 savepoints: parking_lot::Mutex::new(Vec::new()),
221 transaction_nesting_depth: parking_lot::Mutex::new(0),
222 touched_graphs: parking_lot::Mutex::new(Vec::new()),
223 #[cfg(feature = "metrics")]
224 metrics: None,
225 #[cfg(feature = "metrics")]
226 tx_start_time: parking_lot::Mutex::new(None),
227 }
228 }
229
230 #[cfg(feature = "wal")]
235 pub(crate) fn set_wal(
236 &mut self,
237 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
238 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
239 ) {
240 self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
242 Arc::clone(&self.store),
243 Arc::clone(&wal),
244 Arc::clone(&wal_graph_context),
245 ));
246 self.wal = Some(wal);
247 self.wal_graph_context = Some(wal_graph_context);
248 }
249
250 #[cfg(feature = "cdc")]
252 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
253 self.cdc_log = cdc_log;
254 }
255
256 #[cfg(feature = "metrics")]
258 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
259 self.metrics = Some(metrics);
260 }
261
262 pub(crate) fn with_external_store(
271 store: Arc<dyn GraphStoreMut>,
272 cfg: SessionConfig,
273 ) -> Result<Self> {
274 Ok(Self {
275 store: Arc::new(LpgStore::new()?),
276 graph_store: store,
277 catalog: cfg.catalog,
278 #[cfg(feature = "rdf")]
279 rdf_store: Arc::new(RdfStore::new()),
280 transaction_manager: cfg.transaction_manager,
281 query_cache: cfg.query_cache,
282 current_transaction: parking_lot::Mutex::new(None),
283 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
284 db_read_only: cfg.read_only,
285 auto_commit: true,
286 adaptive_config: cfg.adaptive_config,
287 factorized_execution: cfg.factorized_execution,
288 graph_model: cfg.graph_model,
289 query_timeout: cfg.query_timeout,
290 commit_counter: cfg.commit_counter,
291 gc_interval: cfg.gc_interval,
292 transaction_start_node_count: AtomicUsize::new(0),
293 transaction_start_edge_count: AtomicUsize::new(0),
294 #[cfg(feature = "wal")]
295 wal: None,
296 #[cfg(feature = "wal")]
297 wal_graph_context: None,
298 #[cfg(feature = "cdc")]
299 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
300 current_graph: parking_lot::Mutex::new(None),
301 current_schema: parking_lot::Mutex::new(None),
302 time_zone: parking_lot::Mutex::new(None),
303 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
304 viewing_epoch_override: parking_lot::Mutex::new(None),
305 savepoints: parking_lot::Mutex::new(Vec::new()),
306 transaction_nesting_depth: parking_lot::Mutex::new(0),
307 touched_graphs: parking_lot::Mutex::new(Vec::new()),
308 #[cfg(feature = "metrics")]
309 metrics: None,
310 #[cfg(feature = "metrics")]
311 tx_start_time: parking_lot::Mutex::new(None),
312 })
313 }
314
315 #[must_use]
317 pub fn graph_model(&self) -> GraphModel {
318 self.graph_model
319 }
320
321 pub fn use_graph(&self, name: &str) {
325 *self.current_graph.lock() = Some(name.to_string());
326 }
327
328 #[must_use]
330 pub fn current_graph(&self) -> Option<String> {
331 self.current_graph.lock().clone()
332 }
333
334 pub fn set_schema(&self, name: &str) {
338 *self.current_schema.lock() = Some(name.to_string());
339 }
340
341 #[must_use]
345 pub fn current_schema(&self) -> Option<String> {
346 self.current_schema.lock().clone()
347 }
348
349 fn effective_graph_key(&self, graph_name: &str) -> String {
354 let schema = self.current_schema.lock().clone();
355 match schema {
356 Some(s) => format!("{s}/{graph_name}"),
357 None => graph_name.to_string(),
358 }
359 }
360
361 fn effective_type_key(&self, type_name: &str) -> String {
365 let schema = self.current_schema.lock().clone();
366 match schema {
367 Some(s) => format!("{s}/{type_name}"),
368 None => type_name.to_string(),
369 }
370 }
371
372 fn active_graph_storage_key(&self) -> Option<String> {
376 let graph = self.current_graph.lock().clone();
377 let schema = self.current_schema.lock().clone();
378 match (schema, graph) {
379 (_, None) => None,
380 (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
381 (None, Some(name)) => Some(name),
382 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
383 }
384 }
385
386 fn active_store(&self) -> Arc<dyn GraphStoreMut> {
394 let key = self.active_graph_storage_key();
395 match key {
396 None => Arc::clone(&self.graph_store),
397 Some(ref name) => match self.store.graph(name) {
398 Some(named_store) => {
399 #[cfg(feature = "wal")]
400 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
401 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
402 named_store,
403 Arc::clone(wal),
404 name.clone(),
405 Arc::clone(ctx),
406 )) as Arc<dyn GraphStoreMut>;
407 }
408 named_store as Arc<dyn GraphStoreMut>
409 }
410 None => Arc::clone(&self.graph_store),
411 },
412 }
413 }
414
415 fn active_lpg_store(&self) -> Arc<LpgStore> {
420 let key = self.active_graph_storage_key();
421 match key {
422 None => Arc::clone(&self.store),
423 Some(ref name) => self
424 .store
425 .graph(name)
426 .unwrap_or_else(|| Arc::clone(&self.store)),
427 }
428 }
429
430 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
433 match graph_name {
434 None => Arc::clone(&self.store),
435 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
436 Some(name) => self
437 .store
438 .graph(name)
439 .unwrap_or_else(|| Arc::clone(&self.store)),
440 }
441 }
442
443 fn track_graph_touch(&self) {
448 if self.current_transaction.lock().is_some() {
449 let key = self.active_graph_storage_key();
450 let mut touched = self.touched_graphs.lock();
451 if !touched.contains(&key) {
452 touched.push(key);
453 }
454 }
455 }
456
457 pub fn set_time_zone(&self, tz: &str) {
459 *self.time_zone.lock() = Some(tz.to_string());
460 }
461
462 #[must_use]
464 pub fn time_zone(&self) -> Option<String> {
465 self.time_zone.lock().clone()
466 }
467
468 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
470 self.session_params.lock().insert(key.to_string(), value);
471 }
472
473 #[must_use]
475 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
476 self.session_params.lock().get(key).cloned()
477 }
478
479 pub fn reset_session(&self) {
481 *self.current_schema.lock() = None;
482 *self.current_graph.lock() = None;
483 *self.time_zone.lock() = None;
484 self.session_params.lock().clear();
485 *self.viewing_epoch_override.lock() = None;
486 }
487
488 pub fn reset_schema(&self) {
490 *self.current_schema.lock() = None;
491 }
492
493 pub fn reset_graph(&self) {
495 *self.current_graph.lock() = None;
496 }
497
498 pub fn reset_time_zone(&self) {
500 *self.time_zone.lock() = None;
501 }
502
503 pub fn reset_parameters(&self) {
505 self.session_params.lock().clear();
506 }
507
508 pub fn set_viewing_epoch(&self, epoch: EpochId) {
516 *self.viewing_epoch_override.lock() = Some(epoch);
517 }
518
519 pub fn clear_viewing_epoch(&self) {
521 *self.viewing_epoch_override.lock() = None;
522 }
523
524 #[must_use]
526 pub fn viewing_epoch(&self) -> Option<EpochId> {
527 *self.viewing_epoch_override.lock()
528 }
529
530 #[must_use]
534 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
535 self.active_lpg_store().get_node_history(id)
536 }
537
538 #[must_use]
542 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
543 self.active_lpg_store().get_edge_history(id)
544 }
545
546 fn require_lpg(&self, language: &str) -> Result<()> {
548 if self.graph_model == GraphModel::Rdf {
549 return Err(grafeo_common::utils::error::Error::Internal(format!(
550 "This is an RDF database. {language} queries require an LPG database."
551 )));
552 }
553 Ok(())
554 }
555
556 #[cfg(feature = "gql")]
558 fn execute_session_command(
559 &self,
560 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
561 ) -> Result<QueryResult> {
562 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
563 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
564
565 if *self.read_only_tx.lock() {
567 match &cmd {
568 SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
569 return Err(Error::Transaction(
570 grafeo_common::utils::error::TransactionError::ReadOnly,
571 ));
572 }
573 _ => {} }
575 }
576
577 match cmd {
578 SessionCommand::CreateGraph {
579 name,
580 if_not_exists,
581 typed,
582 like_graph,
583 copy_of,
584 open: _,
585 } => {
586 let storage_key = self.effective_graph_key(&name);
588
589 if let Some(ref src) = like_graph {
591 let src_key = self.effective_graph_key(src);
592 if self.store.graph(&src_key).is_none() {
593 return Err(Error::Query(QueryError::new(
594 QueryErrorKind::Semantic,
595 format!("Source graph '{src}' does not exist"),
596 )));
597 }
598 }
599 if let Some(ref src) = copy_of {
600 let src_key = self.effective_graph_key(src);
601 if self.store.graph(&src_key).is_none() {
602 return Err(Error::Query(QueryError::new(
603 QueryErrorKind::Semantic,
604 format!("Source graph '{src}' does not exist"),
605 )));
606 }
607 }
608
609 let created = self
610 .store
611 .create_graph(&storage_key)
612 .map_err(|e| Error::Internal(e.to_string()))?;
613 if !created && !if_not_exists {
614 return Err(Error::Query(QueryError::new(
615 QueryErrorKind::Semantic,
616 format!("Graph '{name}' already exists"),
617 )));
618 }
619 if created {
620 #[cfg(feature = "wal")]
621 self.log_schema_wal(
622 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
623 name: storage_key.clone(),
624 },
625 );
626 }
627
628 if let Some(ref src) = copy_of {
630 let src_key = self.effective_graph_key(src);
631 self.store
632 .copy_graph(Some(&src_key), Some(&storage_key))
633 .map_err(|e| Error::Internal(e.to_string()))?;
634 }
635
636 if let Some(type_name) = typed
640 && let Err(e) = self.catalog.bind_graph_type(
641 &storage_key,
642 if type_name.contains('/') {
643 type_name.clone()
644 } else {
645 self.effective_type_key(&type_name)
646 },
647 )
648 {
649 return Err(Error::Query(QueryError::new(
650 QueryErrorKind::Semantic,
651 e.to_string(),
652 )));
653 }
654
655 if let Some(ref src) = like_graph {
657 let src_key = self.effective_graph_key(src);
658 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
659 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
660 }
661 }
662
663 Ok(QueryResult::empty())
664 }
665 SessionCommand::DropGraph { name, if_exists } => {
666 let storage_key = self.effective_graph_key(&name);
667 let dropped = self.store.drop_graph(&storage_key);
668 if !dropped && !if_exists {
669 return Err(Error::Query(QueryError::new(
670 QueryErrorKind::Semantic,
671 format!("Graph '{name}' does not exist"),
672 )));
673 }
674 if dropped {
675 #[cfg(feature = "wal")]
676 self.log_schema_wal(
677 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
678 name: storage_key.clone(),
679 },
680 );
681 let mut current = self.current_graph.lock();
683 if current
684 .as_deref()
685 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
686 {
687 *current = None;
688 }
689 }
690 Ok(QueryResult::empty())
691 }
692 SessionCommand::UseGraph(name) => {
693 let effective_key = self.effective_graph_key(&name);
695 if !name.eq_ignore_ascii_case("default")
696 && self.store.graph(&effective_key).is_none()
697 {
698 return Err(Error::Query(QueryError::new(
699 QueryErrorKind::Semantic,
700 format!("Graph '{name}' does not exist"),
701 )));
702 }
703 self.use_graph(&name);
704 self.track_graph_touch();
706 Ok(QueryResult::empty())
707 }
708 SessionCommand::SessionSetGraph(name) => {
709 let effective_key = self.effective_graph_key(&name);
711 if !name.eq_ignore_ascii_case("default")
712 && self.store.graph(&effective_key).is_none()
713 {
714 return Err(Error::Query(QueryError::new(
715 QueryErrorKind::Semantic,
716 format!("Graph '{name}' does not exist"),
717 )));
718 }
719 self.use_graph(&name);
720 self.track_graph_touch();
722 Ok(QueryResult::empty())
723 }
724 SessionCommand::SessionSetSchema(name) => {
725 if !self.catalog.schema_exists(&name) {
727 return Err(Error::Query(QueryError::new(
728 QueryErrorKind::Semantic,
729 format!("Schema '{name}' does not exist"),
730 )));
731 }
732 self.set_schema(&name);
733 Ok(QueryResult::empty())
734 }
735 SessionCommand::SessionSetTimeZone(tz) => {
736 self.set_time_zone(&tz);
737 Ok(QueryResult::empty())
738 }
739 SessionCommand::SessionSetParameter(key, expr) => {
740 if key.eq_ignore_ascii_case("viewing_epoch") {
741 match Self::eval_integer_literal(&expr) {
742 Some(n) if n >= 0 => {
743 self.set_viewing_epoch(EpochId::new(n as u64));
744 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
745 }
746 _ => Err(Error::Query(QueryError::new(
747 QueryErrorKind::Semantic,
748 "viewing_epoch must be a non-negative integer literal",
749 ))),
750 }
751 } else {
752 self.set_parameter(&key, Value::Null);
755 Ok(QueryResult::empty())
756 }
757 }
758 SessionCommand::SessionReset(target) => {
759 use grafeo_adapters::query::gql::ast::SessionResetTarget;
760 match target {
761 SessionResetTarget::All => self.reset_session(),
762 SessionResetTarget::Schema => self.reset_schema(),
763 SessionResetTarget::Graph => self.reset_graph(),
764 SessionResetTarget::TimeZone => self.reset_time_zone(),
765 SessionResetTarget::Parameters => self.reset_parameters(),
766 }
767 Ok(QueryResult::empty())
768 }
769 SessionCommand::SessionClose => {
770 self.reset_session();
771 Ok(QueryResult::empty())
772 }
773 SessionCommand::StartTransaction {
774 read_only,
775 isolation_level,
776 } => {
777 let engine_level = isolation_level.map(|l| match l {
778 TransactionIsolationLevel::ReadCommitted => {
779 crate::transaction::IsolationLevel::ReadCommitted
780 }
781 TransactionIsolationLevel::SnapshotIsolation => {
782 crate::transaction::IsolationLevel::SnapshotIsolation
783 }
784 TransactionIsolationLevel::Serializable => {
785 crate::transaction::IsolationLevel::Serializable
786 }
787 });
788 self.begin_transaction_inner(read_only, engine_level)?;
789 Ok(QueryResult::status("Transaction started"))
790 }
791 SessionCommand::Commit => {
792 self.commit_inner()?;
793 Ok(QueryResult::status("Transaction committed"))
794 }
795 SessionCommand::Rollback => {
796 self.rollback_inner()?;
797 Ok(QueryResult::status("Transaction rolled back"))
798 }
799 SessionCommand::Savepoint(name) => {
800 self.savepoint(&name)?;
801 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
802 }
803 SessionCommand::RollbackToSavepoint(name) => {
804 self.rollback_to_savepoint(&name)?;
805 Ok(QueryResult::status(format!(
806 "Rolled back to savepoint '{name}'"
807 )))
808 }
809 SessionCommand::ReleaseSavepoint(name) => {
810 self.release_savepoint(&name)?;
811 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
812 }
813 }
814 }
815
816 #[cfg(feature = "wal")]
818 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
819 if let Some(ref wal) = self.wal
820 && let Err(e) = wal.log(record)
821 {
822 grafeo_warn!("Failed to log schema change to WAL: {}", e);
823 }
824 }
825
826 #[cfg(feature = "gql")]
828 fn execute_schema_command(
829 &self,
830 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
831 ) -> Result<QueryResult> {
832 use crate::catalog::{
833 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
834 };
835 use grafeo_adapters::query::gql::ast::SchemaStatement;
836 #[cfg(feature = "wal")]
837 use grafeo_adapters::storage::wal::WalRecord;
838 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
839
840 macro_rules! wal_log {
842 ($self:expr, $record:expr) => {
843 #[cfg(feature = "wal")]
844 $self.log_schema_wal(&$record);
845 };
846 }
847
848 let result = match cmd {
849 SchemaStatement::CreateNodeType(stmt) => {
850 let effective_name = self.effective_type_key(&stmt.name);
851 #[cfg(feature = "wal")]
852 let props_for_wal: Vec<(String, String, bool)> = stmt
853 .properties
854 .iter()
855 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
856 .collect();
857 let def = NodeTypeDefinition {
858 name: effective_name.clone(),
859 properties: stmt
860 .properties
861 .iter()
862 .map(|p| TypedProperty {
863 name: p.name.clone(),
864 data_type: PropertyDataType::from_type_name(&p.data_type),
865 nullable: p.nullable,
866 default_value: p
867 .default_value
868 .as_ref()
869 .map(|s| parse_default_literal(s)),
870 })
871 .collect(),
872 constraints: Vec::new(),
873 parent_types: stmt.parent_types.clone(),
874 };
875 let result = if stmt.or_replace {
876 let _ = self.catalog.drop_node_type(&effective_name);
877 self.catalog.register_node_type(def)
878 } else {
879 self.catalog.register_node_type(def)
880 };
881 match result {
882 Ok(()) => {
883 wal_log!(
884 self,
885 WalRecord::CreateNodeType {
886 name: effective_name.clone(),
887 properties: props_for_wal,
888 constraints: Vec::new(),
889 }
890 );
891 Ok(QueryResult::status(format!(
892 "Created node type '{}'",
893 stmt.name
894 )))
895 }
896 Err(e) if stmt.if_not_exists => {
897 let _ = e;
898 Ok(QueryResult::status("No change"))
899 }
900 Err(e) => Err(Error::Query(QueryError::new(
901 QueryErrorKind::Semantic,
902 e.to_string(),
903 ))),
904 }
905 }
906 SchemaStatement::CreateEdgeType(stmt) => {
907 let effective_name = self.effective_type_key(&stmt.name);
908 #[cfg(feature = "wal")]
909 let props_for_wal: Vec<(String, String, bool)> = stmt
910 .properties
911 .iter()
912 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
913 .collect();
914 let def = EdgeTypeDefinition {
915 name: effective_name.clone(),
916 properties: stmt
917 .properties
918 .iter()
919 .map(|p| TypedProperty {
920 name: p.name.clone(),
921 data_type: PropertyDataType::from_type_name(&p.data_type),
922 nullable: p.nullable,
923 default_value: p
924 .default_value
925 .as_ref()
926 .map(|s| parse_default_literal(s)),
927 })
928 .collect(),
929 constraints: Vec::new(),
930 source_node_types: stmt.source_node_types.clone(),
931 target_node_types: stmt.target_node_types.clone(),
932 };
933 let result = if stmt.or_replace {
934 let _ = self.catalog.drop_edge_type_def(&effective_name);
935 self.catalog.register_edge_type_def(def)
936 } else {
937 self.catalog.register_edge_type_def(def)
938 };
939 match result {
940 Ok(()) => {
941 wal_log!(
942 self,
943 WalRecord::CreateEdgeType {
944 name: effective_name.clone(),
945 properties: props_for_wal,
946 constraints: Vec::new(),
947 }
948 );
949 Ok(QueryResult::status(format!(
950 "Created edge type '{}'",
951 stmt.name
952 )))
953 }
954 Err(e) if stmt.if_not_exists => {
955 let _ = e;
956 Ok(QueryResult::status("No change"))
957 }
958 Err(e) => Err(Error::Query(QueryError::new(
959 QueryErrorKind::Semantic,
960 e.to_string(),
961 ))),
962 }
963 }
964 SchemaStatement::CreateVectorIndex(stmt) => {
965 Self::create_vector_index_on_store(
966 &self.active_lpg_store(),
967 &stmt.node_label,
968 &stmt.property,
969 stmt.dimensions,
970 stmt.metric.as_deref(),
971 )?;
972 wal_log!(
973 self,
974 WalRecord::CreateIndex {
975 name: stmt.name.clone(),
976 label: stmt.node_label.clone(),
977 property: stmt.property.clone(),
978 index_type: "vector".to_string(),
979 }
980 );
981 Ok(QueryResult::status(format!(
982 "Created vector index '{}'",
983 stmt.name
984 )))
985 }
986 SchemaStatement::DropNodeType { name, if_exists } => {
987 let effective_name = self.effective_type_key(&name);
988 match self.catalog.drop_node_type(&effective_name) {
989 Ok(()) => {
990 wal_log!(
991 self,
992 WalRecord::DropNodeType {
993 name: effective_name
994 }
995 );
996 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
997 }
998 Err(e) if if_exists => {
999 let _ = e;
1000 Ok(QueryResult::status("No change"))
1001 }
1002 Err(e) => Err(Error::Query(QueryError::new(
1003 QueryErrorKind::Semantic,
1004 e.to_string(),
1005 ))),
1006 }
1007 }
1008 SchemaStatement::DropEdgeType { name, if_exists } => {
1009 let effective_name = self.effective_type_key(&name);
1010 match self.catalog.drop_edge_type_def(&effective_name) {
1011 Ok(()) => {
1012 wal_log!(
1013 self,
1014 WalRecord::DropEdgeType {
1015 name: effective_name
1016 }
1017 );
1018 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1019 }
1020 Err(e) if if_exists => {
1021 let _ = e;
1022 Ok(QueryResult::status("No change"))
1023 }
1024 Err(e) => Err(Error::Query(QueryError::new(
1025 QueryErrorKind::Semantic,
1026 e.to_string(),
1027 ))),
1028 }
1029 }
1030 SchemaStatement::CreateIndex(stmt) => {
1031 use grafeo_adapters::query::gql::ast::IndexKind;
1032 let active = self.active_lpg_store();
1033 let index_type_str = match stmt.index_kind {
1034 IndexKind::Property => "property",
1035 IndexKind::BTree => "btree",
1036 IndexKind::Text => "text",
1037 IndexKind::Vector => "vector",
1038 };
1039 match stmt.index_kind {
1040 IndexKind::Property | IndexKind::BTree => {
1041 for prop in &stmt.properties {
1042 active.create_property_index(prop);
1043 }
1044 }
1045 IndexKind::Text => {
1046 for prop in &stmt.properties {
1047 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1048 }
1049 }
1050 IndexKind::Vector => {
1051 for prop in &stmt.properties {
1052 Self::create_vector_index_on_store(
1053 &active,
1054 &stmt.label,
1055 prop,
1056 stmt.options.dimensions,
1057 stmt.options.metric.as_deref(),
1058 )?;
1059 }
1060 }
1061 }
1062 #[cfg(feature = "wal")]
1063 for prop in &stmt.properties {
1064 wal_log!(
1065 self,
1066 WalRecord::CreateIndex {
1067 name: stmt.name.clone(),
1068 label: stmt.label.clone(),
1069 property: prop.clone(),
1070 index_type: index_type_str.to_string(),
1071 }
1072 );
1073 }
1074 Ok(QueryResult::status(format!(
1075 "Created {} index '{}'",
1076 index_type_str, stmt.name
1077 )))
1078 }
1079 SchemaStatement::DropIndex { name, if_exists } => {
1080 let dropped = self.active_lpg_store().drop_property_index(&name);
1082 if dropped || if_exists {
1083 if dropped {
1084 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1085 }
1086 Ok(QueryResult::status(if dropped {
1087 format!("Dropped index '{name}'")
1088 } else {
1089 "No change".to_string()
1090 }))
1091 } else {
1092 Err(Error::Query(QueryError::new(
1093 QueryErrorKind::Semantic,
1094 format!("Index '{name}' does not exist"),
1095 )))
1096 }
1097 }
1098 SchemaStatement::CreateConstraint(stmt) => {
1099 use crate::catalog::TypeConstraint;
1100 use grafeo_adapters::query::gql::ast::ConstraintKind;
1101 let kind_str = match stmt.constraint_kind {
1102 ConstraintKind::Unique => "unique",
1103 ConstraintKind::NodeKey => "node_key",
1104 ConstraintKind::NotNull => "not_null",
1105 ConstraintKind::Exists => "exists",
1106 };
1107 let constraint_name = stmt
1108 .name
1109 .clone()
1110 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1111
1112 match stmt.constraint_kind {
1114 ConstraintKind::Unique => {
1115 for prop in &stmt.properties {
1116 let label_id = self.catalog.get_or_create_label(&stmt.label);
1117 let prop_id = self.catalog.get_or_create_property_key(prop);
1118 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1119 }
1120 let _ = self.catalog.add_constraint_to_type(
1121 &stmt.label,
1122 TypeConstraint::Unique(stmt.properties.clone()),
1123 );
1124 }
1125 ConstraintKind::NodeKey => {
1126 for prop in &stmt.properties {
1127 let label_id = self.catalog.get_or_create_label(&stmt.label);
1128 let prop_id = self.catalog.get_or_create_property_key(prop);
1129 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1130 let _ = self.catalog.add_required_property(label_id, prop_id);
1131 }
1132 let _ = self.catalog.add_constraint_to_type(
1133 &stmt.label,
1134 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1135 );
1136 }
1137 ConstraintKind::NotNull | ConstraintKind::Exists => {
1138 for prop in &stmt.properties {
1139 let label_id = self.catalog.get_or_create_label(&stmt.label);
1140 let prop_id = self.catalog.get_or_create_property_key(prop);
1141 let _ = self.catalog.add_required_property(label_id, prop_id);
1142 let _ = self.catalog.add_constraint_to_type(
1143 &stmt.label,
1144 TypeConstraint::NotNull(prop.clone()),
1145 );
1146 }
1147 }
1148 }
1149
1150 wal_log!(
1151 self,
1152 WalRecord::CreateConstraint {
1153 name: constraint_name.clone(),
1154 label: stmt.label.clone(),
1155 properties: stmt.properties.clone(),
1156 kind: kind_str.to_string(),
1157 }
1158 );
1159 Ok(QueryResult::status(format!(
1160 "Created {kind_str} constraint '{constraint_name}'"
1161 )))
1162 }
1163 SchemaStatement::DropConstraint { name, if_exists } => {
1164 let _ = if_exists;
1165 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1166 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1167 }
1168 SchemaStatement::CreateGraphType(stmt) => {
1169 use crate::catalog::GraphTypeDefinition;
1170 use grafeo_adapters::query::gql::ast::InlineElementType;
1171
1172 let effective_name = self.effective_type_key(&stmt.name);
1173
1174 let (mut node_types, mut edge_types, open) =
1176 if let Some(ref like_graph) = stmt.like_graph {
1177 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1179 if let Some(existing) = self
1180 .catalog
1181 .schema()
1182 .and_then(|s| s.get_graph_type(&type_name))
1183 {
1184 (
1185 existing.allowed_node_types.clone(),
1186 existing.allowed_edge_types.clone(),
1187 existing.open,
1188 )
1189 } else {
1190 (Vec::new(), Vec::new(), true)
1191 }
1192 } else {
1193 let nt = self.catalog.all_node_type_names();
1195 let et = self.catalog.all_edge_type_names();
1196 if nt.is_empty() && et.is_empty() {
1197 (Vec::new(), Vec::new(), true)
1198 } else {
1199 (nt, et, false)
1200 }
1201 }
1202 } else {
1203 let nt = stmt
1205 .node_types
1206 .iter()
1207 .map(|n| self.effective_type_key(n))
1208 .collect();
1209 let et = stmt
1210 .edge_types
1211 .iter()
1212 .map(|n| self.effective_type_key(n))
1213 .collect();
1214 (nt, et, stmt.open)
1215 };
1216
1217 for inline in &stmt.inline_types {
1219 match inline {
1220 InlineElementType::Node {
1221 name,
1222 properties,
1223 key_labels,
1224 ..
1225 } => {
1226 let inline_effective = self.effective_type_key(name);
1227 let def = NodeTypeDefinition {
1228 name: inline_effective.clone(),
1229 properties: properties
1230 .iter()
1231 .map(|p| TypedProperty {
1232 name: p.name.clone(),
1233 data_type: PropertyDataType::from_type_name(&p.data_type),
1234 nullable: p.nullable,
1235 default_value: None,
1236 })
1237 .collect(),
1238 constraints: Vec::new(),
1239 parent_types: key_labels.clone(),
1240 };
1241 self.catalog.register_or_replace_node_type(def);
1243 #[cfg(feature = "wal")]
1244 {
1245 let props_for_wal: Vec<(String, String, bool)> = properties
1246 .iter()
1247 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1248 .collect();
1249 self.log_schema_wal(&WalRecord::CreateNodeType {
1250 name: inline_effective.clone(),
1251 properties: props_for_wal,
1252 constraints: Vec::new(),
1253 });
1254 }
1255 if !node_types.contains(&inline_effective) {
1256 node_types.push(inline_effective);
1257 }
1258 }
1259 InlineElementType::Edge {
1260 name,
1261 properties,
1262 source_node_types,
1263 target_node_types,
1264 ..
1265 } => {
1266 let inline_effective = self.effective_type_key(name);
1267 let def = EdgeTypeDefinition {
1268 name: inline_effective.clone(),
1269 properties: properties
1270 .iter()
1271 .map(|p| TypedProperty {
1272 name: p.name.clone(),
1273 data_type: PropertyDataType::from_type_name(&p.data_type),
1274 nullable: p.nullable,
1275 default_value: None,
1276 })
1277 .collect(),
1278 constraints: Vec::new(),
1279 source_node_types: source_node_types.clone(),
1280 target_node_types: target_node_types.clone(),
1281 };
1282 self.catalog.register_or_replace_edge_type_def(def);
1283 #[cfg(feature = "wal")]
1284 {
1285 let props_for_wal: Vec<(String, String, bool)> = properties
1286 .iter()
1287 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1288 .collect();
1289 self.log_schema_wal(&WalRecord::CreateEdgeType {
1290 name: inline_effective.clone(),
1291 properties: props_for_wal,
1292 constraints: Vec::new(),
1293 });
1294 }
1295 if !edge_types.contains(&inline_effective) {
1296 edge_types.push(inline_effective);
1297 }
1298 }
1299 }
1300 }
1301
1302 let def = GraphTypeDefinition {
1303 name: effective_name.clone(),
1304 allowed_node_types: node_types.clone(),
1305 allowed_edge_types: edge_types.clone(),
1306 open,
1307 };
1308 let result = if stmt.or_replace {
1309 let _ = self.catalog.drop_graph_type(&effective_name);
1311 self.catalog.register_graph_type(def)
1312 } else {
1313 self.catalog.register_graph_type(def)
1314 };
1315 match result {
1316 Ok(()) => {
1317 wal_log!(
1318 self,
1319 WalRecord::CreateGraphType {
1320 name: effective_name.clone(),
1321 node_types,
1322 edge_types,
1323 open,
1324 }
1325 );
1326 Ok(QueryResult::status(format!(
1327 "Created graph type '{}'",
1328 stmt.name
1329 )))
1330 }
1331 Err(e) if stmt.if_not_exists => {
1332 let _ = e;
1333 Ok(QueryResult::status("No change"))
1334 }
1335 Err(e) => Err(Error::Query(QueryError::new(
1336 QueryErrorKind::Semantic,
1337 e.to_string(),
1338 ))),
1339 }
1340 }
1341 SchemaStatement::DropGraphType { name, if_exists } => {
1342 let effective_name = self.effective_type_key(&name);
1343 match self.catalog.drop_graph_type(&effective_name) {
1344 Ok(()) => {
1345 wal_log!(
1346 self,
1347 WalRecord::DropGraphType {
1348 name: effective_name
1349 }
1350 );
1351 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1352 }
1353 Err(e) if if_exists => {
1354 let _ = e;
1355 Ok(QueryResult::status("No change"))
1356 }
1357 Err(e) => Err(Error::Query(QueryError::new(
1358 QueryErrorKind::Semantic,
1359 e.to_string(),
1360 ))),
1361 }
1362 }
1363 SchemaStatement::CreateSchema {
1364 name,
1365 if_not_exists,
1366 } => match self.catalog.register_schema_namespace(name.clone()) {
1367 Ok(()) => {
1368 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1369 Ok(QueryResult::status(format!("Created schema '{name}'")))
1370 }
1371 Err(e) if if_not_exists => {
1372 let _ = e;
1373 Ok(QueryResult::status("No change"))
1374 }
1375 Err(e) => Err(Error::Query(QueryError::new(
1376 QueryErrorKind::Semantic,
1377 e.to_string(),
1378 ))),
1379 },
1380 SchemaStatement::DropSchema { name, if_exists } => {
1381 let prefix = format!("{name}/");
1383 let has_graphs = self
1384 .store
1385 .graph_names()
1386 .iter()
1387 .any(|g| g.starts_with(&prefix));
1388 let has_types = self
1389 .catalog
1390 .all_node_type_names()
1391 .iter()
1392 .any(|n| n.starts_with(&prefix))
1393 || self
1394 .catalog
1395 .all_edge_type_names()
1396 .iter()
1397 .any(|n| n.starts_with(&prefix))
1398 || self
1399 .catalog
1400 .all_graph_type_names()
1401 .iter()
1402 .any(|n| n.starts_with(&prefix));
1403 if has_graphs || has_types {
1404 return Err(Error::Query(QueryError::new(
1405 QueryErrorKind::Semantic,
1406 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1407 )));
1408 }
1409 match self.catalog.drop_schema_namespace(&name) {
1410 Ok(()) => {
1411 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1412 let mut current = self.current_schema.lock();
1414 if current
1415 .as_deref()
1416 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1417 {
1418 *current = None;
1419 }
1420 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1421 }
1422 Err(e) if if_exists => {
1423 let _ = e;
1424 Ok(QueryResult::status("No change"))
1425 }
1426 Err(e) => Err(Error::Query(QueryError::new(
1427 QueryErrorKind::Semantic,
1428 e.to_string(),
1429 ))),
1430 }
1431 }
1432 SchemaStatement::AlterNodeType(stmt) => {
1433 use grafeo_adapters::query::gql::ast::TypeAlteration;
1434 let effective_name = self.effective_type_key(&stmt.name);
1435 let mut wal_alts = Vec::new();
1436 for alt in &stmt.alterations {
1437 match alt {
1438 TypeAlteration::AddProperty(prop) => {
1439 let typed = TypedProperty {
1440 name: prop.name.clone(),
1441 data_type: PropertyDataType::from_type_name(&prop.data_type),
1442 nullable: prop.nullable,
1443 default_value: prop
1444 .default_value
1445 .as_ref()
1446 .map(|s| parse_default_literal(s)),
1447 };
1448 self.catalog
1449 .alter_node_type_add_property(&effective_name, typed)
1450 .map_err(|e| {
1451 Error::Query(QueryError::new(
1452 QueryErrorKind::Semantic,
1453 e.to_string(),
1454 ))
1455 })?;
1456 wal_alts.push((
1457 "add".to_string(),
1458 prop.name.clone(),
1459 prop.data_type.clone(),
1460 prop.nullable,
1461 ));
1462 }
1463 TypeAlteration::DropProperty(name) => {
1464 self.catalog
1465 .alter_node_type_drop_property(&effective_name, name)
1466 .map_err(|e| {
1467 Error::Query(QueryError::new(
1468 QueryErrorKind::Semantic,
1469 e.to_string(),
1470 ))
1471 })?;
1472 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1473 }
1474 }
1475 }
1476 wal_log!(
1477 self,
1478 WalRecord::AlterNodeType {
1479 name: effective_name,
1480 alterations: wal_alts,
1481 }
1482 );
1483 Ok(QueryResult::status(format!(
1484 "Altered node type '{}'",
1485 stmt.name
1486 )))
1487 }
1488 SchemaStatement::AlterEdgeType(stmt) => {
1489 use grafeo_adapters::query::gql::ast::TypeAlteration;
1490 let effective_name = self.effective_type_key(&stmt.name);
1491 let mut wal_alts = Vec::new();
1492 for alt in &stmt.alterations {
1493 match alt {
1494 TypeAlteration::AddProperty(prop) => {
1495 let typed = TypedProperty {
1496 name: prop.name.clone(),
1497 data_type: PropertyDataType::from_type_name(&prop.data_type),
1498 nullable: prop.nullable,
1499 default_value: prop
1500 .default_value
1501 .as_ref()
1502 .map(|s| parse_default_literal(s)),
1503 };
1504 self.catalog
1505 .alter_edge_type_add_property(&effective_name, typed)
1506 .map_err(|e| {
1507 Error::Query(QueryError::new(
1508 QueryErrorKind::Semantic,
1509 e.to_string(),
1510 ))
1511 })?;
1512 wal_alts.push((
1513 "add".to_string(),
1514 prop.name.clone(),
1515 prop.data_type.clone(),
1516 prop.nullable,
1517 ));
1518 }
1519 TypeAlteration::DropProperty(name) => {
1520 self.catalog
1521 .alter_edge_type_drop_property(&effective_name, name)
1522 .map_err(|e| {
1523 Error::Query(QueryError::new(
1524 QueryErrorKind::Semantic,
1525 e.to_string(),
1526 ))
1527 })?;
1528 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1529 }
1530 }
1531 }
1532 wal_log!(
1533 self,
1534 WalRecord::AlterEdgeType {
1535 name: effective_name,
1536 alterations: wal_alts,
1537 }
1538 );
1539 Ok(QueryResult::status(format!(
1540 "Altered edge type '{}'",
1541 stmt.name
1542 )))
1543 }
1544 SchemaStatement::AlterGraphType(stmt) => {
1545 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1546 let effective_name = self.effective_type_key(&stmt.name);
1547 let mut wal_alts = Vec::new();
1548 for alt in &stmt.alterations {
1549 match alt {
1550 GraphTypeAlteration::AddNodeType(name) => {
1551 self.catalog
1552 .alter_graph_type_add_node_type(&effective_name, name.clone())
1553 .map_err(|e| {
1554 Error::Query(QueryError::new(
1555 QueryErrorKind::Semantic,
1556 e.to_string(),
1557 ))
1558 })?;
1559 wal_alts.push(("add_node_type".to_string(), name.clone()));
1560 }
1561 GraphTypeAlteration::DropNodeType(name) => {
1562 self.catalog
1563 .alter_graph_type_drop_node_type(&effective_name, name)
1564 .map_err(|e| {
1565 Error::Query(QueryError::new(
1566 QueryErrorKind::Semantic,
1567 e.to_string(),
1568 ))
1569 })?;
1570 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1571 }
1572 GraphTypeAlteration::AddEdgeType(name) => {
1573 self.catalog
1574 .alter_graph_type_add_edge_type(&effective_name, name.clone())
1575 .map_err(|e| {
1576 Error::Query(QueryError::new(
1577 QueryErrorKind::Semantic,
1578 e.to_string(),
1579 ))
1580 })?;
1581 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1582 }
1583 GraphTypeAlteration::DropEdgeType(name) => {
1584 self.catalog
1585 .alter_graph_type_drop_edge_type(&effective_name, name)
1586 .map_err(|e| {
1587 Error::Query(QueryError::new(
1588 QueryErrorKind::Semantic,
1589 e.to_string(),
1590 ))
1591 })?;
1592 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1593 }
1594 }
1595 }
1596 wal_log!(
1597 self,
1598 WalRecord::AlterGraphType {
1599 name: effective_name,
1600 alterations: wal_alts,
1601 }
1602 );
1603 Ok(QueryResult::status(format!(
1604 "Altered graph type '{}'",
1605 stmt.name
1606 )))
1607 }
1608 SchemaStatement::CreateProcedure(stmt) => {
1609 use crate::catalog::ProcedureDefinition;
1610
1611 let def = ProcedureDefinition {
1612 name: stmt.name.clone(),
1613 params: stmt
1614 .params
1615 .iter()
1616 .map(|p| (p.name.clone(), p.param_type.clone()))
1617 .collect(),
1618 returns: stmt
1619 .returns
1620 .iter()
1621 .map(|r| (r.name.clone(), r.return_type.clone()))
1622 .collect(),
1623 body: stmt.body.clone(),
1624 };
1625
1626 if stmt.or_replace {
1627 self.catalog.replace_procedure(def).map_err(|e| {
1628 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1629 })?;
1630 } else {
1631 match self.catalog.register_procedure(def) {
1632 Ok(()) => {}
1633 Err(_) if stmt.if_not_exists => {
1634 return Ok(QueryResult::empty());
1635 }
1636 Err(e) => {
1637 return Err(Error::Query(QueryError::new(
1638 QueryErrorKind::Semantic,
1639 e.to_string(),
1640 )));
1641 }
1642 }
1643 }
1644
1645 wal_log!(
1646 self,
1647 WalRecord::CreateProcedure {
1648 name: stmt.name.clone(),
1649 params: stmt
1650 .params
1651 .iter()
1652 .map(|p| (p.name.clone(), p.param_type.clone()))
1653 .collect(),
1654 returns: stmt
1655 .returns
1656 .iter()
1657 .map(|r| (r.name.clone(), r.return_type.clone()))
1658 .collect(),
1659 body: stmt.body,
1660 }
1661 );
1662 Ok(QueryResult::status(format!(
1663 "Created procedure '{}'",
1664 stmt.name
1665 )))
1666 }
1667 SchemaStatement::DropProcedure { name, if_exists } => {
1668 match self.catalog.drop_procedure(&name) {
1669 Ok(()) => {}
1670 Err(_) if if_exists => {
1671 return Ok(QueryResult::empty());
1672 }
1673 Err(e) => {
1674 return Err(Error::Query(QueryError::new(
1675 QueryErrorKind::Semantic,
1676 e.to_string(),
1677 )));
1678 }
1679 }
1680 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1681 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1682 }
1683 SchemaStatement::ShowIndexes => {
1684 return self.execute_show_indexes();
1685 }
1686 SchemaStatement::ShowConstraints => {
1687 return self.execute_show_constraints();
1688 }
1689 SchemaStatement::ShowNodeTypes => {
1690 return self.execute_show_node_types();
1691 }
1692 SchemaStatement::ShowEdgeTypes => {
1693 return self.execute_show_edge_types();
1694 }
1695 SchemaStatement::ShowGraphTypes => {
1696 return self.execute_show_graph_types();
1697 }
1698 SchemaStatement::ShowGraphType(name) => {
1699 return self.execute_show_graph_type(&name);
1700 }
1701 SchemaStatement::ShowCurrentGraphType => {
1702 return self.execute_show_current_graph_type();
1703 }
1704 SchemaStatement::ShowGraphs => {
1705 return self.execute_show_graphs();
1706 }
1707 SchemaStatement::ShowSchemas => {
1708 return self.execute_show_schemas();
1709 }
1710 };
1711
1712 if result.is_ok() {
1715 self.query_cache.clear();
1716 }
1717
1718 result
1719 }
1720
1721 #[cfg(all(feature = "gql", feature = "vector-index"))]
1723 fn create_vector_index_on_store(
1724 store: &LpgStore,
1725 label: &str,
1726 property: &str,
1727 dimensions: Option<usize>,
1728 metric: Option<&str>,
1729 ) -> Result<()> {
1730 use grafeo_common::types::{PropertyKey, Value};
1731 use grafeo_common::utils::error::Error;
1732 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1733
1734 let metric = match metric {
1735 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1736 Error::Internal(format!(
1737 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1738 ))
1739 })?,
1740 None => DistanceMetric::Cosine,
1741 };
1742
1743 let prop_key = PropertyKey::new(property);
1744 let mut found_dims: Option<usize> = dimensions;
1745 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1746
1747 for node in store.nodes_with_label(label) {
1748 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1749 if let Some(expected) = found_dims {
1750 if v.len() != expected {
1751 return Err(Error::Internal(format!(
1752 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1753 v.len(),
1754 node.id.0
1755 )));
1756 }
1757 } else {
1758 found_dims = Some(v.len());
1759 }
1760 vectors.push((node.id, v.to_vec()));
1761 }
1762 }
1763
1764 let Some(dims) = found_dims else {
1765 return Err(Error::Internal(format!(
1766 "No vector properties found on :{label}({property}) and no dimensions specified"
1767 )));
1768 };
1769
1770 let config = HnswConfig::new(dims, metric);
1771 let index = HnswIndex::with_capacity(config, vectors.len());
1772 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1773 for (node_id, vec) in &vectors {
1774 index.insert(*node_id, vec, &accessor);
1775 }
1776
1777 store.add_vector_index(label, property, Arc::new(index));
1778 Ok(())
1779 }
1780
1781 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1783 fn create_vector_index_on_store(
1784 _store: &LpgStore,
1785 _label: &str,
1786 _property: &str,
1787 _dimensions: Option<usize>,
1788 _metric: Option<&str>,
1789 ) -> Result<()> {
1790 Err(grafeo_common::utils::error::Error::Internal(
1791 "Vector index support requires the 'vector-index' feature".to_string(),
1792 ))
1793 }
1794
1795 #[cfg(all(feature = "gql", feature = "text-index"))]
1797 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1798 use grafeo_common::types::{PropertyKey, Value};
1799 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1800
1801 let mut index = InvertedIndex::new(BM25Config::default());
1802 let prop_key = PropertyKey::new(property);
1803
1804 let nodes = store.nodes_by_label(label);
1805 for node_id in nodes {
1806 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1807 index.insert(node_id, text.as_str());
1808 }
1809 }
1810
1811 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1812 Ok(())
1813 }
1814
1815 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1817 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1818 Err(grafeo_common::utils::error::Error::Internal(
1819 "Text index support requires the 'text-index' feature".to_string(),
1820 ))
1821 }
1822
1823 fn execute_show_indexes(&self) -> Result<QueryResult> {
1825 let indexes = self.catalog.all_indexes();
1826 let columns = vec![
1827 "name".to_string(),
1828 "type".to_string(),
1829 "label".to_string(),
1830 "property".to_string(),
1831 ];
1832 let rows: Vec<Vec<Value>> = indexes
1833 .into_iter()
1834 .map(|def| {
1835 let label_name = self
1836 .catalog
1837 .get_label_name(def.label)
1838 .unwrap_or_else(|| "?".into());
1839 let prop_name = self
1840 .catalog
1841 .get_property_key_name(def.property_key)
1842 .unwrap_or_else(|| "?".into());
1843 vec![
1844 Value::from(format!("idx_{}_{}", label_name, prop_name)),
1845 Value::from(format!("{:?}", def.index_type)),
1846 Value::from(&*label_name),
1847 Value::from(&*prop_name),
1848 ]
1849 })
1850 .collect();
1851 Ok(QueryResult {
1852 columns,
1853 column_types: Vec::new(),
1854 rows,
1855 ..QueryResult::empty()
1856 })
1857 }
1858
1859 fn execute_show_constraints(&self) -> Result<QueryResult> {
1861 Ok(QueryResult {
1864 columns: vec![
1865 "name".to_string(),
1866 "type".to_string(),
1867 "label".to_string(),
1868 "properties".to_string(),
1869 ],
1870 column_types: Vec::new(),
1871 rows: Vec::new(),
1872 ..QueryResult::empty()
1873 })
1874 }
1875
1876 fn execute_show_node_types(&self) -> Result<QueryResult> {
1878 let columns = vec![
1879 "name".to_string(),
1880 "properties".to_string(),
1881 "constraints".to_string(),
1882 "parents".to_string(),
1883 ];
1884 let schema = self.current_schema.lock().clone();
1885 let all_names = self.catalog.all_node_type_names();
1886 let type_names: Vec<String> = match &schema {
1887 Some(s) => {
1888 let prefix = format!("{s}/");
1889 all_names
1890 .into_iter()
1891 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1892 .collect()
1893 }
1894 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1895 };
1896 let rows: Vec<Vec<Value>> = type_names
1897 .into_iter()
1898 .filter_map(|name| {
1899 let lookup = match &schema {
1900 Some(s) => format!("{s}/{name}"),
1901 None => name.clone(),
1902 };
1903 let def = self.catalog.get_node_type(&lookup)?;
1904 let props: Vec<String> = def
1905 .properties
1906 .iter()
1907 .map(|p| {
1908 let nullable = if p.nullable { "" } else { " NOT NULL" };
1909 format!("{} {}{}", p.name, p.data_type, nullable)
1910 })
1911 .collect();
1912 let constraints: Vec<String> =
1913 def.constraints.iter().map(|c| format!("{c:?}")).collect();
1914 let parents = def.parent_types.join(", ");
1915 Some(vec![
1916 Value::from(name),
1917 Value::from(props.join(", ")),
1918 Value::from(constraints.join(", ")),
1919 Value::from(parents),
1920 ])
1921 })
1922 .collect();
1923 Ok(QueryResult {
1924 columns,
1925 column_types: Vec::new(),
1926 rows,
1927 ..QueryResult::empty()
1928 })
1929 }
1930
1931 fn execute_show_edge_types(&self) -> Result<QueryResult> {
1933 let columns = vec![
1934 "name".to_string(),
1935 "properties".to_string(),
1936 "source_types".to_string(),
1937 "target_types".to_string(),
1938 ];
1939 let schema = self.current_schema.lock().clone();
1940 let all_names = self.catalog.all_edge_type_names();
1941 let type_names: Vec<String> = match &schema {
1942 Some(s) => {
1943 let prefix = format!("{s}/");
1944 all_names
1945 .into_iter()
1946 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1947 .collect()
1948 }
1949 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1950 };
1951 let rows: Vec<Vec<Value>> = type_names
1952 .into_iter()
1953 .filter_map(|name| {
1954 let lookup = match &schema {
1955 Some(s) => format!("{s}/{name}"),
1956 None => name.clone(),
1957 };
1958 let def = self.catalog.get_edge_type_def(&lookup)?;
1959 let props: Vec<String> = def
1960 .properties
1961 .iter()
1962 .map(|p| {
1963 let nullable = if p.nullable { "" } else { " NOT NULL" };
1964 format!("{} {}{}", p.name, p.data_type, nullable)
1965 })
1966 .collect();
1967 let src = def.source_node_types.join(", ");
1968 let tgt = def.target_node_types.join(", ");
1969 Some(vec![
1970 Value::from(name),
1971 Value::from(props.join(", ")),
1972 Value::from(src),
1973 Value::from(tgt),
1974 ])
1975 })
1976 .collect();
1977 Ok(QueryResult {
1978 columns,
1979 column_types: Vec::new(),
1980 rows,
1981 ..QueryResult::empty()
1982 })
1983 }
1984
1985 fn execute_show_graph_types(&self) -> Result<QueryResult> {
1987 let columns = vec![
1988 "name".to_string(),
1989 "open".to_string(),
1990 "node_types".to_string(),
1991 "edge_types".to_string(),
1992 ];
1993 let schema = self.current_schema.lock().clone();
1994 let all_names = self.catalog.all_graph_type_names();
1995 let type_names: Vec<String> = match &schema {
1996 Some(s) => {
1997 let prefix = format!("{s}/");
1998 all_names
1999 .into_iter()
2000 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2001 .collect()
2002 }
2003 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2004 };
2005 let rows: Vec<Vec<Value>> = type_names
2006 .into_iter()
2007 .filter_map(|name| {
2008 let lookup = match &schema {
2009 Some(s) => format!("{s}/{name}"),
2010 None => name.clone(),
2011 };
2012 let def = self.catalog.get_graph_type_def(&lookup)?;
2013 let strip = |n: &String| -> String {
2015 match &schema {
2016 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2017 None => n.clone(),
2018 }
2019 };
2020 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2021 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2022 Some(vec![
2023 Value::from(name),
2024 Value::from(def.open),
2025 Value::from(node_types.join(", ")),
2026 Value::from(edge_types.join(", ")),
2027 ])
2028 })
2029 .collect();
2030 Ok(QueryResult {
2031 columns,
2032 column_types: Vec::new(),
2033 rows,
2034 ..QueryResult::empty()
2035 })
2036 }
2037
2038 fn execute_show_graphs(&self) -> Result<QueryResult> {
2044 let schema = self.current_schema.lock().clone();
2045 let all_names = self.store.graph_names();
2046
2047 let mut names: Vec<String> = match &schema {
2048 Some(s) => {
2049 let prefix = format!("{s}/");
2050 all_names
2051 .into_iter()
2052 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2053 .collect()
2054 }
2055 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2056 };
2057 names.sort();
2058
2059 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2060 Ok(QueryResult {
2061 columns: vec!["name".to_string()],
2062 column_types: Vec::new(),
2063 rows,
2064 ..QueryResult::empty()
2065 })
2066 }
2067
2068 fn execute_show_schemas(&self) -> Result<QueryResult> {
2070 let mut names = self.catalog.schema_names();
2071 names.sort();
2072 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2073 Ok(QueryResult {
2074 columns: vec!["name".to_string()],
2075 column_types: Vec::new(),
2076 rows,
2077 ..QueryResult::empty()
2078 })
2079 }
2080
2081 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2083 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2084
2085 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2086 Error::Query(QueryError::new(
2087 QueryErrorKind::Semantic,
2088 format!("Graph type '{name}' not found"),
2089 ))
2090 })?;
2091
2092 let columns = vec![
2093 "name".to_string(),
2094 "open".to_string(),
2095 "node_types".to_string(),
2096 "edge_types".to_string(),
2097 ];
2098 let rows = vec![vec![
2099 Value::from(def.name),
2100 Value::from(def.open),
2101 Value::from(def.allowed_node_types.join(", ")),
2102 Value::from(def.allowed_edge_types.join(", ")),
2103 ]];
2104 Ok(QueryResult {
2105 columns,
2106 column_types: Vec::new(),
2107 rows,
2108 ..QueryResult::empty()
2109 })
2110 }
2111
2112 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2114 let graph_name = self
2115 .current_graph()
2116 .unwrap_or_else(|| "default".to_string());
2117 let columns = vec![
2118 "graph".to_string(),
2119 "graph_type".to_string(),
2120 "open".to_string(),
2121 "node_types".to_string(),
2122 "edge_types".to_string(),
2123 ];
2124
2125 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2126 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2127 {
2128 let rows = vec![vec![
2129 Value::from(graph_name),
2130 Value::from(type_name),
2131 Value::from(def.open),
2132 Value::from(def.allowed_node_types.join(", ")),
2133 Value::from(def.allowed_edge_types.join(", ")),
2134 ]];
2135 return Ok(QueryResult {
2136 columns,
2137 column_types: Vec::new(),
2138 rows,
2139 ..QueryResult::empty()
2140 });
2141 }
2142
2143 Ok(QueryResult {
2145 columns,
2146 column_types: Vec::new(),
2147 rows: vec![vec![
2148 Value::from(graph_name),
2149 Value::Null,
2150 Value::Null,
2151 Value::Null,
2152 Value::Null,
2153 ]],
2154 ..QueryResult::empty()
2155 })
2156 }
2157
2158 #[cfg(feature = "gql")]
2185 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2186 self.require_lpg("GQL")?;
2187
2188 use crate::query::{
2189 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2190 processor::QueryLanguage, translators::gql,
2191 };
2192
2193 let _span = grafeo_info_span!(
2194 "grafeo::session::execute",
2195 language = "gql",
2196 query_len = query.len(),
2197 );
2198
2199 #[cfg(not(target_arch = "wasm32"))]
2200 let start_time = std::time::Instant::now();
2201
2202 let translation = gql::translate_full(query)?;
2204 let logical_plan = match translation {
2205 gql::GqlTranslationResult::SessionCommand(cmd) => {
2206 return self.execute_session_command(cmd);
2207 }
2208 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2209 if *self.read_only_tx.lock() {
2211 return Err(grafeo_common::utils::error::Error::Transaction(
2212 grafeo_common::utils::error::TransactionError::ReadOnly,
2213 ));
2214 }
2215 return self.execute_schema_command(cmd);
2216 }
2217 gql::GqlTranslationResult::Plan(plan) => {
2218 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2220 return Err(grafeo_common::utils::error::Error::Transaction(
2221 grafeo_common::utils::error::TransactionError::ReadOnly,
2222 ));
2223 }
2224 plan
2225 }
2226 };
2227
2228 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2230
2231 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2233 cached_plan
2234 } else {
2235 let mut binder = Binder::new();
2237 let _binding_context = binder.bind(&logical_plan)?;
2238
2239 let active = self.active_store();
2241 let optimizer = Optimizer::from_graph_store(&*active);
2242 let plan = optimizer.optimize(logical_plan)?;
2243
2244 self.query_cache.put_optimized(cache_key, plan.clone());
2246
2247 plan
2248 };
2249
2250 let active = self.active_store();
2252
2253 if optimized_plan.explain {
2255 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2256 let mut plan = optimized_plan;
2257 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2258 return Ok(explain_result(&plan));
2259 }
2260
2261 if optimized_plan.profile {
2263 let has_mutations = optimized_plan.root.has_mutations();
2264 return self.with_auto_commit(has_mutations, || {
2265 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2266 let planner = self.create_planner_for_store(
2267 Arc::clone(&active),
2268 viewing_epoch,
2269 transaction_id,
2270 );
2271 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2272
2273 let executor = Executor::with_columns(physical_plan.columns.clone())
2274 .with_deadline(self.query_deadline());
2275 let _result = executor.execute(physical_plan.operator.as_mut())?;
2276
2277 let total_time_ms;
2278 #[cfg(not(target_arch = "wasm32"))]
2279 {
2280 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2281 }
2282 #[cfg(target_arch = "wasm32")]
2283 {
2284 total_time_ms = 0.0;
2285 }
2286
2287 let profile_tree = crate::query::profile::build_profile_tree(
2288 &optimized_plan.root,
2289 &mut entries.into_iter(),
2290 );
2291 Ok(crate::query::profile::profile_result(
2292 &profile_tree,
2293 total_time_ms,
2294 ))
2295 });
2296 }
2297
2298 let has_mutations = optimized_plan.root.has_mutations();
2299
2300 let result = self.with_auto_commit(has_mutations, || {
2301 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2303
2304 let has_active_tx = self.current_transaction.lock().is_some();
2309 let read_only = !has_mutations && !has_active_tx;
2310 let planner = self.create_planner_for_store_with_read_only(
2311 Arc::clone(&active),
2312 viewing_epoch,
2313 transaction_id,
2314 read_only,
2315 );
2316 let mut physical_plan = planner.plan(&optimized_plan)?;
2317
2318 let executor = Executor::with_columns(physical_plan.columns.clone())
2320 .with_deadline(self.query_deadline());
2321 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2322
2323 let rows_scanned = result.rows.len() as u64;
2325 #[cfg(not(target_arch = "wasm32"))]
2326 {
2327 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2328 result.execution_time_ms = Some(elapsed_ms);
2329 }
2330 result.rows_scanned = Some(rows_scanned);
2331
2332 Ok(result)
2333 });
2334
2335 #[cfg(feature = "metrics")]
2337 {
2338 #[cfg(not(target_arch = "wasm32"))]
2339 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2340 #[cfg(target_arch = "wasm32")]
2341 let elapsed_ms = None;
2342 self.record_query_metrics("gql", elapsed_ms, &result);
2343 }
2344
2345 result
2346 }
2347
2348 #[cfg(feature = "gql")]
2357 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2358 let previous = self.viewing_epoch_override.lock().replace(epoch);
2359 let result = self.execute(query);
2360 *self.viewing_epoch_override.lock() = previous;
2361 result
2362 }
2363
2364 #[cfg(feature = "gql")]
2372 pub fn execute_at_epoch_with_params(
2373 &self,
2374 query: &str,
2375 epoch: EpochId,
2376 params: Option<std::collections::HashMap<String, Value>>,
2377 ) -> Result<QueryResult> {
2378 let previous = self.viewing_epoch_override.lock().replace(epoch);
2379 let result = if let Some(p) = params {
2380 self.execute_with_params(query, p)
2381 } else {
2382 self.execute(query)
2383 };
2384 *self.viewing_epoch_override.lock() = previous;
2385 result
2386 }
2387
2388 #[cfg(feature = "gql")]
2394 pub fn execute_with_params(
2395 &self,
2396 query: &str,
2397 params: std::collections::HashMap<String, Value>,
2398 ) -> Result<QueryResult> {
2399 self.require_lpg("GQL")?;
2400
2401 use crate::query::processor::{QueryLanguage, QueryProcessor};
2402
2403 let has_mutations = Self::query_looks_like_mutation(query);
2404 let active = self.active_store();
2405
2406 self.with_auto_commit(has_mutations, || {
2407 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2409
2410 let processor = QueryProcessor::for_graph_store_with_transaction(
2412 Arc::clone(&active),
2413 Arc::clone(&self.transaction_manager),
2414 )?;
2415
2416 let processor = if let Some(transaction_id) = transaction_id {
2418 processor.with_transaction_context(viewing_epoch, transaction_id)
2419 } else {
2420 processor
2421 };
2422
2423 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2424 })
2425 }
2426
2427 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2433 pub fn execute_with_params(
2434 &self,
2435 _query: &str,
2436 _params: std::collections::HashMap<String, Value>,
2437 ) -> Result<QueryResult> {
2438 Err(grafeo_common::utils::error::Error::Internal(
2439 "No query language enabled".to_string(),
2440 ))
2441 }
2442
2443 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2449 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2450 Err(grafeo_common::utils::error::Error::Internal(
2451 "No query language enabled".to_string(),
2452 ))
2453 }
2454
2455 #[cfg(feature = "cypher")]
2461 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2462 use crate::query::{
2463 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2464 processor::QueryLanguage, translators::cypher,
2465 };
2466 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2467
2468 let translation = cypher::translate_full(query)?;
2470 match translation {
2471 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2472 if *self.read_only_tx.lock() {
2473 return Err(GrafeoError::Query(QueryError::new(
2474 QueryErrorKind::Semantic,
2475 "Cannot execute schema DDL in a read-only transaction",
2476 )));
2477 }
2478 return self.execute_schema_command(cmd);
2479 }
2480 cypher::CypherTranslationResult::ShowIndexes => {
2481 return self.execute_show_indexes();
2482 }
2483 cypher::CypherTranslationResult::ShowConstraints => {
2484 return self.execute_show_constraints();
2485 }
2486 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2487 return self.execute_show_current_graph_type();
2488 }
2489 cypher::CypherTranslationResult::Plan(_) => {
2490 }
2492 }
2493
2494 #[cfg(not(target_arch = "wasm32"))]
2495 let start_time = std::time::Instant::now();
2496
2497 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2499
2500 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2502 cached_plan
2503 } else {
2504 let logical_plan = cypher::translate(query)?;
2506
2507 let mut binder = Binder::new();
2509 let _binding_context = binder.bind(&logical_plan)?;
2510
2511 let active = self.active_store();
2513 let optimizer = Optimizer::from_graph_store(&*active);
2514 let plan = optimizer.optimize(logical_plan)?;
2515
2516 self.query_cache.put_optimized(cache_key, plan.clone());
2518
2519 plan
2520 };
2521
2522 let active = self.active_store();
2524
2525 if optimized_plan.explain {
2527 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2528 let mut plan = optimized_plan;
2529 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2530 return Ok(explain_result(&plan));
2531 }
2532
2533 if optimized_plan.profile {
2535 let has_mutations = optimized_plan.root.has_mutations();
2536 return self.with_auto_commit(has_mutations, || {
2537 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2538 let planner = self.create_planner_for_store(
2539 Arc::clone(&active),
2540 viewing_epoch,
2541 transaction_id,
2542 );
2543 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2544
2545 let executor = Executor::with_columns(physical_plan.columns.clone())
2546 .with_deadline(self.query_deadline());
2547 let _result = executor.execute(physical_plan.operator.as_mut())?;
2548
2549 let total_time_ms;
2550 #[cfg(not(target_arch = "wasm32"))]
2551 {
2552 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2553 }
2554 #[cfg(target_arch = "wasm32")]
2555 {
2556 total_time_ms = 0.0;
2557 }
2558
2559 let profile_tree = crate::query::profile::build_profile_tree(
2560 &optimized_plan.root,
2561 &mut entries.into_iter(),
2562 );
2563 Ok(crate::query::profile::profile_result(
2564 &profile_tree,
2565 total_time_ms,
2566 ))
2567 });
2568 }
2569
2570 let has_mutations = optimized_plan.root.has_mutations();
2571
2572 let result = self.with_auto_commit(has_mutations, || {
2573 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2575
2576 let planner =
2578 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2579 let mut physical_plan = planner.plan(&optimized_plan)?;
2580
2581 let executor = Executor::with_columns(physical_plan.columns.clone())
2583 .with_deadline(self.query_deadline());
2584 executor.execute(physical_plan.operator.as_mut())
2585 });
2586
2587 #[cfg(feature = "metrics")]
2588 {
2589 #[cfg(not(target_arch = "wasm32"))]
2590 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2591 #[cfg(target_arch = "wasm32")]
2592 let elapsed_ms = None;
2593 self.record_query_metrics("cypher", elapsed_ms, &result);
2594 }
2595
2596 result
2597 }
2598
2599 #[cfg(feature = "gremlin")]
2623 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2624 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2625
2626 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2627 let start_time = Instant::now();
2628
2629 let logical_plan = gremlin::translate(query)?;
2631
2632 let mut binder = Binder::new();
2634 let _binding_context = binder.bind(&logical_plan)?;
2635
2636 let active = self.active_store();
2638 let optimizer = Optimizer::from_graph_store(&*active);
2639 let optimized_plan = optimizer.optimize(logical_plan)?;
2640
2641 let has_mutations = optimized_plan.root.has_mutations();
2642
2643 let result = self.with_auto_commit(has_mutations, || {
2644 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2646
2647 let planner =
2649 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2650 let mut physical_plan = planner.plan(&optimized_plan)?;
2651
2652 let executor = Executor::with_columns(physical_plan.columns.clone())
2654 .with_deadline(self.query_deadline());
2655 executor.execute(physical_plan.operator.as_mut())
2656 });
2657
2658 #[cfg(feature = "metrics")]
2659 {
2660 #[cfg(not(target_arch = "wasm32"))]
2661 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2662 #[cfg(target_arch = "wasm32")]
2663 let elapsed_ms = None;
2664 self.record_query_metrics("gremlin", elapsed_ms, &result);
2665 }
2666
2667 result
2668 }
2669
2670 #[cfg(feature = "gremlin")]
2676 pub fn execute_gremlin_with_params(
2677 &self,
2678 query: &str,
2679 params: std::collections::HashMap<String, Value>,
2680 ) -> Result<QueryResult> {
2681 use crate::query::processor::{QueryLanguage, QueryProcessor};
2682
2683 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2684 let start_time = Instant::now();
2685
2686 let has_mutations = Self::query_looks_like_mutation(query);
2687 let active = self.active_store();
2688
2689 let result = self.with_auto_commit(has_mutations, || {
2690 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2691 let processor = QueryProcessor::for_graph_store_with_transaction(
2692 Arc::clone(&active),
2693 Arc::clone(&self.transaction_manager),
2694 )?;
2695 let processor = if let Some(transaction_id) = transaction_id {
2696 processor.with_transaction_context(viewing_epoch, transaction_id)
2697 } else {
2698 processor
2699 };
2700 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2701 });
2702
2703 #[cfg(feature = "metrics")]
2704 {
2705 #[cfg(not(target_arch = "wasm32"))]
2706 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2707 #[cfg(target_arch = "wasm32")]
2708 let elapsed_ms = None;
2709 self.record_query_metrics("gremlin", elapsed_ms, &result);
2710 }
2711
2712 result
2713 }
2714
2715 #[cfg(feature = "graphql")]
2739 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2740 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2741
2742 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2743 let start_time = Instant::now();
2744
2745 let logical_plan = graphql::translate(query)?;
2746 let mut binder = Binder::new();
2747 let _binding_context = binder.bind(&logical_plan)?;
2748
2749 let active = self.active_store();
2750 let optimizer = Optimizer::from_graph_store(&*active);
2751 let optimized_plan = optimizer.optimize(logical_plan)?;
2752 let has_mutations = optimized_plan.root.has_mutations();
2753
2754 let result = self.with_auto_commit(has_mutations, || {
2755 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2756 let planner =
2757 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2758 let mut physical_plan = planner.plan(&optimized_plan)?;
2759 let executor = Executor::with_columns(physical_plan.columns.clone())
2760 .with_deadline(self.query_deadline());
2761 executor.execute(physical_plan.operator.as_mut())
2762 });
2763
2764 #[cfg(feature = "metrics")]
2765 {
2766 #[cfg(not(target_arch = "wasm32"))]
2767 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2768 #[cfg(target_arch = "wasm32")]
2769 let elapsed_ms = None;
2770 self.record_query_metrics("graphql", elapsed_ms, &result);
2771 }
2772
2773 result
2774 }
2775
2776 #[cfg(feature = "graphql")]
2782 pub fn execute_graphql_with_params(
2783 &self,
2784 query: &str,
2785 params: std::collections::HashMap<String, Value>,
2786 ) -> Result<QueryResult> {
2787 use crate::query::processor::{QueryLanguage, QueryProcessor};
2788
2789 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2790 let start_time = Instant::now();
2791
2792 let has_mutations = Self::query_looks_like_mutation(query);
2793 let active = self.active_store();
2794
2795 let result = self.with_auto_commit(has_mutations, || {
2796 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2797 let processor = QueryProcessor::for_graph_store_with_transaction(
2798 Arc::clone(&active),
2799 Arc::clone(&self.transaction_manager),
2800 )?;
2801 let processor = if let Some(transaction_id) = transaction_id {
2802 processor.with_transaction_context(viewing_epoch, transaction_id)
2803 } else {
2804 processor
2805 };
2806 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2807 });
2808
2809 #[cfg(feature = "metrics")]
2810 {
2811 #[cfg(not(target_arch = "wasm32"))]
2812 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2813 #[cfg(target_arch = "wasm32")]
2814 let elapsed_ms = None;
2815 self.record_query_metrics("graphql", elapsed_ms, &result);
2816 }
2817
2818 result
2819 }
2820
2821 #[cfg(feature = "sql-pgq")]
2846 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2847 use crate::query::{
2848 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2849 processor::QueryLanguage, translators::sql_pgq,
2850 };
2851
2852 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2853 let start_time = Instant::now();
2854
2855 let logical_plan = sql_pgq::translate(query)?;
2857
2858 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2860 return Ok(QueryResult {
2861 columns: vec!["status".into()],
2862 column_types: vec![grafeo_common::types::LogicalType::String],
2863 rows: vec![vec![Value::from(format!(
2864 "Property graph '{}' created ({} node tables, {} edge tables)",
2865 cpg.name,
2866 cpg.node_tables.len(),
2867 cpg.edge_tables.len()
2868 ))]],
2869 execution_time_ms: None,
2870 rows_scanned: None,
2871 status_message: None,
2872 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2873 });
2874 }
2875
2876 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2877
2878 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2879 cached_plan
2880 } else {
2881 let mut binder = Binder::new();
2882 let _binding_context = binder.bind(&logical_plan)?;
2883 let active = self.active_store();
2884 let optimizer = Optimizer::from_graph_store(&*active);
2885 let plan = optimizer.optimize(logical_plan)?;
2886 self.query_cache.put_optimized(cache_key, plan.clone());
2887 plan
2888 };
2889
2890 let active = self.active_store();
2891 let has_mutations = optimized_plan.root.has_mutations();
2892
2893 let result = self.with_auto_commit(has_mutations, || {
2894 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2895 let planner =
2896 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2897 let mut physical_plan = planner.plan(&optimized_plan)?;
2898 let executor = Executor::with_columns(physical_plan.columns.clone())
2899 .with_deadline(self.query_deadline());
2900 executor.execute(physical_plan.operator.as_mut())
2901 });
2902
2903 #[cfg(feature = "metrics")]
2904 {
2905 #[cfg(not(target_arch = "wasm32"))]
2906 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2907 #[cfg(target_arch = "wasm32")]
2908 let elapsed_ms = None;
2909 self.record_query_metrics("sql", elapsed_ms, &result);
2910 }
2911
2912 result
2913 }
2914
2915 #[cfg(feature = "sql-pgq")]
2921 pub fn execute_sql_with_params(
2922 &self,
2923 query: &str,
2924 params: std::collections::HashMap<String, Value>,
2925 ) -> Result<QueryResult> {
2926 use crate::query::processor::{QueryLanguage, QueryProcessor};
2927
2928 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2929 let start_time = Instant::now();
2930
2931 let has_mutations = Self::query_looks_like_mutation(query);
2932 let active = self.active_store();
2933
2934 let result = self.with_auto_commit(has_mutations, || {
2935 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2936 let processor = QueryProcessor::for_graph_store_with_transaction(
2937 Arc::clone(&active),
2938 Arc::clone(&self.transaction_manager),
2939 )?;
2940 let processor = if let Some(transaction_id) = transaction_id {
2941 processor.with_transaction_context(viewing_epoch, transaction_id)
2942 } else {
2943 processor
2944 };
2945 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
2946 });
2947
2948 #[cfg(feature = "metrics")]
2949 {
2950 #[cfg(not(target_arch = "wasm32"))]
2951 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2952 #[cfg(target_arch = "wasm32")]
2953 let elapsed_ms = None;
2954 self.record_query_metrics("sql", elapsed_ms, &result);
2955 }
2956
2957 result
2958 }
2959
2960 pub fn execute_language(
2969 &self,
2970 query: &str,
2971 language: &str,
2972 params: Option<std::collections::HashMap<String, Value>>,
2973 ) -> Result<QueryResult> {
2974 let _span = grafeo_info_span!(
2975 "grafeo::session::execute",
2976 language,
2977 query_len = query.len(),
2978 );
2979 match language {
2980 "gql" => {
2981 if let Some(p) = params {
2982 self.execute_with_params(query, p)
2983 } else {
2984 self.execute(query)
2985 }
2986 }
2987 #[cfg(feature = "cypher")]
2988 "cypher" => {
2989 if let Some(p) = params {
2990 use crate::query::processor::{QueryLanguage, QueryProcessor};
2991
2992 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2993 let start_time = Instant::now();
2994
2995 let has_mutations = Self::query_looks_like_mutation(query);
2996 let active = self.active_store();
2997 let result = self.with_auto_commit(has_mutations, || {
2998 let processor = QueryProcessor::for_graph_store_with_transaction(
2999 Arc::clone(&active),
3000 Arc::clone(&self.transaction_manager),
3001 )?;
3002 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3003 let processor = if let Some(transaction_id) = transaction_id {
3004 processor.with_transaction_context(viewing_epoch, transaction_id)
3005 } else {
3006 processor
3007 };
3008 processor.process(query, QueryLanguage::Cypher, Some(&p))
3009 });
3010
3011 #[cfg(feature = "metrics")]
3012 {
3013 #[cfg(not(target_arch = "wasm32"))]
3014 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3015 #[cfg(target_arch = "wasm32")]
3016 let elapsed_ms = None;
3017 self.record_query_metrics("cypher", elapsed_ms, &result);
3018 }
3019
3020 result
3021 } else {
3022 self.execute_cypher(query)
3023 }
3024 }
3025 #[cfg(feature = "gremlin")]
3026 "gremlin" => {
3027 if let Some(p) = params {
3028 self.execute_gremlin_with_params(query, p)
3029 } else {
3030 self.execute_gremlin(query)
3031 }
3032 }
3033 #[cfg(feature = "graphql")]
3034 "graphql" => {
3035 if let Some(p) = params {
3036 self.execute_graphql_with_params(query, p)
3037 } else {
3038 self.execute_graphql(query)
3039 }
3040 }
3041 #[cfg(all(feature = "graphql", feature = "rdf"))]
3042 "graphql-rdf" => {
3043 if let Some(p) = params {
3044 self.execute_graphql_rdf_with_params(query, p)
3045 } else {
3046 self.execute_graphql_rdf(query)
3047 }
3048 }
3049 #[cfg(feature = "sql-pgq")]
3050 "sql" | "sql-pgq" => {
3051 if let Some(p) = params {
3052 self.execute_sql_with_params(query, p)
3053 } else {
3054 self.execute_sql(query)
3055 }
3056 }
3057 #[cfg(all(feature = "sparql", feature = "rdf"))]
3058 "sparql" => {
3059 if let Some(p) = params {
3060 self.execute_sparql_with_params(query, p)
3061 } else {
3062 self.execute_sparql(query)
3063 }
3064 }
3065 other => Err(grafeo_common::utils::error::Error::Query(
3066 grafeo_common::utils::error::QueryError::new(
3067 grafeo_common::utils::error::QueryErrorKind::Semantic,
3068 format!("Unknown query language: '{other}'"),
3069 ),
3070 )),
3071 }
3072 }
3073
3074 pub fn clear_plan_cache(&self) {
3101 self.query_cache.clear();
3102 }
3103
3104 pub fn begin_transaction(&mut self) -> Result<()> {
3112 self.begin_transaction_inner(false, None)
3113 }
3114
3115 pub fn begin_transaction_with_isolation(
3123 &mut self,
3124 isolation_level: crate::transaction::IsolationLevel,
3125 ) -> Result<()> {
3126 self.begin_transaction_inner(false, Some(isolation_level))
3127 }
3128
3129 fn begin_transaction_inner(
3131 &self,
3132 read_only: bool,
3133 isolation_level: Option<crate::transaction::IsolationLevel>,
3134 ) -> Result<()> {
3135 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3136 let mut current = self.current_transaction.lock();
3137 if current.is_some() {
3138 drop(current);
3140 let mut depth = self.transaction_nesting_depth.lock();
3141 *depth += 1;
3142 let sp_name = format!("_nested_tx_{}", *depth);
3143 self.savepoint(&sp_name)?;
3144 return Ok(());
3145 }
3146
3147 let active = self.active_lpg_store();
3148 self.transaction_start_node_count
3149 .store(active.node_count(), Ordering::Relaxed);
3150 self.transaction_start_edge_count
3151 .store(active.edge_count(), Ordering::Relaxed);
3152 let transaction_id = if let Some(level) = isolation_level {
3153 self.transaction_manager.begin_with_isolation(level)
3154 } else {
3155 self.transaction_manager.begin()
3156 };
3157 *current = Some(transaction_id);
3158 *self.read_only_tx.lock() = read_only || self.db_read_only;
3159
3160 let key = self.active_graph_storage_key();
3163 let mut touched = self.touched_graphs.lock();
3164 touched.clear();
3165 touched.push(key);
3166
3167 #[cfg(feature = "metrics")]
3168 {
3169 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3170 #[cfg(not(target_arch = "wasm32"))]
3171 {
3172 *self.tx_start_time.lock() = Some(Instant::now());
3173 }
3174 }
3175
3176 Ok(())
3177 }
3178
3179 pub fn commit(&mut self) -> Result<()> {
3187 self.commit_inner()
3188 }
3189
3190 fn commit_inner(&self) -> Result<()> {
3192 let _span = grafeo_debug_span!("grafeo::tx::commit");
3193 {
3195 let mut depth = self.transaction_nesting_depth.lock();
3196 if *depth > 0 {
3197 let sp_name = format!("_nested_tx_{depth}");
3198 *depth -= 1;
3199 drop(depth);
3200 return self.release_savepoint(&sp_name);
3201 }
3202 }
3203
3204 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3205 grafeo_common::utils::error::Error::Transaction(
3206 grafeo_common::utils::error::TransactionError::InvalidState(
3207 "No active transaction".to_string(),
3208 ),
3209 )
3210 })?;
3211
3212 let touched = self.touched_graphs.lock().clone();
3215 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3216 Ok(epoch) => epoch,
3217 Err(e) => {
3218 for graph_name in &touched {
3220 let store = self.resolve_store(graph_name);
3221 store.rollback_transaction_properties(transaction_id);
3222 }
3223 #[cfg(feature = "rdf")]
3224 self.rollback_rdf_transaction(transaction_id);
3225 *self.read_only_tx.lock() = self.db_read_only;
3226 self.savepoints.lock().clear();
3227 self.touched_graphs.lock().clear();
3228 #[cfg(feature = "metrics")]
3229 {
3230 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3231 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3232 #[cfg(not(target_arch = "wasm32"))]
3233 if let Some(start) = self.tx_start_time.lock().take() {
3234 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3235 crate::metrics::record_metric!(
3236 self.metrics,
3237 tx_duration,
3238 observe duration_ms
3239 );
3240 }
3241 }
3242 return Err(e);
3243 }
3244 };
3245
3246 for graph_name in &touched {
3248 let store = self.resolve_store(graph_name);
3249 store.finalize_version_epochs(transaction_id, commit_epoch);
3250 }
3251
3252 #[cfg(feature = "rdf")]
3254 self.commit_rdf_transaction(transaction_id);
3255
3256 for graph_name in &touched {
3257 let store = self.resolve_store(graph_name);
3258 store.commit_transaction_properties(transaction_id);
3259 }
3260
3261 let current_epoch = self.transaction_manager.current_epoch();
3264 for graph_name in &touched {
3265 let store = self.resolve_store(graph_name);
3266 store.sync_epoch(current_epoch);
3267 }
3268
3269 *self.read_only_tx.lock() = self.db_read_only;
3271 self.savepoints.lock().clear();
3272 self.touched_graphs.lock().clear();
3273
3274 if self.gc_interval > 0 {
3276 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3277 if count.is_multiple_of(self.gc_interval) {
3278 let min_epoch = self.transaction_manager.min_active_epoch();
3279 for graph_name in &touched {
3280 let store = self.resolve_store(graph_name);
3281 store.gc_versions(min_epoch);
3282 }
3283 self.transaction_manager.gc();
3284 #[cfg(feature = "metrics")]
3285 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3286 }
3287 }
3288
3289 #[cfg(feature = "metrics")]
3290 {
3291 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3292 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3293 #[cfg(not(target_arch = "wasm32"))]
3294 if let Some(start) = self.tx_start_time.lock().take() {
3295 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3296 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3297 }
3298 }
3299
3300 Ok(())
3301 }
3302
3303 pub fn rollback(&mut self) -> Result<()> {
3327 self.rollback_inner()
3328 }
3329
3330 fn rollback_inner(&self) -> Result<()> {
3332 let _span = grafeo_debug_span!("grafeo::tx::rollback");
3333 {
3335 let mut depth = self.transaction_nesting_depth.lock();
3336 if *depth > 0 {
3337 let sp_name = format!("_nested_tx_{depth}");
3338 *depth -= 1;
3339 drop(depth);
3340 return self.rollback_to_savepoint(&sp_name);
3341 }
3342 }
3343
3344 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3345 grafeo_common::utils::error::Error::Transaction(
3346 grafeo_common::utils::error::TransactionError::InvalidState(
3347 "No active transaction".to_string(),
3348 ),
3349 )
3350 })?;
3351
3352 *self.read_only_tx.lock() = self.db_read_only;
3354
3355 let touched = self.touched_graphs.lock().clone();
3357 for graph_name in &touched {
3358 let store = self.resolve_store(graph_name);
3359 store.discard_uncommitted_versions(transaction_id);
3360 }
3361
3362 #[cfg(feature = "rdf")]
3364 self.rollback_rdf_transaction(transaction_id);
3365
3366 self.savepoints.lock().clear();
3368 self.touched_graphs.lock().clear();
3369
3370 let result = self.transaction_manager.abort(transaction_id);
3372
3373 #[cfg(feature = "metrics")]
3374 if result.is_ok() {
3375 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3376 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3377 #[cfg(not(target_arch = "wasm32"))]
3378 if let Some(start) = self.tx_start_time.lock().take() {
3379 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3380 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3381 }
3382 }
3383
3384 result
3385 }
3386
3387 pub fn savepoint(&self, name: &str) -> Result<()> {
3397 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3398 grafeo_common::utils::error::Error::Transaction(
3399 grafeo_common::utils::error::TransactionError::InvalidState(
3400 "No active transaction".to_string(),
3401 ),
3402 )
3403 })?;
3404
3405 let touched = self.touched_graphs.lock().clone();
3407 let graph_snapshots: Vec<GraphSavepoint> = touched
3408 .iter()
3409 .map(|graph_name| {
3410 let store = self.resolve_store(graph_name);
3411 GraphSavepoint {
3412 graph_name: graph_name.clone(),
3413 next_node_id: store.peek_next_node_id(),
3414 next_edge_id: store.peek_next_edge_id(),
3415 undo_log_position: store.property_undo_log_position(tx_id),
3416 }
3417 })
3418 .collect();
3419
3420 self.savepoints.lock().push(SavepointState {
3421 name: name.to_string(),
3422 graph_snapshots,
3423 active_graph: self.current_graph.lock().clone(),
3424 });
3425 Ok(())
3426 }
3427
3428 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3437 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3438 grafeo_common::utils::error::Error::Transaction(
3439 grafeo_common::utils::error::TransactionError::InvalidState(
3440 "No active transaction".to_string(),
3441 ),
3442 )
3443 })?;
3444
3445 let mut savepoints = self.savepoints.lock();
3446
3447 let pos = savepoints
3449 .iter()
3450 .rposition(|sp| sp.name == name)
3451 .ok_or_else(|| {
3452 grafeo_common::utils::error::Error::Transaction(
3453 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3454 "Savepoint '{name}' not found"
3455 )),
3456 )
3457 })?;
3458
3459 let sp_state = savepoints[pos].clone();
3460
3461 savepoints.truncate(pos);
3463 drop(savepoints);
3464
3465 for gs in &sp_state.graph_snapshots {
3467 let store = self.resolve_store(&gs.graph_name);
3468
3469 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3471
3472 let current_next_node = store.peek_next_node_id();
3474 let current_next_edge = store.peek_next_edge_id();
3475
3476 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3477 .map(NodeId::new)
3478 .collect();
3479 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3480 .map(EdgeId::new)
3481 .collect();
3482
3483 if !node_ids.is_empty() || !edge_ids.is_empty() {
3484 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3485 }
3486 }
3487
3488 let touched = self.touched_graphs.lock().clone();
3492 for graph_name in &touched {
3493 let already_captured = sp_state
3494 .graph_snapshots
3495 .iter()
3496 .any(|gs| gs.graph_name == *graph_name);
3497 if !already_captured {
3498 let store = self.resolve_store(graph_name);
3499 store.discard_uncommitted_versions(transaction_id);
3500 }
3501 }
3502
3503 let mut touched = self.touched_graphs.lock();
3505 touched.clear();
3506 for gs in &sp_state.graph_snapshots {
3507 if !touched.contains(&gs.graph_name) {
3508 touched.push(gs.graph_name.clone());
3509 }
3510 }
3511
3512 Ok(())
3513 }
3514
3515 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3521 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3522 grafeo_common::utils::error::Error::Transaction(
3523 grafeo_common::utils::error::TransactionError::InvalidState(
3524 "No active transaction".to_string(),
3525 ),
3526 )
3527 })?;
3528
3529 let mut savepoints = self.savepoints.lock();
3530 let pos = savepoints
3531 .iter()
3532 .rposition(|sp| sp.name == name)
3533 .ok_or_else(|| {
3534 grafeo_common::utils::error::Error::Transaction(
3535 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3536 "Savepoint '{name}' not found"
3537 )),
3538 )
3539 })?;
3540 savepoints.remove(pos);
3541 Ok(())
3542 }
3543
3544 #[must_use]
3546 pub fn in_transaction(&self) -> bool {
3547 self.current_transaction.lock().is_some()
3548 }
3549
3550 #[must_use]
3552 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3553 *self.current_transaction.lock()
3554 }
3555
3556 #[must_use]
3558 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3559 &self.transaction_manager
3560 }
3561
3562 #[must_use]
3564 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3565 (
3566 self.transaction_start_node_count.load(Ordering::Relaxed),
3567 self.active_lpg_store().node_count(),
3568 )
3569 }
3570
3571 #[must_use]
3573 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3574 (
3575 self.transaction_start_edge_count.load(Ordering::Relaxed),
3576 self.active_lpg_store().edge_count(),
3577 )
3578 }
3579
3580 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3614 crate::transaction::PreparedCommit::new(self)
3615 }
3616
3617 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3619 self.auto_commit = auto_commit;
3620 }
3621
3622 #[must_use]
3624 pub fn auto_commit(&self) -> bool {
3625 self.auto_commit
3626 }
3627
3628 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3633 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3634 }
3635
3636 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3639 where
3640 F: FnOnce() -> Result<QueryResult>,
3641 {
3642 if self.needs_auto_commit(has_mutations) {
3643 self.begin_transaction_inner(false, None)?;
3644 match body() {
3645 Ok(result) => {
3646 self.commit_inner()?;
3647 Ok(result)
3648 }
3649 Err(e) => {
3650 let _ = self.rollback_inner();
3651 Err(e)
3652 }
3653 }
3654 } else {
3655 body()
3656 }
3657 }
3658
3659 fn query_looks_like_mutation(query: &str) -> bool {
3665 let upper = query.to_ascii_uppercase();
3666 upper.contains("INSERT")
3667 || upper.contains("CREATE")
3668 || upper.contains("DELETE")
3669 || upper.contains("MERGE")
3670 || upper.contains("SET")
3671 || upper.contains("REMOVE")
3672 || upper.contains("DROP")
3673 || upper.contains("ALTER")
3674 }
3675
3676 #[must_use]
3678 fn query_deadline(&self) -> Option<Instant> {
3679 #[cfg(not(target_arch = "wasm32"))]
3680 {
3681 self.query_timeout.map(|d| Instant::now() + d)
3682 }
3683 #[cfg(target_arch = "wasm32")]
3684 {
3685 let _ = &self.query_timeout;
3686 None
3687 }
3688 }
3689
3690 #[cfg(feature = "metrics")]
3696 fn record_query_metrics(
3697 &self,
3698 language: &str,
3699 elapsed_ms: Option<f64>,
3700 result: &Result<crate::database::QueryResult>,
3701 ) {
3702 use crate::metrics::record_metric;
3703
3704 record_metric!(self.metrics, query_count, inc);
3705 if let Some(ref reg) = self.metrics {
3706 reg.query_count_by_language.increment(language);
3707 }
3708 if let Some(ms) = elapsed_ms {
3709 record_metric!(self.metrics, query_latency, observe ms);
3710 }
3711 match result {
3712 Ok(r) => {
3713 let returned = r.rows.len() as u64;
3714 record_metric!(self.metrics, rows_returned, add returned);
3715 if let Some(scanned) = r.rows_scanned {
3716 record_metric!(self.metrics, rows_scanned, add scanned);
3717 }
3718 }
3719 Err(e) => {
3720 record_metric!(self.metrics, query_errors, inc);
3721 let msg = e.to_string();
3723 if msg.contains("exceeded timeout") {
3724 record_metric!(self.metrics, query_timeouts, inc);
3725 }
3726 }
3727 }
3728 }
3729
3730 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3732 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3733 match expr {
3734 Expression::Literal(Literal::Integer(n)) => Some(*n),
3735 _ => None,
3736 }
3737 }
3738
3739 #[must_use]
3745 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3746 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3748 return (epoch, None);
3749 }
3750
3751 if let Some(transaction_id) = *self.current_transaction.lock() {
3752 let epoch = self
3754 .transaction_manager
3755 .start_epoch(transaction_id)
3756 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3757 (epoch, Some(transaction_id))
3758 } else {
3759 (self.transaction_manager.current_epoch(), None)
3761 }
3762 }
3763
3764 fn create_planner_for_store(
3769 &self,
3770 store: Arc<dyn GraphStoreMut>,
3771 viewing_epoch: EpochId,
3772 transaction_id: Option<TransactionId>,
3773 ) -> crate::query::Planner {
3774 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3775 }
3776
3777 fn create_planner_for_store_with_read_only(
3778 &self,
3779 store: Arc<dyn GraphStoreMut>,
3780 viewing_epoch: EpochId,
3781 transaction_id: Option<TransactionId>,
3782 read_only: bool,
3783 ) -> crate::query::Planner {
3784 use crate::query::Planner;
3785 use grafeo_core::execution::operators::{LazyValue, SessionContext};
3786
3787 let info_store = Arc::clone(&store);
3789 let schema_store = Arc::clone(&store);
3790
3791 let session_context = SessionContext {
3792 current_schema: self.current_schema(),
3793 current_graph: self.current_graph(),
3794 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3795 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3796 };
3797
3798 let mut planner = Planner::with_context(
3799 Arc::clone(&store),
3800 Arc::clone(&self.transaction_manager),
3801 transaction_id,
3802 viewing_epoch,
3803 )
3804 .with_factorized_execution(self.factorized_execution)
3805 .with_catalog(Arc::clone(&self.catalog))
3806 .with_session_context(session_context)
3807 .with_read_only(read_only);
3808
3809 let validator =
3811 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3812 planner = planner.with_validator(Arc::new(validator));
3813
3814 planner
3815 }
3816
3817 fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3819 use grafeo_common::types::PropertyKey;
3820 use std::collections::BTreeMap;
3821
3822 let mut map = BTreeMap::new();
3823 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3824 map.insert(
3825 PropertyKey::from("node_count"),
3826 Value::Int64(store.node_count() as i64),
3827 );
3828 map.insert(
3829 PropertyKey::from("edge_count"),
3830 Value::Int64(store.edge_count() as i64),
3831 );
3832 map.insert(
3833 PropertyKey::from("version"),
3834 Value::String(env!("CARGO_PKG_VERSION").into()),
3835 );
3836 Value::Map(map.into())
3837 }
3838
3839 fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3841 use grafeo_common::types::PropertyKey;
3842 use std::collections::BTreeMap;
3843
3844 let labels: Vec<Value> = store
3845 .all_labels()
3846 .into_iter()
3847 .map(|l| Value::String(l.into()))
3848 .collect();
3849 let edge_types: Vec<Value> = store
3850 .all_edge_types()
3851 .into_iter()
3852 .map(|t| Value::String(t.into()))
3853 .collect();
3854 let property_keys: Vec<Value> = store
3855 .all_property_keys()
3856 .into_iter()
3857 .map(|k| Value::String(k.into()))
3858 .collect();
3859
3860 let mut map = BTreeMap::new();
3861 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3862 map.insert(
3863 PropertyKey::from("edge_types"),
3864 Value::List(edge_types.into()),
3865 );
3866 map.insert(
3867 PropertyKey::from("property_keys"),
3868 Value::List(property_keys.into()),
3869 );
3870 Value::Map(map.into())
3871 }
3872
3873 pub fn create_node(&self, labels: &[&str]) -> NodeId {
3878 let (epoch, transaction_id) = self.get_transaction_context();
3879 self.active_lpg_store().create_node_versioned(
3880 labels,
3881 epoch,
3882 transaction_id.unwrap_or(TransactionId::SYSTEM),
3883 )
3884 }
3885
3886 pub fn create_node_with_props<'a>(
3890 &self,
3891 labels: &[&str],
3892 properties: impl IntoIterator<Item = (&'a str, Value)>,
3893 ) -> NodeId {
3894 let (epoch, transaction_id) = self.get_transaction_context();
3895 self.active_lpg_store().create_node_with_props_versioned(
3896 labels,
3897 properties,
3898 epoch,
3899 transaction_id.unwrap_or(TransactionId::SYSTEM),
3900 )
3901 }
3902
3903 pub fn create_edge(
3908 &self,
3909 src: NodeId,
3910 dst: NodeId,
3911 edge_type: &str,
3912 ) -> grafeo_common::types::EdgeId {
3913 let (epoch, transaction_id) = self.get_transaction_context();
3914 self.active_lpg_store().create_edge_versioned(
3915 src,
3916 dst,
3917 edge_type,
3918 epoch,
3919 transaction_id.unwrap_or(TransactionId::SYSTEM),
3920 )
3921 }
3922
3923 #[must_use]
3951 pub fn get_node(&self, id: NodeId) -> Option<Node> {
3952 let (epoch, transaction_id) = self.get_transaction_context();
3953 self.active_lpg_store().get_node_versioned(
3954 id,
3955 epoch,
3956 transaction_id.unwrap_or(TransactionId::SYSTEM),
3957 )
3958 }
3959
3960 #[must_use]
3984 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3985 self.get_node(id)
3986 .and_then(|node| node.get_property(key).cloned())
3987 }
3988
3989 #[must_use]
3996 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3997 let (epoch, transaction_id) = self.get_transaction_context();
3998 self.active_lpg_store().get_edge_versioned(
3999 id,
4000 epoch,
4001 transaction_id.unwrap_or(TransactionId::SYSTEM),
4002 )
4003 }
4004
4005 #[must_use]
4031 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4032 self.active_lpg_store()
4033 .edges_from(node, Direction::Outgoing)
4034 .collect()
4035 }
4036
4037 #[must_use]
4046 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4047 self.active_lpg_store()
4048 .edges_from(node, Direction::Incoming)
4049 .collect()
4050 }
4051
4052 #[must_use]
4064 pub fn get_neighbors_outgoing_by_type(
4065 &self,
4066 node: NodeId,
4067 edge_type: &str,
4068 ) -> Vec<(NodeId, EdgeId)> {
4069 self.active_lpg_store()
4070 .edges_from(node, Direction::Outgoing)
4071 .filter(|(_, edge_id)| {
4072 self.get_edge(*edge_id)
4073 .is_some_and(|e| e.edge_type.as_str() == edge_type)
4074 })
4075 .collect()
4076 }
4077
4078 #[must_use]
4085 pub fn node_exists(&self, id: NodeId) -> bool {
4086 self.get_node(id).is_some()
4087 }
4088
4089 #[must_use]
4091 pub fn edge_exists(&self, id: EdgeId) -> bool {
4092 self.get_edge(id).is_some()
4093 }
4094
4095 #[must_use]
4099 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4100 let active = self.active_lpg_store();
4101 let out = active.out_degree(node);
4102 let in_degree = active.in_degree(node);
4103 (out, in_degree)
4104 }
4105
4106 #[must_use]
4116 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4117 let (epoch, transaction_id) = self.get_transaction_context();
4118 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4119 let active = self.active_lpg_store();
4120 ids.iter()
4121 .map(|&id| active.get_node_versioned(id, epoch, tx))
4122 .collect()
4123 }
4124
4125 #[cfg(feature = "cdc")]
4129 pub fn history(
4130 &self,
4131 entity_id: impl Into<crate::cdc::EntityId>,
4132 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4133 Ok(self.cdc_log.history(entity_id.into()))
4134 }
4135
4136 #[cfg(feature = "cdc")]
4138 pub fn history_since(
4139 &self,
4140 entity_id: impl Into<crate::cdc::EntityId>,
4141 since_epoch: EpochId,
4142 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4143 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4144 }
4145
4146 #[cfg(feature = "cdc")]
4148 pub fn changes_between(
4149 &self,
4150 start_epoch: EpochId,
4151 end_epoch: EpochId,
4152 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4153 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4154 }
4155}
4156
4157impl Drop for Session {
4158 fn drop(&mut self) {
4159 if self.in_transaction() {
4162 let _ = self.rollback_inner();
4163 }
4164
4165 #[cfg(feature = "metrics")]
4166 if let Some(ref reg) = self.metrics {
4167 reg.session_active
4168 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4169 }
4170 }
4171}
4172
4173#[cfg(test)]
4174mod tests {
4175 use super::parse_default_literal;
4176 use crate::database::GrafeoDB;
4177 use grafeo_common::types::Value;
4178
4179 #[test]
4184 fn parse_default_literal_null() {
4185 assert_eq!(parse_default_literal("null"), Value::Null);
4186 assert_eq!(parse_default_literal("NULL"), Value::Null);
4187 assert_eq!(parse_default_literal("Null"), Value::Null);
4188 }
4189
4190 #[test]
4191 fn parse_default_literal_bool() {
4192 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4193 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4194 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4195 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4196 }
4197
4198 #[test]
4199 fn parse_default_literal_string_single_quoted() {
4200 assert_eq!(
4201 parse_default_literal("'hello'"),
4202 Value::String("hello".into())
4203 );
4204 }
4205
4206 #[test]
4207 fn parse_default_literal_string_double_quoted() {
4208 assert_eq!(
4209 parse_default_literal("\"world\""),
4210 Value::String("world".into())
4211 );
4212 }
4213
4214 #[test]
4215 fn parse_default_literal_integer() {
4216 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4217 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4218 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4219 }
4220
4221 #[test]
4222 fn parse_default_literal_float() {
4223 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4224 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4225 }
4226
4227 #[test]
4228 fn parse_default_literal_fallback_string() {
4229 assert_eq!(
4231 parse_default_literal("some_identifier"),
4232 Value::String("some_identifier".into())
4233 );
4234 }
4235
4236 #[test]
4237 fn test_session_create_node() {
4238 let db = GrafeoDB::new_in_memory();
4239 let session = db.session();
4240
4241 let id = session.create_node(&["Person"]);
4242 assert!(id.is_valid());
4243 assert_eq!(db.node_count(), 1);
4244 }
4245
4246 #[test]
4247 fn test_session_transaction() {
4248 let db = GrafeoDB::new_in_memory();
4249 let mut session = db.session();
4250
4251 assert!(!session.in_transaction());
4252
4253 session.begin_transaction().unwrap();
4254 assert!(session.in_transaction());
4255
4256 session.commit().unwrap();
4257 assert!(!session.in_transaction());
4258 }
4259
4260 #[test]
4261 fn test_session_transaction_context() {
4262 let db = GrafeoDB::new_in_memory();
4263 let mut session = db.session();
4264
4265 let (_epoch1, transaction_id1) = session.get_transaction_context();
4267 assert!(transaction_id1.is_none());
4268
4269 session.begin_transaction().unwrap();
4271 let (epoch2, transaction_id2) = session.get_transaction_context();
4272 assert!(transaction_id2.is_some());
4273 let _ = epoch2; session.commit().unwrap();
4278 let (epoch3, tx_id3) = session.get_transaction_context();
4279 assert!(tx_id3.is_none());
4280 assert!(epoch3.as_u64() >= epoch2.as_u64());
4282 }
4283
4284 #[test]
4285 fn test_session_rollback() {
4286 let db = GrafeoDB::new_in_memory();
4287 let mut session = db.session();
4288
4289 session.begin_transaction().unwrap();
4290 session.rollback().unwrap();
4291 assert!(!session.in_transaction());
4292 }
4293
4294 #[test]
4295 fn test_session_rollback_discards_versions() {
4296 use grafeo_common::types::TransactionId;
4297
4298 let db = GrafeoDB::new_in_memory();
4299
4300 let node_before = db.store().create_node(&["Person"]);
4302 assert!(node_before.is_valid());
4303 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4304
4305 let mut session = db.session();
4307 session.begin_transaction().unwrap();
4308 let transaction_id = session.current_transaction.lock().unwrap();
4309
4310 let epoch = db.store().current_epoch();
4312 let node_in_tx = db
4313 .store()
4314 .create_node_versioned(&["Person"], epoch, transaction_id);
4315 assert!(node_in_tx.is_valid());
4316
4317 assert_eq!(
4321 db.node_count(),
4322 1,
4323 "PENDING nodes should be invisible to non-versioned node_count()"
4324 );
4325 assert!(
4326 db.store()
4327 .get_node_versioned(node_in_tx, epoch, transaction_id)
4328 .is_some(),
4329 "Transaction node should be visible to its own transaction"
4330 );
4331
4332 session.rollback().unwrap();
4334 assert!(!session.in_transaction());
4335
4336 let count_after = db.node_count();
4339 assert_eq!(
4340 count_after, 1,
4341 "Rollback should discard uncommitted node, but got {count_after}"
4342 );
4343
4344 let current_epoch = db.store().current_epoch();
4346 assert!(
4347 db.store()
4348 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4349 .is_some(),
4350 "Original node should still exist"
4351 );
4352
4353 assert!(
4355 db.store()
4356 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4357 .is_none(),
4358 "Transaction node should be gone"
4359 );
4360 }
4361
4362 #[test]
4363 fn test_session_create_node_in_transaction() {
4364 let db = GrafeoDB::new_in_memory();
4366
4367 let node_before = db.create_node(&["Person"]);
4369 assert!(node_before.is_valid());
4370 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4371
4372 let mut session = db.session();
4374 session.begin_transaction().unwrap();
4375 let transaction_id = session.current_transaction.lock().unwrap();
4376
4377 let node_in_tx = session.create_node(&["Person"]);
4379 assert!(node_in_tx.is_valid());
4380
4381 assert_eq!(
4384 db.node_count(),
4385 1,
4386 "PENDING nodes should be invisible to non-versioned node_count()"
4387 );
4388 let epoch = db.store().current_epoch();
4389 assert!(
4390 db.store()
4391 .get_node_versioned(node_in_tx, epoch, transaction_id)
4392 .is_some(),
4393 "Transaction node should be visible to its own transaction"
4394 );
4395
4396 session.rollback().unwrap();
4398
4399 let count_after = db.node_count();
4401 assert_eq!(
4402 count_after, 1,
4403 "Rollback should discard node created via session.create_node(), but got {count_after}"
4404 );
4405 }
4406
4407 #[test]
4408 fn test_session_create_node_with_props_in_transaction() {
4409 use grafeo_common::types::Value;
4410
4411 let db = GrafeoDB::new_in_memory();
4413
4414 db.create_node(&["Person"]);
4416 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4417
4418 let mut session = db.session();
4420 session.begin_transaction().unwrap();
4421 let transaction_id = session.current_transaction.lock().unwrap();
4422
4423 let node_in_tx =
4424 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4425 assert!(node_in_tx.is_valid());
4426
4427 assert_eq!(
4430 db.node_count(),
4431 1,
4432 "PENDING nodes should be invisible to non-versioned node_count()"
4433 );
4434 let epoch = db.store().current_epoch();
4435 assert!(
4436 db.store()
4437 .get_node_versioned(node_in_tx, epoch, transaction_id)
4438 .is_some(),
4439 "Transaction node should be visible to its own transaction"
4440 );
4441
4442 session.rollback().unwrap();
4444
4445 let count_after = db.node_count();
4447 assert_eq!(
4448 count_after, 1,
4449 "Rollback should discard node created via session.create_node_with_props()"
4450 );
4451 }
4452
4453 #[cfg(feature = "gql")]
4454 mod gql_tests {
4455 use super::*;
4456
4457 #[test]
4458 fn test_gql_query_execution() {
4459 let db = GrafeoDB::new_in_memory();
4460 let session = db.session();
4461
4462 session.create_node(&["Person"]);
4464 session.create_node(&["Person"]);
4465 session.create_node(&["Animal"]);
4466
4467 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4469
4470 assert_eq!(result.row_count(), 2);
4472 assert_eq!(result.column_count(), 1);
4473 assert_eq!(result.columns[0], "n");
4474 }
4475
4476 #[test]
4477 fn test_gql_empty_result() {
4478 let db = GrafeoDB::new_in_memory();
4479 let session = db.session();
4480
4481 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4483
4484 assert_eq!(result.row_count(), 0);
4485 }
4486
4487 #[test]
4488 fn test_gql_parse_error() {
4489 let db = GrafeoDB::new_in_memory();
4490 let session = db.session();
4491
4492 let result = session.execute("MATCH (n RETURN n");
4494
4495 assert!(result.is_err());
4496 }
4497
4498 #[test]
4499 fn test_gql_relationship_traversal() {
4500 let db = GrafeoDB::new_in_memory();
4501 let session = db.session();
4502
4503 let alix = session.create_node(&["Person"]);
4505 let gus = session.create_node(&["Person"]);
4506 let vincent = session.create_node(&["Person"]);
4507
4508 session.create_edge(alix, gus, "KNOWS");
4509 session.create_edge(alix, vincent, "KNOWS");
4510
4511 let result = session
4513 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4514 .unwrap();
4515
4516 assert_eq!(result.row_count(), 2);
4518 assert_eq!(result.column_count(), 2);
4519 assert_eq!(result.columns[0], "a");
4520 assert_eq!(result.columns[1], "b");
4521 }
4522
4523 #[test]
4524 fn test_gql_relationship_with_type_filter() {
4525 let db = GrafeoDB::new_in_memory();
4526 let session = db.session();
4527
4528 let alix = session.create_node(&["Person"]);
4530 let gus = session.create_node(&["Person"]);
4531 let vincent = session.create_node(&["Person"]);
4532
4533 session.create_edge(alix, gus, "KNOWS");
4534 session.create_edge(alix, vincent, "WORKS_WITH");
4535
4536 let result = session
4538 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4539 .unwrap();
4540
4541 assert_eq!(result.row_count(), 1);
4543 }
4544
4545 #[test]
4546 fn test_gql_semantic_error_undefined_variable() {
4547 let db = GrafeoDB::new_in_memory();
4548 let session = db.session();
4549
4550 let result = session.execute("MATCH (n:Person) RETURN x");
4552
4553 assert!(result.is_err());
4555 let Err(err) = result else {
4556 panic!("Expected error")
4557 };
4558 assert!(
4559 err.to_string().contains("Undefined variable"),
4560 "Expected undefined variable error, got: {}",
4561 err
4562 );
4563 }
4564
4565 #[test]
4566 fn test_gql_where_clause_property_filter() {
4567 use grafeo_common::types::Value;
4568
4569 let db = GrafeoDB::new_in_memory();
4570 let session = db.session();
4571
4572 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4574 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4575 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4576
4577 let result = session
4579 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4580 .unwrap();
4581
4582 assert_eq!(result.row_count(), 2);
4584 }
4585
4586 #[test]
4587 fn test_gql_where_clause_equality() {
4588 use grafeo_common::types::Value;
4589
4590 let db = GrafeoDB::new_in_memory();
4591 let session = db.session();
4592
4593 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4595 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4596 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4597
4598 let result = session
4600 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4601 .unwrap();
4602
4603 assert_eq!(result.row_count(), 2);
4605 }
4606
4607 #[test]
4608 fn test_gql_return_property_access() {
4609 use grafeo_common::types::Value;
4610
4611 let db = GrafeoDB::new_in_memory();
4612 let session = db.session();
4613
4614 session.create_node_with_props(
4616 &["Person"],
4617 [
4618 ("name", Value::String("Alix".into())),
4619 ("age", Value::Int64(30)),
4620 ],
4621 );
4622 session.create_node_with_props(
4623 &["Person"],
4624 [
4625 ("name", Value::String("Gus".into())),
4626 ("age", Value::Int64(25)),
4627 ],
4628 );
4629
4630 let result = session
4632 .execute("MATCH (n:Person) RETURN n.name, n.age")
4633 .unwrap();
4634
4635 assert_eq!(result.row_count(), 2);
4637 assert_eq!(result.column_count(), 2);
4638 assert_eq!(result.columns[0], "n.name");
4639 assert_eq!(result.columns[1], "n.age");
4640
4641 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4643 assert!(names.contains(&&Value::String("Alix".into())));
4644 assert!(names.contains(&&Value::String("Gus".into())));
4645 }
4646
4647 #[test]
4648 fn test_gql_return_mixed_expressions() {
4649 use grafeo_common::types::Value;
4650
4651 let db = GrafeoDB::new_in_memory();
4652 let session = db.session();
4653
4654 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4656
4657 let result = session
4659 .execute("MATCH (n:Person) RETURN n, n.name")
4660 .unwrap();
4661
4662 assert_eq!(result.row_count(), 1);
4663 assert_eq!(result.column_count(), 2);
4664 assert_eq!(result.columns[0], "n");
4665 assert_eq!(result.columns[1], "n.name");
4666
4667 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4669 }
4670 }
4671
4672 #[cfg(feature = "cypher")]
4673 mod cypher_tests {
4674 use super::*;
4675
4676 #[test]
4677 fn test_cypher_query_execution() {
4678 let db = GrafeoDB::new_in_memory();
4679 let session = db.session();
4680
4681 session.create_node(&["Person"]);
4683 session.create_node(&["Person"]);
4684 session.create_node(&["Animal"]);
4685
4686 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4688
4689 assert_eq!(result.row_count(), 2);
4691 assert_eq!(result.column_count(), 1);
4692 assert_eq!(result.columns[0], "n");
4693 }
4694
4695 #[test]
4696 fn test_cypher_empty_result() {
4697 let db = GrafeoDB::new_in_memory();
4698 let session = db.session();
4699
4700 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4702
4703 assert_eq!(result.row_count(), 0);
4704 }
4705
4706 #[test]
4707 fn test_cypher_parse_error() {
4708 let db = GrafeoDB::new_in_memory();
4709 let session = db.session();
4710
4711 let result = session.execute_cypher("MATCH (n RETURN n");
4713
4714 assert!(result.is_err());
4715 }
4716 }
4717
4718 mod direct_lookup_tests {
4721 use super::*;
4722 use grafeo_common::types::Value;
4723
4724 #[test]
4725 fn test_get_node() {
4726 let db = GrafeoDB::new_in_memory();
4727 let session = db.session();
4728
4729 let id = session.create_node(&["Person"]);
4730 let node = session.get_node(id);
4731
4732 assert!(node.is_some());
4733 let node = node.unwrap();
4734 assert_eq!(node.id, id);
4735 }
4736
4737 #[test]
4738 fn test_get_node_not_found() {
4739 use grafeo_common::types::NodeId;
4740
4741 let db = GrafeoDB::new_in_memory();
4742 let session = db.session();
4743
4744 let node = session.get_node(NodeId::new(9999));
4746 assert!(node.is_none());
4747 }
4748
4749 #[test]
4750 fn test_get_node_property() {
4751 let db = GrafeoDB::new_in_memory();
4752 let session = db.session();
4753
4754 let id = session
4755 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4756
4757 let name = session.get_node_property(id, "name");
4758 assert_eq!(name, Some(Value::String("Alix".into())));
4759
4760 let missing = session.get_node_property(id, "missing");
4762 assert!(missing.is_none());
4763 }
4764
4765 #[test]
4766 fn test_get_edge() {
4767 let db = GrafeoDB::new_in_memory();
4768 let session = db.session();
4769
4770 let alix = session.create_node(&["Person"]);
4771 let gus = session.create_node(&["Person"]);
4772 let edge_id = session.create_edge(alix, gus, "KNOWS");
4773
4774 let edge = session.get_edge(edge_id);
4775 assert!(edge.is_some());
4776 let edge = edge.unwrap();
4777 assert_eq!(edge.id, edge_id);
4778 assert_eq!(edge.src, alix);
4779 assert_eq!(edge.dst, gus);
4780 }
4781
4782 #[test]
4783 fn test_get_edge_not_found() {
4784 use grafeo_common::types::EdgeId;
4785
4786 let db = GrafeoDB::new_in_memory();
4787 let session = db.session();
4788
4789 let edge = session.get_edge(EdgeId::new(9999));
4790 assert!(edge.is_none());
4791 }
4792
4793 #[test]
4794 fn test_get_neighbors_outgoing() {
4795 let db = GrafeoDB::new_in_memory();
4796 let session = db.session();
4797
4798 let alix = session.create_node(&["Person"]);
4799 let gus = session.create_node(&["Person"]);
4800 let harm = session.create_node(&["Person"]);
4801
4802 session.create_edge(alix, gus, "KNOWS");
4803 session.create_edge(alix, harm, "KNOWS");
4804
4805 let neighbors = session.get_neighbors_outgoing(alix);
4806 assert_eq!(neighbors.len(), 2);
4807
4808 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4809 assert!(neighbor_ids.contains(&gus));
4810 assert!(neighbor_ids.contains(&harm));
4811 }
4812
4813 #[test]
4814 fn test_get_neighbors_incoming() {
4815 let db = GrafeoDB::new_in_memory();
4816 let session = db.session();
4817
4818 let alix = session.create_node(&["Person"]);
4819 let gus = session.create_node(&["Person"]);
4820 let harm = session.create_node(&["Person"]);
4821
4822 session.create_edge(gus, alix, "KNOWS");
4823 session.create_edge(harm, alix, "KNOWS");
4824
4825 let neighbors = session.get_neighbors_incoming(alix);
4826 assert_eq!(neighbors.len(), 2);
4827
4828 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4829 assert!(neighbor_ids.contains(&gus));
4830 assert!(neighbor_ids.contains(&harm));
4831 }
4832
4833 #[test]
4834 fn test_get_neighbors_outgoing_by_type() {
4835 let db = GrafeoDB::new_in_memory();
4836 let session = db.session();
4837
4838 let alix = session.create_node(&["Person"]);
4839 let gus = session.create_node(&["Person"]);
4840 let company = session.create_node(&["Company"]);
4841
4842 session.create_edge(alix, gus, "KNOWS");
4843 session.create_edge(alix, company, "WORKS_AT");
4844
4845 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4846 assert_eq!(knows_neighbors.len(), 1);
4847 assert_eq!(knows_neighbors[0].0, gus);
4848
4849 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4850 assert_eq!(works_neighbors.len(), 1);
4851 assert_eq!(works_neighbors[0].0, company);
4852
4853 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4855 assert!(no_neighbors.is_empty());
4856 }
4857
4858 #[test]
4859 fn test_node_exists() {
4860 use grafeo_common::types::NodeId;
4861
4862 let db = GrafeoDB::new_in_memory();
4863 let session = db.session();
4864
4865 let id = session.create_node(&["Person"]);
4866
4867 assert!(session.node_exists(id));
4868 assert!(!session.node_exists(NodeId::new(9999)));
4869 }
4870
4871 #[test]
4872 fn test_edge_exists() {
4873 use grafeo_common::types::EdgeId;
4874
4875 let db = GrafeoDB::new_in_memory();
4876 let session = db.session();
4877
4878 let alix = session.create_node(&["Person"]);
4879 let gus = session.create_node(&["Person"]);
4880 let edge_id = session.create_edge(alix, gus, "KNOWS");
4881
4882 assert!(session.edge_exists(edge_id));
4883 assert!(!session.edge_exists(EdgeId::new(9999)));
4884 }
4885
4886 #[test]
4887 fn test_get_degree() {
4888 let db = GrafeoDB::new_in_memory();
4889 let session = db.session();
4890
4891 let alix = session.create_node(&["Person"]);
4892 let gus = session.create_node(&["Person"]);
4893 let harm = session.create_node(&["Person"]);
4894
4895 session.create_edge(alix, gus, "KNOWS");
4897 session.create_edge(alix, harm, "KNOWS");
4898 session.create_edge(gus, alix, "KNOWS");
4900
4901 let (out_degree, in_degree) = session.get_degree(alix);
4902 assert_eq!(out_degree, 2);
4903 assert_eq!(in_degree, 1);
4904
4905 let lonely = session.create_node(&["Person"]);
4907 let (out, in_deg) = session.get_degree(lonely);
4908 assert_eq!(out, 0);
4909 assert_eq!(in_deg, 0);
4910 }
4911
4912 #[test]
4913 fn test_get_nodes_batch() {
4914 let db = GrafeoDB::new_in_memory();
4915 let session = db.session();
4916
4917 let alix = session.create_node(&["Person"]);
4918 let gus = session.create_node(&["Person"]);
4919 let harm = session.create_node(&["Person"]);
4920
4921 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4922 assert_eq!(nodes.len(), 3);
4923 assert!(nodes[0].is_some());
4924 assert!(nodes[1].is_some());
4925 assert!(nodes[2].is_some());
4926
4927 use grafeo_common::types::NodeId;
4929 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4930 assert_eq!(nodes_with_missing.len(), 3);
4931 assert!(nodes_with_missing[0].is_some());
4932 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
4934 }
4935
4936 #[test]
4937 fn test_auto_commit_setting() {
4938 let db = GrafeoDB::new_in_memory();
4939 let mut session = db.session();
4940
4941 assert!(session.auto_commit());
4943
4944 session.set_auto_commit(false);
4945 assert!(!session.auto_commit());
4946
4947 session.set_auto_commit(true);
4948 assert!(session.auto_commit());
4949 }
4950
4951 #[test]
4952 fn test_transaction_double_begin_nests() {
4953 let db = GrafeoDB::new_in_memory();
4954 let mut session = db.session();
4955
4956 session.begin_transaction().unwrap();
4957 let result = session.begin_transaction();
4959 assert!(result.is_ok());
4960 session.commit().unwrap();
4962 session.commit().unwrap();
4964 }
4965
4966 #[test]
4967 fn test_commit_without_transaction_error() {
4968 let db = GrafeoDB::new_in_memory();
4969 let mut session = db.session();
4970
4971 let result = session.commit();
4972 assert!(result.is_err());
4973 }
4974
4975 #[test]
4976 fn test_rollback_without_transaction_error() {
4977 let db = GrafeoDB::new_in_memory();
4978 let mut session = db.session();
4979
4980 let result = session.rollback();
4981 assert!(result.is_err());
4982 }
4983
4984 #[test]
4985 fn test_create_edge_in_transaction() {
4986 let db = GrafeoDB::new_in_memory();
4987 let mut session = db.session();
4988
4989 let alix = session.create_node(&["Person"]);
4991 let gus = session.create_node(&["Person"]);
4992
4993 session.begin_transaction().unwrap();
4995 let edge_id = session.create_edge(alix, gus, "KNOWS");
4996
4997 assert!(session.edge_exists(edge_id));
4999
5000 session.commit().unwrap();
5002
5003 assert!(session.edge_exists(edge_id));
5005 }
5006
5007 #[test]
5008 fn test_neighbors_empty_node() {
5009 let db = GrafeoDB::new_in_memory();
5010 let session = db.session();
5011
5012 let lonely = session.create_node(&["Person"]);
5013
5014 assert!(session.get_neighbors_outgoing(lonely).is_empty());
5015 assert!(session.get_neighbors_incoming(lonely).is_empty());
5016 assert!(
5017 session
5018 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5019 .is_empty()
5020 );
5021 }
5022 }
5023
5024 #[test]
5025 fn test_auto_gc_triggers_on_commit_interval() {
5026 use crate::config::Config;
5027
5028 let config = Config::in_memory().with_gc_interval(2);
5029 let db = GrafeoDB::with_config(config).unwrap();
5030 let mut session = db.session();
5031
5032 session.begin_transaction().unwrap();
5034 session.create_node(&["A"]);
5035 session.commit().unwrap();
5036
5037 session.begin_transaction().unwrap();
5039 session.create_node(&["B"]);
5040 session.commit().unwrap();
5041
5042 assert_eq!(db.node_count(), 2);
5044 }
5045
5046 #[test]
5047 fn test_query_timeout_config_propagates_to_session() {
5048 use crate::config::Config;
5049 use std::time::Duration;
5050
5051 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5052 let db = GrafeoDB::with_config(config).unwrap();
5053 let session = db.session();
5054
5055 assert!(session.query_deadline().is_some());
5057 }
5058
5059 #[test]
5060 fn test_no_query_timeout_returns_no_deadline() {
5061 let db = GrafeoDB::new_in_memory();
5062 let session = db.session();
5063
5064 assert!(session.query_deadline().is_none());
5066 }
5067
5068 #[test]
5069 fn test_graph_model_accessor() {
5070 use crate::config::GraphModel;
5071
5072 let db = GrafeoDB::new_in_memory();
5073 let session = db.session();
5074
5075 assert_eq!(session.graph_model(), GraphModel::Lpg);
5076 }
5077
5078 #[cfg(feature = "gql")]
5079 #[test]
5080 fn test_external_store_session() {
5081 use grafeo_core::graph::GraphStoreMut;
5082 use std::sync::Arc;
5083
5084 let config = crate::config::Config::in_memory();
5085 let store =
5086 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5087 let db = GrafeoDB::with_store(store, config).unwrap();
5088
5089 let mut session = db.session();
5090
5091 session.begin_transaction().unwrap();
5095 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5096
5097 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5099 assert_eq!(result.row_count(), 1);
5100
5101 session.commit().unwrap();
5102 }
5103
5104 #[cfg(feature = "gql")]
5107 mod session_command_tests {
5108 use super::*;
5109 use grafeo_common::types::Value;
5110
5111 #[test]
5112 fn test_use_graph_sets_current_graph() {
5113 let db = GrafeoDB::new_in_memory();
5114 let session = db.session();
5115
5116 session.execute("CREATE GRAPH mydb").unwrap();
5118 session.execute("USE GRAPH mydb").unwrap();
5119
5120 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5121 }
5122
5123 #[test]
5124 fn test_use_graph_nonexistent_errors() {
5125 let db = GrafeoDB::new_in_memory();
5126 let session = db.session();
5127
5128 let result = session.execute("USE GRAPH doesnotexist");
5129 assert!(result.is_err());
5130 let err = result.unwrap_err().to_string();
5131 assert!(
5132 err.contains("does not exist"),
5133 "Expected 'does not exist' error, got: {err}"
5134 );
5135 }
5136
5137 #[test]
5138 fn test_use_graph_default_always_valid() {
5139 let db = GrafeoDB::new_in_memory();
5140 let session = db.session();
5141
5142 session.execute("USE GRAPH default").unwrap();
5144 assert_eq!(session.current_graph(), Some("default".to_string()));
5145 }
5146
5147 #[test]
5148 fn test_session_set_graph() {
5149 let db = GrafeoDB::new_in_memory();
5150 let session = db.session();
5151
5152 session.execute("CREATE GRAPH analytics").unwrap();
5153 session.execute("SESSION SET GRAPH analytics").unwrap();
5154 assert_eq!(session.current_graph(), Some("analytics".to_string()));
5155 }
5156
5157 #[test]
5158 fn test_session_set_graph_nonexistent_errors() {
5159 let db = GrafeoDB::new_in_memory();
5160 let session = db.session();
5161
5162 let result = session.execute("SESSION SET GRAPH nosuchgraph");
5163 assert!(result.is_err());
5164 }
5165
5166 #[test]
5167 fn test_session_set_time_zone() {
5168 let db = GrafeoDB::new_in_memory();
5169 let session = db.session();
5170
5171 assert_eq!(session.time_zone(), None);
5172
5173 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5174 assert_eq!(session.time_zone(), Some("UTC".to_string()));
5175
5176 session
5177 .execute("SESSION SET TIME ZONE 'America/New_York'")
5178 .unwrap();
5179 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5180 }
5181
5182 #[test]
5183 fn test_session_set_parameter() {
5184 let db = GrafeoDB::new_in_memory();
5185 let session = db.session();
5186
5187 session
5188 .execute("SESSION SET PARAMETER $timeout = 30")
5189 .unwrap();
5190
5191 assert!(session.get_parameter("timeout").is_some());
5194 }
5195
5196 #[test]
5197 fn test_session_reset_clears_all_state() {
5198 let db = GrafeoDB::new_in_memory();
5199 let session = db.session();
5200
5201 session.execute("CREATE GRAPH analytics").unwrap();
5203 session.execute("SESSION SET GRAPH analytics").unwrap();
5204 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5205 session
5206 .execute("SESSION SET PARAMETER $limit = 100")
5207 .unwrap();
5208
5209 assert!(session.current_graph().is_some());
5211 assert!(session.time_zone().is_some());
5212 assert!(session.get_parameter("limit").is_some());
5213
5214 session.execute("SESSION RESET").unwrap();
5216
5217 assert_eq!(session.current_graph(), None);
5218 assert_eq!(session.time_zone(), None);
5219 assert!(session.get_parameter("limit").is_none());
5220 }
5221
5222 #[test]
5223 fn test_session_close_clears_state() {
5224 let db = GrafeoDB::new_in_memory();
5225 let session = db.session();
5226
5227 session.execute("CREATE GRAPH analytics").unwrap();
5228 session.execute("SESSION SET GRAPH analytics").unwrap();
5229 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5230
5231 session.execute("SESSION CLOSE").unwrap();
5232
5233 assert_eq!(session.current_graph(), None);
5234 assert_eq!(session.time_zone(), None);
5235 }
5236
5237 #[test]
5238 fn test_create_graph() {
5239 let db = GrafeoDB::new_in_memory();
5240 let session = db.session();
5241
5242 session.execute("CREATE GRAPH mydb").unwrap();
5243
5244 session.execute("USE GRAPH mydb").unwrap();
5246 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5247 }
5248
5249 #[test]
5250 fn test_create_graph_duplicate_errors() {
5251 let db = GrafeoDB::new_in_memory();
5252 let session = db.session();
5253
5254 session.execute("CREATE GRAPH mydb").unwrap();
5255 let result = session.execute("CREATE GRAPH mydb");
5256
5257 assert!(result.is_err());
5258 let err = result.unwrap_err().to_string();
5259 assert!(
5260 err.contains("already exists"),
5261 "Expected 'already exists' error, got: {err}"
5262 );
5263 }
5264
5265 #[test]
5266 fn test_create_graph_if_not_exists() {
5267 let db = GrafeoDB::new_in_memory();
5268 let session = db.session();
5269
5270 session.execute("CREATE GRAPH mydb").unwrap();
5271 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5273 }
5274
5275 #[test]
5276 fn test_drop_graph() {
5277 let db = GrafeoDB::new_in_memory();
5278 let session = db.session();
5279
5280 session.execute("CREATE GRAPH mydb").unwrap();
5281 session.execute("DROP GRAPH mydb").unwrap();
5282
5283 let result = session.execute("USE GRAPH mydb");
5285 assert!(result.is_err());
5286 }
5287
5288 #[test]
5289 fn test_drop_graph_nonexistent_errors() {
5290 let db = GrafeoDB::new_in_memory();
5291 let session = db.session();
5292
5293 let result = session.execute("DROP GRAPH nosuchgraph");
5294 assert!(result.is_err());
5295 let err = result.unwrap_err().to_string();
5296 assert!(
5297 err.contains("does not exist"),
5298 "Expected 'does not exist' error, got: {err}"
5299 );
5300 }
5301
5302 #[test]
5303 fn test_drop_graph_if_exists() {
5304 let db = GrafeoDB::new_in_memory();
5305 let session = db.session();
5306
5307 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5309 }
5310
5311 #[test]
5312 fn test_start_transaction_via_gql() {
5313 let db = GrafeoDB::new_in_memory();
5314 let session = db.session();
5315
5316 session.execute("START TRANSACTION").unwrap();
5317 assert!(session.in_transaction());
5318 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5319 session.execute("COMMIT").unwrap();
5320 assert!(!session.in_transaction());
5321
5322 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5323 assert_eq!(result.rows.len(), 1);
5324 }
5325
5326 #[test]
5327 fn test_start_transaction_read_only_blocks_insert() {
5328 let db = GrafeoDB::new_in_memory();
5329 let session = db.session();
5330
5331 session.execute("START TRANSACTION READ ONLY").unwrap();
5332 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5333 assert!(result.is_err());
5334 let err = result.unwrap_err().to_string();
5335 assert!(
5336 err.contains("read-only"),
5337 "Expected read-only error, got: {err}"
5338 );
5339 session.execute("ROLLBACK").unwrap();
5340 }
5341
5342 #[test]
5343 fn test_start_transaction_read_only_allows_reads() {
5344 let db = GrafeoDB::new_in_memory();
5345 let mut session = db.session();
5346 session.begin_transaction().unwrap();
5347 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5348 session.commit().unwrap();
5349
5350 session.execute("START TRANSACTION READ ONLY").unwrap();
5351 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5352 assert_eq!(result.rows.len(), 1);
5353 session.execute("COMMIT").unwrap();
5354 }
5355
5356 #[test]
5357 fn test_rollback_via_gql() {
5358 let db = GrafeoDB::new_in_memory();
5359 let session = db.session();
5360
5361 session.execute("START TRANSACTION").unwrap();
5362 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5363 session.execute("ROLLBACK").unwrap();
5364
5365 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5366 assert!(result.rows.is_empty());
5367 }
5368
5369 #[test]
5370 fn test_start_transaction_with_isolation_level() {
5371 let db = GrafeoDB::new_in_memory();
5372 let session = db.session();
5373
5374 session
5375 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5376 .unwrap();
5377 assert!(session.in_transaction());
5378 session.execute("ROLLBACK").unwrap();
5379 }
5380
5381 #[test]
5382 fn test_session_commands_return_empty_result() {
5383 let db = GrafeoDB::new_in_memory();
5384 let session = db.session();
5385
5386 session.execute("CREATE GRAPH test").unwrap();
5387 let result = session.execute("SESSION SET GRAPH test").unwrap();
5388 assert_eq!(result.row_count(), 0);
5389 assert_eq!(result.column_count(), 0);
5390 }
5391
5392 #[test]
5393 fn test_current_graph_default_is_none() {
5394 let db = GrafeoDB::new_in_memory();
5395 let session = db.session();
5396
5397 assert_eq!(session.current_graph(), None);
5398 }
5399
5400 #[test]
5401 fn test_time_zone_default_is_none() {
5402 let db = GrafeoDB::new_in_memory();
5403 let session = db.session();
5404
5405 assert_eq!(session.time_zone(), None);
5406 }
5407
5408 #[test]
5409 fn test_session_state_independent_across_sessions() {
5410 let db = GrafeoDB::new_in_memory();
5411 let session1 = db.session();
5412 let session2 = db.session();
5413
5414 session1.execute("CREATE GRAPH first").unwrap();
5415 session1.execute("CREATE GRAPH second").unwrap();
5416 session1.execute("SESSION SET GRAPH first").unwrap();
5417 session2.execute("SESSION SET GRAPH second").unwrap();
5418
5419 assert_eq!(session1.current_graph(), Some("first".to_string()));
5420 assert_eq!(session2.current_graph(), Some("second".to_string()));
5421 }
5422
5423 #[test]
5424 fn test_show_node_types() {
5425 let db = GrafeoDB::new_in_memory();
5426 let session = db.session();
5427
5428 session
5429 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5430 .unwrap();
5431
5432 let result = session.execute("SHOW NODE TYPES").unwrap();
5433 assert_eq!(
5434 result.columns,
5435 vec!["name", "properties", "constraints", "parents"]
5436 );
5437 assert_eq!(result.rows.len(), 1);
5438 assert_eq!(result.rows[0][0], Value::from("Person"));
5440 }
5441
5442 #[test]
5443 fn test_show_edge_types() {
5444 let db = GrafeoDB::new_in_memory();
5445 let session = db.session();
5446
5447 session
5448 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5449 .unwrap();
5450
5451 let result = session.execute("SHOW EDGE TYPES").unwrap();
5452 assert_eq!(
5453 result.columns,
5454 vec!["name", "properties", "source_types", "target_types"]
5455 );
5456 assert_eq!(result.rows.len(), 1);
5457 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5458 }
5459
5460 #[test]
5461 fn test_show_graph_types() {
5462 let db = GrafeoDB::new_in_memory();
5463 let session = db.session();
5464
5465 session
5466 .execute("CREATE NODE TYPE Person (name STRING)")
5467 .unwrap();
5468 session
5469 .execute(
5470 "CREATE GRAPH TYPE social (\
5471 NODE TYPE Person (name STRING)\
5472 )",
5473 )
5474 .unwrap();
5475
5476 let result = session.execute("SHOW GRAPH TYPES").unwrap();
5477 assert_eq!(
5478 result.columns,
5479 vec!["name", "open", "node_types", "edge_types"]
5480 );
5481 assert_eq!(result.rows.len(), 1);
5482 assert_eq!(result.rows[0][0], Value::from("social"));
5483 }
5484
5485 #[test]
5486 fn test_show_graph_type_named() {
5487 let db = GrafeoDB::new_in_memory();
5488 let session = db.session();
5489
5490 session
5491 .execute("CREATE NODE TYPE Person (name STRING)")
5492 .unwrap();
5493 session
5494 .execute(
5495 "CREATE GRAPH TYPE social (\
5496 NODE TYPE Person (name STRING)\
5497 )",
5498 )
5499 .unwrap();
5500
5501 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5502 assert_eq!(result.rows.len(), 1);
5503 assert_eq!(result.rows[0][0], Value::from("social"));
5504 }
5505
5506 #[test]
5507 fn test_show_graph_type_not_found() {
5508 let db = GrafeoDB::new_in_memory();
5509 let session = db.session();
5510
5511 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5512 assert!(result.is_err());
5513 }
5514
5515 #[test]
5516 fn test_show_indexes_via_gql() {
5517 let db = GrafeoDB::new_in_memory();
5518 let session = db.session();
5519
5520 let result = session.execute("SHOW INDEXES").unwrap();
5521 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5522 }
5523
5524 #[test]
5525 fn test_show_constraints_via_gql() {
5526 let db = GrafeoDB::new_in_memory();
5527 let session = db.session();
5528
5529 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5530 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5531 }
5532
5533 #[test]
5534 fn test_pattern_form_graph_type_roundtrip() {
5535 let db = GrafeoDB::new_in_memory();
5536 let session = db.session();
5537
5538 session
5540 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5541 .unwrap();
5542 session
5543 .execute("CREATE NODE TYPE City (name STRING)")
5544 .unwrap();
5545 session
5546 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5547 .unwrap();
5548 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5549
5550 session
5552 .execute(
5553 "CREATE GRAPH TYPE social (\
5554 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5555 (:Person)-[:LIVES_IN]->(:City)\
5556 )",
5557 )
5558 .unwrap();
5559
5560 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5562 assert_eq!(result.rows.len(), 1);
5563 assert_eq!(result.rows[0][0], Value::from("social"));
5564 }
5565 }
5566}