1#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::cache::QueryCache;
37use crate::transaction::TransactionManager;
38
39const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
42
43fn parse_default_literal(text: &str) -> Value {
48 if text.eq_ignore_ascii_case("null") {
49 return Value::Null;
50 }
51 if text.eq_ignore_ascii_case("true") {
52 return Value::Bool(true);
53 }
54 if text.eq_ignore_ascii_case("false") {
55 return Value::Bool(false);
56 }
57 if (text.starts_with('\'') && text.ends_with('\''))
59 || (text.starts_with('"') && text.ends_with('"'))
60 {
61 return Value::String(text[1..text.len() - 1].into());
62 }
63 if let Ok(i) = text.parse::<i64>() {
65 return Value::Int64(i);
66 }
67 if let Ok(f) = text.parse::<f64>() {
68 return Value::Float64(f);
69 }
70 Value::String(text.into())
72}
73
74pub(crate) struct SessionConfig {
79 pub transaction_manager: Arc<TransactionManager>,
80 pub query_cache: Arc<QueryCache>,
81 pub catalog: Arc<Catalog>,
82 pub adaptive_config: AdaptiveConfig,
83 pub factorized_execution: bool,
84 pub graph_model: GraphModel,
85 pub query_timeout: Option<Duration>,
86 pub commit_counter: Arc<AtomicUsize>,
87 pub gc_interval: usize,
88 pub read_only: bool,
90 pub identity: crate::auth::Identity,
92 #[cfg(feature = "lpg")]
94 pub projections: Arc<
95 parking_lot::RwLock<
96 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
97 >,
98 >,
99}
100
101pub struct Session {
107 #[cfg(feature = "lpg")]
109 store: Arc<LpgStore>,
110 graph_store: Arc<dyn GraphStore>,
112 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
114 catalog: Arc<Catalog>,
116 #[cfg(feature = "triple-store")]
118 rdf_store: Arc<RdfStore>,
119 transaction_manager: Arc<TransactionManager>,
121 query_cache: Arc<QueryCache>,
123 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
127 read_only_tx: parking_lot::Mutex<bool>,
129 db_read_only: bool,
132 identity: crate::auth::Identity,
134 auto_commit: bool,
136 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
139 factorized_execution: bool,
141 graph_model: GraphModel,
143 query_timeout: Option<Duration>,
145 commit_counter: Arc<AtomicUsize>,
147 gc_interval: usize,
149 transaction_start_node_count: AtomicUsize,
151 transaction_start_edge_count: AtomicUsize,
153 #[cfg(feature = "wal")]
155 wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
156 #[cfg(feature = "wal")]
158 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
159 #[cfg(feature = "cdc")]
161 cdc_log: Arc<crate::cdc::CdcLog>,
162 #[cfg(feature = "cdc")]
165 cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
166 current_graph: parking_lot::Mutex<Option<String>>,
168 current_schema: parking_lot::Mutex<Option<String>>,
171 time_zone: parking_lot::Mutex<Option<String>>,
173 session_params:
175 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
176 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
178 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
180 transaction_nesting_depth: parking_lot::Mutex<u32>,
184 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
188 #[cfg(feature = "metrics")]
190 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
191 #[cfg(feature = "metrics")]
193 tx_start_time: parking_lot::Mutex<Option<Instant>>,
194 #[cfg(feature = "lpg")]
196 projections: Arc<
197 parking_lot::RwLock<
198 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
199 >,
200 >,
201}
202
203#[derive(Clone)]
205struct GraphSavepoint {
206 graph_name: Option<String>,
207 next_node_id: u64,
208 next_edge_id: u64,
209 undo_log_position: usize,
210}
211
212#[derive(Clone)]
214struct SavepointState {
215 name: String,
216 graph_snapshots: Vec<GraphSavepoint>,
217 #[allow(dead_code)]
220 active_graph: Option<String>,
221 #[cfg(feature = "cdc")]
224 cdc_event_position: usize,
225}
226
227impl Session {
228 #[cfg(feature = "lpg")]
230 #[allow(dead_code)] pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
232 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
233 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
234 Self {
235 store,
236 graph_store,
237 graph_store_mut,
238 catalog: cfg.catalog,
239 #[cfg(feature = "triple-store")]
240 rdf_store: Arc::new(RdfStore::new()),
241 transaction_manager: cfg.transaction_manager,
242 query_cache: cfg.query_cache,
243 current_transaction: parking_lot::Mutex::new(None),
244 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
245 db_read_only: cfg.read_only,
246 identity: cfg.identity,
247 auto_commit: true,
248 adaptive_config: cfg.adaptive_config,
249 factorized_execution: cfg.factorized_execution,
250 graph_model: cfg.graph_model,
251 query_timeout: cfg.query_timeout,
252 commit_counter: cfg.commit_counter,
253 gc_interval: cfg.gc_interval,
254 transaction_start_node_count: AtomicUsize::new(0),
255 transaction_start_edge_count: AtomicUsize::new(0),
256 #[cfg(feature = "wal")]
257 wal: None,
258 #[cfg(feature = "wal")]
259 wal_graph_context: None,
260 #[cfg(feature = "cdc")]
261 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
262 #[cfg(feature = "cdc")]
263 cdc_pending_events: None,
264 current_graph: parking_lot::Mutex::new(None),
265 current_schema: parking_lot::Mutex::new(None),
266 time_zone: parking_lot::Mutex::new(None),
267 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
268 viewing_epoch_override: parking_lot::Mutex::new(None),
269 savepoints: parking_lot::Mutex::new(Vec::new()),
270 transaction_nesting_depth: parking_lot::Mutex::new(0),
271 touched_graphs: parking_lot::Mutex::new(Vec::new()),
272 #[cfg(feature = "metrics")]
273 metrics: None,
274 #[cfg(feature = "metrics")]
275 tx_start_time: parking_lot::Mutex::new(None),
276 projections: cfg.projections,
277 }
278 }
279
280 #[cfg(all(feature = "wal", feature = "lpg"))]
285 pub(crate) fn set_wal(
286 &mut self,
287 wal: Arc<grafeo_storage::wal::LpgWal>,
288 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
289 ) {
290 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
292 Arc::clone(&self.store),
293 Arc::clone(&wal),
294 Arc::clone(&wal_graph_context),
295 ));
296 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
297 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
298 self.wal = Some(wal);
299 self.wal_graph_context = Some(wal_graph_context);
300 }
301
302 #[cfg(feature = "cdc")]
309 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
310 if let Some(ref write_store) = self.graph_store_mut {
313 let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
314 Arc::clone(write_store),
315 Arc::clone(&cdc_log),
316 ));
317 self.cdc_pending_events = Some(cdc_store.pending_events());
318 self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
319 }
320 self.cdc_log = cdc_log;
321 }
322
323 #[cfg(feature = "metrics")]
325 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
326 self.metrics = Some(metrics);
327 }
328
329 pub(crate) fn with_external_store(
338 read_store: Arc<dyn GraphStore>,
339 write_store: Option<Arc<dyn GraphStoreMut>>,
340 cfg: SessionConfig,
341 ) -> Result<Self> {
342 Ok(Self {
343 #[cfg(feature = "lpg")]
344 store: Arc::new(LpgStore::new()?),
345 graph_store: read_store,
346 graph_store_mut: write_store,
347 catalog: cfg.catalog,
348 #[cfg(feature = "triple-store")]
349 rdf_store: Arc::new(RdfStore::new()),
350 transaction_manager: cfg.transaction_manager,
351 query_cache: cfg.query_cache,
352 current_transaction: parking_lot::Mutex::new(None),
353 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
354 db_read_only: cfg.read_only,
355 identity: cfg.identity,
356 auto_commit: true,
357 adaptive_config: cfg.adaptive_config,
358 factorized_execution: cfg.factorized_execution,
359 graph_model: cfg.graph_model,
360 query_timeout: cfg.query_timeout,
361 commit_counter: cfg.commit_counter,
362 gc_interval: cfg.gc_interval,
363 transaction_start_node_count: AtomicUsize::new(0),
364 transaction_start_edge_count: AtomicUsize::new(0),
365 #[cfg(feature = "wal")]
366 wal: None,
367 #[cfg(feature = "wal")]
368 wal_graph_context: None,
369 #[cfg(feature = "cdc")]
370 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
371 #[cfg(feature = "cdc")]
372 cdc_pending_events: None,
373 current_graph: parking_lot::Mutex::new(None),
374 current_schema: parking_lot::Mutex::new(None),
375 time_zone: parking_lot::Mutex::new(None),
376 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
377 viewing_epoch_override: parking_lot::Mutex::new(None),
378 savepoints: parking_lot::Mutex::new(Vec::new()),
379 transaction_nesting_depth: parking_lot::Mutex::new(0),
380 touched_graphs: parking_lot::Mutex::new(Vec::new()),
381 #[cfg(feature = "metrics")]
382 metrics: None,
383 #[cfg(feature = "metrics")]
384 tx_start_time: parking_lot::Mutex::new(None),
385 #[cfg(feature = "lpg")]
386 projections: cfg.projections,
387 })
388 }
389
390 #[must_use]
392 pub fn graph_model(&self) -> GraphModel {
393 self.graph_model
394 }
395
396 #[must_use]
398 pub fn identity(&self) -> &crate::auth::Identity {
399 &self.identity
400 }
401
402 pub fn use_graph(&self, name: &str) {
406 *self.current_graph.lock() = Some(name.to_string());
407 }
408
409 #[must_use]
411 pub fn current_graph(&self) -> Option<String> {
412 self.current_graph.lock().clone()
413 }
414
415 pub fn set_schema(&self, name: &str) {
419 *self.current_schema.lock() = Some(name.to_string());
420 }
421
422 #[must_use]
426 pub fn current_schema(&self) -> Option<String> {
427 self.current_schema.lock().clone()
428 }
429
430 fn effective_graph_key(&self, graph_name: &str) -> String {
435 let schema = self.current_schema.lock().clone();
436 match schema {
437 Some(s) => format!("{s}/{graph_name}"),
438 None => graph_name.to_string(),
439 }
440 }
441
442 fn effective_type_key(&self, type_name: &str) -> String {
446 let schema = self.current_schema.lock().clone();
447 match schema {
448 Some(s) => format!("{s}/{type_name}"),
449 None => type_name.to_string(),
450 }
451 }
452
453 fn active_graph_storage_key(&self) -> Option<String> {
457 let graph = self.current_graph.lock().clone();
458 let schema = self.current_schema.lock().clone();
459 match (&schema, &graph) {
460 (None, None) => None,
461 (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
462 (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
463 (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
464 Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
465 }
466 (None, Some(name)) => Some(name.clone()),
467 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
468 }
469 }
470
471 fn active_store(&self) -> Arc<dyn GraphStore> {
479 let key = self.active_graph_storage_key();
480 match key {
481 None => Arc::clone(&self.graph_store),
482 #[cfg(feature = "lpg")]
483 Some(ref name) => match self.store.graph(name) {
484 Some(named_store) => {
485 #[cfg(feature = "wal")]
486 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
487 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
488 named_store,
489 Arc::clone(wal),
490 name.clone(),
491 Arc::clone(ctx),
492 )) as Arc<dyn GraphStore>;
493 }
494 named_store as Arc<dyn GraphStore>
495 }
496 None => Arc::clone(&self.graph_store),
497 },
498 #[cfg(not(feature = "lpg"))]
499 Some(_) => Arc::clone(&self.graph_store),
500 }
501 }
502
503 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
508 let key = self.active_graph_storage_key();
509 match key {
510 None => self.graph_store_mut.as_ref().map(Arc::clone),
511 #[cfg(feature = "lpg")]
512 Some(ref name) => match self.store.graph(name) {
513 Some(named_store) => {
514 let mut store: Arc<dyn GraphStoreMut> = named_store;
515
516 #[cfg(feature = "wal")]
517 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
518 store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
519 self.store
521 .graph(name)
522 .unwrap_or_else(|| Arc::clone(&self.store)),
523 Arc::clone(wal),
524 name.clone(),
525 Arc::clone(ctx),
526 ));
527 }
528
529 #[cfg(feature = "cdc")]
530 if let Some(ref pending) = self.cdc_pending_events {
531 store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
532 store,
533 Arc::clone(&self.cdc_log),
534 Arc::clone(pending),
535 ));
536 }
537
538 Some(store)
539 }
540 None => self.graph_store_mut.as_ref().map(Arc::clone),
541 },
542 #[cfg(not(feature = "lpg"))]
543 Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
544 }
545 }
546
547 #[cfg(feature = "lpg")]
552 fn active_lpg_store(&self) -> Arc<LpgStore> {
553 let key = self.active_graph_storage_key();
554 match key {
555 None => Arc::clone(&self.store),
556 Some(ref name) => self
557 .store
558 .graph(name)
559 .unwrap_or_else(|| Arc::clone(&self.store)),
560 }
561 }
562
563 #[cfg(feature = "lpg")]
566 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
567 match graph_name {
568 None => Arc::clone(&self.store),
569 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
570 Some(name) => self
571 .store
572 .graph(name)
573 .unwrap_or_else(|| Arc::clone(&self.store)),
574 }
575 }
576
577 fn track_graph_touch(&self) {
582 if self.current_transaction.lock().is_some() {
583 let key = self.active_graph_storage_key();
584 let mut touched = self.touched_graphs.lock();
585 if !touched.contains(&key) {
586 touched.push(key);
587 }
588 }
589 }
590
591 pub fn set_time_zone(&self, tz: &str) {
593 *self.time_zone.lock() = Some(tz.to_string());
594 }
595
596 #[must_use]
598 pub fn time_zone(&self) -> Option<String> {
599 self.time_zone.lock().clone()
600 }
601
602 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
604 self.session_params.lock().insert(key.to_string(), value);
605 }
606
607 #[must_use]
609 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
610 self.session_params.lock().get(key).cloned()
611 }
612
613 pub fn reset_session(&self) {
615 *self.current_schema.lock() = None;
616 *self.current_graph.lock() = None;
617 *self.time_zone.lock() = None;
618 self.session_params.lock().clear();
619 *self.viewing_epoch_override.lock() = None;
620 }
621
622 pub fn reset_schema(&self) {
624 *self.current_schema.lock() = None;
625 }
626
627 pub fn reset_graph(&self) {
629 *self.current_graph.lock() = None;
630 }
631
632 pub fn reset_time_zone(&self) {
634 *self.time_zone.lock() = None;
635 }
636
637 pub fn reset_parameters(&self) {
639 self.session_params.lock().clear();
640 }
641
642 pub fn set_viewing_epoch(&self, epoch: EpochId) {
650 *self.viewing_epoch_override.lock() = Some(epoch);
651 }
652
653 pub fn clear_viewing_epoch(&self) {
655 *self.viewing_epoch_override.lock() = None;
656 }
657
658 #[must_use]
660 pub fn viewing_epoch(&self) -> Option<EpochId> {
661 *self.viewing_epoch_override.lock()
662 }
663
664 #[cfg(feature = "lpg")]
668 #[must_use]
669 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
670 self.active_lpg_store().get_node_history(id)
671 }
672
673 #[cfg(feature = "lpg")]
677 #[must_use]
678 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
679 self.active_lpg_store().get_edge_history(id)
680 }
681
682 fn require_lpg(&self, language: &str) -> Result<()> {
684 if self.graph_model == GraphModel::Rdf {
685 return Err(grafeo_common::utils::error::Error::Internal(format!(
686 "This is an RDF database. {language} queries require an LPG database."
687 )));
688 }
689 Ok(())
690 }
691
692 #[inline]
698 fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
699 if self.identity.can_admin() {
701 return Ok(());
702 }
703 crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
704 grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
705 grafeo_common::utils::error::QueryErrorKind::Semantic,
706 denied.to_string(),
707 ))
708 })
709 }
710
711 #[cfg(feature = "gql")]
713 fn execute_session_command(
714 &self,
715 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
716 ) -> Result<QueryResult> {
717 use grafeo_adapters::query::gql::ast::SessionCommand;
718 #[cfg(feature = "lpg")]
719 use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
720 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
721
722 match &cmd {
724 SessionCommand::CreateGraph { .. }
725 | SessionCommand::DropGraph { .. }
726 | SessionCommand::CreateProjection { .. }
727 | SessionCommand::DropProjection { .. } => {
728 self.require_permission(crate::auth::StatementKind::Write)?;
729 }
730 _ => {} }
732
733 if self.identity.has_grants() {
735 match &cmd {
736 SessionCommand::CreateGraph { name, .. }
737 | SessionCommand::DropGraph { name, .. } => {
738 if !self
739 .identity
740 .can_access_graph(name, crate::auth::Role::ReadWrite)
741 {
742 return Err(Error::Query(QueryError::new(
743 QueryErrorKind::Semantic,
744 format!(
745 "permission denied: no grant for graph '{name}' (user: {})",
746 self.identity.user_id()
747 ),
748 )));
749 }
750 }
751 _ => {}
752 }
753 }
754
755 if *self.read_only_tx.lock() {
757 match &cmd {
758 SessionCommand::CreateGraph { .. }
759 | SessionCommand::DropGraph { .. }
760 | SessionCommand::CreateProjection { .. }
761 | SessionCommand::DropProjection { .. } => {
762 return Err(Error::Transaction(
763 grafeo_common::utils::error::TransactionError::ReadOnly,
764 ));
765 }
766 _ => {} }
768 }
769
770 match cmd {
771 #[cfg(feature = "lpg")]
772 SessionCommand::CreateGraph {
773 name,
774 if_not_exists,
775 typed,
776 like_graph,
777 copy_of,
778 open: _,
779 } => {
780 let storage_key = self.effective_graph_key(&name);
782
783 if let Some(ref src) = like_graph {
785 let src_key = self.effective_graph_key(src);
786 if self.store.graph(&src_key).is_none() {
787 return Err(Error::Query(QueryError::new(
788 QueryErrorKind::Semantic,
789 format!("Source graph '{src}' does not exist"),
790 )));
791 }
792 }
793 if let Some(ref src) = copy_of {
794 let src_key = self.effective_graph_key(src);
795 if self.store.graph(&src_key).is_none() {
796 return Err(Error::Query(QueryError::new(
797 QueryErrorKind::Semantic,
798 format!("Source graph '{src}' does not exist"),
799 )));
800 }
801 }
802
803 let created = self
804 .store
805 .create_graph(&storage_key)
806 .map_err(|e| Error::Internal(e.to_string()))?;
807 if !created && !if_not_exists {
808 return Err(Error::Query(QueryError::new(
809 QueryErrorKind::Semantic,
810 format!("Graph '{name}' already exists"),
811 )));
812 }
813 if created {
814 #[cfg(feature = "wal")]
815 self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
816 name: storage_key.clone(),
817 });
818 }
819
820 if let Some(ref src) = copy_of {
822 let src_key = self.effective_graph_key(src);
823 self.store
824 .copy_graph(Some(&src_key), Some(&storage_key))
825 .map_err(|e| Error::Internal(e.to_string()))?;
826 }
827
828 if let Some(type_name) = typed
832 && let Err(e) = self.catalog.bind_graph_type(
833 &storage_key,
834 if type_name.contains('/') {
835 type_name.clone()
836 } else {
837 self.effective_type_key(&type_name)
838 },
839 )
840 {
841 return Err(Error::Query(QueryError::new(
842 QueryErrorKind::Semantic,
843 e.to_string(),
844 )));
845 }
846
847 if let Some(ref src) = like_graph {
849 let src_key = self.effective_graph_key(src);
850 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
851 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
852 }
853 }
854
855 Ok(QueryResult::empty())
856 }
857 #[cfg(feature = "lpg")]
858 SessionCommand::DropGraph { name, if_exists } => {
859 let storage_key = self.effective_graph_key(&name);
860 let dropped = self.store.drop_graph(&storage_key);
861 if !dropped && !if_exists {
862 return Err(Error::Query(QueryError::new(
863 QueryErrorKind::Semantic,
864 format!("Graph '{name}' does not exist"),
865 )));
866 }
867 if dropped {
868 #[cfg(feature = "wal")]
869 self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
870 name: storage_key.clone(),
871 });
872 let mut current = self.current_graph.lock();
874 if current
875 .as_deref()
876 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
877 {
878 *current = None;
879 }
880 }
881 Ok(QueryResult::empty())
882 }
883 #[cfg(feature = "lpg")]
884 SessionCommand::UseGraph(name) => {
885 if self.identity.has_grants()
887 && !name.eq_ignore_ascii_case("default")
888 && !self
889 .identity
890 .can_access_graph(&name, crate::auth::Role::ReadOnly)
891 {
892 return Err(Error::Query(QueryError::new(
893 QueryErrorKind::Semantic,
894 format!(
895 "permission denied: no grant for graph '{name}' (user: {})",
896 self.identity.user_id()
897 ),
898 )));
899 }
900 let effective_key = self.effective_graph_key(&name);
902 if !name.eq_ignore_ascii_case("default")
903 && self.store.graph(&effective_key).is_none()
904 {
905 return Err(Error::Query(QueryError::new(
906 QueryErrorKind::Semantic,
907 format!("Graph '{name}' does not exist"),
908 )));
909 }
910 self.use_graph(&name);
911 self.track_graph_touch();
913 Ok(QueryResult::empty())
914 }
915 #[cfg(feature = "lpg")]
916 SessionCommand::SessionSetGraph(name) => {
917 if self.identity.has_grants()
920 && !name.eq_ignore_ascii_case("default")
921 && !self
922 .identity
923 .can_access_graph(&name, crate::auth::Role::ReadOnly)
924 {
925 return Err(Error::Query(QueryError::new(
926 QueryErrorKind::Semantic,
927 format!(
928 "permission denied: no grant for graph '{name}' (user: {})",
929 self.identity.user_id()
930 ),
931 )));
932 }
933 let effective_key = self.effective_graph_key(&name);
934 if !name.eq_ignore_ascii_case("default")
935 && self.store.graph(&effective_key).is_none()
936 {
937 return Err(Error::Query(QueryError::new(
938 QueryErrorKind::Semantic,
939 format!("Graph '{name}' does not exist"),
940 )));
941 }
942 self.use_graph(&name);
943 self.track_graph_touch();
945 Ok(QueryResult::empty())
946 }
947 SessionCommand::SessionSetSchema(name) => {
948 if !self.catalog.schema_exists(&name) {
950 return Err(Error::Query(QueryError::new(
951 QueryErrorKind::Semantic,
952 format!("Schema '{name}' does not exist"),
953 )));
954 }
955 self.set_schema(&name);
956 Ok(QueryResult::empty())
957 }
958 SessionCommand::SessionSetTimeZone(tz) => {
959 self.set_time_zone(&tz);
960 Ok(QueryResult::empty())
961 }
962 SessionCommand::SessionSetParameter(key, expr) => {
963 if key.eq_ignore_ascii_case("viewing_epoch") {
964 match Self::eval_integer_literal(&expr) {
965 Some(n) if n >= 0 => {
966 self.set_viewing_epoch(EpochId::new(n as u64));
967 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
968 }
969 _ => Err(Error::Query(QueryError::new(
970 QueryErrorKind::Semantic,
971 "viewing_epoch must be a non-negative integer literal",
972 ))),
973 }
974 } else {
975 self.set_parameter(&key, Value::Null);
978 Ok(QueryResult::empty())
979 }
980 }
981 SessionCommand::SessionReset(target) => {
982 use grafeo_adapters::query::gql::ast::SessionResetTarget;
983 match target {
984 SessionResetTarget::All => self.reset_session(),
985 SessionResetTarget::Schema => self.reset_schema(),
986 SessionResetTarget::Graph => self.reset_graph(),
987 SessionResetTarget::TimeZone => self.reset_time_zone(),
988 SessionResetTarget::Parameters => self.reset_parameters(),
989 }
990 Ok(QueryResult::empty())
991 }
992 SessionCommand::SessionClose => {
993 self.reset_session();
994 Ok(QueryResult::empty())
995 }
996 #[cfg(feature = "lpg")]
997 SessionCommand::StartTransaction {
998 read_only,
999 isolation_level,
1000 } => {
1001 let engine_level = isolation_level.map(|l| match l {
1002 TransactionIsolationLevel::ReadCommitted => {
1003 crate::transaction::IsolationLevel::ReadCommitted
1004 }
1005 TransactionIsolationLevel::SnapshotIsolation => {
1006 crate::transaction::IsolationLevel::SnapshotIsolation
1007 }
1008 TransactionIsolationLevel::Serializable => {
1009 crate::transaction::IsolationLevel::Serializable
1010 }
1011 });
1012 self.begin_transaction_inner(read_only, engine_level)?;
1013 Ok(QueryResult::status("Transaction started"))
1014 }
1015 #[cfg(feature = "lpg")]
1016 SessionCommand::Commit => {
1017 self.commit_inner()?;
1018 Ok(QueryResult::status("Transaction committed"))
1019 }
1020 #[cfg(feature = "lpg")]
1021 SessionCommand::Rollback => {
1022 self.rollback_inner()?;
1023 Ok(QueryResult::status("Transaction rolled back"))
1024 }
1025 #[cfg(feature = "lpg")]
1026 SessionCommand::Savepoint(name) => {
1027 self.savepoint(&name)?;
1028 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1029 }
1030 #[cfg(feature = "lpg")]
1031 SessionCommand::RollbackToSavepoint(name) => {
1032 self.rollback_to_savepoint(&name)?;
1033 Ok(QueryResult::status(format!(
1034 "Rolled back to savepoint '{name}'"
1035 )))
1036 }
1037 #[cfg(feature = "lpg")]
1038 SessionCommand::ReleaseSavepoint(name) => {
1039 self.release_savepoint(&name)?;
1040 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1041 }
1042 #[cfg(feature = "lpg")]
1043 SessionCommand::CreateProjection {
1044 name,
1045 node_labels,
1046 edge_types,
1047 } => {
1048 use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1049 use std::collections::hash_map::Entry;
1050
1051 let spec = ProjectionSpec::new()
1052 .with_node_labels(node_labels)
1053 .with_edge_types(edge_types);
1054
1055 let store = self.active_store();
1056 let projection = Arc::new(GraphProjection::new(store, spec));
1057 let mut projections = self.projections.write();
1058 match projections.entry(name.clone()) {
1059 Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1060 QueryErrorKind::Semantic,
1061 format!("Projection '{name}' already exists"),
1062 ))),
1063 Entry::Vacant(e) => {
1064 e.insert(projection);
1065 Ok(QueryResult::status(format!("Projection '{name}' created")))
1066 }
1067 }
1068 }
1069 #[cfg(feature = "lpg")]
1070 SessionCommand::DropProjection { name } => {
1071 let removed = self.projections.write().remove(&name).is_some();
1072 if !removed {
1073 return Err(Error::Query(QueryError::new(
1074 QueryErrorKind::Semantic,
1075 format!("Projection '{name}' does not exist"),
1076 )));
1077 }
1078 Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1079 }
1080 #[cfg(feature = "lpg")]
1081 SessionCommand::ShowProjections => {
1082 let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1083 names.sort();
1084 let rows: Vec<Vec<Value>> =
1085 names.into_iter().map(|n| vec![Value::from(n)]).collect();
1086 Ok(QueryResult {
1087 columns: vec!["name".to_string()],
1088 column_types: Vec::new(),
1089 rows,
1090 ..QueryResult::empty()
1091 })
1092 }
1093 #[cfg(not(feature = "lpg"))]
1094 _ => Err(grafeo_common::utils::error::Error::Internal(
1095 "This command requires the `lpg` feature".to_string(),
1096 )),
1097 }
1098 }
1099
1100 #[cfg(feature = "wal")]
1102 fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1103 if let Some(ref wal) = self.wal
1104 && let Err(e) = wal.log(record)
1105 {
1106 grafeo_warn!("Failed to log schema change to WAL: {}", e);
1107 }
1108 }
1109
1110 #[cfg(all(feature = "lpg", feature = "gql"))]
1112 fn execute_schema_command(
1113 &self,
1114 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1115 ) -> Result<QueryResult> {
1116 use crate::catalog::{
1117 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1118 };
1119 use grafeo_adapters::query::gql::ast::SchemaStatement;
1120 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1121 #[cfg(feature = "wal")]
1122 use grafeo_storage::wal::WalRecord;
1123
1124 macro_rules! wal_log {
1126 ($self:expr, $record:expr) => {
1127 #[cfg(feature = "wal")]
1128 $self.log_schema_wal(&$record);
1129 };
1130 }
1131
1132 let result = match cmd {
1133 SchemaStatement::CreateNodeType(stmt) => {
1134 let effective_name = self.effective_type_key(&stmt.name);
1135 #[cfg(feature = "wal")]
1136 let props_for_wal: Vec<(String, String, bool)> = stmt
1137 .properties
1138 .iter()
1139 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1140 .collect();
1141 let def = NodeTypeDefinition {
1142 name: effective_name.clone(),
1143 properties: stmt
1144 .properties
1145 .iter()
1146 .map(|p| TypedProperty {
1147 name: p.name.clone(),
1148 data_type: PropertyDataType::from_type_name(&p.data_type),
1149 nullable: p.nullable,
1150 default_value: p
1151 .default_value
1152 .as_ref()
1153 .map(|s| parse_default_literal(s)),
1154 })
1155 .collect(),
1156 constraints: Vec::new(),
1157 parent_types: stmt.parent_types.clone(),
1158 };
1159 let result = if stmt.or_replace {
1160 let _ = self.catalog.drop_node_type(&effective_name);
1161 self.catalog.register_node_type(def)
1162 } else {
1163 self.catalog.register_node_type(def)
1164 };
1165 match result {
1166 Ok(()) => {
1167 wal_log!(
1168 self,
1169 WalRecord::CreateNodeType {
1170 name: effective_name.clone(),
1171 properties: props_for_wal,
1172 constraints: Vec::new(),
1173 }
1174 );
1175 Ok(QueryResult::status(format!(
1176 "Created node type '{}'",
1177 stmt.name
1178 )))
1179 }
1180 Err(e) if stmt.if_not_exists => {
1181 let _ = e;
1182 Ok(QueryResult::status("No change"))
1183 }
1184 Err(e) => Err(Error::Query(QueryError::new(
1185 QueryErrorKind::Semantic,
1186 e.to_string(),
1187 ))),
1188 }
1189 }
1190 SchemaStatement::CreateEdgeType(stmt) => {
1191 let effective_name = self.effective_type_key(&stmt.name);
1192 #[cfg(feature = "wal")]
1193 let props_for_wal: Vec<(String, String, bool)> = stmt
1194 .properties
1195 .iter()
1196 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1197 .collect();
1198 let def = EdgeTypeDefinition {
1199 name: effective_name.clone(),
1200 properties: stmt
1201 .properties
1202 .iter()
1203 .map(|p| TypedProperty {
1204 name: p.name.clone(),
1205 data_type: PropertyDataType::from_type_name(&p.data_type),
1206 nullable: p.nullable,
1207 default_value: p
1208 .default_value
1209 .as_ref()
1210 .map(|s| parse_default_literal(s)),
1211 })
1212 .collect(),
1213 constraints: Vec::new(),
1214 source_node_types: stmt.source_node_types.clone(),
1215 target_node_types: stmt.target_node_types.clone(),
1216 };
1217 let result = if stmt.or_replace {
1218 let _ = self.catalog.drop_edge_type_def(&effective_name);
1219 self.catalog.register_edge_type_def(def)
1220 } else {
1221 self.catalog.register_edge_type_def(def)
1222 };
1223 match result {
1224 Ok(()) => {
1225 wal_log!(
1226 self,
1227 WalRecord::CreateEdgeType {
1228 name: effective_name.clone(),
1229 properties: props_for_wal,
1230 constraints: Vec::new(),
1231 }
1232 );
1233 Ok(QueryResult::status(format!(
1234 "Created edge type '{}'",
1235 stmt.name
1236 )))
1237 }
1238 Err(e) if stmt.if_not_exists => {
1239 let _ = e;
1240 Ok(QueryResult::status("No change"))
1241 }
1242 Err(e) => Err(Error::Query(QueryError::new(
1243 QueryErrorKind::Semantic,
1244 e.to_string(),
1245 ))),
1246 }
1247 }
1248 SchemaStatement::CreateVectorIndex(stmt) => {
1249 Self::create_vector_index_on_store(
1250 &self.active_lpg_store(),
1251 &stmt.node_label,
1252 &stmt.property,
1253 stmt.dimensions,
1254 stmt.metric.as_deref(),
1255 )?;
1256 wal_log!(
1257 self,
1258 WalRecord::CreateIndex {
1259 name: stmt.name.clone(),
1260 label: stmt.node_label.clone(),
1261 property: stmt.property.clone(),
1262 index_type: "vector".to_string(),
1263 }
1264 );
1265 Ok(QueryResult::status(format!(
1266 "Created vector index '{}'",
1267 stmt.name
1268 )))
1269 }
1270 SchemaStatement::DropNodeType { name, if_exists } => {
1271 let effective_name = self.effective_type_key(&name);
1272 match self.catalog.drop_node_type(&effective_name) {
1273 Ok(()) => {
1274 wal_log!(
1275 self,
1276 WalRecord::DropNodeType {
1277 name: effective_name
1278 }
1279 );
1280 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1281 }
1282 Err(e) if if_exists => {
1283 let _ = e;
1284 Ok(QueryResult::status("No change"))
1285 }
1286 Err(e) => Err(Error::Query(QueryError::new(
1287 QueryErrorKind::Semantic,
1288 e.to_string(),
1289 ))),
1290 }
1291 }
1292 SchemaStatement::DropEdgeType { name, if_exists } => {
1293 let effective_name = self.effective_type_key(&name);
1294 match self.catalog.drop_edge_type_def(&effective_name) {
1295 Ok(()) => {
1296 wal_log!(
1297 self,
1298 WalRecord::DropEdgeType {
1299 name: effective_name
1300 }
1301 );
1302 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1303 }
1304 Err(e) if if_exists => {
1305 let _ = e;
1306 Ok(QueryResult::status("No change"))
1307 }
1308 Err(e) => Err(Error::Query(QueryError::new(
1309 QueryErrorKind::Semantic,
1310 e.to_string(),
1311 ))),
1312 }
1313 }
1314 SchemaStatement::CreateIndex(stmt) => {
1315 use crate::catalog::IndexType as CatalogIndexType;
1316 use grafeo_adapters::query::gql::ast::IndexKind;
1317 let active = self.active_lpg_store();
1318 let index_type_str = match stmt.index_kind {
1319 IndexKind::Property => "property",
1320 IndexKind::BTree => "btree",
1321 IndexKind::Text => "text",
1322 IndexKind::Vector => "vector",
1323 };
1324 match stmt.index_kind {
1325 IndexKind::Property | IndexKind::BTree => {
1326 for prop in &stmt.properties {
1327 active.create_property_index(prop);
1328 }
1329 }
1330 IndexKind::Text => {
1331 for prop in &stmt.properties {
1332 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1333 }
1334 }
1335 IndexKind::Vector => {
1336 for prop in &stmt.properties {
1337 Self::create_vector_index_on_store(
1338 &active,
1339 &stmt.label,
1340 prop,
1341 stmt.options.dimensions,
1342 stmt.options.metric.as_deref(),
1343 )?;
1344 }
1345 }
1346 }
1347 let catalog_index_type = match stmt.index_kind {
1350 IndexKind::Property => CatalogIndexType::Hash,
1351 IndexKind::BTree => CatalogIndexType::BTree,
1352 IndexKind::Text => CatalogIndexType::FullText,
1353 IndexKind::Vector => CatalogIndexType::Hash,
1354 };
1355 let label_id = self.catalog.get_or_create_label(&stmt.label);
1356 for prop in &stmt.properties {
1357 let prop_id = self.catalog.get_or_create_property_key(prop);
1358 self.catalog
1359 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1360 }
1361 #[cfg(feature = "wal")]
1362 for prop in &stmt.properties {
1363 wal_log!(
1364 self,
1365 WalRecord::CreateIndex {
1366 name: stmt.name.clone(),
1367 label: stmt.label.clone(),
1368 property: prop.clone(),
1369 index_type: index_type_str.to_string(),
1370 }
1371 );
1372 }
1373 Ok(QueryResult::status(format!(
1374 "Created {} index '{}'",
1375 index_type_str, stmt.name
1376 )))
1377 }
1378 SchemaStatement::DropIndex { name, if_exists } => {
1379 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1382 let def = self.catalog.get_index(index_id);
1383 self.catalog.drop_index(index_id);
1384 if let Some(def) = def
1385 && let Some(prop_name) =
1386 self.catalog.get_property_key_name(def.property_key)
1387 {
1388 self.active_lpg_store().drop_property_index(&prop_name);
1389 }
1390 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1391 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1392 } else if if_exists {
1393 Ok(QueryResult::status("No change".to_string()))
1394 } else {
1395 Err(Error::Query(QueryError::new(
1396 QueryErrorKind::Semantic,
1397 format!("Index '{name}' does not exist"),
1398 )))
1399 }
1400 }
1401 SchemaStatement::CreateConstraint(stmt) => {
1402 use crate::catalog::TypeConstraint;
1403 use grafeo_adapters::query::gql::ast::ConstraintKind;
1404 let kind_str = match stmt.constraint_kind {
1405 ConstraintKind::Unique => "unique",
1406 ConstraintKind::NodeKey => "node_key",
1407 ConstraintKind::NotNull => "not_null",
1408 ConstraintKind::Exists => "exists",
1409 };
1410 let constraint_name = stmt
1411 .name
1412 .clone()
1413 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1414
1415 match stmt.constraint_kind {
1417 ConstraintKind::Unique => {
1418 for prop in &stmt.properties {
1419 let label_id = self.catalog.get_or_create_label(&stmt.label);
1420 let prop_id = self.catalog.get_or_create_property_key(prop);
1421 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1422 }
1423 let _ = self.catalog.add_constraint_to_type(
1424 &stmt.label,
1425 TypeConstraint::Unique(stmt.properties.clone()),
1426 );
1427 }
1428 ConstraintKind::NodeKey => {
1429 for prop in &stmt.properties {
1430 let label_id = self.catalog.get_or_create_label(&stmt.label);
1431 let prop_id = self.catalog.get_or_create_property_key(prop);
1432 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1433 let _ = self.catalog.add_required_property(label_id, prop_id);
1434 }
1435 let _ = self.catalog.add_constraint_to_type(
1436 &stmt.label,
1437 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1438 );
1439 }
1440 ConstraintKind::NotNull | ConstraintKind::Exists => {
1441 for prop in &stmt.properties {
1442 let label_id = self.catalog.get_or_create_label(&stmt.label);
1443 let prop_id = self.catalog.get_or_create_property_key(prop);
1444 let _ = self.catalog.add_required_property(label_id, prop_id);
1445 let _ = self.catalog.add_constraint_to_type(
1446 &stmt.label,
1447 TypeConstraint::NotNull(prop.clone()),
1448 );
1449 }
1450 }
1451 }
1452
1453 wal_log!(
1454 self,
1455 WalRecord::CreateConstraint {
1456 name: constraint_name.clone(),
1457 label: stmt.label.clone(),
1458 properties: stmt.properties.clone(),
1459 kind: kind_str.to_string(),
1460 }
1461 );
1462 Ok(QueryResult::status(format!(
1463 "Created {kind_str} constraint '{constraint_name}'"
1464 )))
1465 }
1466 SchemaStatement::DropConstraint { name, if_exists } => {
1467 let _ = if_exists;
1468 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1469 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1470 }
1471 SchemaStatement::CreateGraphType(stmt) => {
1472 use crate::catalog::GraphTypeDefinition;
1473 use grafeo_adapters::query::gql::ast::InlineElementType;
1474
1475 let effective_name = self.effective_type_key(&stmt.name);
1476
1477 let (mut node_types, mut edge_types, open) =
1479 if let Some(ref like_graph) = stmt.like_graph {
1480 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1482 if let Some(existing) = self
1483 .catalog
1484 .schema()
1485 .and_then(|s| s.get_graph_type(&type_name))
1486 {
1487 (
1488 existing.allowed_node_types.clone(),
1489 existing.allowed_edge_types.clone(),
1490 existing.open,
1491 )
1492 } else {
1493 (Vec::new(), Vec::new(), true)
1494 }
1495 } else {
1496 let nt = self.catalog.all_node_type_names();
1498 let et = self.catalog.all_edge_type_names();
1499 if nt.is_empty() && et.is_empty() {
1500 (Vec::new(), Vec::new(), true)
1501 } else {
1502 (nt, et, false)
1503 }
1504 }
1505 } else {
1506 let nt = stmt
1508 .node_types
1509 .iter()
1510 .map(|n| self.effective_type_key(n))
1511 .collect();
1512 let et = stmt
1513 .edge_types
1514 .iter()
1515 .map(|n| self.effective_type_key(n))
1516 .collect();
1517 (nt, et, stmt.open)
1518 };
1519
1520 for inline in &stmt.inline_types {
1522 match inline {
1523 InlineElementType::Node {
1524 name,
1525 properties,
1526 key_labels,
1527 ..
1528 } => {
1529 let inline_effective = self.effective_type_key(name);
1530 let def = NodeTypeDefinition {
1531 name: inline_effective.clone(),
1532 properties: properties
1533 .iter()
1534 .map(|p| TypedProperty {
1535 name: p.name.clone(),
1536 data_type: PropertyDataType::from_type_name(&p.data_type),
1537 nullable: p.nullable,
1538 default_value: None,
1539 })
1540 .collect(),
1541 constraints: Vec::new(),
1542 parent_types: key_labels.clone(),
1543 };
1544 self.catalog.register_or_replace_node_type(def);
1546 #[cfg(feature = "wal")]
1547 {
1548 let props_for_wal: Vec<(String, String, bool)> = properties
1549 .iter()
1550 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1551 .collect();
1552 self.log_schema_wal(&WalRecord::CreateNodeType {
1553 name: inline_effective.clone(),
1554 properties: props_for_wal,
1555 constraints: Vec::new(),
1556 });
1557 }
1558 if !node_types.contains(&inline_effective) {
1559 node_types.push(inline_effective);
1560 }
1561 }
1562 InlineElementType::Edge {
1563 name,
1564 properties,
1565 source_node_types,
1566 target_node_types,
1567 ..
1568 } => {
1569 let inline_effective = self.effective_type_key(name);
1570 let def = EdgeTypeDefinition {
1571 name: inline_effective.clone(),
1572 properties: properties
1573 .iter()
1574 .map(|p| TypedProperty {
1575 name: p.name.clone(),
1576 data_type: PropertyDataType::from_type_name(&p.data_type),
1577 nullable: p.nullable,
1578 default_value: None,
1579 })
1580 .collect(),
1581 constraints: Vec::new(),
1582 source_node_types: source_node_types.clone(),
1583 target_node_types: target_node_types.clone(),
1584 };
1585 self.catalog.register_or_replace_edge_type_def(def);
1586 #[cfg(feature = "wal")]
1587 {
1588 let props_for_wal: Vec<(String, String, bool)> = properties
1589 .iter()
1590 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1591 .collect();
1592 self.log_schema_wal(&WalRecord::CreateEdgeType {
1593 name: inline_effective.clone(),
1594 properties: props_for_wal,
1595 constraints: Vec::new(),
1596 });
1597 }
1598 if !edge_types.contains(&inline_effective) {
1599 edge_types.push(inline_effective);
1600 }
1601 }
1602 }
1603 }
1604
1605 let def = GraphTypeDefinition {
1606 name: effective_name.clone(),
1607 allowed_node_types: node_types.clone(),
1608 allowed_edge_types: edge_types.clone(),
1609 open,
1610 };
1611 let result = if stmt.or_replace {
1612 let _ = self.catalog.drop_graph_type(&effective_name);
1614 self.catalog.register_graph_type(def)
1615 } else {
1616 self.catalog.register_graph_type(def)
1617 };
1618 match result {
1619 Ok(()) => {
1620 wal_log!(
1621 self,
1622 WalRecord::CreateGraphType {
1623 name: effective_name.clone(),
1624 node_types,
1625 edge_types,
1626 open,
1627 }
1628 );
1629 Ok(QueryResult::status(format!(
1630 "Created graph type '{}'",
1631 stmt.name
1632 )))
1633 }
1634 Err(e) if stmt.if_not_exists => {
1635 let _ = e;
1636 Ok(QueryResult::status("No change"))
1637 }
1638 Err(e) => Err(Error::Query(QueryError::new(
1639 QueryErrorKind::Semantic,
1640 e.to_string(),
1641 ))),
1642 }
1643 }
1644 SchemaStatement::DropGraphType { name, if_exists } => {
1645 let effective_name = self.effective_type_key(&name);
1646 match self.catalog.drop_graph_type(&effective_name) {
1647 Ok(()) => {
1648 wal_log!(
1649 self,
1650 WalRecord::DropGraphType {
1651 name: effective_name
1652 }
1653 );
1654 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1655 }
1656 Err(e) if if_exists => {
1657 let _ = e;
1658 Ok(QueryResult::status("No change"))
1659 }
1660 Err(e) => Err(Error::Query(QueryError::new(
1661 QueryErrorKind::Semantic,
1662 e.to_string(),
1663 ))),
1664 }
1665 }
1666 SchemaStatement::CreateSchema {
1667 name,
1668 if_not_exists,
1669 } => match self.catalog.register_schema_namespace(name.clone()) {
1670 Ok(()) => {
1671 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1672 let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1675 if self.store.create_graph(&default_key).unwrap_or(false) {
1676 wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1677 }
1678 Ok(QueryResult::status(format!("Created schema '{name}'")))
1679 }
1680 Err(e) if if_not_exists => {
1681 let _ = e;
1682 Ok(QueryResult::status("No change"))
1683 }
1684 Err(e) => Err(Error::Query(QueryError::new(
1685 QueryErrorKind::Semantic,
1686 e.to_string(),
1687 ))),
1688 },
1689 SchemaStatement::DropSchema { name, if_exists } => {
1690 let prefix = format!("{name}/");
1693 let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1694 let has_graphs = self
1695 .store
1696 .graph_names()
1697 .iter()
1698 .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1699 let has_types = self
1700 .catalog
1701 .all_node_type_names()
1702 .iter()
1703 .any(|n| n.starts_with(&prefix))
1704 || self
1705 .catalog
1706 .all_edge_type_names()
1707 .iter()
1708 .any(|n| n.starts_with(&prefix))
1709 || self
1710 .catalog
1711 .all_graph_type_names()
1712 .iter()
1713 .any(|n| n.starts_with(&prefix));
1714 if has_graphs || has_types {
1715 return Err(Error::Query(QueryError::new(
1716 QueryErrorKind::Semantic,
1717 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1718 )));
1719 }
1720 match self.catalog.drop_schema_namespace(&name) {
1721 Ok(()) => {
1722 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1723 if self.store.drop_graph(&default_graph_key) {
1725 wal_log!(
1726 self,
1727 WalRecord::DropNamedGraph {
1728 name: default_graph_key,
1729 }
1730 );
1731 }
1732 let mut current = self.current_schema.lock();
1734 if current
1735 .as_deref()
1736 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1737 {
1738 *current = None;
1739 }
1740 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1741 }
1742 Err(e) if if_exists => {
1743 let _ = e;
1744 Ok(QueryResult::status("No change"))
1745 }
1746 Err(e) => Err(Error::Query(QueryError::new(
1747 QueryErrorKind::Semantic,
1748 e.to_string(),
1749 ))),
1750 }
1751 }
1752 SchemaStatement::AlterNodeType(stmt) => {
1753 use grafeo_adapters::query::gql::ast::TypeAlteration;
1754 let effective_name = self.effective_type_key(&stmt.name);
1755 let mut wal_alts = Vec::new();
1756 for alt in &stmt.alterations {
1757 match alt {
1758 TypeAlteration::AddProperty(prop) => {
1759 let typed = TypedProperty {
1760 name: prop.name.clone(),
1761 data_type: PropertyDataType::from_type_name(&prop.data_type),
1762 nullable: prop.nullable,
1763 default_value: prop
1764 .default_value
1765 .as_ref()
1766 .map(|s| parse_default_literal(s)),
1767 };
1768 self.catalog
1769 .alter_node_type_add_property(&effective_name, typed)
1770 .map_err(|e| {
1771 Error::Query(QueryError::new(
1772 QueryErrorKind::Semantic,
1773 e.to_string(),
1774 ))
1775 })?;
1776 wal_alts.push((
1777 "add".to_string(),
1778 prop.name.clone(),
1779 prop.data_type.clone(),
1780 prop.nullable,
1781 ));
1782 }
1783 TypeAlteration::DropProperty(name) => {
1784 self.catalog
1785 .alter_node_type_drop_property(&effective_name, name)
1786 .map_err(|e| {
1787 Error::Query(QueryError::new(
1788 QueryErrorKind::Semantic,
1789 e.to_string(),
1790 ))
1791 })?;
1792 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1793 }
1794 }
1795 }
1796 wal_log!(
1797 self,
1798 WalRecord::AlterNodeType {
1799 name: effective_name,
1800 alterations: wal_alts,
1801 }
1802 );
1803 Ok(QueryResult::status(format!(
1804 "Altered node type '{}'",
1805 stmt.name
1806 )))
1807 }
1808 SchemaStatement::AlterEdgeType(stmt) => {
1809 use grafeo_adapters::query::gql::ast::TypeAlteration;
1810 let effective_name = self.effective_type_key(&stmt.name);
1811 let mut wal_alts = Vec::new();
1812 for alt in &stmt.alterations {
1813 match alt {
1814 TypeAlteration::AddProperty(prop) => {
1815 let typed = TypedProperty {
1816 name: prop.name.clone(),
1817 data_type: PropertyDataType::from_type_name(&prop.data_type),
1818 nullable: prop.nullable,
1819 default_value: prop
1820 .default_value
1821 .as_ref()
1822 .map(|s| parse_default_literal(s)),
1823 };
1824 self.catalog
1825 .alter_edge_type_add_property(&effective_name, typed)
1826 .map_err(|e| {
1827 Error::Query(QueryError::new(
1828 QueryErrorKind::Semantic,
1829 e.to_string(),
1830 ))
1831 })?;
1832 wal_alts.push((
1833 "add".to_string(),
1834 prop.name.clone(),
1835 prop.data_type.clone(),
1836 prop.nullable,
1837 ));
1838 }
1839 TypeAlteration::DropProperty(name) => {
1840 self.catalog
1841 .alter_edge_type_drop_property(&effective_name, name)
1842 .map_err(|e| {
1843 Error::Query(QueryError::new(
1844 QueryErrorKind::Semantic,
1845 e.to_string(),
1846 ))
1847 })?;
1848 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1849 }
1850 }
1851 }
1852 wal_log!(
1853 self,
1854 WalRecord::AlterEdgeType {
1855 name: effective_name,
1856 alterations: wal_alts,
1857 }
1858 );
1859 Ok(QueryResult::status(format!(
1860 "Altered edge type '{}'",
1861 stmt.name
1862 )))
1863 }
1864 SchemaStatement::AlterGraphType(stmt) => {
1865 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1866 let effective_name = self.effective_type_key(&stmt.name);
1867 let mut wal_alts = Vec::new();
1868 for alt in &stmt.alterations {
1869 match alt {
1870 GraphTypeAlteration::AddNodeType(name) => {
1871 self.catalog
1872 .alter_graph_type_add_node_type(&effective_name, name.clone())
1873 .map_err(|e| {
1874 Error::Query(QueryError::new(
1875 QueryErrorKind::Semantic,
1876 e.to_string(),
1877 ))
1878 })?;
1879 wal_alts.push(("add_node_type".to_string(), name.clone()));
1880 }
1881 GraphTypeAlteration::DropNodeType(name) => {
1882 self.catalog
1883 .alter_graph_type_drop_node_type(&effective_name, name)
1884 .map_err(|e| {
1885 Error::Query(QueryError::new(
1886 QueryErrorKind::Semantic,
1887 e.to_string(),
1888 ))
1889 })?;
1890 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1891 }
1892 GraphTypeAlteration::AddEdgeType(name) => {
1893 self.catalog
1894 .alter_graph_type_add_edge_type(&effective_name, name.clone())
1895 .map_err(|e| {
1896 Error::Query(QueryError::new(
1897 QueryErrorKind::Semantic,
1898 e.to_string(),
1899 ))
1900 })?;
1901 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1902 }
1903 GraphTypeAlteration::DropEdgeType(name) => {
1904 self.catalog
1905 .alter_graph_type_drop_edge_type(&effective_name, name)
1906 .map_err(|e| {
1907 Error::Query(QueryError::new(
1908 QueryErrorKind::Semantic,
1909 e.to_string(),
1910 ))
1911 })?;
1912 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1913 }
1914 }
1915 }
1916 wal_log!(
1917 self,
1918 WalRecord::AlterGraphType {
1919 name: effective_name,
1920 alterations: wal_alts,
1921 }
1922 );
1923 Ok(QueryResult::status(format!(
1924 "Altered graph type '{}'",
1925 stmt.name
1926 )))
1927 }
1928 SchemaStatement::CreateProcedure(stmt) => {
1929 use crate::catalog::ProcedureDefinition;
1930
1931 let def = ProcedureDefinition {
1932 name: stmt.name.clone(),
1933 params: stmt
1934 .params
1935 .iter()
1936 .map(|p| (p.name.clone(), p.param_type.clone()))
1937 .collect(),
1938 returns: stmt
1939 .returns
1940 .iter()
1941 .map(|r| (r.name.clone(), r.return_type.clone()))
1942 .collect(),
1943 body: stmt.body.clone(),
1944 };
1945
1946 if stmt.or_replace {
1947 self.catalog.replace_procedure(def).map_err(|e| {
1948 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1949 })?;
1950 } else {
1951 match self.catalog.register_procedure(def) {
1952 Ok(()) => {}
1953 Err(_) if stmt.if_not_exists => {
1954 return Ok(QueryResult::empty());
1955 }
1956 Err(e) => {
1957 return Err(Error::Query(QueryError::new(
1958 QueryErrorKind::Semantic,
1959 e.to_string(),
1960 )));
1961 }
1962 }
1963 }
1964
1965 wal_log!(
1966 self,
1967 WalRecord::CreateProcedure {
1968 name: stmt.name.clone(),
1969 params: stmt
1970 .params
1971 .iter()
1972 .map(|p| (p.name.clone(), p.param_type.clone()))
1973 .collect(),
1974 returns: stmt
1975 .returns
1976 .iter()
1977 .map(|r| (r.name.clone(), r.return_type.clone()))
1978 .collect(),
1979 body: stmt.body,
1980 }
1981 );
1982 Ok(QueryResult::status(format!(
1983 "Created procedure '{}'",
1984 stmt.name
1985 )))
1986 }
1987 SchemaStatement::DropProcedure { name, if_exists } => {
1988 match self.catalog.drop_procedure(&name) {
1989 Ok(()) => {}
1990 Err(_) if if_exists => {
1991 return Ok(QueryResult::empty());
1992 }
1993 Err(e) => {
1994 return Err(Error::Query(QueryError::new(
1995 QueryErrorKind::Semantic,
1996 e.to_string(),
1997 )));
1998 }
1999 }
2000 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2001 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2002 }
2003 SchemaStatement::ShowIndexes => {
2004 return self.execute_show_indexes();
2005 }
2006 SchemaStatement::ShowConstraints => {
2007 return self.execute_show_constraints();
2008 }
2009 SchemaStatement::ShowNodeTypes => {
2010 return self.execute_show_node_types();
2011 }
2012 SchemaStatement::ShowEdgeTypes => {
2013 return self.execute_show_edge_types();
2014 }
2015 SchemaStatement::ShowGraphTypes => {
2016 return self.execute_show_graph_types();
2017 }
2018 SchemaStatement::ShowGraphType(name) => {
2019 return self.execute_show_graph_type(&name);
2020 }
2021 SchemaStatement::ShowCurrentGraphType => {
2022 return self.execute_show_current_graph_type();
2023 }
2024 SchemaStatement::ShowGraphs => {
2025 return self.execute_show_graphs();
2026 }
2027 SchemaStatement::ShowSchemas => {
2028 return self.execute_show_schemas();
2029 }
2030 };
2031
2032 if result.is_ok() {
2035 self.query_cache.clear();
2036 }
2037
2038 result
2039 }
2040
2041 #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2043 fn create_vector_index_on_store(
2044 store: &LpgStore,
2045 label: &str,
2046 property: &str,
2047 dimensions: Option<usize>,
2048 metric: Option<&str>,
2049 ) -> Result<()> {
2050 use grafeo_common::types::{PropertyKey, Value};
2051 use grafeo_common::utils::error::Error;
2052 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2053
2054 let metric = match metric {
2055 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2056 Error::Internal(format!(
2057 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2058 ))
2059 })?,
2060 None => DistanceMetric::Cosine,
2061 };
2062
2063 let prop_key = PropertyKey::new(property);
2064 let mut found_dims: Option<usize> = dimensions;
2065 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2066
2067 for node in store.nodes_with_label(label) {
2068 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2069 if let Some(expected) = found_dims {
2070 if v.len() != expected {
2071 return Err(Error::Internal(format!(
2072 "Vector dimension mismatch: expected {expected}, found {} on node {}",
2073 v.len(),
2074 node.id.0
2075 )));
2076 }
2077 } else {
2078 found_dims = Some(v.len());
2079 }
2080 vectors.push((node.id, v.to_vec()));
2081 }
2082 }
2083
2084 let Some(dims) = found_dims else {
2085 return Err(Error::Internal(format!(
2086 "No vector properties found on :{label}({property}) and no dimensions specified"
2087 )));
2088 };
2089
2090 let config = HnswConfig::new(dims, metric);
2091 let index = HnswIndex::with_capacity(config, vectors.len());
2092 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2093 for (node_id, vec) in &vectors {
2094 index.insert(*node_id, vec, &accessor);
2095 }
2096
2097 store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2098 Ok(())
2099 }
2100
2101 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2103 fn create_vector_index_on_store(
2104 _store: &LpgStore,
2105 _label: &str,
2106 _property: &str,
2107 _dimensions: Option<usize>,
2108 _metric: Option<&str>,
2109 ) -> Result<()> {
2110 Err(grafeo_common::utils::error::Error::Internal(
2111 "Vector index support requires the 'vector-index' feature".to_string(),
2112 ))
2113 }
2114
2115 #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2117 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2118 use grafeo_common::types::{PropertyKey, Value};
2119 use grafeo_core::index::text::{BM25Config, InvertedIndex};
2120
2121 let mut index = InvertedIndex::new(BM25Config::default());
2122 let prop_key = PropertyKey::new(property);
2123
2124 let nodes = store.nodes_by_label(label);
2125 for node_id in nodes {
2126 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2127 index.insert(node_id, text.as_str());
2128 }
2129 }
2130
2131 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2132 Ok(())
2133 }
2134
2135 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2137 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2138 Err(grafeo_common::utils::error::Error::Internal(
2139 "Text index support requires the 'text-index' feature".to_string(),
2140 ))
2141 }
2142
2143 fn execute_show_indexes(&self) -> Result<QueryResult> {
2145 let indexes = self.catalog.all_indexes();
2146 let columns = vec![
2147 "name".to_string(),
2148 "type".to_string(),
2149 "label".to_string(),
2150 "property".to_string(),
2151 ];
2152 let rows: Vec<Vec<Value>> = indexes
2153 .into_iter()
2154 .map(|def| {
2155 let label_name = self
2156 .catalog
2157 .get_label_name(def.label)
2158 .unwrap_or_else(|| "?".into());
2159 let prop_name = self
2160 .catalog
2161 .get_property_key_name(def.property_key)
2162 .unwrap_or_else(|| "?".into());
2163 vec![
2164 Value::from(def.name),
2165 Value::from(format!("{:?}", def.index_type)),
2166 Value::from(&*label_name),
2167 Value::from(&*prop_name),
2168 ]
2169 })
2170 .collect();
2171 Ok(QueryResult {
2172 columns,
2173 column_types: Vec::new(),
2174 rows,
2175 ..QueryResult::empty()
2176 })
2177 }
2178
2179 fn execute_show_constraints(&self) -> Result<QueryResult> {
2181 Ok(QueryResult {
2184 columns: vec![
2185 "name".to_string(),
2186 "type".to_string(),
2187 "label".to_string(),
2188 "properties".to_string(),
2189 ],
2190 column_types: Vec::new(),
2191 rows: Vec::new(),
2192 ..QueryResult::empty()
2193 })
2194 }
2195
2196 fn execute_show_node_types(&self) -> Result<QueryResult> {
2198 let columns = vec![
2199 "name".to_string(),
2200 "properties".to_string(),
2201 "constraints".to_string(),
2202 "parents".to_string(),
2203 ];
2204 let schema = self.current_schema.lock().clone();
2205 let all_names = self.catalog.all_node_type_names();
2206 let type_names: Vec<String> = match &schema {
2207 Some(s) => {
2208 let prefix = format!("{s}/");
2209 all_names
2210 .into_iter()
2211 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2212 .collect()
2213 }
2214 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2215 };
2216 let rows: Vec<Vec<Value>> = type_names
2217 .into_iter()
2218 .filter_map(|name| {
2219 let lookup = match &schema {
2220 Some(s) => format!("{s}/{name}"),
2221 None => name.clone(),
2222 };
2223 let def = self.catalog.get_node_type(&lookup)?;
2224 let props: Vec<String> = def
2225 .properties
2226 .iter()
2227 .map(|p| {
2228 let nullable = if p.nullable { "" } else { " NOT NULL" };
2229 format!("{} {}{}", p.name, p.data_type, nullable)
2230 })
2231 .collect();
2232 let constraints: Vec<String> =
2233 def.constraints.iter().map(|c| format!("{c:?}")).collect();
2234 let parents = def.parent_types.join(", ");
2235 Some(vec![
2236 Value::from(name),
2237 Value::from(props.join(", ")),
2238 Value::from(constraints.join(", ")),
2239 Value::from(parents),
2240 ])
2241 })
2242 .collect();
2243 Ok(QueryResult {
2244 columns,
2245 column_types: Vec::new(),
2246 rows,
2247 ..QueryResult::empty()
2248 })
2249 }
2250
2251 fn execute_show_edge_types(&self) -> Result<QueryResult> {
2253 let columns = vec![
2254 "name".to_string(),
2255 "properties".to_string(),
2256 "source_types".to_string(),
2257 "target_types".to_string(),
2258 ];
2259 let schema = self.current_schema.lock().clone();
2260 let all_names = self.catalog.all_edge_type_names();
2261 let type_names: Vec<String> = match &schema {
2262 Some(s) => {
2263 let prefix = format!("{s}/");
2264 all_names
2265 .into_iter()
2266 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2267 .collect()
2268 }
2269 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2270 };
2271 let rows: Vec<Vec<Value>> = type_names
2272 .into_iter()
2273 .filter_map(|name| {
2274 let lookup = match &schema {
2275 Some(s) => format!("{s}/{name}"),
2276 None => name.clone(),
2277 };
2278 let def = self.catalog.get_edge_type_def(&lookup)?;
2279 let props: Vec<String> = def
2280 .properties
2281 .iter()
2282 .map(|p| {
2283 let nullable = if p.nullable { "" } else { " NOT NULL" };
2284 format!("{} {}{}", p.name, p.data_type, nullable)
2285 })
2286 .collect();
2287 let src = def.source_node_types.join(", ");
2288 let tgt = def.target_node_types.join(", ");
2289 Some(vec![
2290 Value::from(name),
2291 Value::from(props.join(", ")),
2292 Value::from(src),
2293 Value::from(tgt),
2294 ])
2295 })
2296 .collect();
2297 Ok(QueryResult {
2298 columns,
2299 column_types: Vec::new(),
2300 rows,
2301 ..QueryResult::empty()
2302 })
2303 }
2304
2305 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2307 let columns = vec![
2308 "name".to_string(),
2309 "open".to_string(),
2310 "node_types".to_string(),
2311 "edge_types".to_string(),
2312 ];
2313 let schema = self.current_schema.lock().clone();
2314 let all_names = self.catalog.all_graph_type_names();
2315 let type_names: Vec<String> = match &schema {
2316 Some(s) => {
2317 let prefix = format!("{s}/");
2318 all_names
2319 .into_iter()
2320 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2321 .collect()
2322 }
2323 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2324 };
2325 let rows: Vec<Vec<Value>> = type_names
2326 .into_iter()
2327 .filter_map(|name| {
2328 let lookup = match &schema {
2329 Some(s) => format!("{s}/{name}"),
2330 None => name.clone(),
2331 };
2332 let def = self.catalog.get_graph_type_def(&lookup)?;
2333 let strip = |n: &String| -> String {
2335 match &schema {
2336 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2337 None => n.clone(),
2338 }
2339 };
2340 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2341 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2342 Some(vec![
2343 Value::from(name),
2344 Value::from(def.open),
2345 Value::from(node_types.join(", ")),
2346 Value::from(edge_types.join(", ")),
2347 ])
2348 })
2349 .collect();
2350 Ok(QueryResult {
2351 columns,
2352 column_types: Vec::new(),
2353 rows,
2354 ..QueryResult::empty()
2355 })
2356 }
2357
2358 #[cfg(feature = "lpg")]
2364 fn execute_show_graphs(&self) -> Result<QueryResult> {
2365 let schema = self.current_schema.lock().clone();
2366 let all_names = self.store.graph_names();
2367
2368 let mut names: Vec<String> = match &schema {
2369 Some(s) => {
2370 let prefix = format!("{s}/");
2371 all_names
2372 .into_iter()
2373 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2374 .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2375 .collect()
2376 }
2377 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2378 };
2379 names.sort();
2380
2381 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2382 Ok(QueryResult {
2383 columns: vec!["name".to_string()],
2384 column_types: Vec::new(),
2385 rows,
2386 ..QueryResult::empty()
2387 })
2388 }
2389
2390 fn execute_show_schemas(&self) -> Result<QueryResult> {
2392 let mut names = self.catalog.schema_names();
2393 names.sort();
2394 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2395 Ok(QueryResult {
2396 columns: vec!["name".to_string()],
2397 column_types: Vec::new(),
2398 rows,
2399 ..QueryResult::empty()
2400 })
2401 }
2402
2403 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2405 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2406
2407 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2408 Error::Query(QueryError::new(
2409 QueryErrorKind::Semantic,
2410 format!("Graph type '{name}' not found"),
2411 ))
2412 })?;
2413
2414 let columns = vec![
2415 "name".to_string(),
2416 "open".to_string(),
2417 "node_types".to_string(),
2418 "edge_types".to_string(),
2419 ];
2420 let rows = vec![vec![
2421 Value::from(def.name),
2422 Value::from(def.open),
2423 Value::from(def.allowed_node_types.join(", ")),
2424 Value::from(def.allowed_edge_types.join(", ")),
2425 ]];
2426 Ok(QueryResult {
2427 columns,
2428 column_types: Vec::new(),
2429 rows,
2430 ..QueryResult::empty()
2431 })
2432 }
2433
2434 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2436 let graph_name = self
2437 .current_graph()
2438 .unwrap_or_else(|| "default".to_string());
2439 let columns = vec![
2440 "graph".to_string(),
2441 "graph_type".to_string(),
2442 "open".to_string(),
2443 "node_types".to_string(),
2444 "edge_types".to_string(),
2445 ];
2446
2447 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2448 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2449 {
2450 let rows = vec![vec![
2451 Value::from(graph_name),
2452 Value::from(type_name),
2453 Value::from(def.open),
2454 Value::from(def.allowed_node_types.join(", ")),
2455 Value::from(def.allowed_edge_types.join(", ")),
2456 ]];
2457 return Ok(QueryResult {
2458 columns,
2459 column_types: Vec::new(),
2460 rows,
2461 ..QueryResult::empty()
2462 });
2463 }
2464
2465 Ok(QueryResult {
2467 columns,
2468 column_types: Vec::new(),
2469 rows: vec![vec![
2470 Value::from(graph_name),
2471 Value::Null,
2472 Value::Null,
2473 Value::Null,
2474 Value::Null,
2475 ]],
2476 ..QueryResult::empty()
2477 })
2478 }
2479
2480 #[cfg(feature = "gql")]
2507 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2508 self.require_lpg("GQL")?;
2509
2510 use crate::query::{
2511 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2512 processor::QueryLanguage, translators::gql,
2513 };
2514
2515 let _span = grafeo_info_span!(
2516 "grafeo::session::execute",
2517 language = "gql",
2518 query_len = query.len(),
2519 );
2520
2521 #[cfg(not(target_arch = "wasm32"))]
2522 let start_time = std::time::Instant::now();
2523
2524 let translation = gql::translate_full(query)?;
2526 let logical_plan = match translation {
2527 gql::GqlTranslationResult::SessionCommand(cmd) => {
2528 return self.execute_session_command(cmd);
2529 }
2530 #[cfg(feature = "lpg")]
2531 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2532 self.require_permission(crate::auth::StatementKind::Admin)?;
2534 if *self.read_only_tx.lock() {
2535 return Err(grafeo_common::utils::error::Error::Transaction(
2536 grafeo_common::utils::error::TransactionError::ReadOnly,
2537 ));
2538 }
2539 return self.execute_schema_command(cmd);
2540 }
2541 gql::GqlTranslationResult::Plan(plan) => {
2542 let read_only = *self.read_only_tx.lock();
2547 let need_check = read_only || !self.identity.can_admin();
2548 let is_mutation = need_check && plan.root.has_mutations();
2549 if is_mutation {
2550 self.require_permission(crate::auth::StatementKind::Write)?;
2551 }
2552 if read_only && is_mutation {
2553 return Err(grafeo_common::utils::error::Error::Transaction(
2554 grafeo_common::utils::error::TransactionError::ReadOnly,
2555 ));
2556 }
2557 plan
2558 }
2559 #[cfg(not(feature = "lpg"))]
2560 gql::GqlTranslationResult::SchemaCommand(_) => {
2561 return Err(grafeo_common::utils::error::Error::Internal(
2562 "Schema commands require the `lpg` feature".to_string(),
2563 ));
2564 }
2565 };
2566
2567 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2569
2570 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2572 cached_plan
2573 } else {
2574 let mut binder = Binder::new();
2576 let _binding_context = binder.bind(&logical_plan)?;
2577
2578 let active = self.active_store();
2580 let optimizer = Optimizer::from_graph_store(&*active);
2581 let plan = optimizer.optimize(logical_plan)?;
2582
2583 self.query_cache.put_optimized(cache_key, plan.clone());
2585
2586 plan
2587 };
2588
2589 let active = self.active_store();
2591
2592 if optimized_plan.explain {
2594 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2595 let mut plan = optimized_plan;
2596 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2597 return Ok(explain_result(&plan));
2598 }
2599
2600 if optimized_plan.profile {
2602 let has_mutations = optimized_plan.root.has_mutations();
2603 return self.with_auto_commit(has_mutations, || {
2604 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2605 let planner = self.create_planner_for_store(
2606 Arc::clone(&active),
2607 viewing_epoch,
2608 transaction_id,
2609 );
2610 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2611
2612 let executor = Executor::with_columns(physical_plan.columns.clone())
2613 .with_deadline(self.query_deadline());
2614 let _result = executor.execute(physical_plan.operator.as_mut())?;
2615
2616 let total_time_ms;
2617 #[cfg(not(target_arch = "wasm32"))]
2618 {
2619 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2620 }
2621 #[cfg(target_arch = "wasm32")]
2622 {
2623 total_time_ms = 0.0;
2624 }
2625
2626 let profile_tree = crate::query::profile::build_profile_tree(
2627 &optimized_plan.root,
2628 &mut entries.into_iter(),
2629 );
2630 Ok(crate::query::profile::profile_result(
2631 &profile_tree,
2632 total_time_ms,
2633 ))
2634 });
2635 }
2636
2637 let has_mutations = optimized_plan.root.has_mutations();
2638
2639 let result = self.with_auto_commit(has_mutations, || {
2640 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2642
2643 let has_active_tx = self.current_transaction.lock().is_some();
2648 let read_only = !has_mutations && !has_active_tx;
2649 let planner = self.create_planner_for_store_with_read_only(
2650 Arc::clone(&active),
2651 viewing_epoch,
2652 transaction_id,
2653 read_only,
2654 );
2655 let mut physical_plan = planner.plan(&optimized_plan)?;
2656
2657 let executor = Executor::with_columns(physical_plan.columns.clone())
2659 .with_deadline(self.query_deadline());
2660 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2661
2662 let rows_scanned = result.rows.len() as u64;
2664 #[cfg(not(target_arch = "wasm32"))]
2665 {
2666 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2667 result.execution_time_ms = Some(elapsed_ms);
2668 }
2669 result.rows_scanned = Some(rows_scanned);
2670
2671 Ok(result)
2672 });
2673
2674 #[cfg(feature = "metrics")]
2676 {
2677 #[cfg(not(target_arch = "wasm32"))]
2678 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2679 #[cfg(target_arch = "wasm32")]
2680 let elapsed_ms = None;
2681 self.record_query_metrics("gql", elapsed_ms, &result);
2682 }
2683
2684 result
2685 }
2686
2687 #[cfg(feature = "gql")]
2696 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2697 let previous = self.viewing_epoch_override.lock().replace(epoch);
2698 let result = self.execute(query);
2699 *self.viewing_epoch_override.lock() = previous;
2700 result
2701 }
2702
2703 #[cfg(feature = "gql")]
2711 pub fn execute_at_epoch_with_params(
2712 &self,
2713 query: &str,
2714 epoch: EpochId,
2715 params: Option<std::collections::HashMap<String, Value>>,
2716 ) -> Result<QueryResult> {
2717 let previous = self.viewing_epoch_override.lock().replace(epoch);
2718 let result = if let Some(p) = params {
2719 self.execute_with_params(query, p)
2720 } else {
2721 self.execute(query)
2722 };
2723 *self.viewing_epoch_override.lock() = previous;
2724 result
2725 }
2726
2727 #[cfg(feature = "gql")]
2733 pub fn execute_with_params(
2734 &self,
2735 query: &str,
2736 params: std::collections::HashMap<String, Value>,
2737 ) -> Result<QueryResult> {
2738 self.require_lpg("GQL")?;
2739
2740 use crate::query::processor::{QueryLanguage, QueryProcessor};
2741
2742 let has_mutations = if self.identity.can_write() {
2746 Self::query_looks_like_mutation(query)
2748 } else {
2749 use crate::query::translators::gql;
2751 match gql::translate(query) {
2752 Ok(plan) if plan.root.has_mutations() => {
2753 self.require_permission(crate::auth::StatementKind::Write)?;
2754 true
2755 }
2756 Ok(_) => false,
2757 Err(_) => Self::query_looks_like_mutation(query),
2759 }
2760 };
2761 let active = self.active_store();
2762
2763 self.with_auto_commit(has_mutations, || {
2764 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2766
2767 let processor = QueryProcessor::for_stores_with_transaction(
2769 Arc::clone(&active),
2770 self.active_write_store(),
2771 Arc::clone(&self.transaction_manager),
2772 )?;
2773
2774 let processor = if let Some(transaction_id) = transaction_id {
2776 processor.with_transaction_context(viewing_epoch, transaction_id)
2777 } else {
2778 processor
2779 };
2780
2781 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2782 })
2783 }
2784
2785 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2791 pub fn execute_with_params(
2792 &self,
2793 _query: &str,
2794 _params: std::collections::HashMap<String, Value>,
2795 ) -> Result<QueryResult> {
2796 Err(grafeo_common::utils::error::Error::Internal(
2797 "No query language enabled".to_string(),
2798 ))
2799 }
2800
2801 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2807 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2808 Err(grafeo_common::utils::error::Error::Internal(
2809 "No query language enabled".to_string(),
2810 ))
2811 }
2812
2813 #[cfg(feature = "cypher")]
2819 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2820 use crate::query::{
2821 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2822 processor::QueryLanguage, translators::cypher,
2823 };
2824
2825 let translation = cypher::translate_full(query)?;
2827 match translation {
2828 #[cfg(feature = "lpg")]
2829 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2830 use grafeo_common::utils::error::{
2831 Error as GrafeoError, QueryError, QueryErrorKind,
2832 };
2833 self.require_permission(crate::auth::StatementKind::Admin)?;
2834 if *self.read_only_tx.lock() {
2835 return Err(GrafeoError::Query(QueryError::new(
2836 QueryErrorKind::Semantic,
2837 "Cannot execute schema DDL in a read-only transaction",
2838 )));
2839 }
2840 return self.execute_schema_command(cmd);
2841 }
2842 #[cfg(not(feature = "lpg"))]
2843 cypher::CypherTranslationResult::SchemaCommand(_) => {
2844 return Err(grafeo_common::utils::error::Error::Internal(
2845 "Schema DDL requires the `lpg` feature".to_string(),
2846 ));
2847 }
2848 cypher::CypherTranslationResult::ShowIndexes => {
2849 return self.execute_show_indexes();
2850 }
2851 cypher::CypherTranslationResult::ShowConstraints => {
2852 return self.execute_show_constraints();
2853 }
2854 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2855 return self.execute_show_current_graph_type();
2856 }
2857 cypher::CypherTranslationResult::Plan(_) => {
2858 }
2860 }
2861
2862 #[cfg(not(target_arch = "wasm32"))]
2863 let start_time = std::time::Instant::now();
2864
2865 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2867
2868 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2870 cached_plan
2871 } else {
2872 let logical_plan = cypher::translate(query)?;
2874
2875 let mut binder = Binder::new();
2877 let _binding_context = binder.bind(&logical_plan)?;
2878
2879 let active = self.active_store();
2881 let optimizer = Optimizer::from_graph_store(&*active);
2882 let plan = optimizer.optimize(logical_plan)?;
2883
2884 self.query_cache.put_optimized(cache_key, plan.clone());
2886
2887 plan
2888 };
2889
2890 if optimized_plan.root.has_mutations() {
2892 self.require_permission(crate::auth::StatementKind::Write)?;
2893 }
2894
2895 let active = self.active_store();
2897
2898 if optimized_plan.explain {
2900 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2901 let mut plan = optimized_plan;
2902 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2903 return Ok(explain_result(&plan));
2904 }
2905
2906 if optimized_plan.profile {
2908 let has_mutations = optimized_plan.root.has_mutations();
2909 return self.with_auto_commit(has_mutations, || {
2910 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2911 let planner = self.create_planner_for_store(
2912 Arc::clone(&active),
2913 viewing_epoch,
2914 transaction_id,
2915 );
2916 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2917
2918 let executor = Executor::with_columns(physical_plan.columns.clone())
2919 .with_deadline(self.query_deadline());
2920 let _result = executor.execute(physical_plan.operator.as_mut())?;
2921
2922 let total_time_ms;
2923 #[cfg(not(target_arch = "wasm32"))]
2924 {
2925 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2926 }
2927 #[cfg(target_arch = "wasm32")]
2928 {
2929 total_time_ms = 0.0;
2930 }
2931
2932 let profile_tree = crate::query::profile::build_profile_tree(
2933 &optimized_plan.root,
2934 &mut entries.into_iter(),
2935 );
2936 Ok(crate::query::profile::profile_result(
2937 &profile_tree,
2938 total_time_ms,
2939 ))
2940 });
2941 }
2942
2943 let has_mutations = optimized_plan.root.has_mutations();
2944
2945 let result = self.with_auto_commit(has_mutations, || {
2946 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2948
2949 let planner =
2951 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2952 let mut physical_plan = planner.plan(&optimized_plan)?;
2953
2954 let executor = Executor::with_columns(physical_plan.columns.clone())
2956 .with_deadline(self.query_deadline());
2957 executor.execute(physical_plan.operator.as_mut())
2958 });
2959
2960 #[cfg(feature = "metrics")]
2961 {
2962 #[cfg(not(target_arch = "wasm32"))]
2963 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2964 #[cfg(target_arch = "wasm32")]
2965 let elapsed_ms = None;
2966 self.record_query_metrics("cypher", elapsed_ms, &result);
2967 }
2968
2969 result
2970 }
2971
2972 #[cfg(feature = "gremlin")]
2996 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2997 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2998
2999 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3000 let start_time = Instant::now();
3001
3002 let logical_plan = gremlin::translate(query)?;
3004
3005 let mut binder = Binder::new();
3007 let _binding_context = binder.bind(&logical_plan)?;
3008
3009 let active = self.active_store();
3011 let optimizer = Optimizer::from_graph_store(&*active);
3012 let optimized_plan = optimizer.optimize(logical_plan)?;
3013
3014 let has_mutations = optimized_plan.root.has_mutations();
3015 if has_mutations {
3016 self.require_permission(crate::auth::StatementKind::Write)?;
3017 }
3018
3019 let result = self.with_auto_commit(has_mutations, || {
3020 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3022
3023 let planner =
3025 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3026 let mut physical_plan = planner.plan(&optimized_plan)?;
3027
3028 let executor = Executor::with_columns(physical_plan.columns.clone())
3030 .with_deadline(self.query_deadline());
3031 executor.execute(physical_plan.operator.as_mut())
3032 });
3033
3034 #[cfg(feature = "metrics")]
3035 {
3036 #[cfg(not(target_arch = "wasm32"))]
3037 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3038 #[cfg(target_arch = "wasm32")]
3039 let elapsed_ms = None;
3040 self.record_query_metrics("gremlin", elapsed_ms, &result);
3041 }
3042
3043 result
3044 }
3045
3046 #[cfg(feature = "gremlin")]
3052 pub fn execute_gremlin_with_params(
3053 &self,
3054 query: &str,
3055 params: std::collections::HashMap<String, Value>,
3056 ) -> Result<QueryResult> {
3057 use crate::query::{
3058 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3059 translators::gremlin,
3060 };
3061
3062 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3063 let start_time = Instant::now();
3064
3065 let mut logical_plan = gremlin::translate(query)?;
3067
3068 substitute_params(&mut logical_plan, ¶ms)?;
3070
3071 let mut binder = Binder::new();
3073 let _binding_context = binder.bind(&logical_plan)?;
3074
3075 let active = self.active_store();
3077 let optimizer = Optimizer::from_graph_store(&*active);
3078 let optimized_plan = optimizer.optimize(logical_plan)?;
3079
3080 let has_mutations = optimized_plan.root.has_mutations();
3081 if has_mutations {
3082 self.require_permission(crate::auth::StatementKind::Write)?;
3083 }
3084
3085 let result = self.with_auto_commit(has_mutations, || {
3086 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3087 let planner =
3088 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3089 let mut physical_plan = planner.plan(&optimized_plan)?;
3090 let executor = Executor::with_columns(physical_plan.columns.clone())
3091 .with_deadline(self.query_deadline());
3092 executor.execute(physical_plan.operator.as_mut())
3093 });
3094
3095 #[cfg(feature = "metrics")]
3096 {
3097 #[cfg(not(target_arch = "wasm32"))]
3098 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3099 #[cfg(target_arch = "wasm32")]
3100 let elapsed_ms = None;
3101 self.record_query_metrics("gremlin", elapsed_ms, &result);
3102 }
3103
3104 result
3105 }
3106
3107 #[cfg(feature = "graphql")]
3131 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3132 use crate::query::{
3133 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3134 translators::graphql,
3135 };
3136
3137 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3138 let start_time = Instant::now();
3139
3140 let mut logical_plan = graphql::translate(query)?;
3141
3142 if !logical_plan.default_params.is_empty() {
3144 let defaults = logical_plan.default_params.clone();
3145 substitute_params(&mut logical_plan, &defaults)?;
3146 }
3147
3148 let mut binder = Binder::new();
3149 let _binding_context = binder.bind(&logical_plan)?;
3150
3151 let active = self.active_store();
3152 let optimizer = Optimizer::from_graph_store(&*active);
3153 let optimized_plan = optimizer.optimize(logical_plan)?;
3154 let has_mutations = optimized_plan.root.has_mutations();
3155 if has_mutations {
3156 self.require_permission(crate::auth::StatementKind::Write)?;
3157 }
3158
3159 let result = self.with_auto_commit(has_mutations, || {
3160 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3161 let planner =
3162 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3163 let mut physical_plan = planner.plan(&optimized_plan)?;
3164 let executor = Executor::with_columns(physical_plan.columns.clone())
3165 .with_deadline(self.query_deadline());
3166 executor.execute(physical_plan.operator.as_mut())
3167 });
3168
3169 #[cfg(feature = "metrics")]
3170 {
3171 #[cfg(not(target_arch = "wasm32"))]
3172 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3173 #[cfg(target_arch = "wasm32")]
3174 let elapsed_ms = None;
3175 self.record_query_metrics("graphql", elapsed_ms, &result);
3176 }
3177
3178 result
3179 }
3180
3181 #[cfg(feature = "graphql")]
3187 pub fn execute_graphql_with_params(
3188 &self,
3189 query: &str,
3190 params: std::collections::HashMap<String, Value>,
3191 ) -> Result<QueryResult> {
3192 use crate::query::{
3193 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3194 translators::graphql,
3195 };
3196
3197 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3198 let start_time = Instant::now();
3199
3200 let mut logical_plan = graphql::translate(query)?;
3202
3203 if !logical_plan.default_params.is_empty() {
3205 let mut merged = logical_plan.default_params.clone();
3206 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3207 substitute_params(&mut logical_plan, &merged)?;
3208 } else {
3209 substitute_params(&mut logical_plan, ¶ms)?;
3210 }
3211
3212 let mut binder = Binder::new();
3214 let _binding_context = binder.bind(&logical_plan)?;
3215
3216 let active = self.active_store();
3218 let optimizer = Optimizer::from_graph_store(&*active);
3219 let optimized_plan = optimizer.optimize(logical_plan)?;
3220
3221 let has_mutations = optimized_plan.root.has_mutations();
3222 if has_mutations {
3223 self.require_permission(crate::auth::StatementKind::Write)?;
3224 }
3225
3226 let result = self.with_auto_commit(has_mutations, || {
3227 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3228 let planner =
3229 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3230 let mut physical_plan = planner.plan(&optimized_plan)?;
3231 let executor = Executor::with_columns(physical_plan.columns.clone())
3232 .with_deadline(self.query_deadline());
3233 executor.execute(physical_plan.operator.as_mut())
3234 });
3235
3236 #[cfg(feature = "metrics")]
3237 {
3238 #[cfg(not(target_arch = "wasm32"))]
3239 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3240 #[cfg(target_arch = "wasm32")]
3241 let elapsed_ms = None;
3242 self.record_query_metrics("graphql", elapsed_ms, &result);
3243 }
3244
3245 result
3246 }
3247
3248 #[cfg(feature = "sql-pgq")]
3273 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3274 use crate::query::{
3275 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3276 processor::QueryLanguage, translators::sql_pgq,
3277 };
3278
3279 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3280 let start_time = Instant::now();
3281
3282 let logical_plan = sql_pgq::translate(query)?;
3284
3285 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3287 self.require_permission(crate::auth::StatementKind::Admin)?;
3288 return Ok(QueryResult {
3289 columns: vec!["status".into()],
3290 column_types: vec![grafeo_common::types::LogicalType::String],
3291 rows: vec![vec![Value::from(format!(
3292 "Property graph '{}' created ({} node tables, {} edge tables)",
3293 cpg.name,
3294 cpg.node_tables.len(),
3295 cpg.edge_tables.len()
3296 ))]],
3297 execution_time_ms: None,
3298 rows_scanned: None,
3299 status_message: None,
3300 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3301 });
3302 }
3303
3304 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3305
3306 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3307 cached_plan
3308 } else {
3309 let mut binder = Binder::new();
3310 let _binding_context = binder.bind(&logical_plan)?;
3311 let active = self.active_store();
3312 let optimizer = Optimizer::from_graph_store(&*active);
3313 let plan = optimizer.optimize(logical_plan)?;
3314 self.query_cache.put_optimized(cache_key, plan.clone());
3315 plan
3316 };
3317
3318 let active = self.active_store();
3319 let has_mutations = optimized_plan.root.has_mutations();
3320 if has_mutations {
3321 self.require_permission(crate::auth::StatementKind::Write)?;
3322 }
3323
3324 let result = self.with_auto_commit(has_mutations, || {
3325 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3326 let planner =
3327 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3328 let mut physical_plan = planner.plan(&optimized_plan)?;
3329 let executor = Executor::with_columns(physical_plan.columns.clone())
3330 .with_deadline(self.query_deadline());
3331 executor.execute(physical_plan.operator.as_mut())
3332 });
3333
3334 #[cfg(feature = "metrics")]
3335 {
3336 #[cfg(not(target_arch = "wasm32"))]
3337 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3338 #[cfg(target_arch = "wasm32")]
3339 let elapsed_ms = None;
3340 self.record_query_metrics("sql", elapsed_ms, &result);
3341 }
3342
3343 result
3344 }
3345
3346 #[cfg(feature = "sql-pgq")]
3352 pub fn execute_sql_with_params(
3353 &self,
3354 query: &str,
3355 params: std::collections::HashMap<String, Value>,
3356 ) -> Result<QueryResult> {
3357 use crate::query::processor::{QueryLanguage, QueryProcessor};
3358
3359 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3360 let start_time = Instant::now();
3361
3362 let has_mutations = if self.identity.can_write() {
3363 Self::query_looks_like_mutation(query)
3364 } else {
3365 use crate::query::translators::sql_pgq;
3366 match sql_pgq::translate(query) {
3367 Ok(plan) if plan.root.has_mutations() => {
3368 self.require_permission(crate::auth::StatementKind::Write)?;
3369 true
3370 }
3371 Ok(_) => false,
3372 Err(_) => Self::query_looks_like_mutation(query),
3373 }
3374 };
3375 if has_mutations {
3376 self.require_permission(crate::auth::StatementKind::Write)?;
3377 }
3378 let active = self.active_store();
3379
3380 let result = self.with_auto_commit(has_mutations, || {
3381 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3382 let processor = QueryProcessor::for_stores_with_transaction(
3383 Arc::clone(&active),
3384 self.active_write_store(),
3385 Arc::clone(&self.transaction_manager),
3386 )?;
3387 let processor = if let Some(transaction_id) = transaction_id {
3388 processor.with_transaction_context(viewing_epoch, transaction_id)
3389 } else {
3390 processor
3391 };
3392 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3393 });
3394
3395 #[cfg(feature = "metrics")]
3396 {
3397 #[cfg(not(target_arch = "wasm32"))]
3398 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3399 #[cfg(target_arch = "wasm32")]
3400 let elapsed_ms = None;
3401 self.record_query_metrics("sql", elapsed_ms, &result);
3402 }
3403
3404 result
3405 }
3406
3407 pub fn execute_language(
3416 &self,
3417 query: &str,
3418 language: &str,
3419 params: Option<std::collections::HashMap<String, Value>>,
3420 ) -> Result<QueryResult> {
3421 let _span = grafeo_info_span!(
3422 "grafeo::session::execute",
3423 language,
3424 query_len = query.len(),
3425 );
3426 match language {
3427 "gql" => {
3428 if let Some(p) = params {
3429 self.execute_with_params(query, p)
3430 } else {
3431 self.execute(query)
3432 }
3433 }
3434 #[cfg(feature = "cypher")]
3435 "cypher" => {
3436 if let Some(p) = params {
3437 use crate::query::processor::{QueryLanguage, QueryProcessor};
3438
3439 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3440 let start_time = Instant::now();
3441
3442 let has_mutations = if self.identity.can_write() {
3443 Self::query_looks_like_mutation(query)
3444 } else {
3445 use crate::query::translators::cypher;
3446 match cypher::translate(query) {
3447 Ok(plan) if plan.root.has_mutations() => {
3448 self.require_permission(crate::auth::StatementKind::Write)?;
3449 true
3450 }
3451 Ok(_) => false,
3452 Err(_) => Self::query_looks_like_mutation(query),
3453 }
3454 };
3455 let active = self.active_store();
3456 let result = self.with_auto_commit(has_mutations, || {
3457 let processor = QueryProcessor::for_stores_with_transaction(
3458 Arc::clone(&active),
3459 self.active_write_store(),
3460 Arc::clone(&self.transaction_manager),
3461 )?;
3462 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3463 let processor = if let Some(transaction_id) = transaction_id {
3464 processor.with_transaction_context(viewing_epoch, transaction_id)
3465 } else {
3466 processor
3467 };
3468 processor.process(query, QueryLanguage::Cypher, Some(&p))
3469 });
3470
3471 #[cfg(feature = "metrics")]
3472 {
3473 #[cfg(not(target_arch = "wasm32"))]
3474 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3475 #[cfg(target_arch = "wasm32")]
3476 let elapsed_ms = None;
3477 self.record_query_metrics("cypher", elapsed_ms, &result);
3478 }
3479
3480 result
3481 } else {
3482 self.execute_cypher(query)
3483 }
3484 }
3485 #[cfg(feature = "gremlin")]
3486 "gremlin" => {
3487 if let Some(p) = params {
3488 self.execute_gremlin_with_params(query, p)
3489 } else {
3490 self.execute_gremlin(query)
3491 }
3492 }
3493 #[cfg(feature = "graphql")]
3494 "graphql" => {
3495 if let Some(p) = params {
3496 self.execute_graphql_with_params(query, p)
3497 } else {
3498 self.execute_graphql(query)
3499 }
3500 }
3501 #[cfg(all(feature = "graphql", feature = "triple-store"))]
3502 "graphql-rdf" => {
3503 if let Some(p) = params {
3504 self.execute_graphql_rdf_with_params(query, p)
3505 } else {
3506 self.execute_graphql_rdf(query)
3507 }
3508 }
3509 #[cfg(feature = "sql-pgq")]
3510 "sql" | "sql-pgq" => {
3511 if let Some(p) = params {
3512 self.execute_sql_with_params(query, p)
3513 } else {
3514 self.execute_sql(query)
3515 }
3516 }
3517 #[cfg(all(feature = "sparql", feature = "triple-store"))]
3518 "sparql" => {
3519 if let Some(p) = params {
3520 self.execute_sparql_with_params(query, p)
3521 } else {
3522 self.execute_sparql(query)
3523 }
3524 }
3525 other => Err(grafeo_common::utils::error::Error::Query(
3526 grafeo_common::utils::error::QueryError::new(
3527 grafeo_common::utils::error::QueryErrorKind::Semantic,
3528 format!("Unknown query language: '{other}'"),
3529 ),
3530 )),
3531 }
3532 }
3533
3534 pub fn clear_plan_cache(&self) {
3561 self.query_cache.clear();
3562 }
3563
3564 #[cfg(feature = "lpg")]
3572 pub fn begin_transaction(&mut self) -> Result<()> {
3573 self.begin_transaction_inner(false, None)
3574 }
3575
3576 #[cfg(feature = "lpg")]
3584 pub fn begin_transaction_with_isolation(
3585 &mut self,
3586 isolation_level: crate::transaction::IsolationLevel,
3587 ) -> Result<()> {
3588 self.begin_transaction_inner(false, Some(isolation_level))
3589 }
3590
3591 #[cfg(feature = "lpg")]
3593 fn begin_transaction_inner(
3594 &self,
3595 read_only: bool,
3596 isolation_level: Option<crate::transaction::IsolationLevel>,
3597 ) -> Result<()> {
3598 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3599 let mut current = self.current_transaction.lock();
3600 if current.is_some() {
3601 drop(current);
3603 let mut depth = self.transaction_nesting_depth.lock();
3604 *depth += 1;
3605 let sp_name = format!("_nested_tx_{}", *depth);
3606 self.savepoint(&sp_name)?;
3607 return Ok(());
3608 }
3609
3610 let active = self.active_lpg_store();
3611 self.transaction_start_node_count
3612 .store(active.node_count(), Ordering::Relaxed);
3613 self.transaction_start_edge_count
3614 .store(active.edge_count(), Ordering::Relaxed);
3615 let transaction_id = if let Some(level) = isolation_level {
3616 self.transaction_manager.begin_with_isolation(level)
3617 } else {
3618 self.transaction_manager.begin()
3619 };
3620 *current = Some(transaction_id);
3621 *self.read_only_tx.lock() = read_only || self.db_read_only;
3622
3623 let key = self.active_graph_storage_key();
3626 let mut touched = self.touched_graphs.lock();
3627 touched.clear();
3628 touched.push(key);
3629
3630 #[cfg(feature = "metrics")]
3631 {
3632 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3633 #[cfg(not(target_arch = "wasm32"))]
3634 {
3635 *self.tx_start_time.lock() = Some(Instant::now());
3636 }
3637 }
3638
3639 Ok(())
3640 }
3641
3642 #[cfg(feature = "lpg")]
3650 pub fn commit(&mut self) -> Result<()> {
3651 self.commit_inner()
3652 }
3653
3654 #[cfg(feature = "lpg")]
3656 fn commit_inner(&self) -> Result<()> {
3657 let _span = grafeo_debug_span!("grafeo::tx::commit");
3658 {
3660 let mut depth = self.transaction_nesting_depth.lock();
3661 if *depth > 0 {
3662 let sp_name = format!("_nested_tx_{depth}");
3663 *depth -= 1;
3664 drop(depth);
3665 return self.release_savepoint(&sp_name);
3666 }
3667 }
3668
3669 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3670 grafeo_common::utils::error::Error::Transaction(
3671 grafeo_common::utils::error::TransactionError::InvalidState(
3672 "No active transaction".to_string(),
3673 ),
3674 )
3675 })?;
3676
3677 let touched = self.touched_graphs.lock().clone();
3680 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3681 Ok(epoch) => epoch,
3682 Err(e) => {
3683 for graph_name in &touched {
3685 let store = self.resolve_store(graph_name);
3686 store.rollback_transaction_properties(transaction_id);
3687 }
3688 #[cfg(feature = "triple-store")]
3689 self.rollback_rdf_transaction(transaction_id);
3690 #[cfg(feature = "cdc")]
3692 if let Some(ref pending) = self.cdc_pending_events {
3693 pending.lock().clear();
3694 }
3695 *self.read_only_tx.lock() = self.db_read_only;
3696 self.savepoints.lock().clear();
3697 self.touched_graphs.lock().clear();
3698 #[cfg(feature = "metrics")]
3699 {
3700 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3701 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3702 #[cfg(not(target_arch = "wasm32"))]
3703 if let Some(start) = self.tx_start_time.lock().take() {
3704 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3705 crate::metrics::record_metric!(
3706 self.metrics,
3707 tx_duration,
3708 observe duration_ms
3709 );
3710 }
3711 }
3712 return Err(e);
3713 }
3714 };
3715
3716 for graph_name in &touched {
3718 let store = self.resolve_store(graph_name);
3719 store.finalize_version_epochs(transaction_id, commit_epoch);
3720 }
3721
3722 #[cfg(feature = "triple-store")]
3724 self.commit_rdf_transaction(transaction_id);
3725
3726 for graph_name in &touched {
3727 let store = self.resolve_store(graph_name);
3728 store.commit_transaction_properties(transaction_id);
3729 }
3730
3731 #[cfg(feature = "cdc")]
3735 if let Some(ref pending) = self.cdc_pending_events {
3736 let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3737 self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3738 e.epoch = commit_epoch;
3739 e
3740 }));
3741 }
3742
3743 #[cfg(feature = "wal")]
3748 if let Some(ref wal) = self.wal {
3749 use grafeo_storage::wal::WalRecord;
3750 if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
3751 grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
3752 }
3753 if let Err(e) = wal.log(&WalRecord::EpochAdvance {
3754 epoch: commit_epoch,
3755 }) {
3756 grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
3757 }
3758 }
3759
3760 let current_epoch = self.transaction_manager.current_epoch();
3763 for graph_name in &touched {
3764 let store = self.resolve_store(graph_name);
3765 store.sync_epoch(current_epoch);
3766 }
3767
3768 *self.read_only_tx.lock() = self.db_read_only;
3770 self.savepoints.lock().clear();
3771 self.touched_graphs.lock().clear();
3772
3773 if self.gc_interval > 0 {
3775 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3776 if count.is_multiple_of(self.gc_interval) {
3777 let min_epoch = self.transaction_manager.min_active_epoch();
3778 for graph_name in &touched {
3779 let store = self.resolve_store(graph_name);
3780 store.gc_versions(min_epoch);
3781 }
3782 self.transaction_manager.gc();
3783 #[cfg(feature = "metrics")]
3784 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3785 }
3786 }
3787
3788 #[cfg(feature = "metrics")]
3789 {
3790 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3791 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3792 #[cfg(not(target_arch = "wasm32"))]
3793 if let Some(start) = self.tx_start_time.lock().take() {
3794 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3795 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3796 }
3797 }
3798
3799 Ok(())
3800 }
3801
3802 #[cfg(feature = "lpg")]
3826 pub fn rollback(&mut self) -> Result<()> {
3827 self.rollback_inner()
3828 }
3829
3830 #[cfg(feature = "lpg")]
3832 fn rollback_inner(&self) -> Result<()> {
3833 let _span = grafeo_debug_span!("grafeo::tx::rollback");
3834 {
3836 let mut depth = self.transaction_nesting_depth.lock();
3837 if *depth > 0 {
3838 let sp_name = format!("_nested_tx_{depth}");
3839 *depth -= 1;
3840 drop(depth);
3841 return self.rollback_to_savepoint(&sp_name);
3842 }
3843 }
3844
3845 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3846 grafeo_common::utils::error::Error::Transaction(
3847 grafeo_common::utils::error::TransactionError::InvalidState(
3848 "No active transaction".to_string(),
3849 ),
3850 )
3851 })?;
3852
3853 *self.read_only_tx.lock() = self.db_read_only;
3855
3856 let touched = self.touched_graphs.lock().clone();
3858 for graph_name in &touched {
3859 let store = self.resolve_store(graph_name);
3860 store.discard_uncommitted_versions(transaction_id);
3861 }
3862
3863 #[cfg(feature = "triple-store")]
3865 self.rollback_rdf_transaction(transaction_id);
3866
3867 #[cfg(feature = "cdc")]
3869 if let Some(ref pending) = self.cdc_pending_events {
3870 pending.lock().clear();
3871 }
3872
3873 self.savepoints.lock().clear();
3875 self.touched_graphs.lock().clear();
3876
3877 let result = self.transaction_manager.abort(transaction_id);
3879
3880 #[cfg(feature = "metrics")]
3881 if result.is_ok() {
3882 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3883 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3884 #[cfg(not(target_arch = "wasm32"))]
3885 if let Some(start) = self.tx_start_time.lock().take() {
3886 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3887 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3888 }
3889 }
3890
3891 result
3892 }
3893
3894 #[cfg(feature = "lpg")]
3904 pub fn savepoint(&self, name: &str) -> Result<()> {
3905 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3906 grafeo_common::utils::error::Error::Transaction(
3907 grafeo_common::utils::error::TransactionError::InvalidState(
3908 "No active transaction".to_string(),
3909 ),
3910 )
3911 })?;
3912
3913 let touched = self.touched_graphs.lock().clone();
3915 let graph_snapshots: Vec<GraphSavepoint> = touched
3916 .iter()
3917 .map(|graph_name| {
3918 let store = self.resolve_store(graph_name);
3919 GraphSavepoint {
3920 graph_name: graph_name.clone(),
3921 next_node_id: store.peek_next_node_id(),
3922 next_edge_id: store.peek_next_edge_id(),
3923 undo_log_position: store.property_undo_log_position(tx_id),
3924 }
3925 })
3926 .collect();
3927
3928 self.savepoints.lock().push(SavepointState {
3929 name: name.to_string(),
3930 graph_snapshots,
3931 active_graph: self.current_graph.lock().clone(),
3932 #[cfg(feature = "cdc")]
3933 cdc_event_position: self
3934 .cdc_pending_events
3935 .as_ref()
3936 .map_or(0, |p| p.lock().len()),
3937 });
3938 Ok(())
3939 }
3940
3941 #[cfg(feature = "lpg")]
3950 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3951 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3952 grafeo_common::utils::error::Error::Transaction(
3953 grafeo_common::utils::error::TransactionError::InvalidState(
3954 "No active transaction".to_string(),
3955 ),
3956 )
3957 })?;
3958
3959 let mut savepoints = self.savepoints.lock();
3960
3961 let pos = savepoints
3963 .iter()
3964 .rposition(|sp| sp.name == name)
3965 .ok_or_else(|| {
3966 grafeo_common::utils::error::Error::Transaction(
3967 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3968 "Savepoint '{name}' not found"
3969 )),
3970 )
3971 })?;
3972
3973 let sp_state = savepoints[pos].clone();
3974
3975 savepoints.truncate(pos);
3977 drop(savepoints);
3978
3979 for gs in &sp_state.graph_snapshots {
3981 let store = self.resolve_store(&gs.graph_name);
3982
3983 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3985
3986 let current_next_node = store.peek_next_node_id();
3988 let current_next_edge = store.peek_next_edge_id();
3989
3990 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3991 .map(NodeId::new)
3992 .collect();
3993 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3994 .map(EdgeId::new)
3995 .collect();
3996
3997 if !node_ids.is_empty() || !edge_ids.is_empty() {
3998 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3999 }
4000 }
4001
4002 let touched = self.touched_graphs.lock().clone();
4006 for graph_name in &touched {
4007 let already_captured = sp_state
4008 .graph_snapshots
4009 .iter()
4010 .any(|gs| gs.graph_name == *graph_name);
4011 if !already_captured {
4012 let store = self.resolve_store(graph_name);
4013 store.discard_uncommitted_versions(transaction_id);
4014 }
4015 }
4016
4017 #[cfg(feature = "cdc")]
4019 if let Some(ref pending) = self.cdc_pending_events {
4020 pending.lock().truncate(sp_state.cdc_event_position);
4021 }
4022
4023 let mut touched = self.touched_graphs.lock();
4025 touched.clear();
4026 for gs in &sp_state.graph_snapshots {
4027 if !touched.contains(&gs.graph_name) {
4028 touched.push(gs.graph_name.clone());
4029 }
4030 }
4031
4032 Ok(())
4033 }
4034
4035 pub fn release_savepoint(&self, name: &str) -> Result<()> {
4041 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4042 grafeo_common::utils::error::Error::Transaction(
4043 grafeo_common::utils::error::TransactionError::InvalidState(
4044 "No active transaction".to_string(),
4045 ),
4046 )
4047 })?;
4048
4049 let mut savepoints = self.savepoints.lock();
4050 let pos = savepoints
4051 .iter()
4052 .rposition(|sp| sp.name == name)
4053 .ok_or_else(|| {
4054 grafeo_common::utils::error::Error::Transaction(
4055 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4056 "Savepoint '{name}' not found"
4057 )),
4058 )
4059 })?;
4060 savepoints.remove(pos);
4061 Ok(())
4062 }
4063
4064 #[must_use]
4066 pub fn in_transaction(&self) -> bool {
4067 self.current_transaction.lock().is_some()
4068 }
4069
4070 #[must_use]
4072 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4073 *self.current_transaction.lock()
4074 }
4075
4076 #[must_use]
4078 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4079 &self.transaction_manager
4080 }
4081
4082 #[cfg(feature = "lpg")]
4084 #[must_use]
4085 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4086 (
4087 self.transaction_start_node_count.load(Ordering::Relaxed),
4088 self.active_lpg_store().node_count(),
4089 )
4090 }
4091
4092 #[cfg(feature = "lpg")]
4094 #[must_use]
4095 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4096 (
4097 self.transaction_start_edge_count.load(Ordering::Relaxed),
4098 self.active_lpg_store().edge_count(),
4099 )
4100 }
4101
4102 #[cfg(feature = "lpg")]
4136 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4137 crate::transaction::PreparedCommit::new(self)
4138 }
4139
4140 pub fn set_auto_commit(&mut self, auto_commit: bool) {
4142 self.auto_commit = auto_commit;
4143 }
4144
4145 #[must_use]
4147 pub fn auto_commit(&self) -> bool {
4148 self.auto_commit
4149 }
4150
4151 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4156 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4157 }
4158
4159 #[cfg(feature = "lpg")]
4162 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4163 where
4164 F: FnOnce() -> Result<QueryResult>,
4165 {
4166 if self.needs_auto_commit(has_mutations) {
4167 self.begin_transaction_inner(false, None)?;
4168 match body() {
4169 Ok(result) => {
4170 self.commit_inner()?;
4171 Ok(result)
4172 }
4173 Err(e) => {
4174 let _ = self.rollback_inner();
4175 Err(e)
4176 }
4177 }
4178 } else {
4179 body()
4180 }
4181 }
4182
4183 #[cfg(not(feature = "lpg"))]
4185 fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4186 where
4187 F: FnOnce() -> Result<QueryResult>,
4188 {
4189 body()
4190 }
4191
4192 fn query_looks_like_mutation(query: &str) -> bool {
4198 let upper = query.to_ascii_uppercase();
4199 upper.contains("INSERT")
4200 || upper.contains("CREATE")
4201 || upper.contains("DELETE")
4202 || upper.contains("MERGE")
4203 || upper.contains("SET")
4204 || upper.contains("REMOVE")
4205 || upper.contains("DROP")
4206 || upper.contains("ALTER")
4207 }
4208
4209 #[must_use]
4211 fn query_deadline(&self) -> Option<Instant> {
4212 #[cfg(not(target_arch = "wasm32"))]
4213 {
4214 self.query_timeout.map(|d| Instant::now() + d)
4215 }
4216 #[cfg(target_arch = "wasm32")]
4217 {
4218 let _ = &self.query_timeout;
4219 None
4220 }
4221 }
4222
4223 #[cfg(feature = "metrics")]
4229 fn record_query_metrics(
4230 &self,
4231 language: &str,
4232 elapsed_ms: Option<f64>,
4233 result: &Result<crate::database::QueryResult>,
4234 ) {
4235 use crate::metrics::record_metric;
4236
4237 record_metric!(self.metrics, query_count, inc);
4238 if let Some(ref reg) = self.metrics {
4239 reg.query_count_by_language.increment(language);
4240 }
4241 if let Some(ms) = elapsed_ms {
4242 record_metric!(self.metrics, query_latency, observe ms);
4243 }
4244 match result {
4245 Ok(r) => {
4246 let returned = r.rows.len() as u64;
4247 record_metric!(self.metrics, rows_returned, add returned);
4248 if let Some(scanned) = r.rows_scanned {
4249 record_metric!(self.metrics, rows_scanned, add scanned);
4250 }
4251 }
4252 Err(e) => {
4253 record_metric!(self.metrics, query_errors, inc);
4254 let msg = e.to_string();
4256 if msg.contains("exceeded timeout") {
4257 record_metric!(self.metrics, query_timeouts, inc);
4258 }
4259 }
4260 }
4261 }
4262
4263 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4265 use grafeo_adapters::query::gql::ast::{Expression, Literal};
4266 match expr {
4267 Expression::Literal(Literal::Integer(n)) => Some(*n),
4268 _ => None,
4269 }
4270 }
4271
4272 #[must_use]
4278 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4279 if let Some(epoch) = *self.viewing_epoch_override.lock() {
4281 return (epoch, None);
4282 }
4283
4284 if let Some(transaction_id) = *self.current_transaction.lock() {
4285 let epoch = self
4287 .transaction_manager
4288 .start_epoch(transaction_id)
4289 .unwrap_or_else(|| self.transaction_manager.current_epoch());
4290 (epoch, Some(transaction_id))
4291 } else {
4292 (self.transaction_manager.current_epoch(), None)
4294 }
4295 }
4296
4297 fn create_planner_for_store(
4302 &self,
4303 store: Arc<dyn GraphStore>,
4304 viewing_epoch: EpochId,
4305 transaction_id: Option<TransactionId>,
4306 ) -> crate::query::Planner {
4307 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4308 }
4309
4310 fn create_planner_for_store_with_read_only(
4311 &self,
4312 store: Arc<dyn GraphStore>,
4313 viewing_epoch: EpochId,
4314 transaction_id: Option<TransactionId>,
4315 read_only: bool,
4316 ) -> crate::query::Planner {
4317 use crate::query::Planner;
4318 use grafeo_core::execution::operators::{LazyValue, SessionContext};
4319
4320 let info_store = Arc::clone(&store);
4322 let schema_store = Arc::clone(&store);
4323
4324 let session_context = SessionContext {
4325 current_schema: self.current_schema(),
4326 current_graph: self.current_graph(),
4327 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4328 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4329 };
4330
4331 let write_store = self.active_write_store();
4332
4333 let mut planner = Planner::with_context(
4334 Arc::clone(&store),
4335 write_store,
4336 Arc::clone(&self.transaction_manager),
4337 transaction_id,
4338 viewing_epoch,
4339 )
4340 .with_factorized_execution(self.factorized_execution)
4341 .with_catalog(Arc::clone(&self.catalog))
4342 .with_session_context(session_context)
4343 .with_read_only(read_only);
4344
4345 let validator =
4347 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
4348 planner = planner.with_validator(Arc::new(validator));
4349
4350 planner
4351 }
4352
4353 fn build_info_value(store: &dyn GraphStore) -> Value {
4355 use grafeo_common::types::PropertyKey;
4356 use std::collections::BTreeMap;
4357
4358 let mut map = BTreeMap::new();
4359 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4360 map.insert(
4361 PropertyKey::from("node_count"),
4362 Value::Int64(store.node_count() as i64),
4363 );
4364 map.insert(
4365 PropertyKey::from("edge_count"),
4366 Value::Int64(store.edge_count() as i64),
4367 );
4368 map.insert(
4369 PropertyKey::from("version"),
4370 Value::String(env!("CARGO_PKG_VERSION").into()),
4371 );
4372 Value::Map(map.into())
4373 }
4374
4375 fn build_schema_value(store: &dyn GraphStore) -> Value {
4377 use grafeo_common::types::PropertyKey;
4378 use std::collections::BTreeMap;
4379
4380 let labels: Vec<Value> = store
4381 .all_labels()
4382 .into_iter()
4383 .map(|l| Value::String(l.into()))
4384 .collect();
4385 let edge_types: Vec<Value> = store
4386 .all_edge_types()
4387 .into_iter()
4388 .map(|t| Value::String(t.into()))
4389 .collect();
4390 let property_keys: Vec<Value> = store
4391 .all_property_keys()
4392 .into_iter()
4393 .map(|k| Value::String(k.into()))
4394 .collect();
4395
4396 let mut map = BTreeMap::new();
4397 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4398 map.insert(
4399 PropertyKey::from("edge_types"),
4400 Value::List(edge_types.into()),
4401 );
4402 map.insert(
4403 PropertyKey::from("property_keys"),
4404 Value::List(property_keys.into()),
4405 );
4406 Value::Map(map.into())
4407 }
4408
4409 #[cfg(feature = "lpg")]
4414 pub fn create_node(&self, labels: &[&str]) -> NodeId {
4415 let (epoch, transaction_id) = self.get_transaction_context();
4416 self.active_lpg_store().create_node_versioned(
4417 labels,
4418 epoch,
4419 transaction_id.unwrap_or(TransactionId::SYSTEM),
4420 )
4421 }
4422
4423 #[cfg(feature = "lpg")]
4427 pub fn create_node_with_props<'a>(
4428 &self,
4429 labels: &[&str],
4430 properties: impl IntoIterator<Item = (&'a str, Value)>,
4431 ) -> NodeId {
4432 let (epoch, transaction_id) = self.get_transaction_context();
4433 self.active_lpg_store().create_node_with_props_versioned(
4434 labels,
4435 properties,
4436 epoch,
4437 transaction_id.unwrap_or(TransactionId::SYSTEM),
4438 )
4439 }
4440
4441 #[cfg(feature = "lpg")]
4446 pub fn create_edge(
4447 &self,
4448 src: NodeId,
4449 dst: NodeId,
4450 edge_type: &str,
4451 ) -> grafeo_common::types::EdgeId {
4452 let (epoch, transaction_id) = self.get_transaction_context();
4453 self.active_lpg_store().create_edge_versioned(
4454 src,
4455 dst,
4456 edge_type,
4457 epoch,
4458 transaction_id.unwrap_or(TransactionId::SYSTEM),
4459 )
4460 }
4461
4462 #[cfg(feature = "lpg")]
4464 pub fn create_edge_with_props<'a>(
4465 &self,
4466 src: NodeId,
4467 dst: NodeId,
4468 edge_type: &str,
4469 properties: impl IntoIterator<Item = (&'a str, Value)>,
4470 ) -> grafeo_common::types::EdgeId {
4471 let (epoch, transaction_id) = self.get_transaction_context();
4472 let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4473 let store = self.active_lpg_store();
4474 let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4475 for (key, value) in properties {
4476 store.set_edge_property_versioned(eid, key, value, tid);
4477 }
4478 eid
4479 }
4480
4481 #[cfg(feature = "lpg")]
4483 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4484 let (_, transaction_id) = self.get_transaction_context();
4485 if let Some(tid) = transaction_id {
4486 self.active_lpg_store()
4487 .set_node_property_versioned(id, key, value, tid);
4488 } else {
4489 self.active_lpg_store().set_node_property(id, key, value);
4490 }
4491 }
4492
4493 #[cfg(feature = "lpg")]
4495 pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4496 let (_, transaction_id) = self.get_transaction_context();
4497 if let Some(tid) = transaction_id {
4498 self.active_lpg_store()
4499 .set_edge_property_versioned(id, key, value, tid);
4500 } else {
4501 self.active_lpg_store().set_edge_property(id, key, value);
4502 }
4503 }
4504
4505 #[cfg(feature = "lpg")]
4507 pub fn delete_node(&self, id: NodeId) -> bool {
4508 let (epoch, transaction_id) = self.get_transaction_context();
4509 if let Some(tid) = transaction_id {
4510 self.active_lpg_store()
4511 .delete_node_versioned(id, epoch, tid)
4512 } else {
4513 self.active_lpg_store().delete_node(id)
4514 }
4515 }
4516
4517 #[cfg(feature = "lpg")]
4519 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4520 let (epoch, transaction_id) = self.get_transaction_context();
4521 if let Some(tid) = transaction_id {
4522 self.active_lpg_store()
4523 .delete_edge_versioned(id, epoch, tid)
4524 } else {
4525 self.active_lpg_store().delete_edge(id)
4526 }
4527 }
4528
4529 #[cfg(feature = "lpg")]
4557 #[must_use]
4558 pub fn get_node(&self, id: NodeId) -> Option<Node> {
4559 let (epoch, transaction_id) = self.get_transaction_context();
4560 self.active_lpg_store().get_node_versioned(
4561 id,
4562 epoch,
4563 transaction_id.unwrap_or(TransactionId::SYSTEM),
4564 )
4565 }
4566
4567 #[cfg(feature = "lpg")]
4591 #[must_use]
4592 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4593 self.get_node(id)
4594 .and_then(|node| node.get_property(key).cloned())
4595 }
4596
4597 #[cfg(feature = "lpg")]
4604 #[must_use]
4605 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4606 let (epoch, transaction_id) = self.get_transaction_context();
4607 self.active_lpg_store().get_edge_versioned(
4608 id,
4609 epoch,
4610 transaction_id.unwrap_or(TransactionId::SYSTEM),
4611 )
4612 }
4613
4614 #[cfg(feature = "lpg")]
4640 #[must_use]
4641 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4642 self.active_lpg_store()
4643 .edges_from(node, Direction::Outgoing)
4644 .collect()
4645 }
4646
4647 #[cfg(feature = "lpg")]
4656 #[must_use]
4657 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4658 self.active_lpg_store()
4659 .edges_from(node, Direction::Incoming)
4660 .collect()
4661 }
4662
4663 #[cfg(feature = "lpg")]
4675 #[must_use]
4676 pub fn get_neighbors_outgoing_by_type(
4677 &self,
4678 node: NodeId,
4679 edge_type: &str,
4680 ) -> Vec<(NodeId, EdgeId)> {
4681 self.active_lpg_store()
4682 .edges_from(node, Direction::Outgoing)
4683 .filter(|(_, edge_id)| {
4684 self.get_edge(*edge_id)
4685 .is_some_and(|e| e.edge_type.as_str() == edge_type)
4686 })
4687 .collect()
4688 }
4689
4690 #[cfg(feature = "lpg")]
4697 #[must_use]
4698 pub fn node_exists(&self, id: NodeId) -> bool {
4699 self.get_node(id).is_some()
4700 }
4701
4702 #[cfg(feature = "lpg")]
4704 #[must_use]
4705 pub fn edge_exists(&self, id: EdgeId) -> bool {
4706 self.get_edge(id).is_some()
4707 }
4708
4709 #[cfg(feature = "lpg")]
4713 #[must_use]
4714 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4715 let active = self.active_lpg_store();
4716 let out = active.out_degree(node);
4717 let in_degree = active.in_degree(node);
4718 (out, in_degree)
4719 }
4720
4721 #[cfg(feature = "lpg")]
4731 #[must_use]
4732 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4733 let (epoch, transaction_id) = self.get_transaction_context();
4734 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4735 let active = self.active_lpg_store();
4736 ids.iter()
4737 .map(|&id| active.get_node_versioned(id, epoch, tx))
4738 .collect()
4739 }
4740
4741 #[cfg(feature = "cdc")]
4749 pub fn history(
4750 &self,
4751 entity_id: impl Into<crate::cdc::EntityId>,
4752 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4753 Ok(self.cdc_log.history(entity_id.into()))
4754 }
4755
4756 #[cfg(feature = "cdc")]
4762 pub fn history_since(
4763 &self,
4764 entity_id: impl Into<crate::cdc::EntityId>,
4765 since_epoch: EpochId,
4766 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4767 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4768 }
4769
4770 #[cfg(feature = "cdc")]
4776 pub fn changes_between(
4777 &self,
4778 start_epoch: EpochId,
4779 end_epoch: EpochId,
4780 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4781 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4782 }
4783}
4784
4785impl Drop for Session {
4786 fn drop(&mut self) {
4787 #[cfg(feature = "lpg")]
4790 if self.in_transaction() {
4791 let _ = self.rollback_inner();
4792 }
4793
4794 #[cfg(feature = "metrics")]
4795 if let Some(ref reg) = self.metrics {
4796 reg.session_active
4797 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4798 }
4799 }
4800}
4801
4802#[cfg(test)]
4803mod tests {
4804 use super::parse_default_literal;
4805 use crate::database::GrafeoDB;
4806 use grafeo_common::types::Value;
4807
4808 #[test]
4813 fn parse_default_literal_null() {
4814 assert_eq!(parse_default_literal("null"), Value::Null);
4815 assert_eq!(parse_default_literal("NULL"), Value::Null);
4816 assert_eq!(parse_default_literal("Null"), Value::Null);
4817 }
4818
4819 #[test]
4820 fn parse_default_literal_bool() {
4821 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4822 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4823 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4824 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4825 }
4826
4827 #[test]
4828 fn parse_default_literal_string_single_quoted() {
4829 assert_eq!(
4830 parse_default_literal("'hello'"),
4831 Value::String("hello".into())
4832 );
4833 }
4834
4835 #[test]
4836 fn parse_default_literal_string_double_quoted() {
4837 assert_eq!(
4838 parse_default_literal("\"world\""),
4839 Value::String("world".into())
4840 );
4841 }
4842
4843 #[test]
4844 fn parse_default_literal_integer() {
4845 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4846 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4847 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4848 }
4849
4850 #[test]
4851 fn parse_default_literal_float() {
4852 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4853 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4854 }
4855
4856 #[test]
4857 fn parse_default_literal_fallback_string() {
4858 assert_eq!(
4860 parse_default_literal("some_identifier"),
4861 Value::String("some_identifier".into())
4862 );
4863 }
4864
4865 #[test]
4866 fn test_session_create_node() {
4867 let db = GrafeoDB::new_in_memory();
4868 let session = db.session();
4869
4870 let id = session.create_node(&["Person"]);
4871 assert!(id.is_valid());
4872 assert_eq!(db.node_count(), 1);
4873 }
4874
4875 #[test]
4876 fn test_session_transaction() {
4877 let db = GrafeoDB::new_in_memory();
4878 let mut session = db.session();
4879
4880 assert!(!session.in_transaction());
4881
4882 session.begin_transaction().unwrap();
4883 assert!(session.in_transaction());
4884
4885 session.commit().unwrap();
4886 assert!(!session.in_transaction());
4887 }
4888
4889 #[test]
4890 fn test_session_transaction_context() {
4891 let db = GrafeoDB::new_in_memory();
4892 let mut session = db.session();
4893
4894 let (_epoch1, transaction_id1) = session.get_transaction_context();
4896 assert!(transaction_id1.is_none());
4897
4898 session.begin_transaction().unwrap();
4900 let (epoch2, transaction_id2) = session.get_transaction_context();
4901 assert!(transaction_id2.is_some());
4902 let _ = epoch2; session.commit().unwrap();
4907 let (epoch3, tx_id3) = session.get_transaction_context();
4908 assert!(tx_id3.is_none());
4909 assert!(epoch3.as_u64() >= epoch2.as_u64());
4911 }
4912
4913 #[test]
4914 fn test_session_rollback() {
4915 let db = GrafeoDB::new_in_memory();
4916 let mut session = db.session();
4917
4918 session.begin_transaction().unwrap();
4919 session.rollback().unwrap();
4920 assert!(!session.in_transaction());
4921 }
4922
4923 #[test]
4924 fn test_session_rollback_discards_versions() {
4925 use grafeo_common::types::TransactionId;
4926
4927 let db = GrafeoDB::new_in_memory();
4928
4929 let node_before = db.store().create_node(&["Person"]);
4931 assert!(node_before.is_valid());
4932 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4933
4934 let mut session = db.session();
4936 session.begin_transaction().unwrap();
4937 let transaction_id = session.current_transaction.lock().unwrap();
4938
4939 let epoch = db.store().current_epoch();
4941 let node_in_tx = db
4942 .store()
4943 .create_node_versioned(&["Person"], epoch, transaction_id);
4944 assert!(node_in_tx.is_valid());
4945
4946 assert_eq!(
4950 db.node_count(),
4951 1,
4952 "PENDING nodes should be invisible to non-versioned node_count()"
4953 );
4954 assert!(
4955 db.store()
4956 .get_node_versioned(node_in_tx, epoch, transaction_id)
4957 .is_some(),
4958 "Transaction node should be visible to its own transaction"
4959 );
4960
4961 session.rollback().unwrap();
4963 assert!(!session.in_transaction());
4964
4965 let count_after = db.node_count();
4968 assert_eq!(
4969 count_after, 1,
4970 "Rollback should discard uncommitted node, but got {count_after}"
4971 );
4972
4973 let current_epoch = db.store().current_epoch();
4975 assert!(
4976 db.store()
4977 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4978 .is_some(),
4979 "Original node should still exist"
4980 );
4981
4982 assert!(
4984 db.store()
4985 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4986 .is_none(),
4987 "Transaction node should be gone"
4988 );
4989 }
4990
4991 #[test]
4992 fn test_session_create_node_in_transaction() {
4993 let db = GrafeoDB::new_in_memory();
4995
4996 let node_before = db.create_node(&["Person"]);
4998 assert!(node_before.is_valid());
4999 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5000
5001 let mut session = db.session();
5003 session.begin_transaction().unwrap();
5004 let transaction_id = session.current_transaction.lock().unwrap();
5005
5006 let node_in_tx = session.create_node(&["Person"]);
5008 assert!(node_in_tx.is_valid());
5009
5010 assert_eq!(
5013 db.node_count(),
5014 1,
5015 "PENDING nodes should be invisible to non-versioned node_count()"
5016 );
5017 let epoch = db.store().current_epoch();
5018 assert!(
5019 db.store()
5020 .get_node_versioned(node_in_tx, epoch, transaction_id)
5021 .is_some(),
5022 "Transaction node should be visible to its own transaction"
5023 );
5024
5025 session.rollback().unwrap();
5027
5028 let count_after = db.node_count();
5030 assert_eq!(
5031 count_after, 1,
5032 "Rollback should discard node created via session.create_node(), but got {count_after}"
5033 );
5034 }
5035
5036 #[test]
5037 fn test_session_create_node_with_props_in_transaction() {
5038 use grafeo_common::types::Value;
5039
5040 let db = GrafeoDB::new_in_memory();
5042
5043 db.create_node(&["Person"]);
5045 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5046
5047 let mut session = db.session();
5049 session.begin_transaction().unwrap();
5050 let transaction_id = session.current_transaction.lock().unwrap();
5051
5052 let node_in_tx =
5053 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5054 assert!(node_in_tx.is_valid());
5055
5056 assert_eq!(
5059 db.node_count(),
5060 1,
5061 "PENDING nodes should be invisible to non-versioned node_count()"
5062 );
5063 let epoch = db.store().current_epoch();
5064 assert!(
5065 db.store()
5066 .get_node_versioned(node_in_tx, epoch, transaction_id)
5067 .is_some(),
5068 "Transaction node should be visible to its own transaction"
5069 );
5070
5071 session.rollback().unwrap();
5073
5074 let count_after = db.node_count();
5076 assert_eq!(
5077 count_after, 1,
5078 "Rollback should discard node created via session.create_node_with_props()"
5079 );
5080 }
5081
5082 #[cfg(feature = "gql")]
5083 mod gql_tests {
5084 use super::*;
5085
5086 #[test]
5087 fn test_gql_query_execution() {
5088 let db = GrafeoDB::new_in_memory();
5089 let session = db.session();
5090
5091 session.create_node(&["Person"]);
5093 session.create_node(&["Person"]);
5094 session.create_node(&["Animal"]);
5095
5096 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5098
5099 assert_eq!(result.row_count(), 2);
5101 assert_eq!(result.column_count(), 1);
5102 assert_eq!(result.columns[0], "n");
5103 }
5104
5105 #[test]
5106 fn test_gql_empty_result() {
5107 let db = GrafeoDB::new_in_memory();
5108 let session = db.session();
5109
5110 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5112
5113 assert_eq!(result.row_count(), 0);
5114 }
5115
5116 #[test]
5117 fn test_gql_parse_error() {
5118 let db = GrafeoDB::new_in_memory();
5119 let session = db.session();
5120
5121 let result = session.execute("MATCH (n RETURN n");
5123
5124 assert!(result.is_err());
5125 }
5126
5127 #[test]
5128 fn test_gql_relationship_traversal() {
5129 let db = GrafeoDB::new_in_memory();
5130 let session = db.session();
5131
5132 let alix = session.create_node(&["Person"]);
5134 let gus = session.create_node(&["Person"]);
5135 let vincent = session.create_node(&["Person"]);
5136
5137 session.create_edge(alix, gus, "KNOWS");
5138 session.create_edge(alix, vincent, "KNOWS");
5139
5140 let result = session
5142 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5143 .unwrap();
5144
5145 assert_eq!(result.row_count(), 2);
5147 assert_eq!(result.column_count(), 2);
5148 assert_eq!(result.columns[0], "a");
5149 assert_eq!(result.columns[1], "b");
5150 }
5151
5152 #[test]
5153 fn test_gql_relationship_with_type_filter() {
5154 let db = GrafeoDB::new_in_memory();
5155 let session = db.session();
5156
5157 let alix = session.create_node(&["Person"]);
5159 let gus = session.create_node(&["Person"]);
5160 let vincent = session.create_node(&["Person"]);
5161
5162 session.create_edge(alix, gus, "KNOWS");
5163 session.create_edge(alix, vincent, "WORKS_WITH");
5164
5165 let result = session
5167 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5168 .unwrap();
5169
5170 assert_eq!(result.row_count(), 1);
5172 }
5173
5174 #[test]
5175 fn test_gql_semantic_error_undefined_variable() {
5176 let db = GrafeoDB::new_in_memory();
5177 let session = db.session();
5178
5179 let result = session.execute("MATCH (n:Person) RETURN x");
5181
5182 assert!(result.is_err());
5184 let Err(err) = result else {
5185 panic!("Expected error")
5186 };
5187 assert!(
5188 err.to_string().contains("Undefined variable"),
5189 "Expected undefined variable error, got: {}",
5190 err
5191 );
5192 }
5193
5194 #[test]
5195 fn test_gql_where_clause_property_filter() {
5196 use grafeo_common::types::Value;
5197
5198 let db = GrafeoDB::new_in_memory();
5199 let session = db.session();
5200
5201 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
5203 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
5204 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
5205
5206 let result = session
5208 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5209 .unwrap();
5210
5211 assert_eq!(result.row_count(), 2);
5213 }
5214
5215 #[test]
5216 fn test_gql_where_clause_equality() {
5217 use grafeo_common::types::Value;
5218
5219 let db = GrafeoDB::new_in_memory();
5220 let session = db.session();
5221
5222 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5224 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
5225 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5226
5227 let result = session
5229 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5230 .unwrap();
5231
5232 assert_eq!(result.row_count(), 2);
5234 }
5235
5236 #[test]
5237 fn test_gql_return_property_access() {
5238 use grafeo_common::types::Value;
5239
5240 let db = GrafeoDB::new_in_memory();
5241 let session = db.session();
5242
5243 session.create_node_with_props(
5245 &["Person"],
5246 [
5247 ("name", Value::String("Alix".into())),
5248 ("age", Value::Int64(30)),
5249 ],
5250 );
5251 session.create_node_with_props(
5252 &["Person"],
5253 [
5254 ("name", Value::String("Gus".into())),
5255 ("age", Value::Int64(25)),
5256 ],
5257 );
5258
5259 let result = session
5261 .execute("MATCH (n:Person) RETURN n.name, n.age")
5262 .unwrap();
5263
5264 assert_eq!(result.row_count(), 2);
5266 assert_eq!(result.column_count(), 2);
5267 assert_eq!(result.columns[0], "n.name");
5268 assert_eq!(result.columns[1], "n.age");
5269
5270 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5272 assert!(names.contains(&&Value::String("Alix".into())));
5273 assert!(names.contains(&&Value::String("Gus".into())));
5274 }
5275
5276 #[test]
5277 fn test_gql_return_mixed_expressions() {
5278 use grafeo_common::types::Value;
5279
5280 let db = GrafeoDB::new_in_memory();
5281 let session = db.session();
5282
5283 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5285
5286 let result = session
5288 .execute("MATCH (n:Person) RETURN n, n.name")
5289 .unwrap();
5290
5291 assert_eq!(result.row_count(), 1);
5292 assert_eq!(result.column_count(), 2);
5293 assert_eq!(result.columns[0], "n");
5294 assert_eq!(result.columns[1], "n.name");
5295
5296 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5298 }
5299 }
5300
5301 #[cfg(feature = "cypher")]
5302 mod cypher_tests {
5303 use super::*;
5304
5305 #[test]
5306 fn test_cypher_query_execution() {
5307 let db = GrafeoDB::new_in_memory();
5308 let session = db.session();
5309
5310 session.create_node(&["Person"]);
5312 session.create_node(&["Person"]);
5313 session.create_node(&["Animal"]);
5314
5315 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5317
5318 assert_eq!(result.row_count(), 2);
5320 assert_eq!(result.column_count(), 1);
5321 assert_eq!(result.columns[0], "n");
5322 }
5323
5324 #[test]
5325 fn test_cypher_empty_result() {
5326 let db = GrafeoDB::new_in_memory();
5327 let session = db.session();
5328
5329 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5331
5332 assert_eq!(result.row_count(), 0);
5333 }
5334
5335 #[test]
5336 fn test_cypher_parse_error() {
5337 let db = GrafeoDB::new_in_memory();
5338 let session = db.session();
5339
5340 let result = session.execute_cypher("MATCH (n RETURN n");
5342
5343 assert!(result.is_err());
5344 }
5345 }
5346
5347 mod direct_lookup_tests {
5350 use super::*;
5351 use grafeo_common::types::Value;
5352
5353 #[test]
5354 fn test_get_node() {
5355 let db = GrafeoDB::new_in_memory();
5356 let session = db.session();
5357
5358 let id = session.create_node(&["Person"]);
5359 let node = session.get_node(id);
5360
5361 assert!(node.is_some());
5362 let node = node.unwrap();
5363 assert_eq!(node.id, id);
5364 }
5365
5366 #[test]
5367 fn test_get_node_not_found() {
5368 use grafeo_common::types::NodeId;
5369
5370 let db = GrafeoDB::new_in_memory();
5371 let session = db.session();
5372
5373 let node = session.get_node(NodeId::new(9999));
5375 assert!(node.is_none());
5376 }
5377
5378 #[test]
5379 fn test_get_node_property() {
5380 let db = GrafeoDB::new_in_memory();
5381 let session = db.session();
5382
5383 let id = session
5384 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5385
5386 let name = session.get_node_property(id, "name");
5387 assert_eq!(name, Some(Value::String("Alix".into())));
5388
5389 let missing = session.get_node_property(id, "missing");
5391 assert!(missing.is_none());
5392 }
5393
5394 #[test]
5395 fn test_get_edge() {
5396 let db = GrafeoDB::new_in_memory();
5397 let session = db.session();
5398
5399 let alix = session.create_node(&["Person"]);
5400 let gus = session.create_node(&["Person"]);
5401 let edge_id = session.create_edge(alix, gus, "KNOWS");
5402
5403 let edge = session.get_edge(edge_id);
5404 assert!(edge.is_some());
5405 let edge = edge.unwrap();
5406 assert_eq!(edge.id, edge_id);
5407 assert_eq!(edge.src, alix);
5408 assert_eq!(edge.dst, gus);
5409 }
5410
5411 #[test]
5412 fn test_get_edge_not_found() {
5413 use grafeo_common::types::EdgeId;
5414
5415 let db = GrafeoDB::new_in_memory();
5416 let session = db.session();
5417
5418 let edge = session.get_edge(EdgeId::new(9999));
5419 assert!(edge.is_none());
5420 }
5421
5422 #[test]
5423 fn test_get_neighbors_outgoing() {
5424 let db = GrafeoDB::new_in_memory();
5425 let session = db.session();
5426
5427 let alix = session.create_node(&["Person"]);
5428 let gus = session.create_node(&["Person"]);
5429 let harm = session.create_node(&["Person"]);
5430
5431 session.create_edge(alix, gus, "KNOWS");
5432 session.create_edge(alix, harm, "KNOWS");
5433
5434 let neighbors = session.get_neighbors_outgoing(alix);
5435 assert_eq!(neighbors.len(), 2);
5436
5437 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5438 assert!(neighbor_ids.contains(&gus));
5439 assert!(neighbor_ids.contains(&harm));
5440 }
5441
5442 #[test]
5443 fn test_get_neighbors_incoming() {
5444 let db = GrafeoDB::new_in_memory();
5445 let session = db.session();
5446
5447 let alix = session.create_node(&["Person"]);
5448 let gus = session.create_node(&["Person"]);
5449 let harm = session.create_node(&["Person"]);
5450
5451 session.create_edge(gus, alix, "KNOWS");
5452 session.create_edge(harm, alix, "KNOWS");
5453
5454 let neighbors = session.get_neighbors_incoming(alix);
5455 assert_eq!(neighbors.len(), 2);
5456
5457 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5458 assert!(neighbor_ids.contains(&gus));
5459 assert!(neighbor_ids.contains(&harm));
5460 }
5461
5462 #[test]
5463 fn test_get_neighbors_outgoing_by_type() {
5464 let db = GrafeoDB::new_in_memory();
5465 let session = db.session();
5466
5467 let alix = session.create_node(&["Person"]);
5468 let gus = session.create_node(&["Person"]);
5469 let company = session.create_node(&["Company"]);
5470
5471 session.create_edge(alix, gus, "KNOWS");
5472 session.create_edge(alix, company, "WORKS_AT");
5473
5474 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5475 assert_eq!(knows_neighbors.len(), 1);
5476 assert_eq!(knows_neighbors[0].0, gus);
5477
5478 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5479 assert_eq!(works_neighbors.len(), 1);
5480 assert_eq!(works_neighbors[0].0, company);
5481
5482 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5484 assert!(no_neighbors.is_empty());
5485 }
5486
5487 #[test]
5488 fn test_node_exists() {
5489 use grafeo_common::types::NodeId;
5490
5491 let db = GrafeoDB::new_in_memory();
5492 let session = db.session();
5493
5494 let id = session.create_node(&["Person"]);
5495
5496 assert!(session.node_exists(id));
5497 assert!(!session.node_exists(NodeId::new(9999)));
5498 }
5499
5500 #[test]
5501 fn test_edge_exists() {
5502 use grafeo_common::types::EdgeId;
5503
5504 let db = GrafeoDB::new_in_memory();
5505 let session = db.session();
5506
5507 let alix = session.create_node(&["Person"]);
5508 let gus = session.create_node(&["Person"]);
5509 let edge_id = session.create_edge(alix, gus, "KNOWS");
5510
5511 assert!(session.edge_exists(edge_id));
5512 assert!(!session.edge_exists(EdgeId::new(9999)));
5513 }
5514
5515 #[test]
5516 fn test_get_degree() {
5517 let db = GrafeoDB::new_in_memory();
5518 let session = db.session();
5519
5520 let alix = session.create_node(&["Person"]);
5521 let gus = session.create_node(&["Person"]);
5522 let harm = session.create_node(&["Person"]);
5523
5524 session.create_edge(alix, gus, "KNOWS");
5526 session.create_edge(alix, harm, "KNOWS");
5527 session.create_edge(gus, alix, "KNOWS");
5529
5530 let (out_degree, in_degree) = session.get_degree(alix);
5531 assert_eq!(out_degree, 2);
5532 assert_eq!(in_degree, 1);
5533
5534 let lonely = session.create_node(&["Person"]);
5536 let (out, in_deg) = session.get_degree(lonely);
5537 assert_eq!(out, 0);
5538 assert_eq!(in_deg, 0);
5539 }
5540
5541 #[test]
5542 fn test_get_nodes_batch() {
5543 let db = GrafeoDB::new_in_memory();
5544 let session = db.session();
5545
5546 let alix = session.create_node(&["Person"]);
5547 let gus = session.create_node(&["Person"]);
5548 let harm = session.create_node(&["Person"]);
5549
5550 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5551 assert_eq!(nodes.len(), 3);
5552 assert!(nodes[0].is_some());
5553 assert!(nodes[1].is_some());
5554 assert!(nodes[2].is_some());
5555
5556 use grafeo_common::types::NodeId;
5558 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5559 assert_eq!(nodes_with_missing.len(), 3);
5560 assert!(nodes_with_missing[0].is_some());
5561 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
5563 }
5564
5565 #[test]
5566 fn test_auto_commit_setting() {
5567 let db = GrafeoDB::new_in_memory();
5568 let mut session = db.session();
5569
5570 assert!(session.auto_commit());
5572
5573 session.set_auto_commit(false);
5574 assert!(!session.auto_commit());
5575
5576 session.set_auto_commit(true);
5577 assert!(session.auto_commit());
5578 }
5579
5580 #[test]
5581 fn test_transaction_double_begin_nests() {
5582 let db = GrafeoDB::new_in_memory();
5583 let mut session = db.session();
5584
5585 session.begin_transaction().unwrap();
5586 let result = session.begin_transaction();
5588 assert!(result.is_ok());
5589 session.commit().unwrap();
5591 session.commit().unwrap();
5593 }
5594
5595 #[test]
5596 fn test_commit_without_transaction_error() {
5597 let db = GrafeoDB::new_in_memory();
5598 let mut session = db.session();
5599
5600 let result = session.commit();
5601 assert!(result.is_err());
5602 }
5603
5604 #[test]
5605 fn test_rollback_without_transaction_error() {
5606 let db = GrafeoDB::new_in_memory();
5607 let mut session = db.session();
5608
5609 let result = session.rollback();
5610 assert!(result.is_err());
5611 }
5612
5613 #[test]
5614 fn test_create_edge_in_transaction() {
5615 let db = GrafeoDB::new_in_memory();
5616 let mut session = db.session();
5617
5618 let alix = session.create_node(&["Person"]);
5620 let gus = session.create_node(&["Person"]);
5621
5622 session.begin_transaction().unwrap();
5624 let edge_id = session.create_edge(alix, gus, "KNOWS");
5625
5626 assert!(session.edge_exists(edge_id));
5628
5629 session.commit().unwrap();
5631
5632 assert!(session.edge_exists(edge_id));
5634 }
5635
5636 #[test]
5637 fn test_neighbors_empty_node() {
5638 let db = GrafeoDB::new_in_memory();
5639 let session = db.session();
5640
5641 let lonely = session.create_node(&["Person"]);
5642
5643 assert!(session.get_neighbors_outgoing(lonely).is_empty());
5644 assert!(session.get_neighbors_incoming(lonely).is_empty());
5645 assert!(
5646 session
5647 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5648 .is_empty()
5649 );
5650 }
5651 }
5652
5653 #[test]
5654 fn test_auto_gc_triggers_on_commit_interval() {
5655 use crate::config::Config;
5656
5657 let config = Config::in_memory().with_gc_interval(2);
5658 let db = GrafeoDB::with_config(config).unwrap();
5659 let mut session = db.session();
5660
5661 session.begin_transaction().unwrap();
5663 session.create_node(&["A"]);
5664 session.commit().unwrap();
5665
5666 session.begin_transaction().unwrap();
5668 session.create_node(&["B"]);
5669 session.commit().unwrap();
5670
5671 assert_eq!(db.node_count(), 2);
5673 }
5674
5675 #[test]
5676 fn test_query_timeout_config_propagates_to_session() {
5677 use crate::config::Config;
5678 use std::time::Duration;
5679
5680 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5681 let db = GrafeoDB::with_config(config).unwrap();
5682 let session = db.session();
5683
5684 assert!(session.query_deadline().is_some());
5686 }
5687
5688 #[test]
5689 fn test_no_query_timeout_returns_no_deadline() {
5690 let db = GrafeoDB::new_in_memory();
5691 let session = db.session();
5692
5693 assert!(session.query_deadline().is_none());
5695 }
5696
5697 #[test]
5698 fn test_graph_model_accessor() {
5699 use crate::config::GraphModel;
5700
5701 let db = GrafeoDB::new_in_memory();
5702 let session = db.session();
5703
5704 assert_eq!(session.graph_model(), GraphModel::Lpg);
5705 }
5706
5707 #[cfg(feature = "gql")]
5708 #[test]
5709 fn test_external_store_session() {
5710 use grafeo_core::graph::GraphStoreMut;
5711 use std::sync::Arc;
5712
5713 let config = crate::config::Config::in_memory();
5714 let store =
5715 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5716 let db = GrafeoDB::with_store(store, config).unwrap();
5717
5718 let mut session = db.session();
5719
5720 session.begin_transaction().unwrap();
5724 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5725
5726 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5728 assert_eq!(result.row_count(), 1);
5729
5730 session.commit().unwrap();
5731 }
5732
5733 #[cfg(feature = "gql")]
5736 mod session_command_tests {
5737 use super::*;
5738 use grafeo_common::types::Value;
5739
5740 #[test]
5741 fn test_use_graph_sets_current_graph() {
5742 let db = GrafeoDB::new_in_memory();
5743 let session = db.session();
5744
5745 session.execute("CREATE GRAPH mydb").unwrap();
5747 session.execute("USE GRAPH mydb").unwrap();
5748
5749 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5750 }
5751
5752 #[test]
5753 fn test_use_graph_nonexistent_errors() {
5754 let db = GrafeoDB::new_in_memory();
5755 let session = db.session();
5756
5757 let result = session.execute("USE GRAPH doesnotexist");
5758 assert!(result.is_err());
5759 let err = result.unwrap_err().to_string();
5760 assert!(
5761 err.contains("does not exist"),
5762 "Expected 'does not exist' error, got: {err}"
5763 );
5764 }
5765
5766 #[test]
5767 fn test_use_graph_default_always_valid() {
5768 let db = GrafeoDB::new_in_memory();
5769 let session = db.session();
5770
5771 session.execute("USE GRAPH default").unwrap();
5773 assert_eq!(session.current_graph(), Some("default".to_string()));
5774 }
5775
5776 #[test]
5777 fn test_session_set_graph() {
5778 let db = GrafeoDB::new_in_memory();
5779 let session = db.session();
5780
5781 session.execute("CREATE GRAPH analytics").unwrap();
5782 session.execute("SESSION SET GRAPH analytics").unwrap();
5783 assert_eq!(session.current_graph(), Some("analytics".to_string()));
5784 }
5785
5786 #[test]
5787 fn test_session_set_graph_nonexistent_errors() {
5788 let db = GrafeoDB::new_in_memory();
5789 let session = db.session();
5790
5791 let result = session.execute("SESSION SET GRAPH nosuchgraph");
5792 assert!(result.is_err());
5793 }
5794
5795 #[test]
5796 fn test_session_set_time_zone() {
5797 let db = GrafeoDB::new_in_memory();
5798 let session = db.session();
5799
5800 assert_eq!(session.time_zone(), None);
5801
5802 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5803 assert_eq!(session.time_zone(), Some("UTC".to_string()));
5804
5805 session
5806 .execute("SESSION SET TIME ZONE 'America/New_York'")
5807 .unwrap();
5808 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5809 }
5810
5811 #[test]
5812 fn test_session_set_parameter() {
5813 let db = GrafeoDB::new_in_memory();
5814 let session = db.session();
5815
5816 session
5817 .execute("SESSION SET PARAMETER $timeout = 30")
5818 .unwrap();
5819
5820 assert!(session.get_parameter("timeout").is_some());
5823 }
5824
5825 #[test]
5826 fn test_session_reset_clears_all_state() {
5827 let db = GrafeoDB::new_in_memory();
5828 let session = db.session();
5829
5830 session.execute("CREATE GRAPH analytics").unwrap();
5832 session.execute("SESSION SET GRAPH analytics").unwrap();
5833 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5834 session
5835 .execute("SESSION SET PARAMETER $limit = 100")
5836 .unwrap();
5837
5838 assert!(session.current_graph().is_some());
5840 assert!(session.time_zone().is_some());
5841 assert!(session.get_parameter("limit").is_some());
5842
5843 session.execute("SESSION RESET").unwrap();
5845
5846 assert_eq!(session.current_graph(), None);
5847 assert_eq!(session.time_zone(), None);
5848 assert!(session.get_parameter("limit").is_none());
5849 }
5850
5851 #[test]
5852 fn test_session_close_clears_state() {
5853 let db = GrafeoDB::new_in_memory();
5854 let session = db.session();
5855
5856 session.execute("CREATE GRAPH analytics").unwrap();
5857 session.execute("SESSION SET GRAPH analytics").unwrap();
5858 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5859
5860 session.execute("SESSION CLOSE").unwrap();
5861
5862 assert_eq!(session.current_graph(), None);
5863 assert_eq!(session.time_zone(), None);
5864 }
5865
5866 #[test]
5867 fn test_create_graph() {
5868 let db = GrafeoDB::new_in_memory();
5869 let session = db.session();
5870
5871 session.execute("CREATE GRAPH mydb").unwrap();
5872
5873 session.execute("USE GRAPH mydb").unwrap();
5875 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5876 }
5877
5878 #[test]
5879 fn test_create_graph_duplicate_errors() {
5880 let db = GrafeoDB::new_in_memory();
5881 let session = db.session();
5882
5883 session.execute("CREATE GRAPH mydb").unwrap();
5884 let result = session.execute("CREATE GRAPH mydb");
5885
5886 assert!(result.is_err());
5887 let err = result.unwrap_err().to_string();
5888 assert!(
5889 err.contains("already exists"),
5890 "Expected 'already exists' error, got: {err}"
5891 );
5892 }
5893
5894 #[test]
5895 fn test_create_graph_if_not_exists() {
5896 let db = GrafeoDB::new_in_memory();
5897 let session = db.session();
5898
5899 session.execute("CREATE GRAPH mydb").unwrap();
5900 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5902 }
5903
5904 #[test]
5905 fn test_drop_graph() {
5906 let db = GrafeoDB::new_in_memory();
5907 let session = db.session();
5908
5909 session.execute("CREATE GRAPH mydb").unwrap();
5910 session.execute("DROP GRAPH mydb").unwrap();
5911
5912 let result = session.execute("USE GRAPH mydb");
5914 assert!(result.is_err());
5915 }
5916
5917 #[test]
5918 fn test_drop_graph_nonexistent_errors() {
5919 let db = GrafeoDB::new_in_memory();
5920 let session = db.session();
5921
5922 let result = session.execute("DROP GRAPH nosuchgraph");
5923 assert!(result.is_err());
5924 let err = result.unwrap_err().to_string();
5925 assert!(
5926 err.contains("does not exist"),
5927 "Expected 'does not exist' error, got: {err}"
5928 );
5929 }
5930
5931 #[test]
5932 fn test_drop_graph_if_exists() {
5933 let db = GrafeoDB::new_in_memory();
5934 let session = db.session();
5935
5936 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5938 }
5939
5940 #[test]
5941 fn test_start_transaction_via_gql() {
5942 let db = GrafeoDB::new_in_memory();
5943 let session = db.session();
5944
5945 session.execute("START TRANSACTION").unwrap();
5946 assert!(session.in_transaction());
5947 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5948 session.execute("COMMIT").unwrap();
5949 assert!(!session.in_transaction());
5950
5951 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5952 assert_eq!(result.rows.len(), 1);
5953 }
5954
5955 #[test]
5956 fn test_start_transaction_read_only_blocks_insert() {
5957 let db = GrafeoDB::new_in_memory();
5958 let session = db.session();
5959
5960 session.execute("START TRANSACTION READ ONLY").unwrap();
5961 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5962 assert!(result.is_err());
5963 let err = result.unwrap_err().to_string();
5964 assert!(
5965 err.contains("read-only"),
5966 "Expected read-only error, got: {err}"
5967 );
5968 session.execute("ROLLBACK").unwrap();
5969 }
5970
5971 #[test]
5972 fn test_start_transaction_read_only_allows_reads() {
5973 let db = GrafeoDB::new_in_memory();
5974 let mut session = db.session();
5975 session.begin_transaction().unwrap();
5976 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5977 session.commit().unwrap();
5978
5979 session.execute("START TRANSACTION READ ONLY").unwrap();
5980 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5981 assert_eq!(result.rows.len(), 1);
5982 session.execute("COMMIT").unwrap();
5983 }
5984
5985 #[test]
5986 fn test_rollback_via_gql() {
5987 let db = GrafeoDB::new_in_memory();
5988 let session = db.session();
5989
5990 session.execute("START TRANSACTION").unwrap();
5991 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5992 session.execute("ROLLBACK").unwrap();
5993
5994 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5995 assert!(result.rows.is_empty());
5996 }
5997
5998 #[test]
5999 fn test_start_transaction_with_isolation_level() {
6000 let db = GrafeoDB::new_in_memory();
6001 let session = db.session();
6002
6003 session
6004 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6005 .unwrap();
6006 assert!(session.in_transaction());
6007 session.execute("ROLLBACK").unwrap();
6008 }
6009
6010 #[test]
6011 fn test_session_commands_return_empty_result() {
6012 let db = GrafeoDB::new_in_memory();
6013 let session = db.session();
6014
6015 session.execute("CREATE GRAPH test").unwrap();
6016 let result = session.execute("SESSION SET GRAPH test").unwrap();
6017 assert_eq!(result.row_count(), 0);
6018 assert_eq!(result.column_count(), 0);
6019 }
6020
6021 #[test]
6022 fn test_current_graph_default_is_none() {
6023 let db = GrafeoDB::new_in_memory();
6024 let session = db.session();
6025
6026 assert_eq!(session.current_graph(), None);
6027 }
6028
6029 #[test]
6030 fn test_time_zone_default_is_none() {
6031 let db = GrafeoDB::new_in_memory();
6032 let session = db.session();
6033
6034 assert_eq!(session.time_zone(), None);
6035 }
6036
6037 #[test]
6038 fn test_session_state_independent_across_sessions() {
6039 let db = GrafeoDB::new_in_memory();
6040 let session1 = db.session();
6041 let session2 = db.session();
6042
6043 session1.execute("CREATE GRAPH first").unwrap();
6044 session1.execute("CREATE GRAPH second").unwrap();
6045 session1.execute("SESSION SET GRAPH first").unwrap();
6046 session2.execute("SESSION SET GRAPH second").unwrap();
6047
6048 assert_eq!(session1.current_graph(), Some("first".to_string()));
6049 assert_eq!(session2.current_graph(), Some("second".to_string()));
6050 }
6051
6052 #[test]
6053 fn test_show_node_types() {
6054 let db = GrafeoDB::new_in_memory();
6055 let session = db.session();
6056
6057 session
6058 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6059 .unwrap();
6060
6061 let result = session.execute("SHOW NODE TYPES").unwrap();
6062 assert_eq!(
6063 result.columns,
6064 vec!["name", "properties", "constraints", "parents"]
6065 );
6066 assert_eq!(result.rows.len(), 1);
6067 assert_eq!(result.rows[0][0], Value::from("Person"));
6069 }
6070
6071 #[test]
6072 fn test_show_edge_types() {
6073 let db = GrafeoDB::new_in_memory();
6074 let session = db.session();
6075
6076 session
6077 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6078 .unwrap();
6079
6080 let result = session.execute("SHOW EDGE TYPES").unwrap();
6081 assert_eq!(
6082 result.columns,
6083 vec!["name", "properties", "source_types", "target_types"]
6084 );
6085 assert_eq!(result.rows.len(), 1);
6086 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6087 }
6088
6089 #[test]
6090 fn test_show_graph_types() {
6091 let db = GrafeoDB::new_in_memory();
6092 let session = db.session();
6093
6094 session
6095 .execute("CREATE NODE TYPE Person (name STRING)")
6096 .unwrap();
6097 session
6098 .execute(
6099 "CREATE GRAPH TYPE social (\
6100 NODE TYPE Person (name STRING)\
6101 )",
6102 )
6103 .unwrap();
6104
6105 let result = session.execute("SHOW GRAPH TYPES").unwrap();
6106 assert_eq!(
6107 result.columns,
6108 vec!["name", "open", "node_types", "edge_types"]
6109 );
6110 assert_eq!(result.rows.len(), 1);
6111 assert_eq!(result.rows[0][0], Value::from("social"));
6112 }
6113
6114 #[test]
6115 fn test_show_graph_type_named() {
6116 let db = GrafeoDB::new_in_memory();
6117 let session = db.session();
6118
6119 session
6120 .execute("CREATE NODE TYPE Person (name STRING)")
6121 .unwrap();
6122 session
6123 .execute(
6124 "CREATE GRAPH TYPE social (\
6125 NODE TYPE Person (name STRING)\
6126 )",
6127 )
6128 .unwrap();
6129
6130 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6131 assert_eq!(result.rows.len(), 1);
6132 assert_eq!(result.rows[0][0], Value::from("social"));
6133 }
6134
6135 #[test]
6136 fn test_show_graph_type_not_found() {
6137 let db = GrafeoDB::new_in_memory();
6138 let session = db.session();
6139
6140 let result = session.execute("SHOW GRAPH TYPE nonexistent");
6141 assert!(result.is_err());
6142 }
6143
6144 #[test]
6145 fn test_show_indexes_via_gql() {
6146 let db = GrafeoDB::new_in_memory();
6147 let session = db.session();
6148
6149 let result = session.execute("SHOW INDEXES").unwrap();
6150 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6151 }
6152
6153 #[test]
6154 fn test_show_constraints_via_gql() {
6155 let db = GrafeoDB::new_in_memory();
6156 let session = db.session();
6157
6158 let result = session.execute("SHOW CONSTRAINTS").unwrap();
6159 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6160 }
6161
6162 #[test]
6163 fn test_pattern_form_graph_type_roundtrip() {
6164 let db = GrafeoDB::new_in_memory();
6165 let session = db.session();
6166
6167 session
6169 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6170 .unwrap();
6171 session
6172 .execute("CREATE NODE TYPE City (name STRING)")
6173 .unwrap();
6174 session
6175 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6176 .unwrap();
6177 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6178
6179 session
6181 .execute(
6182 "CREATE GRAPH TYPE social (\
6183 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6184 (:Person)-[:LIVES_IN]->(:City)\
6185 )",
6186 )
6187 .unwrap();
6188
6189 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6191 assert_eq!(result.rows.len(), 1);
6192 assert_eq!(result.rows[0][0], Value::from("social"));
6193 }
6194 }
6195}