1#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::cache::QueryCache;
37use crate::transaction::TransactionManager;
38
39const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
42
43fn parse_default_literal(text: &str) -> Value {
48 if text.eq_ignore_ascii_case("null") {
49 return Value::Null;
50 }
51 if text.eq_ignore_ascii_case("true") {
52 return Value::Bool(true);
53 }
54 if text.eq_ignore_ascii_case("false") {
55 return Value::Bool(false);
56 }
57 if (text.starts_with('\'') && text.ends_with('\''))
59 || (text.starts_with('"') && text.ends_with('"'))
60 {
61 return Value::String(text[1..text.len() - 1].into());
62 }
63 if let Ok(i) = text.parse::<i64>() {
65 return Value::Int64(i);
66 }
67 if let Ok(f) = text.parse::<f64>() {
68 return Value::Float64(f);
69 }
70 Value::String(text.into())
72}
73
74pub(crate) struct SessionConfig {
79 pub transaction_manager: Arc<TransactionManager>,
80 pub query_cache: Arc<QueryCache>,
81 pub catalog: Arc<Catalog>,
82 pub adaptive_config: AdaptiveConfig,
83 pub factorized_execution: bool,
84 pub graph_model: GraphModel,
85 pub query_timeout: Option<Duration>,
86 pub commit_counter: Arc<AtomicUsize>,
87 pub gc_interval: usize,
88 pub read_only: bool,
90 pub identity: crate::auth::Identity,
92 #[cfg(feature = "lpg")]
94 pub projections: Arc<
95 parking_lot::RwLock<
96 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
97 >,
98 >,
99}
100
101pub struct Session {
107 #[cfg(feature = "lpg")]
109 store: Arc<LpgStore>,
110 graph_store: Arc<dyn GraphStore>,
112 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
114 catalog: Arc<Catalog>,
116 #[cfg(feature = "triple-store")]
118 rdf_store: Arc<RdfStore>,
119 transaction_manager: Arc<TransactionManager>,
121 query_cache: Arc<QueryCache>,
123 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
127 read_only_tx: parking_lot::Mutex<bool>,
129 db_read_only: bool,
132 identity: crate::auth::Identity,
134 auto_commit: bool,
136 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
139 factorized_execution: bool,
141 graph_model: GraphModel,
143 query_timeout: Option<Duration>,
145 commit_counter: Arc<AtomicUsize>,
147 gc_interval: usize,
149 transaction_start_node_count: AtomicUsize,
151 transaction_start_edge_count: AtomicUsize,
153 #[cfg(feature = "wal")]
155 wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
156 #[cfg(feature = "wal")]
158 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
159 #[cfg(feature = "cdc")]
161 cdc_log: Arc<crate::cdc::CdcLog>,
162 #[cfg(feature = "cdc")]
165 cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
166 current_graph: parking_lot::Mutex<Option<String>>,
168 current_schema: parking_lot::Mutex<Option<String>>,
171 time_zone: parking_lot::Mutex<Option<String>>,
173 session_params:
175 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
176 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
178 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
180 transaction_nesting_depth: parking_lot::Mutex<u32>,
184 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
188 #[cfg(feature = "metrics")]
190 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
191 #[cfg(feature = "metrics")]
193 tx_start_time: parking_lot::Mutex<Option<Instant>>,
194 #[cfg(feature = "lpg")]
196 projections: Arc<
197 parking_lot::RwLock<
198 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
199 >,
200 >,
201}
202
203#[derive(Clone)]
205struct GraphSavepoint {
206 graph_name: Option<String>,
207 next_node_id: u64,
208 next_edge_id: u64,
209 undo_log_position: usize,
210}
211
212#[derive(Clone)]
214struct SavepointState {
215 name: String,
216 graph_snapshots: Vec<GraphSavepoint>,
217 #[allow(dead_code)]
220 active_graph: Option<String>,
221 #[cfg(feature = "cdc")]
224 cdc_event_position: usize,
225}
226
227impl Session {
228 #[cfg(feature = "lpg")]
230 #[allow(dead_code)] pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
232 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
233 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
234 Self {
235 store,
236 graph_store,
237 graph_store_mut,
238 catalog: cfg.catalog,
239 #[cfg(feature = "triple-store")]
240 rdf_store: Arc::new(RdfStore::new()),
241 transaction_manager: cfg.transaction_manager,
242 query_cache: cfg.query_cache,
243 current_transaction: parking_lot::Mutex::new(None),
244 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
245 db_read_only: cfg.read_only,
246 identity: cfg.identity,
247 auto_commit: true,
248 adaptive_config: cfg.adaptive_config,
249 factorized_execution: cfg.factorized_execution,
250 graph_model: cfg.graph_model,
251 query_timeout: cfg.query_timeout,
252 commit_counter: cfg.commit_counter,
253 gc_interval: cfg.gc_interval,
254 transaction_start_node_count: AtomicUsize::new(0),
255 transaction_start_edge_count: AtomicUsize::new(0),
256 #[cfg(feature = "wal")]
257 wal: None,
258 #[cfg(feature = "wal")]
259 wal_graph_context: None,
260 #[cfg(feature = "cdc")]
261 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
262 #[cfg(feature = "cdc")]
263 cdc_pending_events: None,
264 current_graph: parking_lot::Mutex::new(None),
265 current_schema: parking_lot::Mutex::new(None),
266 time_zone: parking_lot::Mutex::new(None),
267 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
268 viewing_epoch_override: parking_lot::Mutex::new(None),
269 savepoints: parking_lot::Mutex::new(Vec::new()),
270 transaction_nesting_depth: parking_lot::Mutex::new(0),
271 touched_graphs: parking_lot::Mutex::new(Vec::new()),
272 #[cfg(feature = "metrics")]
273 metrics: None,
274 #[cfg(feature = "metrics")]
275 tx_start_time: parking_lot::Mutex::new(None),
276 projections: cfg.projections,
277 }
278 }
279
280 #[cfg(all(feature = "wal", feature = "lpg"))]
285 pub(crate) fn set_wal(
286 &mut self,
287 wal: Arc<grafeo_storage::wal::LpgWal>,
288 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
289 ) {
290 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
292 Arc::clone(&self.store),
293 Arc::clone(&wal),
294 Arc::clone(&wal_graph_context),
295 ));
296 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
297 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
298 self.wal = Some(wal);
299 self.wal_graph_context = Some(wal_graph_context);
300 }
301
302 #[cfg(feature = "cdc")]
309 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
310 if let Some(ref write_store) = self.graph_store_mut {
313 let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
314 Arc::clone(write_store),
315 Arc::clone(&cdc_log),
316 ));
317 self.cdc_pending_events = Some(cdc_store.pending_events());
318 self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
319 }
320 self.cdc_log = cdc_log;
321 }
322
323 #[cfg(feature = "metrics")]
325 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
326 self.metrics = Some(metrics);
327 }
328
329 pub(crate) fn with_external_store(
338 read_store: Arc<dyn GraphStore>,
339 write_store: Option<Arc<dyn GraphStoreMut>>,
340 cfg: SessionConfig,
341 ) -> Result<Self> {
342 Ok(Self {
343 #[cfg(feature = "lpg")]
344 store: Arc::new(LpgStore::new()?),
345 graph_store: read_store,
346 graph_store_mut: write_store,
347 catalog: cfg.catalog,
348 #[cfg(feature = "triple-store")]
349 rdf_store: Arc::new(RdfStore::new()),
350 transaction_manager: cfg.transaction_manager,
351 query_cache: cfg.query_cache,
352 current_transaction: parking_lot::Mutex::new(None),
353 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
354 db_read_only: cfg.read_only,
355 identity: cfg.identity,
356 auto_commit: true,
357 adaptive_config: cfg.adaptive_config,
358 factorized_execution: cfg.factorized_execution,
359 graph_model: cfg.graph_model,
360 query_timeout: cfg.query_timeout,
361 commit_counter: cfg.commit_counter,
362 gc_interval: cfg.gc_interval,
363 transaction_start_node_count: AtomicUsize::new(0),
364 transaction_start_edge_count: AtomicUsize::new(0),
365 #[cfg(feature = "wal")]
366 wal: None,
367 #[cfg(feature = "wal")]
368 wal_graph_context: None,
369 #[cfg(feature = "cdc")]
370 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
371 #[cfg(feature = "cdc")]
372 cdc_pending_events: None,
373 current_graph: parking_lot::Mutex::new(None),
374 current_schema: parking_lot::Mutex::new(None),
375 time_zone: parking_lot::Mutex::new(None),
376 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
377 viewing_epoch_override: parking_lot::Mutex::new(None),
378 savepoints: parking_lot::Mutex::new(Vec::new()),
379 transaction_nesting_depth: parking_lot::Mutex::new(0),
380 touched_graphs: parking_lot::Mutex::new(Vec::new()),
381 #[cfg(feature = "metrics")]
382 metrics: None,
383 #[cfg(feature = "metrics")]
384 tx_start_time: parking_lot::Mutex::new(None),
385 #[cfg(feature = "lpg")]
386 projections: cfg.projections,
387 })
388 }
389
390 #[must_use]
392 pub fn graph_model(&self) -> GraphModel {
393 self.graph_model
394 }
395
396 #[must_use]
398 pub fn identity(&self) -> &crate::auth::Identity {
399 &self.identity
400 }
401
402 pub fn use_graph(&self, name: &str) {
406 *self.current_graph.lock() = Some(name.to_string());
407 }
408
409 #[must_use]
411 pub fn current_graph(&self) -> Option<String> {
412 self.current_graph.lock().clone()
413 }
414
415 pub fn set_schema(&self, name: &str) {
419 *self.current_schema.lock() = Some(name.to_string());
420 }
421
422 #[must_use]
426 pub fn current_schema(&self) -> Option<String> {
427 self.current_schema.lock().clone()
428 }
429
430 fn effective_graph_key(&self, graph_name: &str) -> String {
435 let schema = self.current_schema.lock().clone();
436 match schema {
437 Some(s) => format!("{s}/{graph_name}"),
438 None => graph_name.to_string(),
439 }
440 }
441
442 fn effective_type_key(&self, type_name: &str) -> String {
446 let schema = self.current_schema.lock().clone();
447 match schema {
448 Some(s) => format!("{s}/{type_name}"),
449 None => type_name.to_string(),
450 }
451 }
452
453 fn active_graph_storage_key(&self) -> Option<String> {
457 let graph = self.current_graph.lock().clone();
458 let schema = self.current_schema.lock().clone();
459 match (&schema, &graph) {
460 (None, None) => None,
461 (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
462 (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
463 (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
464 Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
465 }
466 (None, Some(name)) => Some(name.clone()),
467 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
468 }
469 }
470
471 fn active_store(&self) -> Arc<dyn GraphStore> {
479 let key = self.active_graph_storage_key();
480 match key {
481 None => Arc::clone(&self.graph_store),
482 #[cfg(feature = "lpg")]
483 Some(ref name) => match self.store.graph(name) {
484 Some(named_store) => {
485 #[cfg(feature = "wal")]
486 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
487 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
488 named_store,
489 Arc::clone(wal),
490 name.clone(),
491 Arc::clone(ctx),
492 )) as Arc<dyn GraphStore>;
493 }
494 named_store as Arc<dyn GraphStore>
495 }
496 None => Arc::clone(&self.graph_store),
497 },
498 #[cfg(not(feature = "lpg"))]
499 Some(_) => Arc::clone(&self.graph_store),
500 }
501 }
502
503 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
508 let key = self.active_graph_storage_key();
509 match key {
510 None => self.graph_store_mut.as_ref().map(Arc::clone),
511 #[cfg(feature = "lpg")]
512 Some(ref name) => match self.store.graph(name) {
513 Some(named_store) => {
514 let mut store: Arc<dyn GraphStoreMut> = named_store;
515
516 #[cfg(feature = "wal")]
517 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
518 store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
519 self.store
521 .graph(name)
522 .unwrap_or_else(|| Arc::clone(&self.store)),
523 Arc::clone(wal),
524 name.clone(),
525 Arc::clone(ctx),
526 ));
527 }
528
529 #[cfg(feature = "cdc")]
530 if let Some(ref pending) = self.cdc_pending_events {
531 store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
532 store,
533 Arc::clone(&self.cdc_log),
534 Arc::clone(pending),
535 ));
536 }
537
538 Some(store)
539 }
540 None => self.graph_store_mut.as_ref().map(Arc::clone),
541 },
542 #[cfg(not(feature = "lpg"))]
543 Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
544 }
545 }
546
547 #[cfg(feature = "lpg")]
552 fn active_lpg_store(&self) -> Arc<LpgStore> {
553 let key = self.active_graph_storage_key();
554 match key {
555 None => Arc::clone(&self.store),
556 Some(ref name) => self
557 .store
558 .graph(name)
559 .unwrap_or_else(|| Arc::clone(&self.store)),
560 }
561 }
562
563 #[cfg(feature = "lpg")]
566 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
567 match graph_name {
568 None => Arc::clone(&self.store),
569 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
570 Some(name) => self
571 .store
572 .graph(name)
573 .unwrap_or_else(|| Arc::clone(&self.store)),
574 }
575 }
576
577 fn track_graph_touch(&self) {
582 if self.current_transaction.lock().is_some() {
583 let key = self.active_graph_storage_key();
584 let mut touched = self.touched_graphs.lock();
585 if !touched.contains(&key) {
586 touched.push(key);
587 }
588 }
589 }
590
591 pub fn set_time_zone(&self, tz: &str) {
593 *self.time_zone.lock() = Some(tz.to_string());
594 }
595
596 #[must_use]
598 pub fn time_zone(&self) -> Option<String> {
599 self.time_zone.lock().clone()
600 }
601
602 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
604 self.session_params.lock().insert(key.to_string(), value);
605 }
606
607 #[must_use]
609 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
610 self.session_params.lock().get(key).cloned()
611 }
612
613 pub fn reset_session(&self) {
615 *self.current_schema.lock() = None;
616 *self.current_graph.lock() = None;
617 *self.time_zone.lock() = None;
618 self.session_params.lock().clear();
619 *self.viewing_epoch_override.lock() = None;
620 }
621
622 pub fn reset_schema(&self) {
624 *self.current_schema.lock() = None;
625 }
626
627 pub fn reset_graph(&self) {
629 *self.current_graph.lock() = None;
630 }
631
632 pub fn reset_time_zone(&self) {
634 *self.time_zone.lock() = None;
635 }
636
637 pub fn reset_parameters(&self) {
639 self.session_params.lock().clear();
640 }
641
642 pub fn set_viewing_epoch(&self, epoch: EpochId) {
650 *self.viewing_epoch_override.lock() = Some(epoch);
651 }
652
653 pub fn clear_viewing_epoch(&self) {
655 *self.viewing_epoch_override.lock() = None;
656 }
657
658 #[must_use]
660 pub fn viewing_epoch(&self) -> Option<EpochId> {
661 *self.viewing_epoch_override.lock()
662 }
663
664 #[cfg(feature = "lpg")]
668 #[must_use]
669 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
670 self.active_lpg_store().get_node_history(id)
671 }
672
673 #[cfg(feature = "lpg")]
677 #[must_use]
678 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
679 self.active_lpg_store().get_edge_history(id)
680 }
681
682 fn require_lpg(&self, language: &str) -> Result<()> {
684 if self.graph_model == GraphModel::Rdf {
685 return Err(grafeo_common::utils::error::Error::Internal(format!(
686 "This is an RDF database. {language} queries require an LPG database."
687 )));
688 }
689 Ok(())
690 }
691
692 fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
695 crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
696 grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
697 grafeo_common::utils::error::QueryErrorKind::Semantic,
698 denied.to_string(),
699 ))
700 })
701 }
702
703 #[cfg(feature = "gql")]
705 fn execute_session_command(
706 &self,
707 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
708 ) -> Result<QueryResult> {
709 use grafeo_adapters::query::gql::ast::SessionCommand;
710 #[cfg(feature = "lpg")]
711 use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
712 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
713
714 match &cmd {
716 SessionCommand::CreateGraph { .. }
717 | SessionCommand::DropGraph { .. }
718 | SessionCommand::CreateProjection { .. }
719 | SessionCommand::DropProjection { .. } => {
720 self.require_permission(crate::auth::StatementKind::Write)?;
721 }
722 _ => {} }
724
725 if self.identity.has_grants() {
727 match &cmd {
728 SessionCommand::CreateGraph { name, .. }
729 | SessionCommand::DropGraph { name, .. } => {
730 if !self
731 .identity
732 .can_access_graph(name, crate::auth::Role::ReadWrite)
733 {
734 return Err(Error::Query(QueryError::new(
735 QueryErrorKind::Semantic,
736 format!(
737 "permission denied: no grant for graph '{name}' (user: {})",
738 self.identity.user_id()
739 ),
740 )));
741 }
742 }
743 _ => {}
744 }
745 }
746
747 if *self.read_only_tx.lock() {
749 match &cmd {
750 SessionCommand::CreateGraph { .. }
751 | SessionCommand::DropGraph { .. }
752 | SessionCommand::CreateProjection { .. }
753 | SessionCommand::DropProjection { .. } => {
754 return Err(Error::Transaction(
755 grafeo_common::utils::error::TransactionError::ReadOnly,
756 ));
757 }
758 _ => {} }
760 }
761
762 match cmd {
763 #[cfg(feature = "lpg")]
764 SessionCommand::CreateGraph {
765 name,
766 if_not_exists,
767 typed,
768 like_graph,
769 copy_of,
770 open: _,
771 } => {
772 let storage_key = self.effective_graph_key(&name);
774
775 if let Some(ref src) = like_graph {
777 let src_key = self.effective_graph_key(src);
778 if self.store.graph(&src_key).is_none() {
779 return Err(Error::Query(QueryError::new(
780 QueryErrorKind::Semantic,
781 format!("Source graph '{src}' does not exist"),
782 )));
783 }
784 }
785 if let Some(ref src) = copy_of {
786 let src_key = self.effective_graph_key(src);
787 if self.store.graph(&src_key).is_none() {
788 return Err(Error::Query(QueryError::new(
789 QueryErrorKind::Semantic,
790 format!("Source graph '{src}' does not exist"),
791 )));
792 }
793 }
794
795 let created = self
796 .store
797 .create_graph(&storage_key)
798 .map_err(|e| Error::Internal(e.to_string()))?;
799 if !created && !if_not_exists {
800 return Err(Error::Query(QueryError::new(
801 QueryErrorKind::Semantic,
802 format!("Graph '{name}' already exists"),
803 )));
804 }
805 if created {
806 #[cfg(feature = "wal")]
807 self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
808 name: storage_key.clone(),
809 });
810 }
811
812 if let Some(ref src) = copy_of {
814 let src_key = self.effective_graph_key(src);
815 self.store
816 .copy_graph(Some(&src_key), Some(&storage_key))
817 .map_err(|e| Error::Internal(e.to_string()))?;
818 }
819
820 if let Some(type_name) = typed
824 && let Err(e) = self.catalog.bind_graph_type(
825 &storage_key,
826 if type_name.contains('/') {
827 type_name.clone()
828 } else {
829 self.effective_type_key(&type_name)
830 },
831 )
832 {
833 return Err(Error::Query(QueryError::new(
834 QueryErrorKind::Semantic,
835 e.to_string(),
836 )));
837 }
838
839 if let Some(ref src) = like_graph {
841 let src_key = self.effective_graph_key(src);
842 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
843 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
844 }
845 }
846
847 Ok(QueryResult::empty())
848 }
849 #[cfg(feature = "lpg")]
850 SessionCommand::DropGraph { name, if_exists } => {
851 let storage_key = self.effective_graph_key(&name);
852 let dropped = self.store.drop_graph(&storage_key);
853 if !dropped && !if_exists {
854 return Err(Error::Query(QueryError::new(
855 QueryErrorKind::Semantic,
856 format!("Graph '{name}' does not exist"),
857 )));
858 }
859 if dropped {
860 #[cfg(feature = "wal")]
861 self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
862 name: storage_key.clone(),
863 });
864 let mut current = self.current_graph.lock();
866 if current
867 .as_deref()
868 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
869 {
870 *current = None;
871 }
872 }
873 Ok(QueryResult::empty())
874 }
875 #[cfg(feature = "lpg")]
876 SessionCommand::UseGraph(name) => {
877 if self.identity.has_grants()
879 && !name.eq_ignore_ascii_case("default")
880 && !self
881 .identity
882 .can_access_graph(&name, crate::auth::Role::ReadOnly)
883 {
884 return Err(Error::Query(QueryError::new(
885 QueryErrorKind::Semantic,
886 format!(
887 "permission denied: no grant for graph '{name}' (user: {})",
888 self.identity.user_id()
889 ),
890 )));
891 }
892 let effective_key = self.effective_graph_key(&name);
894 if !name.eq_ignore_ascii_case("default")
895 && self.store.graph(&effective_key).is_none()
896 {
897 return Err(Error::Query(QueryError::new(
898 QueryErrorKind::Semantic,
899 format!("Graph '{name}' does not exist"),
900 )));
901 }
902 self.use_graph(&name);
903 self.track_graph_touch();
905 Ok(QueryResult::empty())
906 }
907 #[cfg(feature = "lpg")]
908 SessionCommand::SessionSetGraph(name) => {
909 if self.identity.has_grants()
912 && !name.eq_ignore_ascii_case("default")
913 && !self
914 .identity
915 .can_access_graph(&name, crate::auth::Role::ReadOnly)
916 {
917 return Err(Error::Query(QueryError::new(
918 QueryErrorKind::Semantic,
919 format!(
920 "permission denied: no grant for graph '{name}' (user: {})",
921 self.identity.user_id()
922 ),
923 )));
924 }
925 let effective_key = self.effective_graph_key(&name);
926 if !name.eq_ignore_ascii_case("default")
927 && self.store.graph(&effective_key).is_none()
928 {
929 return Err(Error::Query(QueryError::new(
930 QueryErrorKind::Semantic,
931 format!("Graph '{name}' does not exist"),
932 )));
933 }
934 self.use_graph(&name);
935 self.track_graph_touch();
937 Ok(QueryResult::empty())
938 }
939 SessionCommand::SessionSetSchema(name) => {
940 if !self.catalog.schema_exists(&name) {
942 return Err(Error::Query(QueryError::new(
943 QueryErrorKind::Semantic,
944 format!("Schema '{name}' does not exist"),
945 )));
946 }
947 self.set_schema(&name);
948 Ok(QueryResult::empty())
949 }
950 SessionCommand::SessionSetTimeZone(tz) => {
951 self.set_time_zone(&tz);
952 Ok(QueryResult::empty())
953 }
954 SessionCommand::SessionSetParameter(key, expr) => {
955 if key.eq_ignore_ascii_case("viewing_epoch") {
956 match Self::eval_integer_literal(&expr) {
957 Some(n) if n >= 0 => {
958 self.set_viewing_epoch(EpochId::new(n as u64));
959 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
960 }
961 _ => Err(Error::Query(QueryError::new(
962 QueryErrorKind::Semantic,
963 "viewing_epoch must be a non-negative integer literal",
964 ))),
965 }
966 } else {
967 self.set_parameter(&key, Value::Null);
970 Ok(QueryResult::empty())
971 }
972 }
973 SessionCommand::SessionReset(target) => {
974 use grafeo_adapters::query::gql::ast::SessionResetTarget;
975 match target {
976 SessionResetTarget::All => self.reset_session(),
977 SessionResetTarget::Schema => self.reset_schema(),
978 SessionResetTarget::Graph => self.reset_graph(),
979 SessionResetTarget::TimeZone => self.reset_time_zone(),
980 SessionResetTarget::Parameters => self.reset_parameters(),
981 }
982 Ok(QueryResult::empty())
983 }
984 SessionCommand::SessionClose => {
985 self.reset_session();
986 Ok(QueryResult::empty())
987 }
988 #[cfg(feature = "lpg")]
989 SessionCommand::StartTransaction {
990 read_only,
991 isolation_level,
992 } => {
993 let engine_level = isolation_level.map(|l| match l {
994 TransactionIsolationLevel::ReadCommitted => {
995 crate::transaction::IsolationLevel::ReadCommitted
996 }
997 TransactionIsolationLevel::SnapshotIsolation => {
998 crate::transaction::IsolationLevel::SnapshotIsolation
999 }
1000 TransactionIsolationLevel::Serializable => {
1001 crate::transaction::IsolationLevel::Serializable
1002 }
1003 });
1004 self.begin_transaction_inner(read_only, engine_level)?;
1005 Ok(QueryResult::status("Transaction started"))
1006 }
1007 #[cfg(feature = "lpg")]
1008 SessionCommand::Commit => {
1009 self.commit_inner()?;
1010 Ok(QueryResult::status("Transaction committed"))
1011 }
1012 #[cfg(feature = "lpg")]
1013 SessionCommand::Rollback => {
1014 self.rollback_inner()?;
1015 Ok(QueryResult::status("Transaction rolled back"))
1016 }
1017 #[cfg(feature = "lpg")]
1018 SessionCommand::Savepoint(name) => {
1019 self.savepoint(&name)?;
1020 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1021 }
1022 #[cfg(feature = "lpg")]
1023 SessionCommand::RollbackToSavepoint(name) => {
1024 self.rollback_to_savepoint(&name)?;
1025 Ok(QueryResult::status(format!(
1026 "Rolled back to savepoint '{name}'"
1027 )))
1028 }
1029 #[cfg(feature = "lpg")]
1030 SessionCommand::ReleaseSavepoint(name) => {
1031 self.release_savepoint(&name)?;
1032 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1033 }
1034 #[cfg(feature = "lpg")]
1035 SessionCommand::CreateProjection {
1036 name,
1037 node_labels,
1038 edge_types,
1039 } => {
1040 use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1041 use std::collections::hash_map::Entry;
1042
1043 let spec = ProjectionSpec::new()
1044 .with_node_labels(node_labels)
1045 .with_edge_types(edge_types);
1046
1047 let store = self.active_store();
1048 let projection = Arc::new(GraphProjection::new(store, spec));
1049 let mut projections = self.projections.write();
1050 match projections.entry(name.clone()) {
1051 Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1052 QueryErrorKind::Semantic,
1053 format!("Projection '{name}' already exists"),
1054 ))),
1055 Entry::Vacant(e) => {
1056 e.insert(projection);
1057 Ok(QueryResult::status(format!("Projection '{name}' created")))
1058 }
1059 }
1060 }
1061 #[cfg(feature = "lpg")]
1062 SessionCommand::DropProjection { name } => {
1063 let removed = self.projections.write().remove(&name).is_some();
1064 if !removed {
1065 return Err(Error::Query(QueryError::new(
1066 QueryErrorKind::Semantic,
1067 format!("Projection '{name}' does not exist"),
1068 )));
1069 }
1070 Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1071 }
1072 #[cfg(feature = "lpg")]
1073 SessionCommand::ShowProjections => {
1074 let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1075 names.sort();
1076 let rows: Vec<Vec<Value>> =
1077 names.into_iter().map(|n| vec![Value::from(n)]).collect();
1078 Ok(QueryResult {
1079 columns: vec!["name".to_string()],
1080 column_types: Vec::new(),
1081 rows,
1082 ..QueryResult::empty()
1083 })
1084 }
1085 #[cfg(not(feature = "lpg"))]
1086 _ => Err(grafeo_common::utils::error::Error::Internal(
1087 "This command requires the `lpg` feature".to_string(),
1088 )),
1089 }
1090 }
1091
1092 #[cfg(feature = "wal")]
1094 fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1095 if let Some(ref wal) = self.wal
1096 && let Err(e) = wal.log(record)
1097 {
1098 grafeo_warn!("Failed to log schema change to WAL: {}", e);
1099 }
1100 }
1101
1102 #[cfg(all(feature = "lpg", feature = "gql"))]
1104 fn execute_schema_command(
1105 &self,
1106 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1107 ) -> Result<QueryResult> {
1108 use crate::catalog::{
1109 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1110 };
1111 use grafeo_adapters::query::gql::ast::SchemaStatement;
1112 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1113 #[cfg(feature = "wal")]
1114 use grafeo_storage::wal::WalRecord;
1115
1116 macro_rules! wal_log {
1118 ($self:expr, $record:expr) => {
1119 #[cfg(feature = "wal")]
1120 $self.log_schema_wal(&$record);
1121 };
1122 }
1123
1124 let result = match cmd {
1125 SchemaStatement::CreateNodeType(stmt) => {
1126 let effective_name = self.effective_type_key(&stmt.name);
1127 #[cfg(feature = "wal")]
1128 let props_for_wal: Vec<(String, String, bool)> = stmt
1129 .properties
1130 .iter()
1131 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1132 .collect();
1133 let def = NodeTypeDefinition {
1134 name: effective_name.clone(),
1135 properties: stmt
1136 .properties
1137 .iter()
1138 .map(|p| TypedProperty {
1139 name: p.name.clone(),
1140 data_type: PropertyDataType::from_type_name(&p.data_type),
1141 nullable: p.nullable,
1142 default_value: p
1143 .default_value
1144 .as_ref()
1145 .map(|s| parse_default_literal(s)),
1146 })
1147 .collect(),
1148 constraints: Vec::new(),
1149 parent_types: stmt.parent_types.clone(),
1150 };
1151 let result = if stmt.or_replace {
1152 let _ = self.catalog.drop_node_type(&effective_name);
1153 self.catalog.register_node_type(def)
1154 } else {
1155 self.catalog.register_node_type(def)
1156 };
1157 match result {
1158 Ok(()) => {
1159 wal_log!(
1160 self,
1161 WalRecord::CreateNodeType {
1162 name: effective_name.clone(),
1163 properties: props_for_wal,
1164 constraints: Vec::new(),
1165 }
1166 );
1167 Ok(QueryResult::status(format!(
1168 "Created node type '{}'",
1169 stmt.name
1170 )))
1171 }
1172 Err(e) if stmt.if_not_exists => {
1173 let _ = e;
1174 Ok(QueryResult::status("No change"))
1175 }
1176 Err(e) => Err(Error::Query(QueryError::new(
1177 QueryErrorKind::Semantic,
1178 e.to_string(),
1179 ))),
1180 }
1181 }
1182 SchemaStatement::CreateEdgeType(stmt) => {
1183 let effective_name = self.effective_type_key(&stmt.name);
1184 #[cfg(feature = "wal")]
1185 let props_for_wal: Vec<(String, String, bool)> = stmt
1186 .properties
1187 .iter()
1188 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1189 .collect();
1190 let def = EdgeTypeDefinition {
1191 name: effective_name.clone(),
1192 properties: stmt
1193 .properties
1194 .iter()
1195 .map(|p| TypedProperty {
1196 name: p.name.clone(),
1197 data_type: PropertyDataType::from_type_name(&p.data_type),
1198 nullable: p.nullable,
1199 default_value: p
1200 .default_value
1201 .as_ref()
1202 .map(|s| parse_default_literal(s)),
1203 })
1204 .collect(),
1205 constraints: Vec::new(),
1206 source_node_types: stmt.source_node_types.clone(),
1207 target_node_types: stmt.target_node_types.clone(),
1208 };
1209 let result = if stmt.or_replace {
1210 let _ = self.catalog.drop_edge_type_def(&effective_name);
1211 self.catalog.register_edge_type_def(def)
1212 } else {
1213 self.catalog.register_edge_type_def(def)
1214 };
1215 match result {
1216 Ok(()) => {
1217 wal_log!(
1218 self,
1219 WalRecord::CreateEdgeType {
1220 name: effective_name.clone(),
1221 properties: props_for_wal,
1222 constraints: Vec::new(),
1223 }
1224 );
1225 Ok(QueryResult::status(format!(
1226 "Created edge type '{}'",
1227 stmt.name
1228 )))
1229 }
1230 Err(e) if stmt.if_not_exists => {
1231 let _ = e;
1232 Ok(QueryResult::status("No change"))
1233 }
1234 Err(e) => Err(Error::Query(QueryError::new(
1235 QueryErrorKind::Semantic,
1236 e.to_string(),
1237 ))),
1238 }
1239 }
1240 SchemaStatement::CreateVectorIndex(stmt) => {
1241 Self::create_vector_index_on_store(
1242 &self.active_lpg_store(),
1243 &stmt.node_label,
1244 &stmt.property,
1245 stmt.dimensions,
1246 stmt.metric.as_deref(),
1247 )?;
1248 wal_log!(
1249 self,
1250 WalRecord::CreateIndex {
1251 name: stmt.name.clone(),
1252 label: stmt.node_label.clone(),
1253 property: stmt.property.clone(),
1254 index_type: "vector".to_string(),
1255 }
1256 );
1257 Ok(QueryResult::status(format!(
1258 "Created vector index '{}'",
1259 stmt.name
1260 )))
1261 }
1262 SchemaStatement::DropNodeType { name, if_exists } => {
1263 let effective_name = self.effective_type_key(&name);
1264 match self.catalog.drop_node_type(&effective_name) {
1265 Ok(()) => {
1266 wal_log!(
1267 self,
1268 WalRecord::DropNodeType {
1269 name: effective_name
1270 }
1271 );
1272 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1273 }
1274 Err(e) if if_exists => {
1275 let _ = e;
1276 Ok(QueryResult::status("No change"))
1277 }
1278 Err(e) => Err(Error::Query(QueryError::new(
1279 QueryErrorKind::Semantic,
1280 e.to_string(),
1281 ))),
1282 }
1283 }
1284 SchemaStatement::DropEdgeType { name, if_exists } => {
1285 let effective_name = self.effective_type_key(&name);
1286 match self.catalog.drop_edge_type_def(&effective_name) {
1287 Ok(()) => {
1288 wal_log!(
1289 self,
1290 WalRecord::DropEdgeType {
1291 name: effective_name
1292 }
1293 );
1294 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1295 }
1296 Err(e) if if_exists => {
1297 let _ = e;
1298 Ok(QueryResult::status("No change"))
1299 }
1300 Err(e) => Err(Error::Query(QueryError::new(
1301 QueryErrorKind::Semantic,
1302 e.to_string(),
1303 ))),
1304 }
1305 }
1306 SchemaStatement::CreateIndex(stmt) => {
1307 use crate::catalog::IndexType as CatalogIndexType;
1308 use grafeo_adapters::query::gql::ast::IndexKind;
1309 let active = self.active_lpg_store();
1310 let index_type_str = match stmt.index_kind {
1311 IndexKind::Property => "property",
1312 IndexKind::BTree => "btree",
1313 IndexKind::Text => "text",
1314 IndexKind::Vector => "vector",
1315 };
1316 match stmt.index_kind {
1317 IndexKind::Property | IndexKind::BTree => {
1318 for prop in &stmt.properties {
1319 active.create_property_index(prop);
1320 }
1321 }
1322 IndexKind::Text => {
1323 for prop in &stmt.properties {
1324 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1325 }
1326 }
1327 IndexKind::Vector => {
1328 for prop in &stmt.properties {
1329 Self::create_vector_index_on_store(
1330 &active,
1331 &stmt.label,
1332 prop,
1333 stmt.options.dimensions,
1334 stmt.options.metric.as_deref(),
1335 )?;
1336 }
1337 }
1338 }
1339 let catalog_index_type = match stmt.index_kind {
1342 IndexKind::Property => CatalogIndexType::Hash,
1343 IndexKind::BTree => CatalogIndexType::BTree,
1344 IndexKind::Text => CatalogIndexType::FullText,
1345 IndexKind::Vector => CatalogIndexType::Hash,
1346 };
1347 let label_id = self.catalog.get_or_create_label(&stmt.label);
1348 for prop in &stmt.properties {
1349 let prop_id = self.catalog.get_or_create_property_key(prop);
1350 self.catalog
1351 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1352 }
1353 #[cfg(feature = "wal")]
1354 for prop in &stmt.properties {
1355 wal_log!(
1356 self,
1357 WalRecord::CreateIndex {
1358 name: stmt.name.clone(),
1359 label: stmt.label.clone(),
1360 property: prop.clone(),
1361 index_type: index_type_str.to_string(),
1362 }
1363 );
1364 }
1365 Ok(QueryResult::status(format!(
1366 "Created {} index '{}'",
1367 index_type_str, stmt.name
1368 )))
1369 }
1370 SchemaStatement::DropIndex { name, if_exists } => {
1371 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1374 let def = self.catalog.get_index(index_id);
1375 self.catalog.drop_index(index_id);
1376 if let Some(def) = def
1377 && let Some(prop_name) =
1378 self.catalog.get_property_key_name(def.property_key)
1379 {
1380 self.active_lpg_store().drop_property_index(&prop_name);
1381 }
1382 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1383 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1384 } else if if_exists {
1385 Ok(QueryResult::status("No change".to_string()))
1386 } else {
1387 Err(Error::Query(QueryError::new(
1388 QueryErrorKind::Semantic,
1389 format!("Index '{name}' does not exist"),
1390 )))
1391 }
1392 }
1393 SchemaStatement::CreateConstraint(stmt) => {
1394 use crate::catalog::TypeConstraint;
1395 use grafeo_adapters::query::gql::ast::ConstraintKind;
1396 let kind_str = match stmt.constraint_kind {
1397 ConstraintKind::Unique => "unique",
1398 ConstraintKind::NodeKey => "node_key",
1399 ConstraintKind::NotNull => "not_null",
1400 ConstraintKind::Exists => "exists",
1401 };
1402 let constraint_name = stmt
1403 .name
1404 .clone()
1405 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1406
1407 match stmt.constraint_kind {
1409 ConstraintKind::Unique => {
1410 for prop in &stmt.properties {
1411 let label_id = self.catalog.get_or_create_label(&stmt.label);
1412 let prop_id = self.catalog.get_or_create_property_key(prop);
1413 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1414 }
1415 let _ = self.catalog.add_constraint_to_type(
1416 &stmt.label,
1417 TypeConstraint::Unique(stmt.properties.clone()),
1418 );
1419 }
1420 ConstraintKind::NodeKey => {
1421 for prop in &stmt.properties {
1422 let label_id = self.catalog.get_or_create_label(&stmt.label);
1423 let prop_id = self.catalog.get_or_create_property_key(prop);
1424 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1425 let _ = self.catalog.add_required_property(label_id, prop_id);
1426 }
1427 let _ = self.catalog.add_constraint_to_type(
1428 &stmt.label,
1429 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1430 );
1431 }
1432 ConstraintKind::NotNull | ConstraintKind::Exists => {
1433 for prop in &stmt.properties {
1434 let label_id = self.catalog.get_or_create_label(&stmt.label);
1435 let prop_id = self.catalog.get_or_create_property_key(prop);
1436 let _ = self.catalog.add_required_property(label_id, prop_id);
1437 let _ = self.catalog.add_constraint_to_type(
1438 &stmt.label,
1439 TypeConstraint::NotNull(prop.clone()),
1440 );
1441 }
1442 }
1443 }
1444
1445 wal_log!(
1446 self,
1447 WalRecord::CreateConstraint {
1448 name: constraint_name.clone(),
1449 label: stmt.label.clone(),
1450 properties: stmt.properties.clone(),
1451 kind: kind_str.to_string(),
1452 }
1453 );
1454 Ok(QueryResult::status(format!(
1455 "Created {kind_str} constraint '{constraint_name}'"
1456 )))
1457 }
1458 SchemaStatement::DropConstraint { name, if_exists } => {
1459 let _ = if_exists;
1460 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1461 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1462 }
1463 SchemaStatement::CreateGraphType(stmt) => {
1464 use crate::catalog::GraphTypeDefinition;
1465 use grafeo_adapters::query::gql::ast::InlineElementType;
1466
1467 let effective_name = self.effective_type_key(&stmt.name);
1468
1469 let (mut node_types, mut edge_types, open) =
1471 if let Some(ref like_graph) = stmt.like_graph {
1472 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1474 if let Some(existing) = self
1475 .catalog
1476 .schema()
1477 .and_then(|s| s.get_graph_type(&type_name))
1478 {
1479 (
1480 existing.allowed_node_types.clone(),
1481 existing.allowed_edge_types.clone(),
1482 existing.open,
1483 )
1484 } else {
1485 (Vec::new(), Vec::new(), true)
1486 }
1487 } else {
1488 let nt = self.catalog.all_node_type_names();
1490 let et = self.catalog.all_edge_type_names();
1491 if nt.is_empty() && et.is_empty() {
1492 (Vec::new(), Vec::new(), true)
1493 } else {
1494 (nt, et, false)
1495 }
1496 }
1497 } else {
1498 let nt = stmt
1500 .node_types
1501 .iter()
1502 .map(|n| self.effective_type_key(n))
1503 .collect();
1504 let et = stmt
1505 .edge_types
1506 .iter()
1507 .map(|n| self.effective_type_key(n))
1508 .collect();
1509 (nt, et, stmt.open)
1510 };
1511
1512 for inline in &stmt.inline_types {
1514 match inline {
1515 InlineElementType::Node {
1516 name,
1517 properties,
1518 key_labels,
1519 ..
1520 } => {
1521 let inline_effective = self.effective_type_key(name);
1522 let def = NodeTypeDefinition {
1523 name: inline_effective.clone(),
1524 properties: properties
1525 .iter()
1526 .map(|p| TypedProperty {
1527 name: p.name.clone(),
1528 data_type: PropertyDataType::from_type_name(&p.data_type),
1529 nullable: p.nullable,
1530 default_value: None,
1531 })
1532 .collect(),
1533 constraints: Vec::new(),
1534 parent_types: key_labels.clone(),
1535 };
1536 self.catalog.register_or_replace_node_type(def);
1538 #[cfg(feature = "wal")]
1539 {
1540 let props_for_wal: Vec<(String, String, bool)> = properties
1541 .iter()
1542 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1543 .collect();
1544 self.log_schema_wal(&WalRecord::CreateNodeType {
1545 name: inline_effective.clone(),
1546 properties: props_for_wal,
1547 constraints: Vec::new(),
1548 });
1549 }
1550 if !node_types.contains(&inline_effective) {
1551 node_types.push(inline_effective);
1552 }
1553 }
1554 InlineElementType::Edge {
1555 name,
1556 properties,
1557 source_node_types,
1558 target_node_types,
1559 ..
1560 } => {
1561 let inline_effective = self.effective_type_key(name);
1562 let def = EdgeTypeDefinition {
1563 name: inline_effective.clone(),
1564 properties: properties
1565 .iter()
1566 .map(|p| TypedProperty {
1567 name: p.name.clone(),
1568 data_type: PropertyDataType::from_type_name(&p.data_type),
1569 nullable: p.nullable,
1570 default_value: None,
1571 })
1572 .collect(),
1573 constraints: Vec::new(),
1574 source_node_types: source_node_types.clone(),
1575 target_node_types: target_node_types.clone(),
1576 };
1577 self.catalog.register_or_replace_edge_type_def(def);
1578 #[cfg(feature = "wal")]
1579 {
1580 let props_for_wal: Vec<(String, String, bool)> = properties
1581 .iter()
1582 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1583 .collect();
1584 self.log_schema_wal(&WalRecord::CreateEdgeType {
1585 name: inline_effective.clone(),
1586 properties: props_for_wal,
1587 constraints: Vec::new(),
1588 });
1589 }
1590 if !edge_types.contains(&inline_effective) {
1591 edge_types.push(inline_effective);
1592 }
1593 }
1594 }
1595 }
1596
1597 let def = GraphTypeDefinition {
1598 name: effective_name.clone(),
1599 allowed_node_types: node_types.clone(),
1600 allowed_edge_types: edge_types.clone(),
1601 open,
1602 };
1603 let result = if stmt.or_replace {
1604 let _ = self.catalog.drop_graph_type(&effective_name);
1606 self.catalog.register_graph_type(def)
1607 } else {
1608 self.catalog.register_graph_type(def)
1609 };
1610 match result {
1611 Ok(()) => {
1612 wal_log!(
1613 self,
1614 WalRecord::CreateGraphType {
1615 name: effective_name.clone(),
1616 node_types,
1617 edge_types,
1618 open,
1619 }
1620 );
1621 Ok(QueryResult::status(format!(
1622 "Created graph type '{}'",
1623 stmt.name
1624 )))
1625 }
1626 Err(e) if stmt.if_not_exists => {
1627 let _ = e;
1628 Ok(QueryResult::status("No change"))
1629 }
1630 Err(e) => Err(Error::Query(QueryError::new(
1631 QueryErrorKind::Semantic,
1632 e.to_string(),
1633 ))),
1634 }
1635 }
1636 SchemaStatement::DropGraphType { name, if_exists } => {
1637 let effective_name = self.effective_type_key(&name);
1638 match self.catalog.drop_graph_type(&effective_name) {
1639 Ok(()) => {
1640 wal_log!(
1641 self,
1642 WalRecord::DropGraphType {
1643 name: effective_name
1644 }
1645 );
1646 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1647 }
1648 Err(e) if if_exists => {
1649 let _ = e;
1650 Ok(QueryResult::status("No change"))
1651 }
1652 Err(e) => Err(Error::Query(QueryError::new(
1653 QueryErrorKind::Semantic,
1654 e.to_string(),
1655 ))),
1656 }
1657 }
1658 SchemaStatement::CreateSchema {
1659 name,
1660 if_not_exists,
1661 } => match self.catalog.register_schema_namespace(name.clone()) {
1662 Ok(()) => {
1663 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1664 let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1667 if self.store.create_graph(&default_key).unwrap_or(false) {
1668 wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1669 }
1670 Ok(QueryResult::status(format!("Created schema '{name}'")))
1671 }
1672 Err(e) if if_not_exists => {
1673 let _ = e;
1674 Ok(QueryResult::status("No change"))
1675 }
1676 Err(e) => Err(Error::Query(QueryError::new(
1677 QueryErrorKind::Semantic,
1678 e.to_string(),
1679 ))),
1680 },
1681 SchemaStatement::DropSchema { name, if_exists } => {
1682 let prefix = format!("{name}/");
1685 let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1686 let has_graphs = self
1687 .store
1688 .graph_names()
1689 .iter()
1690 .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1691 let has_types = self
1692 .catalog
1693 .all_node_type_names()
1694 .iter()
1695 .any(|n| n.starts_with(&prefix))
1696 || self
1697 .catalog
1698 .all_edge_type_names()
1699 .iter()
1700 .any(|n| n.starts_with(&prefix))
1701 || self
1702 .catalog
1703 .all_graph_type_names()
1704 .iter()
1705 .any(|n| n.starts_with(&prefix));
1706 if has_graphs || has_types {
1707 return Err(Error::Query(QueryError::new(
1708 QueryErrorKind::Semantic,
1709 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1710 )));
1711 }
1712 match self.catalog.drop_schema_namespace(&name) {
1713 Ok(()) => {
1714 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1715 if self.store.drop_graph(&default_graph_key) {
1717 wal_log!(
1718 self,
1719 WalRecord::DropNamedGraph {
1720 name: default_graph_key,
1721 }
1722 );
1723 }
1724 let mut current = self.current_schema.lock();
1726 if current
1727 .as_deref()
1728 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1729 {
1730 *current = None;
1731 }
1732 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1733 }
1734 Err(e) if if_exists => {
1735 let _ = e;
1736 Ok(QueryResult::status("No change"))
1737 }
1738 Err(e) => Err(Error::Query(QueryError::new(
1739 QueryErrorKind::Semantic,
1740 e.to_string(),
1741 ))),
1742 }
1743 }
1744 SchemaStatement::AlterNodeType(stmt) => {
1745 use grafeo_adapters::query::gql::ast::TypeAlteration;
1746 let effective_name = self.effective_type_key(&stmt.name);
1747 let mut wal_alts = Vec::new();
1748 for alt in &stmt.alterations {
1749 match alt {
1750 TypeAlteration::AddProperty(prop) => {
1751 let typed = TypedProperty {
1752 name: prop.name.clone(),
1753 data_type: PropertyDataType::from_type_name(&prop.data_type),
1754 nullable: prop.nullable,
1755 default_value: prop
1756 .default_value
1757 .as_ref()
1758 .map(|s| parse_default_literal(s)),
1759 };
1760 self.catalog
1761 .alter_node_type_add_property(&effective_name, typed)
1762 .map_err(|e| {
1763 Error::Query(QueryError::new(
1764 QueryErrorKind::Semantic,
1765 e.to_string(),
1766 ))
1767 })?;
1768 wal_alts.push((
1769 "add".to_string(),
1770 prop.name.clone(),
1771 prop.data_type.clone(),
1772 prop.nullable,
1773 ));
1774 }
1775 TypeAlteration::DropProperty(name) => {
1776 self.catalog
1777 .alter_node_type_drop_property(&effective_name, name)
1778 .map_err(|e| {
1779 Error::Query(QueryError::new(
1780 QueryErrorKind::Semantic,
1781 e.to_string(),
1782 ))
1783 })?;
1784 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1785 }
1786 }
1787 }
1788 wal_log!(
1789 self,
1790 WalRecord::AlterNodeType {
1791 name: effective_name,
1792 alterations: wal_alts,
1793 }
1794 );
1795 Ok(QueryResult::status(format!(
1796 "Altered node type '{}'",
1797 stmt.name
1798 )))
1799 }
1800 SchemaStatement::AlterEdgeType(stmt) => {
1801 use grafeo_adapters::query::gql::ast::TypeAlteration;
1802 let effective_name = self.effective_type_key(&stmt.name);
1803 let mut wal_alts = Vec::new();
1804 for alt in &stmt.alterations {
1805 match alt {
1806 TypeAlteration::AddProperty(prop) => {
1807 let typed = TypedProperty {
1808 name: prop.name.clone(),
1809 data_type: PropertyDataType::from_type_name(&prop.data_type),
1810 nullable: prop.nullable,
1811 default_value: prop
1812 .default_value
1813 .as_ref()
1814 .map(|s| parse_default_literal(s)),
1815 };
1816 self.catalog
1817 .alter_edge_type_add_property(&effective_name, typed)
1818 .map_err(|e| {
1819 Error::Query(QueryError::new(
1820 QueryErrorKind::Semantic,
1821 e.to_string(),
1822 ))
1823 })?;
1824 wal_alts.push((
1825 "add".to_string(),
1826 prop.name.clone(),
1827 prop.data_type.clone(),
1828 prop.nullable,
1829 ));
1830 }
1831 TypeAlteration::DropProperty(name) => {
1832 self.catalog
1833 .alter_edge_type_drop_property(&effective_name, name)
1834 .map_err(|e| {
1835 Error::Query(QueryError::new(
1836 QueryErrorKind::Semantic,
1837 e.to_string(),
1838 ))
1839 })?;
1840 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1841 }
1842 }
1843 }
1844 wal_log!(
1845 self,
1846 WalRecord::AlterEdgeType {
1847 name: effective_name,
1848 alterations: wal_alts,
1849 }
1850 );
1851 Ok(QueryResult::status(format!(
1852 "Altered edge type '{}'",
1853 stmt.name
1854 )))
1855 }
1856 SchemaStatement::AlterGraphType(stmt) => {
1857 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1858 let effective_name = self.effective_type_key(&stmt.name);
1859 let mut wal_alts = Vec::new();
1860 for alt in &stmt.alterations {
1861 match alt {
1862 GraphTypeAlteration::AddNodeType(name) => {
1863 self.catalog
1864 .alter_graph_type_add_node_type(&effective_name, name.clone())
1865 .map_err(|e| {
1866 Error::Query(QueryError::new(
1867 QueryErrorKind::Semantic,
1868 e.to_string(),
1869 ))
1870 })?;
1871 wal_alts.push(("add_node_type".to_string(), name.clone()));
1872 }
1873 GraphTypeAlteration::DropNodeType(name) => {
1874 self.catalog
1875 .alter_graph_type_drop_node_type(&effective_name, name)
1876 .map_err(|e| {
1877 Error::Query(QueryError::new(
1878 QueryErrorKind::Semantic,
1879 e.to_string(),
1880 ))
1881 })?;
1882 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1883 }
1884 GraphTypeAlteration::AddEdgeType(name) => {
1885 self.catalog
1886 .alter_graph_type_add_edge_type(&effective_name, name.clone())
1887 .map_err(|e| {
1888 Error::Query(QueryError::new(
1889 QueryErrorKind::Semantic,
1890 e.to_string(),
1891 ))
1892 })?;
1893 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1894 }
1895 GraphTypeAlteration::DropEdgeType(name) => {
1896 self.catalog
1897 .alter_graph_type_drop_edge_type(&effective_name, name)
1898 .map_err(|e| {
1899 Error::Query(QueryError::new(
1900 QueryErrorKind::Semantic,
1901 e.to_string(),
1902 ))
1903 })?;
1904 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1905 }
1906 }
1907 }
1908 wal_log!(
1909 self,
1910 WalRecord::AlterGraphType {
1911 name: effective_name,
1912 alterations: wal_alts,
1913 }
1914 );
1915 Ok(QueryResult::status(format!(
1916 "Altered graph type '{}'",
1917 stmt.name
1918 )))
1919 }
1920 SchemaStatement::CreateProcedure(stmt) => {
1921 use crate::catalog::ProcedureDefinition;
1922
1923 let def = ProcedureDefinition {
1924 name: stmt.name.clone(),
1925 params: stmt
1926 .params
1927 .iter()
1928 .map(|p| (p.name.clone(), p.param_type.clone()))
1929 .collect(),
1930 returns: stmt
1931 .returns
1932 .iter()
1933 .map(|r| (r.name.clone(), r.return_type.clone()))
1934 .collect(),
1935 body: stmt.body.clone(),
1936 };
1937
1938 if stmt.or_replace {
1939 self.catalog.replace_procedure(def).map_err(|e| {
1940 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1941 })?;
1942 } else {
1943 match self.catalog.register_procedure(def) {
1944 Ok(()) => {}
1945 Err(_) if stmt.if_not_exists => {
1946 return Ok(QueryResult::empty());
1947 }
1948 Err(e) => {
1949 return Err(Error::Query(QueryError::new(
1950 QueryErrorKind::Semantic,
1951 e.to_string(),
1952 )));
1953 }
1954 }
1955 }
1956
1957 wal_log!(
1958 self,
1959 WalRecord::CreateProcedure {
1960 name: stmt.name.clone(),
1961 params: stmt
1962 .params
1963 .iter()
1964 .map(|p| (p.name.clone(), p.param_type.clone()))
1965 .collect(),
1966 returns: stmt
1967 .returns
1968 .iter()
1969 .map(|r| (r.name.clone(), r.return_type.clone()))
1970 .collect(),
1971 body: stmt.body,
1972 }
1973 );
1974 Ok(QueryResult::status(format!(
1975 "Created procedure '{}'",
1976 stmt.name
1977 )))
1978 }
1979 SchemaStatement::DropProcedure { name, if_exists } => {
1980 match self.catalog.drop_procedure(&name) {
1981 Ok(()) => {}
1982 Err(_) if if_exists => {
1983 return Ok(QueryResult::empty());
1984 }
1985 Err(e) => {
1986 return Err(Error::Query(QueryError::new(
1987 QueryErrorKind::Semantic,
1988 e.to_string(),
1989 )));
1990 }
1991 }
1992 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1993 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1994 }
1995 SchemaStatement::ShowIndexes => {
1996 return self.execute_show_indexes();
1997 }
1998 SchemaStatement::ShowConstraints => {
1999 return self.execute_show_constraints();
2000 }
2001 SchemaStatement::ShowNodeTypes => {
2002 return self.execute_show_node_types();
2003 }
2004 SchemaStatement::ShowEdgeTypes => {
2005 return self.execute_show_edge_types();
2006 }
2007 SchemaStatement::ShowGraphTypes => {
2008 return self.execute_show_graph_types();
2009 }
2010 SchemaStatement::ShowGraphType(name) => {
2011 return self.execute_show_graph_type(&name);
2012 }
2013 SchemaStatement::ShowCurrentGraphType => {
2014 return self.execute_show_current_graph_type();
2015 }
2016 SchemaStatement::ShowGraphs => {
2017 return self.execute_show_graphs();
2018 }
2019 SchemaStatement::ShowSchemas => {
2020 return self.execute_show_schemas();
2021 }
2022 };
2023
2024 if result.is_ok() {
2027 self.query_cache.clear();
2028 }
2029
2030 result
2031 }
2032
2033 #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2035 fn create_vector_index_on_store(
2036 store: &LpgStore,
2037 label: &str,
2038 property: &str,
2039 dimensions: Option<usize>,
2040 metric: Option<&str>,
2041 ) -> Result<()> {
2042 use grafeo_common::types::{PropertyKey, Value};
2043 use grafeo_common::utils::error::Error;
2044 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
2045
2046 let metric = match metric {
2047 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2048 Error::Internal(format!(
2049 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2050 ))
2051 })?,
2052 None => DistanceMetric::Cosine,
2053 };
2054
2055 let prop_key = PropertyKey::new(property);
2056 let mut found_dims: Option<usize> = dimensions;
2057 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2058
2059 for node in store.nodes_with_label(label) {
2060 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2061 if let Some(expected) = found_dims {
2062 if v.len() != expected {
2063 return Err(Error::Internal(format!(
2064 "Vector dimension mismatch: expected {expected}, found {} on node {}",
2065 v.len(),
2066 node.id.0
2067 )));
2068 }
2069 } else {
2070 found_dims = Some(v.len());
2071 }
2072 vectors.push((node.id, v.to_vec()));
2073 }
2074 }
2075
2076 let Some(dims) = found_dims else {
2077 return Err(Error::Internal(format!(
2078 "No vector properties found on :{label}({property}) and no dimensions specified"
2079 )));
2080 };
2081
2082 let config = HnswConfig::new(dims, metric);
2083 let index = HnswIndex::with_capacity(config, vectors.len());
2084 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2085 for (node_id, vec) in &vectors {
2086 index.insert(*node_id, vec, &accessor);
2087 }
2088
2089 store.add_vector_index(label, property, Arc::new(index));
2090 Ok(())
2091 }
2092
2093 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2095 fn create_vector_index_on_store(
2096 _store: &LpgStore,
2097 _label: &str,
2098 _property: &str,
2099 _dimensions: Option<usize>,
2100 _metric: Option<&str>,
2101 ) -> Result<()> {
2102 Err(grafeo_common::utils::error::Error::Internal(
2103 "Vector index support requires the 'vector-index' feature".to_string(),
2104 ))
2105 }
2106
2107 #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2109 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2110 use grafeo_common::types::{PropertyKey, Value};
2111 use grafeo_core::index::text::{BM25Config, InvertedIndex};
2112
2113 let mut index = InvertedIndex::new(BM25Config::default());
2114 let prop_key = PropertyKey::new(property);
2115
2116 let nodes = store.nodes_by_label(label);
2117 for node_id in nodes {
2118 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2119 index.insert(node_id, text.as_str());
2120 }
2121 }
2122
2123 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2124 Ok(())
2125 }
2126
2127 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2129 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2130 Err(grafeo_common::utils::error::Error::Internal(
2131 "Text index support requires the 'text-index' feature".to_string(),
2132 ))
2133 }
2134
2135 fn execute_show_indexes(&self) -> Result<QueryResult> {
2137 let indexes = self.catalog.all_indexes();
2138 let columns = vec![
2139 "name".to_string(),
2140 "type".to_string(),
2141 "label".to_string(),
2142 "property".to_string(),
2143 ];
2144 let rows: Vec<Vec<Value>> = indexes
2145 .into_iter()
2146 .map(|def| {
2147 let label_name = self
2148 .catalog
2149 .get_label_name(def.label)
2150 .unwrap_or_else(|| "?".into());
2151 let prop_name = self
2152 .catalog
2153 .get_property_key_name(def.property_key)
2154 .unwrap_or_else(|| "?".into());
2155 vec![
2156 Value::from(def.name),
2157 Value::from(format!("{:?}", def.index_type)),
2158 Value::from(&*label_name),
2159 Value::from(&*prop_name),
2160 ]
2161 })
2162 .collect();
2163 Ok(QueryResult {
2164 columns,
2165 column_types: Vec::new(),
2166 rows,
2167 ..QueryResult::empty()
2168 })
2169 }
2170
2171 fn execute_show_constraints(&self) -> Result<QueryResult> {
2173 Ok(QueryResult {
2176 columns: vec![
2177 "name".to_string(),
2178 "type".to_string(),
2179 "label".to_string(),
2180 "properties".to_string(),
2181 ],
2182 column_types: Vec::new(),
2183 rows: Vec::new(),
2184 ..QueryResult::empty()
2185 })
2186 }
2187
2188 fn execute_show_node_types(&self) -> Result<QueryResult> {
2190 let columns = vec![
2191 "name".to_string(),
2192 "properties".to_string(),
2193 "constraints".to_string(),
2194 "parents".to_string(),
2195 ];
2196 let schema = self.current_schema.lock().clone();
2197 let all_names = self.catalog.all_node_type_names();
2198 let type_names: Vec<String> = match &schema {
2199 Some(s) => {
2200 let prefix = format!("{s}/");
2201 all_names
2202 .into_iter()
2203 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2204 .collect()
2205 }
2206 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2207 };
2208 let rows: Vec<Vec<Value>> = type_names
2209 .into_iter()
2210 .filter_map(|name| {
2211 let lookup = match &schema {
2212 Some(s) => format!("{s}/{name}"),
2213 None => name.clone(),
2214 };
2215 let def = self.catalog.get_node_type(&lookup)?;
2216 let props: Vec<String> = def
2217 .properties
2218 .iter()
2219 .map(|p| {
2220 let nullable = if p.nullable { "" } else { " NOT NULL" };
2221 format!("{} {}{}", p.name, p.data_type, nullable)
2222 })
2223 .collect();
2224 let constraints: Vec<String> =
2225 def.constraints.iter().map(|c| format!("{c:?}")).collect();
2226 let parents = def.parent_types.join(", ");
2227 Some(vec![
2228 Value::from(name),
2229 Value::from(props.join(", ")),
2230 Value::from(constraints.join(", ")),
2231 Value::from(parents),
2232 ])
2233 })
2234 .collect();
2235 Ok(QueryResult {
2236 columns,
2237 column_types: Vec::new(),
2238 rows,
2239 ..QueryResult::empty()
2240 })
2241 }
2242
2243 fn execute_show_edge_types(&self) -> Result<QueryResult> {
2245 let columns = vec![
2246 "name".to_string(),
2247 "properties".to_string(),
2248 "source_types".to_string(),
2249 "target_types".to_string(),
2250 ];
2251 let schema = self.current_schema.lock().clone();
2252 let all_names = self.catalog.all_edge_type_names();
2253 let type_names: Vec<String> = match &schema {
2254 Some(s) => {
2255 let prefix = format!("{s}/");
2256 all_names
2257 .into_iter()
2258 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2259 .collect()
2260 }
2261 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2262 };
2263 let rows: Vec<Vec<Value>> = type_names
2264 .into_iter()
2265 .filter_map(|name| {
2266 let lookup = match &schema {
2267 Some(s) => format!("{s}/{name}"),
2268 None => name.clone(),
2269 };
2270 let def = self.catalog.get_edge_type_def(&lookup)?;
2271 let props: Vec<String> = def
2272 .properties
2273 .iter()
2274 .map(|p| {
2275 let nullable = if p.nullable { "" } else { " NOT NULL" };
2276 format!("{} {}{}", p.name, p.data_type, nullable)
2277 })
2278 .collect();
2279 let src = def.source_node_types.join(", ");
2280 let tgt = def.target_node_types.join(", ");
2281 Some(vec![
2282 Value::from(name),
2283 Value::from(props.join(", ")),
2284 Value::from(src),
2285 Value::from(tgt),
2286 ])
2287 })
2288 .collect();
2289 Ok(QueryResult {
2290 columns,
2291 column_types: Vec::new(),
2292 rows,
2293 ..QueryResult::empty()
2294 })
2295 }
2296
2297 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2299 let columns = vec![
2300 "name".to_string(),
2301 "open".to_string(),
2302 "node_types".to_string(),
2303 "edge_types".to_string(),
2304 ];
2305 let schema = self.current_schema.lock().clone();
2306 let all_names = self.catalog.all_graph_type_names();
2307 let type_names: Vec<String> = match &schema {
2308 Some(s) => {
2309 let prefix = format!("{s}/");
2310 all_names
2311 .into_iter()
2312 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2313 .collect()
2314 }
2315 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2316 };
2317 let rows: Vec<Vec<Value>> = type_names
2318 .into_iter()
2319 .filter_map(|name| {
2320 let lookup = match &schema {
2321 Some(s) => format!("{s}/{name}"),
2322 None => name.clone(),
2323 };
2324 let def = self.catalog.get_graph_type_def(&lookup)?;
2325 let strip = |n: &String| -> String {
2327 match &schema {
2328 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2329 None => n.clone(),
2330 }
2331 };
2332 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2333 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2334 Some(vec![
2335 Value::from(name),
2336 Value::from(def.open),
2337 Value::from(node_types.join(", ")),
2338 Value::from(edge_types.join(", ")),
2339 ])
2340 })
2341 .collect();
2342 Ok(QueryResult {
2343 columns,
2344 column_types: Vec::new(),
2345 rows,
2346 ..QueryResult::empty()
2347 })
2348 }
2349
2350 #[cfg(feature = "lpg")]
2356 fn execute_show_graphs(&self) -> Result<QueryResult> {
2357 let schema = self.current_schema.lock().clone();
2358 let all_names = self.store.graph_names();
2359
2360 let mut names: Vec<String> = match &schema {
2361 Some(s) => {
2362 let prefix = format!("{s}/");
2363 all_names
2364 .into_iter()
2365 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2366 .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2367 .collect()
2368 }
2369 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2370 };
2371 names.sort();
2372
2373 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2374 Ok(QueryResult {
2375 columns: vec!["name".to_string()],
2376 column_types: Vec::new(),
2377 rows,
2378 ..QueryResult::empty()
2379 })
2380 }
2381
2382 fn execute_show_schemas(&self) -> Result<QueryResult> {
2384 let mut names = self.catalog.schema_names();
2385 names.sort();
2386 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2387 Ok(QueryResult {
2388 columns: vec!["name".to_string()],
2389 column_types: Vec::new(),
2390 rows,
2391 ..QueryResult::empty()
2392 })
2393 }
2394
2395 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2397 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2398
2399 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2400 Error::Query(QueryError::new(
2401 QueryErrorKind::Semantic,
2402 format!("Graph type '{name}' not found"),
2403 ))
2404 })?;
2405
2406 let columns = vec![
2407 "name".to_string(),
2408 "open".to_string(),
2409 "node_types".to_string(),
2410 "edge_types".to_string(),
2411 ];
2412 let rows = vec![vec![
2413 Value::from(def.name),
2414 Value::from(def.open),
2415 Value::from(def.allowed_node_types.join(", ")),
2416 Value::from(def.allowed_edge_types.join(", ")),
2417 ]];
2418 Ok(QueryResult {
2419 columns,
2420 column_types: Vec::new(),
2421 rows,
2422 ..QueryResult::empty()
2423 })
2424 }
2425
2426 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2428 let graph_name = self
2429 .current_graph()
2430 .unwrap_or_else(|| "default".to_string());
2431 let columns = vec![
2432 "graph".to_string(),
2433 "graph_type".to_string(),
2434 "open".to_string(),
2435 "node_types".to_string(),
2436 "edge_types".to_string(),
2437 ];
2438
2439 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2440 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2441 {
2442 let rows = vec![vec![
2443 Value::from(graph_name),
2444 Value::from(type_name),
2445 Value::from(def.open),
2446 Value::from(def.allowed_node_types.join(", ")),
2447 Value::from(def.allowed_edge_types.join(", ")),
2448 ]];
2449 return Ok(QueryResult {
2450 columns,
2451 column_types: Vec::new(),
2452 rows,
2453 ..QueryResult::empty()
2454 });
2455 }
2456
2457 Ok(QueryResult {
2459 columns,
2460 column_types: Vec::new(),
2461 rows: vec![vec![
2462 Value::from(graph_name),
2463 Value::Null,
2464 Value::Null,
2465 Value::Null,
2466 Value::Null,
2467 ]],
2468 ..QueryResult::empty()
2469 })
2470 }
2471
2472 #[cfg(feature = "gql")]
2499 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2500 self.require_lpg("GQL")?;
2501
2502 use crate::query::{
2503 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2504 processor::QueryLanguage, translators::gql,
2505 };
2506
2507 let _span = grafeo_info_span!(
2508 "grafeo::session::execute",
2509 language = "gql",
2510 query_len = query.len(),
2511 );
2512
2513 #[cfg(not(target_arch = "wasm32"))]
2514 let start_time = std::time::Instant::now();
2515
2516 let translation = gql::translate_full(query)?;
2518 let logical_plan = match translation {
2519 gql::GqlTranslationResult::SessionCommand(cmd) => {
2520 return self.execute_session_command(cmd);
2521 }
2522 #[cfg(feature = "lpg")]
2523 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2524 self.require_permission(crate::auth::StatementKind::Admin)?;
2526 if *self.read_only_tx.lock() {
2527 return Err(grafeo_common::utils::error::Error::Transaction(
2528 grafeo_common::utils::error::TransactionError::ReadOnly,
2529 ));
2530 }
2531 return self.execute_schema_command(cmd);
2532 }
2533 gql::GqlTranslationResult::Plan(plan) => {
2534 if plan.root.has_mutations() {
2536 self.require_permission(crate::auth::StatementKind::Write)?;
2537 }
2538 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2540 return Err(grafeo_common::utils::error::Error::Transaction(
2541 grafeo_common::utils::error::TransactionError::ReadOnly,
2542 ));
2543 }
2544 plan
2545 }
2546 #[cfg(not(feature = "lpg"))]
2547 gql::GqlTranslationResult::SchemaCommand(_) => {
2548 return Err(grafeo_common::utils::error::Error::Internal(
2549 "Schema commands require the `lpg` feature".to_string(),
2550 ));
2551 }
2552 };
2553
2554 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2556
2557 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2559 cached_plan
2560 } else {
2561 let mut binder = Binder::new();
2563 let _binding_context = binder.bind(&logical_plan)?;
2564
2565 let active = self.active_store();
2567 let optimizer = Optimizer::from_graph_store(&*active);
2568 let plan = optimizer.optimize(logical_plan)?;
2569
2570 self.query_cache.put_optimized(cache_key, plan.clone());
2572
2573 plan
2574 };
2575
2576 let active = self.active_store();
2578
2579 if optimized_plan.explain {
2581 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2582 let mut plan = optimized_plan;
2583 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2584 return Ok(explain_result(&plan));
2585 }
2586
2587 if optimized_plan.profile {
2589 let has_mutations = optimized_plan.root.has_mutations();
2590 return self.with_auto_commit(has_mutations, || {
2591 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2592 let planner = self.create_planner_for_store(
2593 Arc::clone(&active),
2594 viewing_epoch,
2595 transaction_id,
2596 );
2597 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2598
2599 let executor = Executor::with_columns(physical_plan.columns.clone())
2600 .with_deadline(self.query_deadline());
2601 let _result = executor.execute(physical_plan.operator.as_mut())?;
2602
2603 let total_time_ms;
2604 #[cfg(not(target_arch = "wasm32"))]
2605 {
2606 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2607 }
2608 #[cfg(target_arch = "wasm32")]
2609 {
2610 total_time_ms = 0.0;
2611 }
2612
2613 let profile_tree = crate::query::profile::build_profile_tree(
2614 &optimized_plan.root,
2615 &mut entries.into_iter(),
2616 );
2617 Ok(crate::query::profile::profile_result(
2618 &profile_tree,
2619 total_time_ms,
2620 ))
2621 });
2622 }
2623
2624 let has_mutations = optimized_plan.root.has_mutations();
2625
2626 let result = self.with_auto_commit(has_mutations, || {
2627 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2629
2630 let has_active_tx = self.current_transaction.lock().is_some();
2635 let read_only = !has_mutations && !has_active_tx;
2636 let planner = self.create_planner_for_store_with_read_only(
2637 Arc::clone(&active),
2638 viewing_epoch,
2639 transaction_id,
2640 read_only,
2641 );
2642 let mut physical_plan = planner.plan(&optimized_plan)?;
2643
2644 let executor = Executor::with_columns(physical_plan.columns.clone())
2646 .with_deadline(self.query_deadline());
2647 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2648
2649 let rows_scanned = result.rows.len() as u64;
2651 #[cfg(not(target_arch = "wasm32"))]
2652 {
2653 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2654 result.execution_time_ms = Some(elapsed_ms);
2655 }
2656 result.rows_scanned = Some(rows_scanned);
2657
2658 Ok(result)
2659 });
2660
2661 #[cfg(feature = "metrics")]
2663 {
2664 #[cfg(not(target_arch = "wasm32"))]
2665 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2666 #[cfg(target_arch = "wasm32")]
2667 let elapsed_ms = None;
2668 self.record_query_metrics("gql", elapsed_ms, &result);
2669 }
2670
2671 result
2672 }
2673
2674 #[cfg(feature = "gql")]
2683 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2684 let previous = self.viewing_epoch_override.lock().replace(epoch);
2685 let result = self.execute(query);
2686 *self.viewing_epoch_override.lock() = previous;
2687 result
2688 }
2689
2690 #[cfg(feature = "gql")]
2698 pub fn execute_at_epoch_with_params(
2699 &self,
2700 query: &str,
2701 epoch: EpochId,
2702 params: Option<std::collections::HashMap<String, Value>>,
2703 ) -> Result<QueryResult> {
2704 let previous = self.viewing_epoch_override.lock().replace(epoch);
2705 let result = if let Some(p) = params {
2706 self.execute_with_params(query, p)
2707 } else {
2708 self.execute(query)
2709 };
2710 *self.viewing_epoch_override.lock() = previous;
2711 result
2712 }
2713
2714 #[cfg(feature = "gql")]
2720 pub fn execute_with_params(
2721 &self,
2722 query: &str,
2723 params: std::collections::HashMap<String, Value>,
2724 ) -> Result<QueryResult> {
2725 self.require_lpg("GQL")?;
2726
2727 use crate::query::processor::{QueryLanguage, QueryProcessor};
2728
2729 let has_mutations = if self.identity.can_write() {
2733 Self::query_looks_like_mutation(query)
2735 } else {
2736 use crate::query::translators::gql;
2738 match gql::translate(query) {
2739 Ok(plan) if plan.root.has_mutations() => {
2740 self.require_permission(crate::auth::StatementKind::Write)?;
2741 true
2742 }
2743 Ok(_) => false,
2744 Err(_) => Self::query_looks_like_mutation(query),
2746 }
2747 };
2748 let active = self.active_store();
2749
2750 self.with_auto_commit(has_mutations, || {
2751 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2753
2754 let processor = QueryProcessor::for_stores_with_transaction(
2756 Arc::clone(&active),
2757 self.active_write_store(),
2758 Arc::clone(&self.transaction_manager),
2759 )?;
2760
2761 let processor = if let Some(transaction_id) = transaction_id {
2763 processor.with_transaction_context(viewing_epoch, transaction_id)
2764 } else {
2765 processor
2766 };
2767
2768 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2769 })
2770 }
2771
2772 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2778 pub fn execute_with_params(
2779 &self,
2780 _query: &str,
2781 _params: std::collections::HashMap<String, Value>,
2782 ) -> Result<QueryResult> {
2783 Err(grafeo_common::utils::error::Error::Internal(
2784 "No query language enabled".to_string(),
2785 ))
2786 }
2787
2788 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2794 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2795 Err(grafeo_common::utils::error::Error::Internal(
2796 "No query language enabled".to_string(),
2797 ))
2798 }
2799
2800 #[cfg(feature = "cypher")]
2806 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2807 use crate::query::{
2808 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2809 processor::QueryLanguage, translators::cypher,
2810 };
2811
2812 let translation = cypher::translate_full(query)?;
2814 match translation {
2815 #[cfg(feature = "lpg")]
2816 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2817 use grafeo_common::utils::error::{
2818 Error as GrafeoError, QueryError, QueryErrorKind,
2819 };
2820 self.require_permission(crate::auth::StatementKind::Admin)?;
2821 if *self.read_only_tx.lock() {
2822 return Err(GrafeoError::Query(QueryError::new(
2823 QueryErrorKind::Semantic,
2824 "Cannot execute schema DDL in a read-only transaction",
2825 )));
2826 }
2827 return self.execute_schema_command(cmd);
2828 }
2829 #[cfg(not(feature = "lpg"))]
2830 cypher::CypherTranslationResult::SchemaCommand(_) => {
2831 return Err(grafeo_common::utils::error::Error::Internal(
2832 "Schema DDL requires the `lpg` feature".to_string(),
2833 ));
2834 }
2835 cypher::CypherTranslationResult::ShowIndexes => {
2836 return self.execute_show_indexes();
2837 }
2838 cypher::CypherTranslationResult::ShowConstraints => {
2839 return self.execute_show_constraints();
2840 }
2841 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2842 return self.execute_show_current_graph_type();
2843 }
2844 cypher::CypherTranslationResult::Plan(_) => {
2845 }
2847 }
2848
2849 #[cfg(not(target_arch = "wasm32"))]
2850 let start_time = std::time::Instant::now();
2851
2852 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2854
2855 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2857 cached_plan
2858 } else {
2859 let logical_plan = cypher::translate(query)?;
2861
2862 let mut binder = Binder::new();
2864 let _binding_context = binder.bind(&logical_plan)?;
2865
2866 let active = self.active_store();
2868 let optimizer = Optimizer::from_graph_store(&*active);
2869 let plan = optimizer.optimize(logical_plan)?;
2870
2871 self.query_cache.put_optimized(cache_key, plan.clone());
2873
2874 plan
2875 };
2876
2877 if optimized_plan.root.has_mutations() {
2879 self.require_permission(crate::auth::StatementKind::Write)?;
2880 }
2881
2882 let active = self.active_store();
2884
2885 if optimized_plan.explain {
2887 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2888 let mut plan = optimized_plan;
2889 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2890 return Ok(explain_result(&plan));
2891 }
2892
2893 if optimized_plan.profile {
2895 let has_mutations = optimized_plan.root.has_mutations();
2896 return self.with_auto_commit(has_mutations, || {
2897 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2898 let planner = self.create_planner_for_store(
2899 Arc::clone(&active),
2900 viewing_epoch,
2901 transaction_id,
2902 );
2903 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2904
2905 let executor = Executor::with_columns(physical_plan.columns.clone())
2906 .with_deadline(self.query_deadline());
2907 let _result = executor.execute(physical_plan.operator.as_mut())?;
2908
2909 let total_time_ms;
2910 #[cfg(not(target_arch = "wasm32"))]
2911 {
2912 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2913 }
2914 #[cfg(target_arch = "wasm32")]
2915 {
2916 total_time_ms = 0.0;
2917 }
2918
2919 let profile_tree = crate::query::profile::build_profile_tree(
2920 &optimized_plan.root,
2921 &mut entries.into_iter(),
2922 );
2923 Ok(crate::query::profile::profile_result(
2924 &profile_tree,
2925 total_time_ms,
2926 ))
2927 });
2928 }
2929
2930 let has_mutations = optimized_plan.root.has_mutations();
2931
2932 let result = self.with_auto_commit(has_mutations, || {
2933 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2935
2936 let planner =
2938 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2939 let mut physical_plan = planner.plan(&optimized_plan)?;
2940
2941 let executor = Executor::with_columns(physical_plan.columns.clone())
2943 .with_deadline(self.query_deadline());
2944 executor.execute(physical_plan.operator.as_mut())
2945 });
2946
2947 #[cfg(feature = "metrics")]
2948 {
2949 #[cfg(not(target_arch = "wasm32"))]
2950 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2951 #[cfg(target_arch = "wasm32")]
2952 let elapsed_ms = None;
2953 self.record_query_metrics("cypher", elapsed_ms, &result);
2954 }
2955
2956 result
2957 }
2958
2959 #[cfg(feature = "gremlin")]
2983 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2984 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2985
2986 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2987 let start_time = Instant::now();
2988
2989 let logical_plan = gremlin::translate(query)?;
2991
2992 let mut binder = Binder::new();
2994 let _binding_context = binder.bind(&logical_plan)?;
2995
2996 let active = self.active_store();
2998 let optimizer = Optimizer::from_graph_store(&*active);
2999 let optimized_plan = optimizer.optimize(logical_plan)?;
3000
3001 let has_mutations = optimized_plan.root.has_mutations();
3002 if has_mutations {
3003 self.require_permission(crate::auth::StatementKind::Write)?;
3004 }
3005
3006 let result = self.with_auto_commit(has_mutations, || {
3007 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3009
3010 let planner =
3012 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3013 let mut physical_plan = planner.plan(&optimized_plan)?;
3014
3015 let executor = Executor::with_columns(physical_plan.columns.clone())
3017 .with_deadline(self.query_deadline());
3018 executor.execute(physical_plan.operator.as_mut())
3019 });
3020
3021 #[cfg(feature = "metrics")]
3022 {
3023 #[cfg(not(target_arch = "wasm32"))]
3024 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3025 #[cfg(target_arch = "wasm32")]
3026 let elapsed_ms = None;
3027 self.record_query_metrics("gremlin", elapsed_ms, &result);
3028 }
3029
3030 result
3031 }
3032
3033 #[cfg(feature = "gremlin")]
3039 pub fn execute_gremlin_with_params(
3040 &self,
3041 query: &str,
3042 params: std::collections::HashMap<String, Value>,
3043 ) -> Result<QueryResult> {
3044 use crate::query::{
3045 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3046 translators::gremlin,
3047 };
3048
3049 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3050 let start_time = Instant::now();
3051
3052 let mut logical_plan = gremlin::translate(query)?;
3054
3055 substitute_params(&mut logical_plan, ¶ms)?;
3057
3058 let mut binder = Binder::new();
3060 let _binding_context = binder.bind(&logical_plan)?;
3061
3062 let active = self.active_store();
3064 let optimizer = Optimizer::from_graph_store(&*active);
3065 let optimized_plan = optimizer.optimize(logical_plan)?;
3066
3067 let has_mutations = optimized_plan.root.has_mutations();
3068 if has_mutations {
3069 self.require_permission(crate::auth::StatementKind::Write)?;
3070 }
3071
3072 let result = self.with_auto_commit(has_mutations, || {
3073 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3074 let planner =
3075 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3076 let mut physical_plan = planner.plan(&optimized_plan)?;
3077 let executor = Executor::with_columns(physical_plan.columns.clone())
3078 .with_deadline(self.query_deadline());
3079 executor.execute(physical_plan.operator.as_mut())
3080 });
3081
3082 #[cfg(feature = "metrics")]
3083 {
3084 #[cfg(not(target_arch = "wasm32"))]
3085 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3086 #[cfg(target_arch = "wasm32")]
3087 let elapsed_ms = None;
3088 self.record_query_metrics("gremlin", elapsed_ms, &result);
3089 }
3090
3091 result
3092 }
3093
3094 #[cfg(feature = "graphql")]
3118 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3119 use crate::query::{
3120 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3121 translators::graphql,
3122 };
3123
3124 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3125 let start_time = Instant::now();
3126
3127 let mut logical_plan = graphql::translate(query)?;
3128
3129 if !logical_plan.default_params.is_empty() {
3131 let defaults = logical_plan.default_params.clone();
3132 substitute_params(&mut logical_plan, &defaults)?;
3133 }
3134
3135 let mut binder = Binder::new();
3136 let _binding_context = binder.bind(&logical_plan)?;
3137
3138 let active = self.active_store();
3139 let optimizer = Optimizer::from_graph_store(&*active);
3140 let optimized_plan = optimizer.optimize(logical_plan)?;
3141 let has_mutations = optimized_plan.root.has_mutations();
3142 if has_mutations {
3143 self.require_permission(crate::auth::StatementKind::Write)?;
3144 }
3145
3146 let result = self.with_auto_commit(has_mutations, || {
3147 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3148 let planner =
3149 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3150 let mut physical_plan = planner.plan(&optimized_plan)?;
3151 let executor = Executor::with_columns(physical_plan.columns.clone())
3152 .with_deadline(self.query_deadline());
3153 executor.execute(physical_plan.operator.as_mut())
3154 });
3155
3156 #[cfg(feature = "metrics")]
3157 {
3158 #[cfg(not(target_arch = "wasm32"))]
3159 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3160 #[cfg(target_arch = "wasm32")]
3161 let elapsed_ms = None;
3162 self.record_query_metrics("graphql", elapsed_ms, &result);
3163 }
3164
3165 result
3166 }
3167
3168 #[cfg(feature = "graphql")]
3174 pub fn execute_graphql_with_params(
3175 &self,
3176 query: &str,
3177 params: std::collections::HashMap<String, Value>,
3178 ) -> Result<QueryResult> {
3179 use crate::query::{
3180 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3181 translators::graphql,
3182 };
3183
3184 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3185 let start_time = Instant::now();
3186
3187 let mut logical_plan = graphql::translate(query)?;
3189
3190 if !logical_plan.default_params.is_empty() {
3192 let mut merged = logical_plan.default_params.clone();
3193 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3194 substitute_params(&mut logical_plan, &merged)?;
3195 } else {
3196 substitute_params(&mut logical_plan, ¶ms)?;
3197 }
3198
3199 let mut binder = Binder::new();
3201 let _binding_context = binder.bind(&logical_plan)?;
3202
3203 let active = self.active_store();
3205 let optimizer = Optimizer::from_graph_store(&*active);
3206 let optimized_plan = optimizer.optimize(logical_plan)?;
3207
3208 let has_mutations = optimized_plan.root.has_mutations();
3209 if has_mutations {
3210 self.require_permission(crate::auth::StatementKind::Write)?;
3211 }
3212
3213 let result = self.with_auto_commit(has_mutations, || {
3214 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3215 let planner =
3216 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3217 let mut physical_plan = planner.plan(&optimized_plan)?;
3218 let executor = Executor::with_columns(physical_plan.columns.clone())
3219 .with_deadline(self.query_deadline());
3220 executor.execute(physical_plan.operator.as_mut())
3221 });
3222
3223 #[cfg(feature = "metrics")]
3224 {
3225 #[cfg(not(target_arch = "wasm32"))]
3226 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3227 #[cfg(target_arch = "wasm32")]
3228 let elapsed_ms = None;
3229 self.record_query_metrics("graphql", elapsed_ms, &result);
3230 }
3231
3232 result
3233 }
3234
3235 #[cfg(feature = "sql-pgq")]
3260 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3261 use crate::query::{
3262 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3263 processor::QueryLanguage, translators::sql_pgq,
3264 };
3265
3266 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3267 let start_time = Instant::now();
3268
3269 let logical_plan = sql_pgq::translate(query)?;
3271
3272 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3274 self.require_permission(crate::auth::StatementKind::Admin)?;
3275 return Ok(QueryResult {
3276 columns: vec!["status".into()],
3277 column_types: vec![grafeo_common::types::LogicalType::String],
3278 rows: vec![vec![Value::from(format!(
3279 "Property graph '{}' created ({} node tables, {} edge tables)",
3280 cpg.name,
3281 cpg.node_tables.len(),
3282 cpg.edge_tables.len()
3283 ))]],
3284 execution_time_ms: None,
3285 rows_scanned: None,
3286 status_message: None,
3287 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3288 });
3289 }
3290
3291 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3292
3293 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3294 cached_plan
3295 } else {
3296 let mut binder = Binder::new();
3297 let _binding_context = binder.bind(&logical_plan)?;
3298 let active = self.active_store();
3299 let optimizer = Optimizer::from_graph_store(&*active);
3300 let plan = optimizer.optimize(logical_plan)?;
3301 self.query_cache.put_optimized(cache_key, plan.clone());
3302 plan
3303 };
3304
3305 let active = self.active_store();
3306 let has_mutations = optimized_plan.root.has_mutations();
3307 if has_mutations {
3308 self.require_permission(crate::auth::StatementKind::Write)?;
3309 }
3310
3311 let result = self.with_auto_commit(has_mutations, || {
3312 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3313 let planner =
3314 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3315 let mut physical_plan = planner.plan(&optimized_plan)?;
3316 let executor = Executor::with_columns(physical_plan.columns.clone())
3317 .with_deadline(self.query_deadline());
3318 executor.execute(physical_plan.operator.as_mut())
3319 });
3320
3321 #[cfg(feature = "metrics")]
3322 {
3323 #[cfg(not(target_arch = "wasm32"))]
3324 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3325 #[cfg(target_arch = "wasm32")]
3326 let elapsed_ms = None;
3327 self.record_query_metrics("sql", elapsed_ms, &result);
3328 }
3329
3330 result
3331 }
3332
3333 #[cfg(feature = "sql-pgq")]
3339 pub fn execute_sql_with_params(
3340 &self,
3341 query: &str,
3342 params: std::collections::HashMap<String, Value>,
3343 ) -> Result<QueryResult> {
3344 use crate::query::processor::{QueryLanguage, QueryProcessor};
3345
3346 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3347 let start_time = Instant::now();
3348
3349 let has_mutations = if self.identity.can_write() {
3350 Self::query_looks_like_mutation(query)
3351 } else {
3352 use crate::query::translators::sql_pgq;
3353 match sql_pgq::translate(query) {
3354 Ok(plan) if plan.root.has_mutations() => {
3355 self.require_permission(crate::auth::StatementKind::Write)?;
3356 true
3357 }
3358 Ok(_) => false,
3359 Err(_) => Self::query_looks_like_mutation(query),
3360 }
3361 };
3362 if has_mutations {
3363 self.require_permission(crate::auth::StatementKind::Write)?;
3364 }
3365 let active = self.active_store();
3366
3367 let result = self.with_auto_commit(has_mutations, || {
3368 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3369 let processor = QueryProcessor::for_stores_with_transaction(
3370 Arc::clone(&active),
3371 self.active_write_store(),
3372 Arc::clone(&self.transaction_manager),
3373 )?;
3374 let processor = if let Some(transaction_id) = transaction_id {
3375 processor.with_transaction_context(viewing_epoch, transaction_id)
3376 } else {
3377 processor
3378 };
3379 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3380 });
3381
3382 #[cfg(feature = "metrics")]
3383 {
3384 #[cfg(not(target_arch = "wasm32"))]
3385 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3386 #[cfg(target_arch = "wasm32")]
3387 let elapsed_ms = None;
3388 self.record_query_metrics("sql", elapsed_ms, &result);
3389 }
3390
3391 result
3392 }
3393
3394 pub fn execute_language(
3403 &self,
3404 query: &str,
3405 language: &str,
3406 params: Option<std::collections::HashMap<String, Value>>,
3407 ) -> Result<QueryResult> {
3408 let _span = grafeo_info_span!(
3409 "grafeo::session::execute",
3410 language,
3411 query_len = query.len(),
3412 );
3413 match language {
3414 "gql" => {
3415 if let Some(p) = params {
3416 self.execute_with_params(query, p)
3417 } else {
3418 self.execute(query)
3419 }
3420 }
3421 #[cfg(feature = "cypher")]
3422 "cypher" => {
3423 if let Some(p) = params {
3424 use crate::query::processor::{QueryLanguage, QueryProcessor};
3425
3426 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3427 let start_time = Instant::now();
3428
3429 let has_mutations = if self.identity.can_write() {
3430 Self::query_looks_like_mutation(query)
3431 } else {
3432 use crate::query::translators::cypher;
3433 match cypher::translate(query) {
3434 Ok(plan) if plan.root.has_mutations() => {
3435 self.require_permission(crate::auth::StatementKind::Write)?;
3436 true
3437 }
3438 Ok(_) => false,
3439 Err(_) => Self::query_looks_like_mutation(query),
3440 }
3441 };
3442 let active = self.active_store();
3443 let result = self.with_auto_commit(has_mutations, || {
3444 let processor = QueryProcessor::for_stores_with_transaction(
3445 Arc::clone(&active),
3446 self.active_write_store(),
3447 Arc::clone(&self.transaction_manager),
3448 )?;
3449 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3450 let processor = if let Some(transaction_id) = transaction_id {
3451 processor.with_transaction_context(viewing_epoch, transaction_id)
3452 } else {
3453 processor
3454 };
3455 processor.process(query, QueryLanguage::Cypher, Some(&p))
3456 });
3457
3458 #[cfg(feature = "metrics")]
3459 {
3460 #[cfg(not(target_arch = "wasm32"))]
3461 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3462 #[cfg(target_arch = "wasm32")]
3463 let elapsed_ms = None;
3464 self.record_query_metrics("cypher", elapsed_ms, &result);
3465 }
3466
3467 result
3468 } else {
3469 self.execute_cypher(query)
3470 }
3471 }
3472 #[cfg(feature = "gremlin")]
3473 "gremlin" => {
3474 if let Some(p) = params {
3475 self.execute_gremlin_with_params(query, p)
3476 } else {
3477 self.execute_gremlin(query)
3478 }
3479 }
3480 #[cfg(feature = "graphql")]
3481 "graphql" => {
3482 if let Some(p) = params {
3483 self.execute_graphql_with_params(query, p)
3484 } else {
3485 self.execute_graphql(query)
3486 }
3487 }
3488 #[cfg(all(feature = "graphql", feature = "triple-store"))]
3489 "graphql-rdf" => {
3490 if let Some(p) = params {
3491 self.execute_graphql_rdf_with_params(query, p)
3492 } else {
3493 self.execute_graphql_rdf(query)
3494 }
3495 }
3496 #[cfg(feature = "sql-pgq")]
3497 "sql" | "sql-pgq" => {
3498 if let Some(p) = params {
3499 self.execute_sql_with_params(query, p)
3500 } else {
3501 self.execute_sql(query)
3502 }
3503 }
3504 #[cfg(all(feature = "sparql", feature = "triple-store"))]
3505 "sparql" => {
3506 if let Some(p) = params {
3507 self.execute_sparql_with_params(query, p)
3508 } else {
3509 self.execute_sparql(query)
3510 }
3511 }
3512 other => Err(grafeo_common::utils::error::Error::Query(
3513 grafeo_common::utils::error::QueryError::new(
3514 grafeo_common::utils::error::QueryErrorKind::Semantic,
3515 format!("Unknown query language: '{other}'"),
3516 ),
3517 )),
3518 }
3519 }
3520
3521 pub fn clear_plan_cache(&self) {
3548 self.query_cache.clear();
3549 }
3550
3551 #[cfg(feature = "lpg")]
3559 pub fn begin_transaction(&mut self) -> Result<()> {
3560 self.begin_transaction_inner(false, None)
3561 }
3562
3563 #[cfg(feature = "lpg")]
3571 pub fn begin_transaction_with_isolation(
3572 &mut self,
3573 isolation_level: crate::transaction::IsolationLevel,
3574 ) -> Result<()> {
3575 self.begin_transaction_inner(false, Some(isolation_level))
3576 }
3577
3578 #[cfg(feature = "lpg")]
3580 fn begin_transaction_inner(
3581 &self,
3582 read_only: bool,
3583 isolation_level: Option<crate::transaction::IsolationLevel>,
3584 ) -> Result<()> {
3585 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3586 let mut current = self.current_transaction.lock();
3587 if current.is_some() {
3588 drop(current);
3590 let mut depth = self.transaction_nesting_depth.lock();
3591 *depth += 1;
3592 let sp_name = format!("_nested_tx_{}", *depth);
3593 self.savepoint(&sp_name)?;
3594 return Ok(());
3595 }
3596
3597 let active = self.active_lpg_store();
3598 self.transaction_start_node_count
3599 .store(active.node_count(), Ordering::Relaxed);
3600 self.transaction_start_edge_count
3601 .store(active.edge_count(), Ordering::Relaxed);
3602 let transaction_id = if let Some(level) = isolation_level {
3603 self.transaction_manager.begin_with_isolation(level)
3604 } else {
3605 self.transaction_manager.begin()
3606 };
3607 *current = Some(transaction_id);
3608 *self.read_only_tx.lock() = read_only || self.db_read_only;
3609
3610 let key = self.active_graph_storage_key();
3613 let mut touched = self.touched_graphs.lock();
3614 touched.clear();
3615 touched.push(key);
3616
3617 #[cfg(feature = "metrics")]
3618 {
3619 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3620 #[cfg(not(target_arch = "wasm32"))]
3621 {
3622 *self.tx_start_time.lock() = Some(Instant::now());
3623 }
3624 }
3625
3626 Ok(())
3627 }
3628
3629 #[cfg(feature = "lpg")]
3637 pub fn commit(&mut self) -> Result<()> {
3638 self.commit_inner()
3639 }
3640
3641 #[cfg(feature = "lpg")]
3643 fn commit_inner(&self) -> Result<()> {
3644 let _span = grafeo_debug_span!("grafeo::tx::commit");
3645 {
3647 let mut depth = self.transaction_nesting_depth.lock();
3648 if *depth > 0 {
3649 let sp_name = format!("_nested_tx_{depth}");
3650 *depth -= 1;
3651 drop(depth);
3652 return self.release_savepoint(&sp_name);
3653 }
3654 }
3655
3656 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3657 grafeo_common::utils::error::Error::Transaction(
3658 grafeo_common::utils::error::TransactionError::InvalidState(
3659 "No active transaction".to_string(),
3660 ),
3661 )
3662 })?;
3663
3664 let touched = self.touched_graphs.lock().clone();
3667 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3668 Ok(epoch) => epoch,
3669 Err(e) => {
3670 for graph_name in &touched {
3672 let store = self.resolve_store(graph_name);
3673 store.rollback_transaction_properties(transaction_id);
3674 }
3675 #[cfg(feature = "triple-store")]
3676 self.rollback_rdf_transaction(transaction_id);
3677 #[cfg(feature = "cdc")]
3679 if let Some(ref pending) = self.cdc_pending_events {
3680 pending.lock().clear();
3681 }
3682 *self.read_only_tx.lock() = self.db_read_only;
3683 self.savepoints.lock().clear();
3684 self.touched_graphs.lock().clear();
3685 #[cfg(feature = "metrics")]
3686 {
3687 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3688 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3689 #[cfg(not(target_arch = "wasm32"))]
3690 if let Some(start) = self.tx_start_time.lock().take() {
3691 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3692 crate::metrics::record_metric!(
3693 self.metrics,
3694 tx_duration,
3695 observe duration_ms
3696 );
3697 }
3698 }
3699 return Err(e);
3700 }
3701 };
3702
3703 for graph_name in &touched {
3705 let store = self.resolve_store(graph_name);
3706 store.finalize_version_epochs(transaction_id, commit_epoch);
3707 }
3708
3709 #[cfg(feature = "triple-store")]
3711 self.commit_rdf_transaction(transaction_id);
3712
3713 for graph_name in &touched {
3714 let store = self.resolve_store(graph_name);
3715 store.commit_transaction_properties(transaction_id);
3716 }
3717
3718 #[cfg(feature = "cdc")]
3722 if let Some(ref pending) = self.cdc_pending_events {
3723 let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3724 self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3725 e.epoch = commit_epoch;
3726 e
3727 }));
3728 }
3729
3730 #[cfg(feature = "wal")]
3735 if let Some(ref wal) = self.wal {
3736 use grafeo_storage::wal::WalRecord;
3737 if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
3738 grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
3739 }
3740 if let Err(e) = wal.log(&WalRecord::EpochAdvance {
3741 epoch: commit_epoch,
3742 }) {
3743 grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
3744 }
3745 }
3746
3747 let current_epoch = self.transaction_manager.current_epoch();
3750 for graph_name in &touched {
3751 let store = self.resolve_store(graph_name);
3752 store.sync_epoch(current_epoch);
3753 }
3754
3755 *self.read_only_tx.lock() = self.db_read_only;
3757 self.savepoints.lock().clear();
3758 self.touched_graphs.lock().clear();
3759
3760 if self.gc_interval > 0 {
3762 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3763 if count.is_multiple_of(self.gc_interval) {
3764 let min_epoch = self.transaction_manager.min_active_epoch();
3765 for graph_name in &touched {
3766 let store = self.resolve_store(graph_name);
3767 store.gc_versions(min_epoch);
3768 }
3769 self.transaction_manager.gc();
3770 #[cfg(feature = "metrics")]
3771 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3772 }
3773 }
3774
3775 #[cfg(feature = "metrics")]
3776 {
3777 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3778 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3779 #[cfg(not(target_arch = "wasm32"))]
3780 if let Some(start) = self.tx_start_time.lock().take() {
3781 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3782 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3783 }
3784 }
3785
3786 Ok(())
3787 }
3788
3789 #[cfg(feature = "lpg")]
3813 pub fn rollback(&mut self) -> Result<()> {
3814 self.rollback_inner()
3815 }
3816
3817 #[cfg(feature = "lpg")]
3819 fn rollback_inner(&self) -> Result<()> {
3820 let _span = grafeo_debug_span!("grafeo::tx::rollback");
3821 {
3823 let mut depth = self.transaction_nesting_depth.lock();
3824 if *depth > 0 {
3825 let sp_name = format!("_nested_tx_{depth}");
3826 *depth -= 1;
3827 drop(depth);
3828 return self.rollback_to_savepoint(&sp_name);
3829 }
3830 }
3831
3832 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3833 grafeo_common::utils::error::Error::Transaction(
3834 grafeo_common::utils::error::TransactionError::InvalidState(
3835 "No active transaction".to_string(),
3836 ),
3837 )
3838 })?;
3839
3840 *self.read_only_tx.lock() = self.db_read_only;
3842
3843 let touched = self.touched_graphs.lock().clone();
3845 for graph_name in &touched {
3846 let store = self.resolve_store(graph_name);
3847 store.discard_uncommitted_versions(transaction_id);
3848 }
3849
3850 #[cfg(feature = "triple-store")]
3852 self.rollback_rdf_transaction(transaction_id);
3853
3854 #[cfg(feature = "cdc")]
3856 if let Some(ref pending) = self.cdc_pending_events {
3857 pending.lock().clear();
3858 }
3859
3860 self.savepoints.lock().clear();
3862 self.touched_graphs.lock().clear();
3863
3864 let result = self.transaction_manager.abort(transaction_id);
3866
3867 #[cfg(feature = "metrics")]
3868 if result.is_ok() {
3869 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3870 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3871 #[cfg(not(target_arch = "wasm32"))]
3872 if let Some(start) = self.tx_start_time.lock().take() {
3873 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3874 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3875 }
3876 }
3877
3878 result
3879 }
3880
3881 #[cfg(feature = "lpg")]
3891 pub fn savepoint(&self, name: &str) -> Result<()> {
3892 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3893 grafeo_common::utils::error::Error::Transaction(
3894 grafeo_common::utils::error::TransactionError::InvalidState(
3895 "No active transaction".to_string(),
3896 ),
3897 )
3898 })?;
3899
3900 let touched = self.touched_graphs.lock().clone();
3902 let graph_snapshots: Vec<GraphSavepoint> = touched
3903 .iter()
3904 .map(|graph_name| {
3905 let store = self.resolve_store(graph_name);
3906 GraphSavepoint {
3907 graph_name: graph_name.clone(),
3908 next_node_id: store.peek_next_node_id(),
3909 next_edge_id: store.peek_next_edge_id(),
3910 undo_log_position: store.property_undo_log_position(tx_id),
3911 }
3912 })
3913 .collect();
3914
3915 self.savepoints.lock().push(SavepointState {
3916 name: name.to_string(),
3917 graph_snapshots,
3918 active_graph: self.current_graph.lock().clone(),
3919 #[cfg(feature = "cdc")]
3920 cdc_event_position: self
3921 .cdc_pending_events
3922 .as_ref()
3923 .map_or(0, |p| p.lock().len()),
3924 });
3925 Ok(())
3926 }
3927
3928 #[cfg(feature = "lpg")]
3937 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3938 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3939 grafeo_common::utils::error::Error::Transaction(
3940 grafeo_common::utils::error::TransactionError::InvalidState(
3941 "No active transaction".to_string(),
3942 ),
3943 )
3944 })?;
3945
3946 let mut savepoints = self.savepoints.lock();
3947
3948 let pos = savepoints
3950 .iter()
3951 .rposition(|sp| sp.name == name)
3952 .ok_or_else(|| {
3953 grafeo_common::utils::error::Error::Transaction(
3954 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3955 "Savepoint '{name}' not found"
3956 )),
3957 )
3958 })?;
3959
3960 let sp_state = savepoints[pos].clone();
3961
3962 savepoints.truncate(pos);
3964 drop(savepoints);
3965
3966 for gs in &sp_state.graph_snapshots {
3968 let store = self.resolve_store(&gs.graph_name);
3969
3970 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3972
3973 let current_next_node = store.peek_next_node_id();
3975 let current_next_edge = store.peek_next_edge_id();
3976
3977 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3978 .map(NodeId::new)
3979 .collect();
3980 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3981 .map(EdgeId::new)
3982 .collect();
3983
3984 if !node_ids.is_empty() || !edge_ids.is_empty() {
3985 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3986 }
3987 }
3988
3989 let touched = self.touched_graphs.lock().clone();
3993 for graph_name in &touched {
3994 let already_captured = sp_state
3995 .graph_snapshots
3996 .iter()
3997 .any(|gs| gs.graph_name == *graph_name);
3998 if !already_captured {
3999 let store = self.resolve_store(graph_name);
4000 store.discard_uncommitted_versions(transaction_id);
4001 }
4002 }
4003
4004 #[cfg(feature = "cdc")]
4006 if let Some(ref pending) = self.cdc_pending_events {
4007 pending.lock().truncate(sp_state.cdc_event_position);
4008 }
4009
4010 let mut touched = self.touched_graphs.lock();
4012 touched.clear();
4013 for gs in &sp_state.graph_snapshots {
4014 if !touched.contains(&gs.graph_name) {
4015 touched.push(gs.graph_name.clone());
4016 }
4017 }
4018
4019 Ok(())
4020 }
4021
4022 pub fn release_savepoint(&self, name: &str) -> Result<()> {
4028 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4029 grafeo_common::utils::error::Error::Transaction(
4030 grafeo_common::utils::error::TransactionError::InvalidState(
4031 "No active transaction".to_string(),
4032 ),
4033 )
4034 })?;
4035
4036 let mut savepoints = self.savepoints.lock();
4037 let pos = savepoints
4038 .iter()
4039 .rposition(|sp| sp.name == name)
4040 .ok_or_else(|| {
4041 grafeo_common::utils::error::Error::Transaction(
4042 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4043 "Savepoint '{name}' not found"
4044 )),
4045 )
4046 })?;
4047 savepoints.remove(pos);
4048 Ok(())
4049 }
4050
4051 #[must_use]
4053 pub fn in_transaction(&self) -> bool {
4054 self.current_transaction.lock().is_some()
4055 }
4056
4057 #[must_use]
4059 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4060 *self.current_transaction.lock()
4061 }
4062
4063 #[must_use]
4065 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4066 &self.transaction_manager
4067 }
4068
4069 #[cfg(feature = "lpg")]
4071 #[must_use]
4072 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4073 (
4074 self.transaction_start_node_count.load(Ordering::Relaxed),
4075 self.active_lpg_store().node_count(),
4076 )
4077 }
4078
4079 #[cfg(feature = "lpg")]
4081 #[must_use]
4082 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4083 (
4084 self.transaction_start_edge_count.load(Ordering::Relaxed),
4085 self.active_lpg_store().edge_count(),
4086 )
4087 }
4088
4089 #[cfg(feature = "lpg")]
4123 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4124 crate::transaction::PreparedCommit::new(self)
4125 }
4126
4127 pub fn set_auto_commit(&mut self, auto_commit: bool) {
4129 self.auto_commit = auto_commit;
4130 }
4131
4132 #[must_use]
4134 pub fn auto_commit(&self) -> bool {
4135 self.auto_commit
4136 }
4137
4138 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4143 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4144 }
4145
4146 #[cfg(feature = "lpg")]
4149 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4150 where
4151 F: FnOnce() -> Result<QueryResult>,
4152 {
4153 if self.needs_auto_commit(has_mutations) {
4154 self.begin_transaction_inner(false, None)?;
4155 match body() {
4156 Ok(result) => {
4157 self.commit_inner()?;
4158 Ok(result)
4159 }
4160 Err(e) => {
4161 let _ = self.rollback_inner();
4162 Err(e)
4163 }
4164 }
4165 } else {
4166 body()
4167 }
4168 }
4169
4170 #[cfg(not(feature = "lpg"))]
4172 fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4173 where
4174 F: FnOnce() -> Result<QueryResult>,
4175 {
4176 body()
4177 }
4178
4179 fn query_looks_like_mutation(query: &str) -> bool {
4185 let upper = query.to_ascii_uppercase();
4186 upper.contains("INSERT")
4187 || upper.contains("CREATE")
4188 || upper.contains("DELETE")
4189 || upper.contains("MERGE")
4190 || upper.contains("SET")
4191 || upper.contains("REMOVE")
4192 || upper.contains("DROP")
4193 || upper.contains("ALTER")
4194 }
4195
4196 #[must_use]
4198 fn query_deadline(&self) -> Option<Instant> {
4199 #[cfg(not(target_arch = "wasm32"))]
4200 {
4201 self.query_timeout.map(|d| Instant::now() + d)
4202 }
4203 #[cfg(target_arch = "wasm32")]
4204 {
4205 let _ = &self.query_timeout;
4206 None
4207 }
4208 }
4209
4210 #[cfg(feature = "metrics")]
4216 fn record_query_metrics(
4217 &self,
4218 language: &str,
4219 elapsed_ms: Option<f64>,
4220 result: &Result<crate::database::QueryResult>,
4221 ) {
4222 use crate::metrics::record_metric;
4223
4224 record_metric!(self.metrics, query_count, inc);
4225 if let Some(ref reg) = self.metrics {
4226 reg.query_count_by_language.increment(language);
4227 }
4228 if let Some(ms) = elapsed_ms {
4229 record_metric!(self.metrics, query_latency, observe ms);
4230 }
4231 match result {
4232 Ok(r) => {
4233 let returned = r.rows.len() as u64;
4234 record_metric!(self.metrics, rows_returned, add returned);
4235 if let Some(scanned) = r.rows_scanned {
4236 record_metric!(self.metrics, rows_scanned, add scanned);
4237 }
4238 }
4239 Err(e) => {
4240 record_metric!(self.metrics, query_errors, inc);
4241 let msg = e.to_string();
4243 if msg.contains("exceeded timeout") {
4244 record_metric!(self.metrics, query_timeouts, inc);
4245 }
4246 }
4247 }
4248 }
4249
4250 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4252 use grafeo_adapters::query::gql::ast::{Expression, Literal};
4253 match expr {
4254 Expression::Literal(Literal::Integer(n)) => Some(*n),
4255 _ => None,
4256 }
4257 }
4258
4259 #[must_use]
4265 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4266 if let Some(epoch) = *self.viewing_epoch_override.lock() {
4268 return (epoch, None);
4269 }
4270
4271 if let Some(transaction_id) = *self.current_transaction.lock() {
4272 let epoch = self
4274 .transaction_manager
4275 .start_epoch(transaction_id)
4276 .unwrap_or_else(|| self.transaction_manager.current_epoch());
4277 (epoch, Some(transaction_id))
4278 } else {
4279 (self.transaction_manager.current_epoch(), None)
4281 }
4282 }
4283
4284 fn create_planner_for_store(
4289 &self,
4290 store: Arc<dyn GraphStore>,
4291 viewing_epoch: EpochId,
4292 transaction_id: Option<TransactionId>,
4293 ) -> crate::query::Planner {
4294 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4295 }
4296
4297 fn create_planner_for_store_with_read_only(
4298 &self,
4299 store: Arc<dyn GraphStore>,
4300 viewing_epoch: EpochId,
4301 transaction_id: Option<TransactionId>,
4302 read_only: bool,
4303 ) -> crate::query::Planner {
4304 use crate::query::Planner;
4305 use grafeo_core::execution::operators::{LazyValue, SessionContext};
4306
4307 let info_store = Arc::clone(&store);
4309 let schema_store = Arc::clone(&store);
4310
4311 let session_context = SessionContext {
4312 current_schema: self.current_schema(),
4313 current_graph: self.current_graph(),
4314 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4315 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4316 };
4317
4318 let write_store = self.active_write_store();
4319
4320 let mut planner = Planner::with_context(
4321 Arc::clone(&store),
4322 write_store,
4323 Arc::clone(&self.transaction_manager),
4324 transaction_id,
4325 viewing_epoch,
4326 )
4327 .with_factorized_execution(self.factorized_execution)
4328 .with_catalog(Arc::clone(&self.catalog))
4329 .with_session_context(session_context)
4330 .with_read_only(read_only);
4331
4332 let validator =
4334 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
4335 planner = planner.with_validator(Arc::new(validator));
4336
4337 planner
4338 }
4339
4340 fn build_info_value(store: &dyn GraphStore) -> Value {
4342 use grafeo_common::types::PropertyKey;
4343 use std::collections::BTreeMap;
4344
4345 let mut map = BTreeMap::new();
4346 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4347 map.insert(
4348 PropertyKey::from("node_count"),
4349 Value::Int64(store.node_count() as i64),
4350 );
4351 map.insert(
4352 PropertyKey::from("edge_count"),
4353 Value::Int64(store.edge_count() as i64),
4354 );
4355 map.insert(
4356 PropertyKey::from("version"),
4357 Value::String(env!("CARGO_PKG_VERSION").into()),
4358 );
4359 Value::Map(map.into())
4360 }
4361
4362 fn build_schema_value(store: &dyn GraphStore) -> Value {
4364 use grafeo_common::types::PropertyKey;
4365 use std::collections::BTreeMap;
4366
4367 let labels: Vec<Value> = store
4368 .all_labels()
4369 .into_iter()
4370 .map(|l| Value::String(l.into()))
4371 .collect();
4372 let edge_types: Vec<Value> = store
4373 .all_edge_types()
4374 .into_iter()
4375 .map(|t| Value::String(t.into()))
4376 .collect();
4377 let property_keys: Vec<Value> = store
4378 .all_property_keys()
4379 .into_iter()
4380 .map(|k| Value::String(k.into()))
4381 .collect();
4382
4383 let mut map = BTreeMap::new();
4384 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4385 map.insert(
4386 PropertyKey::from("edge_types"),
4387 Value::List(edge_types.into()),
4388 );
4389 map.insert(
4390 PropertyKey::from("property_keys"),
4391 Value::List(property_keys.into()),
4392 );
4393 Value::Map(map.into())
4394 }
4395
4396 #[cfg(feature = "lpg")]
4401 pub fn create_node(&self, labels: &[&str]) -> NodeId {
4402 let (epoch, transaction_id) = self.get_transaction_context();
4403 self.active_lpg_store().create_node_versioned(
4404 labels,
4405 epoch,
4406 transaction_id.unwrap_or(TransactionId::SYSTEM),
4407 )
4408 }
4409
4410 #[cfg(feature = "lpg")]
4414 pub fn create_node_with_props<'a>(
4415 &self,
4416 labels: &[&str],
4417 properties: impl IntoIterator<Item = (&'a str, Value)>,
4418 ) -> NodeId {
4419 let (epoch, transaction_id) = self.get_transaction_context();
4420 self.active_lpg_store().create_node_with_props_versioned(
4421 labels,
4422 properties,
4423 epoch,
4424 transaction_id.unwrap_or(TransactionId::SYSTEM),
4425 )
4426 }
4427
4428 #[cfg(feature = "lpg")]
4433 pub fn create_edge(
4434 &self,
4435 src: NodeId,
4436 dst: NodeId,
4437 edge_type: &str,
4438 ) -> grafeo_common::types::EdgeId {
4439 let (epoch, transaction_id) = self.get_transaction_context();
4440 self.active_lpg_store().create_edge_versioned(
4441 src,
4442 dst,
4443 edge_type,
4444 epoch,
4445 transaction_id.unwrap_or(TransactionId::SYSTEM),
4446 )
4447 }
4448
4449 #[cfg(feature = "lpg")]
4451 pub fn create_edge_with_props<'a>(
4452 &self,
4453 src: NodeId,
4454 dst: NodeId,
4455 edge_type: &str,
4456 properties: impl IntoIterator<Item = (&'a str, Value)>,
4457 ) -> grafeo_common::types::EdgeId {
4458 let (epoch, transaction_id) = self.get_transaction_context();
4459 let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4460 let store = self.active_lpg_store();
4461 let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4462 for (key, value) in properties {
4463 store.set_edge_property_versioned(eid, key, value, tid);
4464 }
4465 eid
4466 }
4467
4468 #[cfg(feature = "lpg")]
4470 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4471 let (_, transaction_id) = self.get_transaction_context();
4472 if let Some(tid) = transaction_id {
4473 self.active_lpg_store()
4474 .set_node_property_versioned(id, key, value, tid);
4475 } else {
4476 self.active_lpg_store().set_node_property(id, key, value);
4477 }
4478 }
4479
4480 #[cfg(feature = "lpg")]
4482 pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4483 let (_, transaction_id) = self.get_transaction_context();
4484 if let Some(tid) = transaction_id {
4485 self.active_lpg_store()
4486 .set_edge_property_versioned(id, key, value, tid);
4487 } else {
4488 self.active_lpg_store().set_edge_property(id, key, value);
4489 }
4490 }
4491
4492 #[cfg(feature = "lpg")]
4494 pub fn delete_node(&self, id: NodeId) -> bool {
4495 let (epoch, transaction_id) = self.get_transaction_context();
4496 if let Some(tid) = transaction_id {
4497 self.active_lpg_store()
4498 .delete_node_versioned(id, epoch, tid)
4499 } else {
4500 self.active_lpg_store().delete_node(id)
4501 }
4502 }
4503
4504 #[cfg(feature = "lpg")]
4506 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4507 let (epoch, transaction_id) = self.get_transaction_context();
4508 if let Some(tid) = transaction_id {
4509 self.active_lpg_store()
4510 .delete_edge_versioned(id, epoch, tid)
4511 } else {
4512 self.active_lpg_store().delete_edge(id)
4513 }
4514 }
4515
4516 #[cfg(feature = "lpg")]
4544 #[must_use]
4545 pub fn get_node(&self, id: NodeId) -> Option<Node> {
4546 let (epoch, transaction_id) = self.get_transaction_context();
4547 self.active_lpg_store().get_node_versioned(
4548 id,
4549 epoch,
4550 transaction_id.unwrap_or(TransactionId::SYSTEM),
4551 )
4552 }
4553
4554 #[cfg(feature = "lpg")]
4578 #[must_use]
4579 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4580 self.get_node(id)
4581 .and_then(|node| node.get_property(key).cloned())
4582 }
4583
4584 #[cfg(feature = "lpg")]
4591 #[must_use]
4592 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4593 let (epoch, transaction_id) = self.get_transaction_context();
4594 self.active_lpg_store().get_edge_versioned(
4595 id,
4596 epoch,
4597 transaction_id.unwrap_or(TransactionId::SYSTEM),
4598 )
4599 }
4600
4601 #[cfg(feature = "lpg")]
4627 #[must_use]
4628 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4629 self.active_lpg_store()
4630 .edges_from(node, Direction::Outgoing)
4631 .collect()
4632 }
4633
4634 #[cfg(feature = "lpg")]
4643 #[must_use]
4644 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4645 self.active_lpg_store()
4646 .edges_from(node, Direction::Incoming)
4647 .collect()
4648 }
4649
4650 #[cfg(feature = "lpg")]
4662 #[must_use]
4663 pub fn get_neighbors_outgoing_by_type(
4664 &self,
4665 node: NodeId,
4666 edge_type: &str,
4667 ) -> Vec<(NodeId, EdgeId)> {
4668 self.active_lpg_store()
4669 .edges_from(node, Direction::Outgoing)
4670 .filter(|(_, edge_id)| {
4671 self.get_edge(*edge_id)
4672 .is_some_and(|e| e.edge_type.as_str() == edge_type)
4673 })
4674 .collect()
4675 }
4676
4677 #[cfg(feature = "lpg")]
4684 #[must_use]
4685 pub fn node_exists(&self, id: NodeId) -> bool {
4686 self.get_node(id).is_some()
4687 }
4688
4689 #[cfg(feature = "lpg")]
4691 #[must_use]
4692 pub fn edge_exists(&self, id: EdgeId) -> bool {
4693 self.get_edge(id).is_some()
4694 }
4695
4696 #[cfg(feature = "lpg")]
4700 #[must_use]
4701 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4702 let active = self.active_lpg_store();
4703 let out = active.out_degree(node);
4704 let in_degree = active.in_degree(node);
4705 (out, in_degree)
4706 }
4707
4708 #[cfg(feature = "lpg")]
4718 #[must_use]
4719 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4720 let (epoch, transaction_id) = self.get_transaction_context();
4721 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4722 let active = self.active_lpg_store();
4723 ids.iter()
4724 .map(|&id| active.get_node_versioned(id, epoch, tx))
4725 .collect()
4726 }
4727
4728 #[cfg(feature = "cdc")]
4736 pub fn history(
4737 &self,
4738 entity_id: impl Into<crate::cdc::EntityId>,
4739 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4740 Ok(self.cdc_log.history(entity_id.into()))
4741 }
4742
4743 #[cfg(feature = "cdc")]
4749 pub fn history_since(
4750 &self,
4751 entity_id: impl Into<crate::cdc::EntityId>,
4752 since_epoch: EpochId,
4753 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4754 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4755 }
4756
4757 #[cfg(feature = "cdc")]
4763 pub fn changes_between(
4764 &self,
4765 start_epoch: EpochId,
4766 end_epoch: EpochId,
4767 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4768 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4769 }
4770}
4771
4772impl Drop for Session {
4773 fn drop(&mut self) {
4774 #[cfg(feature = "lpg")]
4777 if self.in_transaction() {
4778 let _ = self.rollback_inner();
4779 }
4780
4781 #[cfg(feature = "metrics")]
4782 if let Some(ref reg) = self.metrics {
4783 reg.session_active
4784 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4785 }
4786 }
4787}
4788
4789#[cfg(test)]
4790mod tests {
4791 use super::parse_default_literal;
4792 use crate::database::GrafeoDB;
4793 use grafeo_common::types::Value;
4794
4795 #[test]
4800 fn parse_default_literal_null() {
4801 assert_eq!(parse_default_literal("null"), Value::Null);
4802 assert_eq!(parse_default_literal("NULL"), Value::Null);
4803 assert_eq!(parse_default_literal("Null"), Value::Null);
4804 }
4805
4806 #[test]
4807 fn parse_default_literal_bool() {
4808 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4809 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4810 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4811 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4812 }
4813
4814 #[test]
4815 fn parse_default_literal_string_single_quoted() {
4816 assert_eq!(
4817 parse_default_literal("'hello'"),
4818 Value::String("hello".into())
4819 );
4820 }
4821
4822 #[test]
4823 fn parse_default_literal_string_double_quoted() {
4824 assert_eq!(
4825 parse_default_literal("\"world\""),
4826 Value::String("world".into())
4827 );
4828 }
4829
4830 #[test]
4831 fn parse_default_literal_integer() {
4832 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4833 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4834 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4835 }
4836
4837 #[test]
4838 fn parse_default_literal_float() {
4839 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4840 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4841 }
4842
4843 #[test]
4844 fn parse_default_literal_fallback_string() {
4845 assert_eq!(
4847 parse_default_literal("some_identifier"),
4848 Value::String("some_identifier".into())
4849 );
4850 }
4851
4852 #[test]
4853 fn test_session_create_node() {
4854 let db = GrafeoDB::new_in_memory();
4855 let session = db.session();
4856
4857 let id = session.create_node(&["Person"]);
4858 assert!(id.is_valid());
4859 assert_eq!(db.node_count(), 1);
4860 }
4861
4862 #[test]
4863 fn test_session_transaction() {
4864 let db = GrafeoDB::new_in_memory();
4865 let mut session = db.session();
4866
4867 assert!(!session.in_transaction());
4868
4869 session.begin_transaction().unwrap();
4870 assert!(session.in_transaction());
4871
4872 session.commit().unwrap();
4873 assert!(!session.in_transaction());
4874 }
4875
4876 #[test]
4877 fn test_session_transaction_context() {
4878 let db = GrafeoDB::new_in_memory();
4879 let mut session = db.session();
4880
4881 let (_epoch1, transaction_id1) = session.get_transaction_context();
4883 assert!(transaction_id1.is_none());
4884
4885 session.begin_transaction().unwrap();
4887 let (epoch2, transaction_id2) = session.get_transaction_context();
4888 assert!(transaction_id2.is_some());
4889 let _ = epoch2; session.commit().unwrap();
4894 let (epoch3, tx_id3) = session.get_transaction_context();
4895 assert!(tx_id3.is_none());
4896 assert!(epoch3.as_u64() >= epoch2.as_u64());
4898 }
4899
4900 #[test]
4901 fn test_session_rollback() {
4902 let db = GrafeoDB::new_in_memory();
4903 let mut session = db.session();
4904
4905 session.begin_transaction().unwrap();
4906 session.rollback().unwrap();
4907 assert!(!session.in_transaction());
4908 }
4909
4910 #[test]
4911 fn test_session_rollback_discards_versions() {
4912 use grafeo_common::types::TransactionId;
4913
4914 let db = GrafeoDB::new_in_memory();
4915
4916 let node_before = db.store().create_node(&["Person"]);
4918 assert!(node_before.is_valid());
4919 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4920
4921 let mut session = db.session();
4923 session.begin_transaction().unwrap();
4924 let transaction_id = session.current_transaction.lock().unwrap();
4925
4926 let epoch = db.store().current_epoch();
4928 let node_in_tx = db
4929 .store()
4930 .create_node_versioned(&["Person"], epoch, transaction_id);
4931 assert!(node_in_tx.is_valid());
4932
4933 assert_eq!(
4937 db.node_count(),
4938 1,
4939 "PENDING nodes should be invisible to non-versioned node_count()"
4940 );
4941 assert!(
4942 db.store()
4943 .get_node_versioned(node_in_tx, epoch, transaction_id)
4944 .is_some(),
4945 "Transaction node should be visible to its own transaction"
4946 );
4947
4948 session.rollback().unwrap();
4950 assert!(!session.in_transaction());
4951
4952 let count_after = db.node_count();
4955 assert_eq!(
4956 count_after, 1,
4957 "Rollback should discard uncommitted node, but got {count_after}"
4958 );
4959
4960 let current_epoch = db.store().current_epoch();
4962 assert!(
4963 db.store()
4964 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4965 .is_some(),
4966 "Original node should still exist"
4967 );
4968
4969 assert!(
4971 db.store()
4972 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4973 .is_none(),
4974 "Transaction node should be gone"
4975 );
4976 }
4977
4978 #[test]
4979 fn test_session_create_node_in_transaction() {
4980 let db = GrafeoDB::new_in_memory();
4982
4983 let node_before = db.create_node(&["Person"]);
4985 assert!(node_before.is_valid());
4986 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4987
4988 let mut session = db.session();
4990 session.begin_transaction().unwrap();
4991 let transaction_id = session.current_transaction.lock().unwrap();
4992
4993 let node_in_tx = session.create_node(&["Person"]);
4995 assert!(node_in_tx.is_valid());
4996
4997 assert_eq!(
5000 db.node_count(),
5001 1,
5002 "PENDING nodes should be invisible to non-versioned node_count()"
5003 );
5004 let epoch = db.store().current_epoch();
5005 assert!(
5006 db.store()
5007 .get_node_versioned(node_in_tx, epoch, transaction_id)
5008 .is_some(),
5009 "Transaction node should be visible to its own transaction"
5010 );
5011
5012 session.rollback().unwrap();
5014
5015 let count_after = db.node_count();
5017 assert_eq!(
5018 count_after, 1,
5019 "Rollback should discard node created via session.create_node(), but got {count_after}"
5020 );
5021 }
5022
5023 #[test]
5024 fn test_session_create_node_with_props_in_transaction() {
5025 use grafeo_common::types::Value;
5026
5027 let db = GrafeoDB::new_in_memory();
5029
5030 db.create_node(&["Person"]);
5032 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5033
5034 let mut session = db.session();
5036 session.begin_transaction().unwrap();
5037 let transaction_id = session.current_transaction.lock().unwrap();
5038
5039 let node_in_tx =
5040 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5041 assert!(node_in_tx.is_valid());
5042
5043 assert_eq!(
5046 db.node_count(),
5047 1,
5048 "PENDING nodes should be invisible to non-versioned node_count()"
5049 );
5050 let epoch = db.store().current_epoch();
5051 assert!(
5052 db.store()
5053 .get_node_versioned(node_in_tx, epoch, transaction_id)
5054 .is_some(),
5055 "Transaction node should be visible to its own transaction"
5056 );
5057
5058 session.rollback().unwrap();
5060
5061 let count_after = db.node_count();
5063 assert_eq!(
5064 count_after, 1,
5065 "Rollback should discard node created via session.create_node_with_props()"
5066 );
5067 }
5068
5069 #[cfg(feature = "gql")]
5070 mod gql_tests {
5071 use super::*;
5072
5073 #[test]
5074 fn test_gql_query_execution() {
5075 let db = GrafeoDB::new_in_memory();
5076 let session = db.session();
5077
5078 session.create_node(&["Person"]);
5080 session.create_node(&["Person"]);
5081 session.create_node(&["Animal"]);
5082
5083 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5085
5086 assert_eq!(result.row_count(), 2);
5088 assert_eq!(result.column_count(), 1);
5089 assert_eq!(result.columns[0], "n");
5090 }
5091
5092 #[test]
5093 fn test_gql_empty_result() {
5094 let db = GrafeoDB::new_in_memory();
5095 let session = db.session();
5096
5097 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5099
5100 assert_eq!(result.row_count(), 0);
5101 }
5102
5103 #[test]
5104 fn test_gql_parse_error() {
5105 let db = GrafeoDB::new_in_memory();
5106 let session = db.session();
5107
5108 let result = session.execute("MATCH (n RETURN n");
5110
5111 assert!(result.is_err());
5112 }
5113
5114 #[test]
5115 fn test_gql_relationship_traversal() {
5116 let db = GrafeoDB::new_in_memory();
5117 let session = db.session();
5118
5119 let alix = session.create_node(&["Person"]);
5121 let gus = session.create_node(&["Person"]);
5122 let vincent = session.create_node(&["Person"]);
5123
5124 session.create_edge(alix, gus, "KNOWS");
5125 session.create_edge(alix, vincent, "KNOWS");
5126
5127 let result = session
5129 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5130 .unwrap();
5131
5132 assert_eq!(result.row_count(), 2);
5134 assert_eq!(result.column_count(), 2);
5135 assert_eq!(result.columns[0], "a");
5136 assert_eq!(result.columns[1], "b");
5137 }
5138
5139 #[test]
5140 fn test_gql_relationship_with_type_filter() {
5141 let db = GrafeoDB::new_in_memory();
5142 let session = db.session();
5143
5144 let alix = session.create_node(&["Person"]);
5146 let gus = session.create_node(&["Person"]);
5147 let vincent = session.create_node(&["Person"]);
5148
5149 session.create_edge(alix, gus, "KNOWS");
5150 session.create_edge(alix, vincent, "WORKS_WITH");
5151
5152 let result = session
5154 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5155 .unwrap();
5156
5157 assert_eq!(result.row_count(), 1);
5159 }
5160
5161 #[test]
5162 fn test_gql_semantic_error_undefined_variable() {
5163 let db = GrafeoDB::new_in_memory();
5164 let session = db.session();
5165
5166 let result = session.execute("MATCH (n:Person) RETURN x");
5168
5169 assert!(result.is_err());
5171 let Err(err) = result else {
5172 panic!("Expected error")
5173 };
5174 assert!(
5175 err.to_string().contains("Undefined variable"),
5176 "Expected undefined variable error, got: {}",
5177 err
5178 );
5179 }
5180
5181 #[test]
5182 fn test_gql_where_clause_property_filter() {
5183 use grafeo_common::types::Value;
5184
5185 let db = GrafeoDB::new_in_memory();
5186 let session = db.session();
5187
5188 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
5190 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
5191 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
5192
5193 let result = session
5195 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5196 .unwrap();
5197
5198 assert_eq!(result.row_count(), 2);
5200 }
5201
5202 #[test]
5203 fn test_gql_where_clause_equality() {
5204 use grafeo_common::types::Value;
5205
5206 let db = GrafeoDB::new_in_memory();
5207 let session = db.session();
5208
5209 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5211 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
5212 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5213
5214 let result = session
5216 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5217 .unwrap();
5218
5219 assert_eq!(result.row_count(), 2);
5221 }
5222
5223 #[test]
5224 fn test_gql_return_property_access() {
5225 use grafeo_common::types::Value;
5226
5227 let db = GrafeoDB::new_in_memory();
5228 let session = db.session();
5229
5230 session.create_node_with_props(
5232 &["Person"],
5233 [
5234 ("name", Value::String("Alix".into())),
5235 ("age", Value::Int64(30)),
5236 ],
5237 );
5238 session.create_node_with_props(
5239 &["Person"],
5240 [
5241 ("name", Value::String("Gus".into())),
5242 ("age", Value::Int64(25)),
5243 ],
5244 );
5245
5246 let result = session
5248 .execute("MATCH (n:Person) RETURN n.name, n.age")
5249 .unwrap();
5250
5251 assert_eq!(result.row_count(), 2);
5253 assert_eq!(result.column_count(), 2);
5254 assert_eq!(result.columns[0], "n.name");
5255 assert_eq!(result.columns[1], "n.age");
5256
5257 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5259 assert!(names.contains(&&Value::String("Alix".into())));
5260 assert!(names.contains(&&Value::String("Gus".into())));
5261 }
5262
5263 #[test]
5264 fn test_gql_return_mixed_expressions() {
5265 use grafeo_common::types::Value;
5266
5267 let db = GrafeoDB::new_in_memory();
5268 let session = db.session();
5269
5270 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5272
5273 let result = session
5275 .execute("MATCH (n:Person) RETURN n, n.name")
5276 .unwrap();
5277
5278 assert_eq!(result.row_count(), 1);
5279 assert_eq!(result.column_count(), 2);
5280 assert_eq!(result.columns[0], "n");
5281 assert_eq!(result.columns[1], "n.name");
5282
5283 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5285 }
5286 }
5287
5288 #[cfg(feature = "cypher")]
5289 mod cypher_tests {
5290 use super::*;
5291
5292 #[test]
5293 fn test_cypher_query_execution() {
5294 let db = GrafeoDB::new_in_memory();
5295 let session = db.session();
5296
5297 session.create_node(&["Person"]);
5299 session.create_node(&["Person"]);
5300 session.create_node(&["Animal"]);
5301
5302 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5304
5305 assert_eq!(result.row_count(), 2);
5307 assert_eq!(result.column_count(), 1);
5308 assert_eq!(result.columns[0], "n");
5309 }
5310
5311 #[test]
5312 fn test_cypher_empty_result() {
5313 let db = GrafeoDB::new_in_memory();
5314 let session = db.session();
5315
5316 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5318
5319 assert_eq!(result.row_count(), 0);
5320 }
5321
5322 #[test]
5323 fn test_cypher_parse_error() {
5324 let db = GrafeoDB::new_in_memory();
5325 let session = db.session();
5326
5327 let result = session.execute_cypher("MATCH (n RETURN n");
5329
5330 assert!(result.is_err());
5331 }
5332 }
5333
5334 mod direct_lookup_tests {
5337 use super::*;
5338 use grafeo_common::types::Value;
5339
5340 #[test]
5341 fn test_get_node() {
5342 let db = GrafeoDB::new_in_memory();
5343 let session = db.session();
5344
5345 let id = session.create_node(&["Person"]);
5346 let node = session.get_node(id);
5347
5348 assert!(node.is_some());
5349 let node = node.unwrap();
5350 assert_eq!(node.id, id);
5351 }
5352
5353 #[test]
5354 fn test_get_node_not_found() {
5355 use grafeo_common::types::NodeId;
5356
5357 let db = GrafeoDB::new_in_memory();
5358 let session = db.session();
5359
5360 let node = session.get_node(NodeId::new(9999));
5362 assert!(node.is_none());
5363 }
5364
5365 #[test]
5366 fn test_get_node_property() {
5367 let db = GrafeoDB::new_in_memory();
5368 let session = db.session();
5369
5370 let id = session
5371 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5372
5373 let name = session.get_node_property(id, "name");
5374 assert_eq!(name, Some(Value::String("Alix".into())));
5375
5376 let missing = session.get_node_property(id, "missing");
5378 assert!(missing.is_none());
5379 }
5380
5381 #[test]
5382 fn test_get_edge() {
5383 let db = GrafeoDB::new_in_memory();
5384 let session = db.session();
5385
5386 let alix = session.create_node(&["Person"]);
5387 let gus = session.create_node(&["Person"]);
5388 let edge_id = session.create_edge(alix, gus, "KNOWS");
5389
5390 let edge = session.get_edge(edge_id);
5391 assert!(edge.is_some());
5392 let edge = edge.unwrap();
5393 assert_eq!(edge.id, edge_id);
5394 assert_eq!(edge.src, alix);
5395 assert_eq!(edge.dst, gus);
5396 }
5397
5398 #[test]
5399 fn test_get_edge_not_found() {
5400 use grafeo_common::types::EdgeId;
5401
5402 let db = GrafeoDB::new_in_memory();
5403 let session = db.session();
5404
5405 let edge = session.get_edge(EdgeId::new(9999));
5406 assert!(edge.is_none());
5407 }
5408
5409 #[test]
5410 fn test_get_neighbors_outgoing() {
5411 let db = GrafeoDB::new_in_memory();
5412 let session = db.session();
5413
5414 let alix = session.create_node(&["Person"]);
5415 let gus = session.create_node(&["Person"]);
5416 let harm = session.create_node(&["Person"]);
5417
5418 session.create_edge(alix, gus, "KNOWS");
5419 session.create_edge(alix, harm, "KNOWS");
5420
5421 let neighbors = session.get_neighbors_outgoing(alix);
5422 assert_eq!(neighbors.len(), 2);
5423
5424 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5425 assert!(neighbor_ids.contains(&gus));
5426 assert!(neighbor_ids.contains(&harm));
5427 }
5428
5429 #[test]
5430 fn test_get_neighbors_incoming() {
5431 let db = GrafeoDB::new_in_memory();
5432 let session = db.session();
5433
5434 let alix = session.create_node(&["Person"]);
5435 let gus = session.create_node(&["Person"]);
5436 let harm = session.create_node(&["Person"]);
5437
5438 session.create_edge(gus, alix, "KNOWS");
5439 session.create_edge(harm, alix, "KNOWS");
5440
5441 let neighbors = session.get_neighbors_incoming(alix);
5442 assert_eq!(neighbors.len(), 2);
5443
5444 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5445 assert!(neighbor_ids.contains(&gus));
5446 assert!(neighbor_ids.contains(&harm));
5447 }
5448
5449 #[test]
5450 fn test_get_neighbors_outgoing_by_type() {
5451 let db = GrafeoDB::new_in_memory();
5452 let session = db.session();
5453
5454 let alix = session.create_node(&["Person"]);
5455 let gus = session.create_node(&["Person"]);
5456 let company = session.create_node(&["Company"]);
5457
5458 session.create_edge(alix, gus, "KNOWS");
5459 session.create_edge(alix, company, "WORKS_AT");
5460
5461 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5462 assert_eq!(knows_neighbors.len(), 1);
5463 assert_eq!(knows_neighbors[0].0, gus);
5464
5465 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5466 assert_eq!(works_neighbors.len(), 1);
5467 assert_eq!(works_neighbors[0].0, company);
5468
5469 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5471 assert!(no_neighbors.is_empty());
5472 }
5473
5474 #[test]
5475 fn test_node_exists() {
5476 use grafeo_common::types::NodeId;
5477
5478 let db = GrafeoDB::new_in_memory();
5479 let session = db.session();
5480
5481 let id = session.create_node(&["Person"]);
5482
5483 assert!(session.node_exists(id));
5484 assert!(!session.node_exists(NodeId::new(9999)));
5485 }
5486
5487 #[test]
5488 fn test_edge_exists() {
5489 use grafeo_common::types::EdgeId;
5490
5491 let db = GrafeoDB::new_in_memory();
5492 let session = db.session();
5493
5494 let alix = session.create_node(&["Person"]);
5495 let gus = session.create_node(&["Person"]);
5496 let edge_id = session.create_edge(alix, gus, "KNOWS");
5497
5498 assert!(session.edge_exists(edge_id));
5499 assert!(!session.edge_exists(EdgeId::new(9999)));
5500 }
5501
5502 #[test]
5503 fn test_get_degree() {
5504 let db = GrafeoDB::new_in_memory();
5505 let session = db.session();
5506
5507 let alix = session.create_node(&["Person"]);
5508 let gus = session.create_node(&["Person"]);
5509 let harm = session.create_node(&["Person"]);
5510
5511 session.create_edge(alix, gus, "KNOWS");
5513 session.create_edge(alix, harm, "KNOWS");
5514 session.create_edge(gus, alix, "KNOWS");
5516
5517 let (out_degree, in_degree) = session.get_degree(alix);
5518 assert_eq!(out_degree, 2);
5519 assert_eq!(in_degree, 1);
5520
5521 let lonely = session.create_node(&["Person"]);
5523 let (out, in_deg) = session.get_degree(lonely);
5524 assert_eq!(out, 0);
5525 assert_eq!(in_deg, 0);
5526 }
5527
5528 #[test]
5529 fn test_get_nodes_batch() {
5530 let db = GrafeoDB::new_in_memory();
5531 let session = db.session();
5532
5533 let alix = session.create_node(&["Person"]);
5534 let gus = session.create_node(&["Person"]);
5535 let harm = session.create_node(&["Person"]);
5536
5537 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5538 assert_eq!(nodes.len(), 3);
5539 assert!(nodes[0].is_some());
5540 assert!(nodes[1].is_some());
5541 assert!(nodes[2].is_some());
5542
5543 use grafeo_common::types::NodeId;
5545 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5546 assert_eq!(nodes_with_missing.len(), 3);
5547 assert!(nodes_with_missing[0].is_some());
5548 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
5550 }
5551
5552 #[test]
5553 fn test_auto_commit_setting() {
5554 let db = GrafeoDB::new_in_memory();
5555 let mut session = db.session();
5556
5557 assert!(session.auto_commit());
5559
5560 session.set_auto_commit(false);
5561 assert!(!session.auto_commit());
5562
5563 session.set_auto_commit(true);
5564 assert!(session.auto_commit());
5565 }
5566
5567 #[test]
5568 fn test_transaction_double_begin_nests() {
5569 let db = GrafeoDB::new_in_memory();
5570 let mut session = db.session();
5571
5572 session.begin_transaction().unwrap();
5573 let result = session.begin_transaction();
5575 assert!(result.is_ok());
5576 session.commit().unwrap();
5578 session.commit().unwrap();
5580 }
5581
5582 #[test]
5583 fn test_commit_without_transaction_error() {
5584 let db = GrafeoDB::new_in_memory();
5585 let mut session = db.session();
5586
5587 let result = session.commit();
5588 assert!(result.is_err());
5589 }
5590
5591 #[test]
5592 fn test_rollback_without_transaction_error() {
5593 let db = GrafeoDB::new_in_memory();
5594 let mut session = db.session();
5595
5596 let result = session.rollback();
5597 assert!(result.is_err());
5598 }
5599
5600 #[test]
5601 fn test_create_edge_in_transaction() {
5602 let db = GrafeoDB::new_in_memory();
5603 let mut session = db.session();
5604
5605 let alix = session.create_node(&["Person"]);
5607 let gus = session.create_node(&["Person"]);
5608
5609 session.begin_transaction().unwrap();
5611 let edge_id = session.create_edge(alix, gus, "KNOWS");
5612
5613 assert!(session.edge_exists(edge_id));
5615
5616 session.commit().unwrap();
5618
5619 assert!(session.edge_exists(edge_id));
5621 }
5622
5623 #[test]
5624 fn test_neighbors_empty_node() {
5625 let db = GrafeoDB::new_in_memory();
5626 let session = db.session();
5627
5628 let lonely = session.create_node(&["Person"]);
5629
5630 assert!(session.get_neighbors_outgoing(lonely).is_empty());
5631 assert!(session.get_neighbors_incoming(lonely).is_empty());
5632 assert!(
5633 session
5634 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5635 .is_empty()
5636 );
5637 }
5638 }
5639
5640 #[test]
5641 fn test_auto_gc_triggers_on_commit_interval() {
5642 use crate::config::Config;
5643
5644 let config = Config::in_memory().with_gc_interval(2);
5645 let db = GrafeoDB::with_config(config).unwrap();
5646 let mut session = db.session();
5647
5648 session.begin_transaction().unwrap();
5650 session.create_node(&["A"]);
5651 session.commit().unwrap();
5652
5653 session.begin_transaction().unwrap();
5655 session.create_node(&["B"]);
5656 session.commit().unwrap();
5657
5658 assert_eq!(db.node_count(), 2);
5660 }
5661
5662 #[test]
5663 fn test_query_timeout_config_propagates_to_session() {
5664 use crate::config::Config;
5665 use std::time::Duration;
5666
5667 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5668 let db = GrafeoDB::with_config(config).unwrap();
5669 let session = db.session();
5670
5671 assert!(session.query_deadline().is_some());
5673 }
5674
5675 #[test]
5676 fn test_no_query_timeout_returns_no_deadline() {
5677 let db = GrafeoDB::new_in_memory();
5678 let session = db.session();
5679
5680 assert!(session.query_deadline().is_none());
5682 }
5683
5684 #[test]
5685 fn test_graph_model_accessor() {
5686 use crate::config::GraphModel;
5687
5688 let db = GrafeoDB::new_in_memory();
5689 let session = db.session();
5690
5691 assert_eq!(session.graph_model(), GraphModel::Lpg);
5692 }
5693
5694 #[cfg(feature = "gql")]
5695 #[test]
5696 fn test_external_store_session() {
5697 use grafeo_core::graph::GraphStoreMut;
5698 use std::sync::Arc;
5699
5700 let config = crate::config::Config::in_memory();
5701 let store =
5702 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5703 let db = GrafeoDB::with_store(store, config).unwrap();
5704
5705 let mut session = db.session();
5706
5707 session.begin_transaction().unwrap();
5711 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5712
5713 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5715 assert_eq!(result.row_count(), 1);
5716
5717 session.commit().unwrap();
5718 }
5719
5720 #[cfg(feature = "gql")]
5723 mod session_command_tests {
5724 use super::*;
5725 use grafeo_common::types::Value;
5726
5727 #[test]
5728 fn test_use_graph_sets_current_graph() {
5729 let db = GrafeoDB::new_in_memory();
5730 let session = db.session();
5731
5732 session.execute("CREATE GRAPH mydb").unwrap();
5734 session.execute("USE GRAPH mydb").unwrap();
5735
5736 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5737 }
5738
5739 #[test]
5740 fn test_use_graph_nonexistent_errors() {
5741 let db = GrafeoDB::new_in_memory();
5742 let session = db.session();
5743
5744 let result = session.execute("USE GRAPH doesnotexist");
5745 assert!(result.is_err());
5746 let err = result.unwrap_err().to_string();
5747 assert!(
5748 err.contains("does not exist"),
5749 "Expected 'does not exist' error, got: {err}"
5750 );
5751 }
5752
5753 #[test]
5754 fn test_use_graph_default_always_valid() {
5755 let db = GrafeoDB::new_in_memory();
5756 let session = db.session();
5757
5758 session.execute("USE GRAPH default").unwrap();
5760 assert_eq!(session.current_graph(), Some("default".to_string()));
5761 }
5762
5763 #[test]
5764 fn test_session_set_graph() {
5765 let db = GrafeoDB::new_in_memory();
5766 let session = db.session();
5767
5768 session.execute("CREATE GRAPH analytics").unwrap();
5769 session.execute("SESSION SET GRAPH analytics").unwrap();
5770 assert_eq!(session.current_graph(), Some("analytics".to_string()));
5771 }
5772
5773 #[test]
5774 fn test_session_set_graph_nonexistent_errors() {
5775 let db = GrafeoDB::new_in_memory();
5776 let session = db.session();
5777
5778 let result = session.execute("SESSION SET GRAPH nosuchgraph");
5779 assert!(result.is_err());
5780 }
5781
5782 #[test]
5783 fn test_session_set_time_zone() {
5784 let db = GrafeoDB::new_in_memory();
5785 let session = db.session();
5786
5787 assert_eq!(session.time_zone(), None);
5788
5789 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5790 assert_eq!(session.time_zone(), Some("UTC".to_string()));
5791
5792 session
5793 .execute("SESSION SET TIME ZONE 'America/New_York'")
5794 .unwrap();
5795 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5796 }
5797
5798 #[test]
5799 fn test_session_set_parameter() {
5800 let db = GrafeoDB::new_in_memory();
5801 let session = db.session();
5802
5803 session
5804 .execute("SESSION SET PARAMETER $timeout = 30")
5805 .unwrap();
5806
5807 assert!(session.get_parameter("timeout").is_some());
5810 }
5811
5812 #[test]
5813 fn test_session_reset_clears_all_state() {
5814 let db = GrafeoDB::new_in_memory();
5815 let session = db.session();
5816
5817 session.execute("CREATE GRAPH analytics").unwrap();
5819 session.execute("SESSION SET GRAPH analytics").unwrap();
5820 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5821 session
5822 .execute("SESSION SET PARAMETER $limit = 100")
5823 .unwrap();
5824
5825 assert!(session.current_graph().is_some());
5827 assert!(session.time_zone().is_some());
5828 assert!(session.get_parameter("limit").is_some());
5829
5830 session.execute("SESSION RESET").unwrap();
5832
5833 assert_eq!(session.current_graph(), None);
5834 assert_eq!(session.time_zone(), None);
5835 assert!(session.get_parameter("limit").is_none());
5836 }
5837
5838 #[test]
5839 fn test_session_close_clears_state() {
5840 let db = GrafeoDB::new_in_memory();
5841 let session = db.session();
5842
5843 session.execute("CREATE GRAPH analytics").unwrap();
5844 session.execute("SESSION SET GRAPH analytics").unwrap();
5845 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5846
5847 session.execute("SESSION CLOSE").unwrap();
5848
5849 assert_eq!(session.current_graph(), None);
5850 assert_eq!(session.time_zone(), None);
5851 }
5852
5853 #[test]
5854 fn test_create_graph() {
5855 let db = GrafeoDB::new_in_memory();
5856 let session = db.session();
5857
5858 session.execute("CREATE GRAPH mydb").unwrap();
5859
5860 session.execute("USE GRAPH mydb").unwrap();
5862 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5863 }
5864
5865 #[test]
5866 fn test_create_graph_duplicate_errors() {
5867 let db = GrafeoDB::new_in_memory();
5868 let session = db.session();
5869
5870 session.execute("CREATE GRAPH mydb").unwrap();
5871 let result = session.execute("CREATE GRAPH mydb");
5872
5873 assert!(result.is_err());
5874 let err = result.unwrap_err().to_string();
5875 assert!(
5876 err.contains("already exists"),
5877 "Expected 'already exists' error, got: {err}"
5878 );
5879 }
5880
5881 #[test]
5882 fn test_create_graph_if_not_exists() {
5883 let db = GrafeoDB::new_in_memory();
5884 let session = db.session();
5885
5886 session.execute("CREATE GRAPH mydb").unwrap();
5887 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5889 }
5890
5891 #[test]
5892 fn test_drop_graph() {
5893 let db = GrafeoDB::new_in_memory();
5894 let session = db.session();
5895
5896 session.execute("CREATE GRAPH mydb").unwrap();
5897 session.execute("DROP GRAPH mydb").unwrap();
5898
5899 let result = session.execute("USE GRAPH mydb");
5901 assert!(result.is_err());
5902 }
5903
5904 #[test]
5905 fn test_drop_graph_nonexistent_errors() {
5906 let db = GrafeoDB::new_in_memory();
5907 let session = db.session();
5908
5909 let result = session.execute("DROP GRAPH nosuchgraph");
5910 assert!(result.is_err());
5911 let err = result.unwrap_err().to_string();
5912 assert!(
5913 err.contains("does not exist"),
5914 "Expected 'does not exist' error, got: {err}"
5915 );
5916 }
5917
5918 #[test]
5919 fn test_drop_graph_if_exists() {
5920 let db = GrafeoDB::new_in_memory();
5921 let session = db.session();
5922
5923 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5925 }
5926
5927 #[test]
5928 fn test_start_transaction_via_gql() {
5929 let db = GrafeoDB::new_in_memory();
5930 let session = db.session();
5931
5932 session.execute("START TRANSACTION").unwrap();
5933 assert!(session.in_transaction());
5934 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5935 session.execute("COMMIT").unwrap();
5936 assert!(!session.in_transaction());
5937
5938 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5939 assert_eq!(result.rows.len(), 1);
5940 }
5941
5942 #[test]
5943 fn test_start_transaction_read_only_blocks_insert() {
5944 let db = GrafeoDB::new_in_memory();
5945 let session = db.session();
5946
5947 session.execute("START TRANSACTION READ ONLY").unwrap();
5948 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5949 assert!(result.is_err());
5950 let err = result.unwrap_err().to_string();
5951 assert!(
5952 err.contains("read-only"),
5953 "Expected read-only error, got: {err}"
5954 );
5955 session.execute("ROLLBACK").unwrap();
5956 }
5957
5958 #[test]
5959 fn test_start_transaction_read_only_allows_reads() {
5960 let db = GrafeoDB::new_in_memory();
5961 let mut session = db.session();
5962 session.begin_transaction().unwrap();
5963 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5964 session.commit().unwrap();
5965
5966 session.execute("START TRANSACTION READ ONLY").unwrap();
5967 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5968 assert_eq!(result.rows.len(), 1);
5969 session.execute("COMMIT").unwrap();
5970 }
5971
5972 #[test]
5973 fn test_rollback_via_gql() {
5974 let db = GrafeoDB::new_in_memory();
5975 let session = db.session();
5976
5977 session.execute("START TRANSACTION").unwrap();
5978 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5979 session.execute("ROLLBACK").unwrap();
5980
5981 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5982 assert!(result.rows.is_empty());
5983 }
5984
5985 #[test]
5986 fn test_start_transaction_with_isolation_level() {
5987 let db = GrafeoDB::new_in_memory();
5988 let session = db.session();
5989
5990 session
5991 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5992 .unwrap();
5993 assert!(session.in_transaction());
5994 session.execute("ROLLBACK").unwrap();
5995 }
5996
5997 #[test]
5998 fn test_session_commands_return_empty_result() {
5999 let db = GrafeoDB::new_in_memory();
6000 let session = db.session();
6001
6002 session.execute("CREATE GRAPH test").unwrap();
6003 let result = session.execute("SESSION SET GRAPH test").unwrap();
6004 assert_eq!(result.row_count(), 0);
6005 assert_eq!(result.column_count(), 0);
6006 }
6007
6008 #[test]
6009 fn test_current_graph_default_is_none() {
6010 let db = GrafeoDB::new_in_memory();
6011 let session = db.session();
6012
6013 assert_eq!(session.current_graph(), None);
6014 }
6015
6016 #[test]
6017 fn test_time_zone_default_is_none() {
6018 let db = GrafeoDB::new_in_memory();
6019 let session = db.session();
6020
6021 assert_eq!(session.time_zone(), None);
6022 }
6023
6024 #[test]
6025 fn test_session_state_independent_across_sessions() {
6026 let db = GrafeoDB::new_in_memory();
6027 let session1 = db.session();
6028 let session2 = db.session();
6029
6030 session1.execute("CREATE GRAPH first").unwrap();
6031 session1.execute("CREATE GRAPH second").unwrap();
6032 session1.execute("SESSION SET GRAPH first").unwrap();
6033 session2.execute("SESSION SET GRAPH second").unwrap();
6034
6035 assert_eq!(session1.current_graph(), Some("first".to_string()));
6036 assert_eq!(session2.current_graph(), Some("second".to_string()));
6037 }
6038
6039 #[test]
6040 fn test_show_node_types() {
6041 let db = GrafeoDB::new_in_memory();
6042 let session = db.session();
6043
6044 session
6045 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6046 .unwrap();
6047
6048 let result = session.execute("SHOW NODE TYPES").unwrap();
6049 assert_eq!(
6050 result.columns,
6051 vec!["name", "properties", "constraints", "parents"]
6052 );
6053 assert_eq!(result.rows.len(), 1);
6054 assert_eq!(result.rows[0][0], Value::from("Person"));
6056 }
6057
6058 #[test]
6059 fn test_show_edge_types() {
6060 let db = GrafeoDB::new_in_memory();
6061 let session = db.session();
6062
6063 session
6064 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6065 .unwrap();
6066
6067 let result = session.execute("SHOW EDGE TYPES").unwrap();
6068 assert_eq!(
6069 result.columns,
6070 vec!["name", "properties", "source_types", "target_types"]
6071 );
6072 assert_eq!(result.rows.len(), 1);
6073 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6074 }
6075
6076 #[test]
6077 fn test_show_graph_types() {
6078 let db = GrafeoDB::new_in_memory();
6079 let session = db.session();
6080
6081 session
6082 .execute("CREATE NODE TYPE Person (name STRING)")
6083 .unwrap();
6084 session
6085 .execute(
6086 "CREATE GRAPH TYPE social (\
6087 NODE TYPE Person (name STRING)\
6088 )",
6089 )
6090 .unwrap();
6091
6092 let result = session.execute("SHOW GRAPH TYPES").unwrap();
6093 assert_eq!(
6094 result.columns,
6095 vec!["name", "open", "node_types", "edge_types"]
6096 );
6097 assert_eq!(result.rows.len(), 1);
6098 assert_eq!(result.rows[0][0], Value::from("social"));
6099 }
6100
6101 #[test]
6102 fn test_show_graph_type_named() {
6103 let db = GrafeoDB::new_in_memory();
6104 let session = db.session();
6105
6106 session
6107 .execute("CREATE NODE TYPE Person (name STRING)")
6108 .unwrap();
6109 session
6110 .execute(
6111 "CREATE GRAPH TYPE social (\
6112 NODE TYPE Person (name STRING)\
6113 )",
6114 )
6115 .unwrap();
6116
6117 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6118 assert_eq!(result.rows.len(), 1);
6119 assert_eq!(result.rows[0][0], Value::from("social"));
6120 }
6121
6122 #[test]
6123 fn test_show_graph_type_not_found() {
6124 let db = GrafeoDB::new_in_memory();
6125 let session = db.session();
6126
6127 let result = session.execute("SHOW GRAPH TYPE nonexistent");
6128 assert!(result.is_err());
6129 }
6130
6131 #[test]
6132 fn test_show_indexes_via_gql() {
6133 let db = GrafeoDB::new_in_memory();
6134 let session = db.session();
6135
6136 let result = session.execute("SHOW INDEXES").unwrap();
6137 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6138 }
6139
6140 #[test]
6141 fn test_show_constraints_via_gql() {
6142 let db = GrafeoDB::new_in_memory();
6143 let session = db.session();
6144
6145 let result = session.execute("SHOW CONSTRAINTS").unwrap();
6146 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6147 }
6148
6149 #[test]
6150 fn test_pattern_form_graph_type_roundtrip() {
6151 let db = GrafeoDB::new_in_memory();
6152 let session = db.session();
6153
6154 session
6156 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6157 .unwrap();
6158 session
6159 .execute("CREATE NODE TYPE City (name STRING)")
6160 .unwrap();
6161 session
6162 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6163 .unwrap();
6164 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6165
6166 session
6168 .execute(
6169 "CREATE GRAPH TYPE social (\
6170 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6171 (:Person)-[:LIVES_IN]->(:City)\
6172 )",
6173 )
6174 .unwrap();
6175
6176 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6178 assert_eq!(result.rows.len(), 1);
6179 assert_eq!(result.rows[0][0], Value::from("social"));
6180 }
6181 }
6182}