1#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut, GraphStoreSearch};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::Executor;
37use crate::query::cache::QueryCache;
38use crate::transaction::TransactionManager;
39
40const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
43
44fn parse_default_literal(text: &str) -> Value {
49 if text.eq_ignore_ascii_case("null") {
50 return Value::Null;
51 }
52 if text.eq_ignore_ascii_case("true") {
53 return Value::Bool(true);
54 }
55 if text.eq_ignore_ascii_case("false") {
56 return Value::Bool(false);
57 }
58 if (text.starts_with('\'') && text.ends_with('\''))
60 || (text.starts_with('"') && text.ends_with('"'))
61 {
62 return Value::String(text[1..text.len() - 1].into());
63 }
64 if let Ok(i) = text.parse::<i64>() {
66 return Value::Int64(i);
67 }
68 if let Ok(f) = text.parse::<f64>() {
69 return Value::Float64(f);
70 }
71 Value::String(text.into())
73}
74
75pub(crate) struct SessionConfig {
80 pub transaction_manager: Arc<TransactionManager>,
81 pub query_cache: Arc<QueryCache>,
82 pub catalog: Arc<Catalog>,
83 pub adaptive_config: AdaptiveConfig,
84 pub factorized_execution: bool,
85 pub graph_model: GraphModel,
86 pub query_timeout: Option<Duration>,
87 pub max_property_size: Option<usize>,
88 pub buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
90 pub commit_counter: Arc<AtomicUsize>,
91 pub gc_interval: usize,
92 pub read_only: bool,
94 pub identity: crate::auth::Identity,
96 #[cfg(feature = "lpg")]
98 pub projections: Arc<
99 parking_lot::RwLock<
100 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
101 >,
102 >,
103}
104
105pub struct Session {
111 #[cfg(feature = "lpg")]
113 store: Arc<LpgStore>,
114 #[cfg(feature = "lpg")]
119 lpg_backend: LpgBackend,
120 graph_store: Arc<dyn GraphStoreSearch>,
122 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
124 catalog: Arc<Catalog>,
126 #[cfg(feature = "triple-store")]
128 rdf_store: Arc<RdfStore>,
129 transaction_manager: Arc<TransactionManager>,
131 query_cache: Arc<QueryCache>,
133 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
137 read_only_tx: parking_lot::Mutex<bool>,
139 db_read_only: bool,
142 identity: crate::auth::Identity,
144 auto_commit: bool,
146 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
149 factorized_execution: bool,
151 graph_model: GraphModel,
153 query_timeout: Option<Duration>,
155 max_property_size: Option<usize>,
157 buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
159 commit_counter: Arc<AtomicUsize>,
161 gc_interval: usize,
163 transaction_start_node_count: AtomicUsize,
165 transaction_start_edge_count: AtomicUsize,
167 #[cfg(feature = "wal")]
169 wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
170 #[cfg(feature = "wal")]
172 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
173 #[cfg(feature = "cdc")]
175 cdc_log: Arc<crate::cdc::CdcLog>,
176 #[cfg(feature = "cdc")]
179 cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
180 current_graph: parking_lot::Mutex<Option<String>>,
182 current_schema: parking_lot::Mutex<Option<String>>,
185 time_zone: parking_lot::Mutex<Option<String>>,
187 session_params:
189 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
190 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
192 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
194 transaction_nesting_depth: parking_lot::Mutex<u32>,
198 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
202 active_streams: AtomicUsize,
206 #[cfg(feature = "metrics")]
208 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
209 #[cfg(feature = "metrics")]
211 tx_start_time: parking_lot::Mutex<Option<Instant>>,
212 #[cfg(feature = "lpg")]
214 projections: Arc<
215 parking_lot::RwLock<
216 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
217 >,
218 >,
219}
220
221#[cfg(feature = "lpg")]
223#[derive(Clone, Copy)]
224enum LpgBackend {
225 Active,
229 Placeholder,
233}
234
235#[derive(Clone)]
237struct GraphSavepoint {
238 graph_name: Option<String>,
239 next_node_id: u64,
240 next_edge_id: u64,
241 undo_log_position: usize,
242}
243
244#[derive(Clone)]
246struct SavepointState {
247 name: String,
248 graph_snapshots: Vec<GraphSavepoint>,
249 #[allow(dead_code)]
252 active_graph: Option<String>,
253 #[cfg(feature = "cdc")]
256 cdc_event_position: usize,
257}
258
259impl Session {
260 #[cfg(feature = "lpg")]
262 #[allow(dead_code)] pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
264 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
265 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
266 Self {
267 store,
268 lpg_backend: LpgBackend::Active,
269 graph_store,
270 graph_store_mut,
271 catalog: cfg.catalog,
272 #[cfg(feature = "triple-store")]
273 rdf_store: Arc::new(RdfStore::new()),
274 transaction_manager: cfg.transaction_manager,
275 query_cache: cfg.query_cache,
276 current_transaction: parking_lot::Mutex::new(None),
277 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
278 db_read_only: cfg.read_only,
279 identity: cfg.identity,
280 auto_commit: true,
281 adaptive_config: cfg.adaptive_config,
282 factorized_execution: cfg.factorized_execution,
283 graph_model: cfg.graph_model,
284 query_timeout: cfg.query_timeout,
285 max_property_size: cfg.max_property_size,
286 buffer_manager: cfg.buffer_manager,
287 commit_counter: cfg.commit_counter,
288 gc_interval: cfg.gc_interval,
289 transaction_start_node_count: AtomicUsize::new(0),
290 transaction_start_edge_count: AtomicUsize::new(0),
291 #[cfg(feature = "wal")]
292 wal: None,
293 #[cfg(feature = "wal")]
294 wal_graph_context: None,
295 #[cfg(feature = "cdc")]
296 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
297 #[cfg(feature = "cdc")]
298 cdc_pending_events: None,
299 current_graph: parking_lot::Mutex::new(None),
300 current_schema: parking_lot::Mutex::new(None),
301 time_zone: parking_lot::Mutex::new(None),
302 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
303 viewing_epoch_override: parking_lot::Mutex::new(None),
304 savepoints: parking_lot::Mutex::new(Vec::new()),
305 transaction_nesting_depth: parking_lot::Mutex::new(0),
306 touched_graphs: parking_lot::Mutex::new(Vec::new()),
307 active_streams: AtomicUsize::new(0),
308 #[cfg(feature = "metrics")]
309 metrics: None,
310 #[cfg(feature = "metrics")]
311 tx_start_time: parking_lot::Mutex::new(None),
312 projections: cfg.projections,
313 }
314 }
315
316 #[cfg(all(feature = "compact-store", feature = "lpg"))]
322 pub(crate) fn override_stores(
323 &mut self,
324 read_store: Arc<dyn GraphStoreSearch>,
325 write_store: Option<Arc<dyn GraphStoreMut>>,
326 ) {
327 self.graph_store = read_store;
328 self.graph_store_mut = write_store;
329 }
330
331 #[cfg(all(feature = "wal", feature = "lpg"))]
336 pub(crate) fn set_wal(
337 &mut self,
338 wal: Arc<grafeo_storage::wal::LpgWal>,
339 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
340 ) {
341 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
343 Arc::clone(&self.store),
344 Arc::clone(&wal),
345 Arc::clone(&wal_graph_context),
346 ));
347 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStoreSearch>;
348 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
349 self.wal = Some(wal);
350 self.wal_graph_context = Some(wal_graph_context);
351 }
352
353 #[cfg(all(feature = "wal", feature = "lpg"))]
361 pub(crate) fn log_wal_record(&self, record: &grafeo_storage::wal::WalRecord) {
362 if let Some(ref wal) = self.wal
363 && let Err(e) = wal.log(record)
364 {
365 grafeo_warn!("Session: failed to log WAL record: {}", e);
366 }
367 }
368
369 #[cfg(feature = "cdc")]
376 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
377 if let Some(ref write_store) = self.graph_store_mut {
380 let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
381 Arc::clone(write_store),
382 Arc::clone(&cdc_log),
383 ));
384 self.cdc_pending_events = Some(cdc_store.pending_events());
385 self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
386 }
387 self.cdc_log = cdc_log;
388 }
389
390 #[cfg(feature = "metrics")]
392 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
393 self.metrics = Some(metrics);
394 }
395
396 pub(crate) fn with_external_store(
405 read_store: Arc<dyn GraphStoreSearch>,
406 write_store: Option<Arc<dyn GraphStoreMut>>,
407 cfg: SessionConfig,
408 ) -> Result<Self> {
409 Ok(Self {
410 #[cfg(feature = "lpg")]
411 store: Arc::new(LpgStore::new()?),
412 #[cfg(feature = "lpg")]
413 lpg_backend: LpgBackend::Placeholder,
414 graph_store: read_store,
415 graph_store_mut: write_store,
416 catalog: cfg.catalog,
417 #[cfg(feature = "triple-store")]
418 rdf_store: Arc::new(RdfStore::new()),
419 transaction_manager: cfg.transaction_manager,
420 query_cache: cfg.query_cache,
421 current_transaction: parking_lot::Mutex::new(None),
422 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
423 db_read_only: cfg.read_only,
424 identity: cfg.identity,
425 auto_commit: true,
426 adaptive_config: cfg.adaptive_config,
427 factorized_execution: cfg.factorized_execution,
428 graph_model: cfg.graph_model,
429 query_timeout: cfg.query_timeout,
430 max_property_size: cfg.max_property_size,
431 buffer_manager: cfg.buffer_manager,
432 commit_counter: cfg.commit_counter,
433 gc_interval: cfg.gc_interval,
434 transaction_start_node_count: AtomicUsize::new(0),
435 transaction_start_edge_count: AtomicUsize::new(0),
436 #[cfg(feature = "wal")]
437 wal: None,
438 #[cfg(feature = "wal")]
439 wal_graph_context: None,
440 #[cfg(feature = "cdc")]
441 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
442 #[cfg(feature = "cdc")]
443 cdc_pending_events: None,
444 current_graph: parking_lot::Mutex::new(None),
445 current_schema: parking_lot::Mutex::new(None),
446 time_zone: parking_lot::Mutex::new(None),
447 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
448 viewing_epoch_override: parking_lot::Mutex::new(None),
449 savepoints: parking_lot::Mutex::new(Vec::new()),
450 transaction_nesting_depth: parking_lot::Mutex::new(0),
451 touched_graphs: parking_lot::Mutex::new(Vec::new()),
452 active_streams: AtomicUsize::new(0),
453 #[cfg(feature = "metrics")]
454 metrics: None,
455 #[cfg(feature = "metrics")]
456 tx_start_time: parking_lot::Mutex::new(None),
457 #[cfg(feature = "lpg")]
458 projections: cfg.projections,
459 })
460 }
461
462 #[must_use]
464 pub fn graph_model(&self) -> GraphModel {
465 self.graph_model
466 }
467
468 #[must_use]
470 pub fn identity(&self) -> &crate::auth::Identity {
471 &self.identity
472 }
473
474 pub fn use_graph(&self, name: &str) {
478 *self.current_graph.lock() = Some(name.to_string());
479 self.track_graph_touch();
480 }
481
482 #[must_use]
484 pub fn current_graph(&self) -> Option<String> {
485 self.current_graph.lock().clone()
486 }
487
488 pub fn set_schema(&self, name: &str) {
492 *self.current_schema.lock() = Some(name.to_string());
493 self.track_graph_touch();
494 }
495
496 #[must_use]
500 pub fn current_schema(&self) -> Option<String> {
501 self.current_schema.lock().clone()
502 }
503
504 fn effective_graph_key(&self, graph_name: &str) -> String {
509 let schema = self.current_schema.lock().clone();
510 match schema {
511 Some(s) => format!("{s}/{graph_name}"),
512 None => graph_name.to_string(),
513 }
514 }
515
516 fn effective_type_key(&self, type_name: &str) -> String {
520 let schema = self.current_schema.lock().clone();
521 match schema {
522 Some(s) => format!("{s}/{type_name}"),
523 None => type_name.to_string(),
524 }
525 }
526
527 fn active_graph_storage_key(&self) -> Option<String> {
531 let graph = self.current_graph.lock().clone();
532 let schema = self.current_schema.lock().clone();
533 match (&schema, &graph) {
534 (None, None) => None,
535 (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
536 (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
537 (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
538 Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
539 }
540 (None, Some(name)) => Some(name.clone()),
541 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
542 }
543 }
544
545 fn active_store(&self) -> Arc<dyn GraphStoreSearch> {
553 let key = self.active_graph_storage_key();
554 match key {
555 None => Arc::clone(&self.graph_store),
556 #[cfg(feature = "lpg")]
557 Some(ref name) => match self.store.graph(name) {
558 Some(named_store) => {
559 #[cfg(feature = "wal")]
560 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
561 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
562 named_store,
563 Arc::clone(wal),
564 name.clone(),
565 Arc::clone(ctx),
566 )) as Arc<dyn GraphStoreSearch>;
567 }
568 named_store as Arc<dyn GraphStoreSearch>
569 }
570 None => Arc::clone(&self.graph_store),
571 },
572 #[cfg(not(feature = "lpg"))]
573 Some(_) => Arc::clone(&self.graph_store),
574 }
575 }
576
577 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
582 let key = self.active_graph_storage_key();
583 match key {
584 None => self.graph_store_mut.as_ref().map(Arc::clone),
585 #[cfg(feature = "lpg")]
586 Some(ref name) => match self.store.graph(name) {
587 Some(named_store) => {
588 let mut store: Arc<dyn GraphStoreMut> = named_store;
589
590 #[cfg(feature = "wal")]
591 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
592 store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
593 self.store
595 .graph(name)
596 .unwrap_or_else(|| Arc::clone(&self.store)),
597 Arc::clone(wal),
598 name.clone(),
599 Arc::clone(ctx),
600 ));
601 }
602
603 #[cfg(feature = "cdc")]
604 if let Some(ref pending) = self.cdc_pending_events {
605 store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
606 store,
607 Arc::clone(&self.cdc_log),
608 Arc::clone(pending),
609 ));
610 }
611
612 Some(store)
613 }
614 None => self.graph_store_mut.as_ref().map(Arc::clone),
615 },
616 #[cfg(not(feature = "lpg"))]
617 Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
618 }
619 }
620
621 #[cfg(feature = "lpg")]
626 fn active_lpg_store(&self) -> Arc<LpgStore> {
627 let key = self.active_graph_storage_key();
628 match key {
629 None => Arc::clone(&self.store),
630 Some(ref name) => self
631 .store
632 .graph(name)
633 .unwrap_or_else(|| Arc::clone(&self.store)),
634 }
635 }
636
637 #[cfg(feature = "lpg")]
640 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
641 match graph_name {
642 None => Arc::clone(&self.store),
643 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
644 Some(name) => self
645 .store
646 .graph(name)
647 .unwrap_or_else(|| Arc::clone(&self.store)),
648 }
649 }
650
651 fn track_graph_touch(&self) {
660 if self.current_transaction.lock().is_some() {
661 let key = self.active_graph_storage_key();
662 let mut touched = self.touched_graphs.lock();
663 if !touched.contains(&key) {
664 touched.push(key);
665 }
666 }
667 }
668
669 pub fn set_time_zone(&self, tz: &str) {
671 *self.time_zone.lock() = Some(tz.to_string());
672 }
673
674 #[must_use]
676 pub fn time_zone(&self) -> Option<String> {
677 self.time_zone.lock().clone()
678 }
679
680 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
682 self.session_params.lock().insert(key.to_string(), value);
683 }
684
685 #[must_use]
687 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
688 self.session_params.lock().get(key).cloned()
689 }
690
691 pub fn reset_session(&self) {
693 *self.current_schema.lock() = None;
694 *self.current_graph.lock() = None;
695 *self.time_zone.lock() = None;
696 self.session_params.lock().clear();
697 *self.viewing_epoch_override.lock() = None;
698 self.track_graph_touch();
699 }
700
701 pub fn reset_schema(&self) {
703 *self.current_schema.lock() = None;
704 self.track_graph_touch();
705 }
706
707 pub fn reset_graph(&self) {
709 *self.current_graph.lock() = None;
710 self.track_graph_touch();
711 }
712
713 pub fn reset_time_zone(&self) {
715 *self.time_zone.lock() = None;
716 }
717
718 pub fn reset_parameters(&self) {
720 self.session_params.lock().clear();
721 }
722
723 pub fn set_viewing_epoch(&self, epoch: EpochId) {
731 *self.viewing_epoch_override.lock() = Some(epoch);
732 }
733
734 pub fn clear_viewing_epoch(&self) {
736 *self.viewing_epoch_override.lock() = None;
737 }
738
739 #[must_use]
741 pub fn viewing_epoch(&self) -> Option<EpochId> {
742 *self.viewing_epoch_override.lock()
743 }
744
745 #[cfg(feature = "lpg")]
749 #[must_use]
750 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
751 self.active_lpg_store().get_node_history(id)
752 }
753
754 #[cfg(feature = "lpg")]
758 #[must_use]
759 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
760 self.active_lpg_store().get_edge_history(id)
761 }
762
763 fn require_lpg(&self, language: &str) -> Result<()> {
765 if self.graph_model == GraphModel::Rdf {
766 return Err(grafeo_common::utils::error::Error::Internal(format!(
767 "This is an RDF database. {language} queries require an LPG database."
768 )));
769 }
770 Ok(())
771 }
772
773 #[inline]
779 fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
780 if self.identity.can_admin() {
782 return Ok(());
783 }
784 crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
785 grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
786 grafeo_common::utils::error::QueryErrorKind::Semantic,
787 denied.to_string(),
788 ))
789 })
790 }
791
792 #[cfg(feature = "gql")]
794 fn execute_session_command(
795 &self,
796 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
797 ) -> Result<QueryResult> {
798 use grafeo_adapters::query::gql::ast::SessionCommand;
799 #[cfg(feature = "lpg")]
800 use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
801 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
802
803 match &cmd {
805 SessionCommand::CreateGraph { .. }
806 | SessionCommand::DropGraph { .. }
807 | SessionCommand::CreateProjection { .. }
808 | SessionCommand::DropProjection { .. } => {
809 self.require_permission(crate::auth::StatementKind::Write)?;
810 }
811 _ => {} }
813
814 if self.identity.has_grants() {
816 match &cmd {
817 SessionCommand::CreateGraph { name, .. }
818 | SessionCommand::DropGraph { name, .. }
819 if !self
820 .identity
821 .can_access_graph(name, crate::auth::Role::ReadWrite) =>
822 {
823 return Err(Error::Query(QueryError::new(
824 QueryErrorKind::Semantic,
825 format!(
826 "permission denied: no grant for graph '{name}' (user: {})",
827 self.identity.user_id()
828 ),
829 )));
830 }
831 _ => {}
832 }
833 }
834
835 if *self.read_only_tx.lock() {
837 match &cmd {
838 SessionCommand::CreateGraph { .. }
839 | SessionCommand::DropGraph { .. }
840 | SessionCommand::CreateProjection { .. }
841 | SessionCommand::DropProjection { .. } => {
842 return Err(Error::Transaction(
843 grafeo_common::utils::error::TransactionError::ReadOnly,
844 ));
845 }
846 _ => {} }
848 }
849
850 match cmd {
851 #[cfg(feature = "lpg")]
852 SessionCommand::CreateGraph {
853 name,
854 if_not_exists,
855 typed,
856 like_graph,
857 copy_of,
858 open: _,
859 } => {
860 if name.contains('/') {
862 return Err(Error::Query(QueryError::new(
863 QueryErrorKind::Semantic,
864 format!(
865 "Graph name '{name}' must not contain '/' (reserved as schema/graph separator)"
866 ),
867 )));
868 }
869 let storage_key = self.effective_graph_key(&name);
870
871 if let Some(ref src) = like_graph {
873 let src_key = self.effective_graph_key(src);
874 if self.store.graph(&src_key).is_none() {
875 return Err(Error::Query(QueryError::new(
876 QueryErrorKind::Semantic,
877 format!("Source graph '{src}' does not exist"),
878 )));
879 }
880 }
881 if let Some(ref src) = copy_of {
882 let src_key = self.effective_graph_key(src);
883 if self.store.graph(&src_key).is_none() {
884 return Err(Error::Query(QueryError::new(
885 QueryErrorKind::Semantic,
886 format!("Source graph '{src}' does not exist"),
887 )));
888 }
889 }
890
891 let created = self
892 .store
893 .create_graph(&storage_key)
894 .map_err(|e| Error::Internal(e.to_string()))?;
895 if !created && !if_not_exists {
896 return Err(Error::Query(QueryError::new(
897 QueryErrorKind::Semantic,
898 format!("Graph '{name}' already exists"),
899 )));
900 }
901 if created {
902 #[cfg(feature = "wal")]
903 self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
904 name: storage_key.clone(),
905 });
906 }
907
908 if let Some(ref src) = copy_of {
910 let src_key = self.effective_graph_key(src);
911 self.store
912 .copy_graph(Some(&src_key), Some(&storage_key))
913 .map_err(|e| Error::Internal(e.to_string()))?;
914 }
915
916 if let Some(type_name) = typed
920 && let Err(e) = self.catalog.bind_graph_type(
921 &storage_key,
922 if type_name.contains('/') {
923 type_name.clone()
924 } else {
925 self.effective_type_key(&type_name)
926 },
927 )
928 {
929 return Err(Error::Query(QueryError::new(
930 QueryErrorKind::Semantic,
931 e.to_string(),
932 )));
933 }
934
935 if let Some(ref src) = like_graph {
937 let src_key = self.effective_graph_key(src);
938 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
939 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
940 }
941 }
942
943 Ok(QueryResult::empty())
944 }
945 #[cfg(feature = "lpg")]
946 SessionCommand::DropGraph { name, if_exists } => {
947 let storage_key = self.effective_graph_key(&name);
948 let dropped = self.store.drop_graph(&storage_key);
949 if !dropped && !if_exists {
950 return Err(Error::Query(QueryError::new(
951 QueryErrorKind::Semantic,
952 format!("Graph '{name}' does not exist"),
953 )));
954 }
955 if dropped {
956 #[cfg(feature = "wal")]
957 self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
958 name: storage_key.clone(),
959 });
960 let mut current = self.current_graph.lock();
962 if current
963 .as_deref()
964 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
965 {
966 *current = None;
967 }
968 }
969 Ok(QueryResult::empty())
970 }
971 #[cfg(feature = "lpg")]
972 SessionCommand::UseGraph(name) => {
973 if self.identity.has_grants()
975 && !name.eq_ignore_ascii_case("default")
976 && !self
977 .identity
978 .can_access_graph(&name, crate::auth::Role::ReadOnly)
979 {
980 return Err(Error::Query(QueryError::new(
981 QueryErrorKind::Semantic,
982 format!(
983 "permission denied: no grant for graph '{name}' (user: {})",
984 self.identity.user_id()
985 ),
986 )));
987 }
988 let effective_key = self.effective_graph_key(&name);
990 if !name.eq_ignore_ascii_case("default")
991 && self.store.graph(&effective_key).is_none()
992 {
993 return Err(Error::Query(QueryError::new(
994 QueryErrorKind::Semantic,
995 format!("Graph '{name}' does not exist"),
996 )));
997 }
998 self.use_graph(&name);
999 Ok(QueryResult::empty())
1000 }
1001 #[cfg(feature = "lpg")]
1002 SessionCommand::SessionSetGraph(name) => {
1003 if self.identity.has_grants()
1006 && !name.eq_ignore_ascii_case("default")
1007 && !self
1008 .identity
1009 .can_access_graph(&name, crate::auth::Role::ReadOnly)
1010 {
1011 return Err(Error::Query(QueryError::new(
1012 QueryErrorKind::Semantic,
1013 format!(
1014 "permission denied: no grant for graph '{name}' (user: {})",
1015 self.identity.user_id()
1016 ),
1017 )));
1018 }
1019 let effective_key = self.effective_graph_key(&name);
1020 if !name.eq_ignore_ascii_case("default")
1021 && self.store.graph(&effective_key).is_none()
1022 {
1023 return Err(Error::Query(QueryError::new(
1024 QueryErrorKind::Semantic,
1025 format!("Graph '{name}' does not exist"),
1026 )));
1027 }
1028 self.use_graph(&name);
1029 Ok(QueryResult::empty())
1030 }
1031 SessionCommand::SessionSetSchema(name) => {
1032 if !self.catalog.schema_exists(&name) {
1034 return Err(Error::Query(QueryError::new(
1035 QueryErrorKind::Semantic,
1036 format!("Schema '{name}' does not exist"),
1037 )));
1038 }
1039 self.set_schema(&name);
1040 Ok(QueryResult::empty())
1041 }
1042 SessionCommand::SessionSetTimeZone(tz) => {
1043 self.set_time_zone(&tz);
1044 Ok(QueryResult::empty())
1045 }
1046 #[cfg(feature = "gql")]
1047 SessionCommand::SessionSetParameter(key, expr) => {
1048 if key.eq_ignore_ascii_case("viewing_epoch") {
1049 match Self::eval_integer_literal(&expr) {
1050 Some(n) if n >= 0 => {
1051 #[allow(clippy::cast_sign_loss)]
1053 let epoch = n as u64;
1054 self.set_viewing_epoch(EpochId::new(epoch));
1055 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
1056 }
1057 _ => Err(Error::Query(QueryError::new(
1058 QueryErrorKind::Semantic,
1059 "viewing_epoch must be a non-negative integer literal",
1060 ))),
1061 }
1062 } else {
1063 self.set_parameter(&key, Value::Null);
1066 Ok(QueryResult::empty())
1067 }
1068 }
1069 SessionCommand::SessionReset(target) => {
1070 use grafeo_adapters::query::gql::ast::SessionResetTarget;
1071 match target {
1072 SessionResetTarget::All => self.reset_session(),
1073 SessionResetTarget::Schema => self.reset_schema(),
1074 SessionResetTarget::Graph => self.reset_graph(),
1075 SessionResetTarget::TimeZone => self.reset_time_zone(),
1076 SessionResetTarget::Parameters => self.reset_parameters(),
1077 }
1078 Ok(QueryResult::empty())
1079 }
1080 SessionCommand::SessionClose => {
1081 self.reset_session();
1082 Ok(QueryResult::empty())
1083 }
1084 #[cfg(feature = "lpg")]
1085 SessionCommand::StartTransaction {
1086 read_only,
1087 isolation_level,
1088 } => {
1089 let engine_level = isolation_level.map(|l| match l {
1090 TransactionIsolationLevel::ReadCommitted => {
1091 crate::transaction::IsolationLevel::ReadCommitted
1092 }
1093 TransactionIsolationLevel::SnapshotIsolation => {
1094 crate::transaction::IsolationLevel::SnapshotIsolation
1095 }
1096 TransactionIsolationLevel::Serializable => {
1097 crate::transaction::IsolationLevel::Serializable
1098 }
1099 });
1100 self.begin_transaction_inner(read_only, engine_level)?;
1101 Ok(QueryResult::status("Transaction started"))
1102 }
1103 #[cfg(feature = "lpg")]
1104 SessionCommand::Commit => {
1105 self.commit_inner()?;
1106 Ok(QueryResult::status("Transaction committed"))
1107 }
1108 #[cfg(feature = "lpg")]
1109 SessionCommand::Rollback => {
1110 self.rollback_inner()?;
1111 Ok(QueryResult::status("Transaction rolled back"))
1112 }
1113 #[cfg(feature = "lpg")]
1114 SessionCommand::Savepoint(name) => {
1115 self.savepoint(&name)?;
1116 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1117 }
1118 #[cfg(feature = "lpg")]
1119 SessionCommand::RollbackToSavepoint(name) => {
1120 self.rollback_to_savepoint(&name)?;
1121 Ok(QueryResult::status(format!(
1122 "Rolled back to savepoint '{name}'"
1123 )))
1124 }
1125 #[cfg(feature = "lpg")]
1126 SessionCommand::ReleaseSavepoint(name) => {
1127 self.release_savepoint(&name)?;
1128 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1129 }
1130 #[cfg(feature = "lpg")]
1131 SessionCommand::CreateProjection {
1132 name,
1133 node_labels,
1134 edge_types,
1135 } => {
1136 use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1137 use std::collections::hash_map::Entry;
1138
1139 let spec = ProjectionSpec::new()
1140 .with_node_labels(node_labels)
1141 .with_edge_types(edge_types);
1142
1143 let store = self.active_store();
1144 let projection = Arc::new(GraphProjection::new(store, spec));
1145 let mut projections = self.projections.write();
1146 match projections.entry(name.clone()) {
1147 Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1148 QueryErrorKind::Semantic,
1149 format!("Projection '{name}' already exists"),
1150 ))),
1151 Entry::Vacant(e) => {
1152 e.insert(projection);
1153 Ok(QueryResult::status(format!("Projection '{name}' created")))
1154 }
1155 }
1156 }
1157 #[cfg(feature = "lpg")]
1158 SessionCommand::DropProjection { name } => {
1159 let removed = self.projections.write().remove(&name).is_some();
1160 if !removed {
1161 return Err(Error::Query(QueryError::new(
1162 QueryErrorKind::Semantic,
1163 format!("Projection '{name}' does not exist"),
1164 )));
1165 }
1166 Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1167 }
1168 #[cfg(feature = "lpg")]
1169 SessionCommand::ShowProjections => {
1170 let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1171 names.sort();
1172 let rows: Vec<Vec<Value>> =
1173 names.into_iter().map(|n| vec![Value::from(n)]).collect();
1174 Ok(QueryResult {
1175 columns: vec!["name".to_string()],
1176 column_types: Vec::new(),
1177 rows,
1178 ..QueryResult::empty()
1179 })
1180 }
1181 #[cfg(not(feature = "lpg"))]
1182 _ => Err(grafeo_common::utils::error::Error::Internal(
1183 "This command requires the `lpg` feature".to_string(),
1184 )),
1185 }
1186 }
1187
1188 #[cfg(feature = "wal")]
1190 fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1191 if let Some(ref wal) = self.wal
1192 && let Err(e) = wal.log(record)
1193 {
1194 grafeo_warn!("Failed to log schema change to WAL: {}", e);
1195 }
1196 }
1197
1198 #[cfg(all(feature = "lpg", feature = "gql"))]
1200 fn execute_schema_command(
1201 &self,
1202 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1203 ) -> Result<QueryResult> {
1204 use crate::catalog::{
1205 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1206 };
1207 use grafeo_adapters::query::gql::ast::SchemaStatement;
1208 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1209 #[cfg(feature = "wal")]
1210 use grafeo_storage::wal::WalRecord;
1211
1212 macro_rules! wal_log {
1214 ($self:expr, $record:expr) => {
1215 #[cfg(feature = "wal")]
1216 $self.log_schema_wal(&$record);
1217 };
1218 }
1219
1220 let result = match cmd {
1221 SchemaStatement::CreateNodeType(stmt) => {
1222 let effective_name = self.effective_type_key(&stmt.name);
1223 #[cfg(feature = "wal")]
1224 let props_for_wal: Vec<(String, String, bool)> = stmt
1225 .properties
1226 .iter()
1227 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1228 .collect();
1229 let def = NodeTypeDefinition {
1230 name: effective_name.clone(),
1231 properties: stmt
1232 .properties
1233 .iter()
1234 .map(|p| TypedProperty {
1235 name: p.name.clone(),
1236 data_type: PropertyDataType::from_type_name(&p.data_type),
1237 nullable: p.nullable,
1238 default_value: p
1239 .default_value
1240 .as_ref()
1241 .map(|s| parse_default_literal(s)),
1242 })
1243 .collect(),
1244 constraints: Vec::new(),
1245 parent_types: stmt.parent_types.clone(),
1246 };
1247 let result = if stmt.or_replace {
1248 let _ = self.catalog.drop_node_type(&effective_name);
1249 self.catalog.register_node_type(def)
1250 } else {
1251 self.catalog.register_node_type(def)
1252 };
1253 match result {
1254 Ok(()) => {
1255 wal_log!(
1256 self,
1257 WalRecord::CreateNodeType {
1258 name: effective_name.clone(),
1259 properties: props_for_wal,
1260 constraints: Vec::new(),
1261 }
1262 );
1263 Ok(QueryResult::status(format!(
1264 "Created node type '{}'",
1265 stmt.name
1266 )))
1267 }
1268 Err(e) if stmt.if_not_exists => {
1269 let _ = e;
1270 Ok(QueryResult::status("No change"))
1271 }
1272 Err(e) => Err(Error::Query(QueryError::new(
1273 QueryErrorKind::Semantic,
1274 e.to_string(),
1275 ))),
1276 }
1277 }
1278 SchemaStatement::CreateEdgeType(stmt) => {
1279 let effective_name = self.effective_type_key(&stmt.name);
1280 #[cfg(feature = "wal")]
1281 let props_for_wal: Vec<(String, String, bool)> = stmt
1282 .properties
1283 .iter()
1284 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1285 .collect();
1286 let def = EdgeTypeDefinition {
1287 name: effective_name.clone(),
1288 properties: stmt
1289 .properties
1290 .iter()
1291 .map(|p| TypedProperty {
1292 name: p.name.clone(),
1293 data_type: PropertyDataType::from_type_name(&p.data_type),
1294 nullable: p.nullable,
1295 default_value: p
1296 .default_value
1297 .as_ref()
1298 .map(|s| parse_default_literal(s)),
1299 })
1300 .collect(),
1301 constraints: Vec::new(),
1302 source_node_types: stmt.source_node_types.clone(),
1303 target_node_types: stmt.target_node_types.clone(),
1304 };
1305 let result = if stmt.or_replace {
1306 let _ = self.catalog.drop_edge_type_def(&effective_name);
1307 self.catalog.register_edge_type_def(def)
1308 } else {
1309 self.catalog.register_edge_type_def(def)
1310 };
1311 match result {
1312 Ok(()) => {
1313 wal_log!(
1314 self,
1315 WalRecord::CreateEdgeType {
1316 name: effective_name.clone(),
1317 properties: props_for_wal,
1318 constraints: Vec::new(),
1319 }
1320 );
1321 Ok(QueryResult::status(format!(
1322 "Created edge type '{}'",
1323 stmt.name
1324 )))
1325 }
1326 Err(e) if stmt.if_not_exists => {
1327 let _ = e;
1328 Ok(QueryResult::status("No change"))
1329 }
1330 Err(e) => Err(Error::Query(QueryError::new(
1331 QueryErrorKind::Semantic,
1332 e.to_string(),
1333 ))),
1334 }
1335 }
1336 SchemaStatement::CreateVectorIndex(stmt) => {
1337 Self::create_vector_index_on_store(
1338 &self.active_lpg_store(),
1339 &stmt.node_label,
1340 &stmt.property,
1341 stmt.dimensions,
1342 stmt.metric.as_deref(),
1343 )?;
1344 wal_log!(
1345 self,
1346 WalRecord::CreateIndex {
1347 name: stmt.name.clone(),
1348 label: stmt.node_label.clone(),
1349 property: stmt.property.clone(),
1350 index_type: "vector".to_string(),
1351 }
1352 );
1353 Ok(QueryResult::status(format!(
1354 "Created vector index '{}'",
1355 stmt.name
1356 )))
1357 }
1358 SchemaStatement::DropNodeType { name, if_exists } => {
1359 let effective_name = self.effective_type_key(&name);
1360 match self.catalog.drop_node_type(&effective_name) {
1361 Ok(()) => {
1362 wal_log!(
1363 self,
1364 WalRecord::DropNodeType {
1365 name: effective_name
1366 }
1367 );
1368 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1369 }
1370 Err(e) if if_exists => {
1371 let _ = e;
1372 Ok(QueryResult::status("No change"))
1373 }
1374 Err(e) => Err(Error::Query(QueryError::new(
1375 QueryErrorKind::Semantic,
1376 e.to_string(),
1377 ))),
1378 }
1379 }
1380 SchemaStatement::DropEdgeType { name, if_exists } => {
1381 let effective_name = self.effective_type_key(&name);
1382 match self.catalog.drop_edge_type_def(&effective_name) {
1383 Ok(()) => {
1384 wal_log!(
1385 self,
1386 WalRecord::DropEdgeType {
1387 name: effective_name
1388 }
1389 );
1390 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1391 }
1392 Err(e) if if_exists => {
1393 let _ = e;
1394 Ok(QueryResult::status("No change"))
1395 }
1396 Err(e) => Err(Error::Query(QueryError::new(
1397 QueryErrorKind::Semantic,
1398 e.to_string(),
1399 ))),
1400 }
1401 }
1402 SchemaStatement::CreateIndex(stmt) => {
1403 use crate::catalog::IndexType as CatalogIndexType;
1404 use grafeo_adapters::query::gql::ast::IndexKind;
1405 let active = self.active_lpg_store();
1406 let index_type_str = match stmt.index_kind {
1407 IndexKind::Property => "property",
1408 IndexKind::BTree => "btree",
1409 IndexKind::Text => "text",
1410 IndexKind::Vector => "vector",
1411 };
1412 match stmt.index_kind {
1413 IndexKind::Property | IndexKind::BTree => {
1414 for prop in &stmt.properties {
1415 active.create_property_index(prop);
1416 }
1417 }
1418 IndexKind::Text => {
1419 for prop in &stmt.properties {
1420 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1421 }
1422 }
1423 IndexKind::Vector => {
1424 for prop in &stmt.properties {
1425 Self::create_vector_index_on_store(
1426 &active,
1427 &stmt.label,
1428 prop,
1429 stmt.options.dimensions,
1430 stmt.options.metric.as_deref(),
1431 )?;
1432 }
1433 }
1434 }
1435 let catalog_index_type = match stmt.index_kind {
1438 IndexKind::Property => CatalogIndexType::Hash,
1439 IndexKind::BTree => CatalogIndexType::BTree,
1440 IndexKind::Text => CatalogIndexType::FullText,
1441 IndexKind::Vector => CatalogIndexType::Hash,
1442 };
1443 let label_id = self.catalog.get_or_create_label(&stmt.label);
1444 for prop in &stmt.properties {
1445 let prop_id = self.catalog.get_or_create_property_key(prop);
1446 self.catalog
1447 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1448 }
1449 #[cfg(feature = "wal")]
1450 for prop in &stmt.properties {
1451 wal_log!(
1452 self,
1453 WalRecord::CreateIndex {
1454 name: stmt.name.clone(),
1455 label: stmt.label.clone(),
1456 property: prop.clone(),
1457 index_type: index_type_str.to_string(),
1458 }
1459 );
1460 }
1461 Ok(QueryResult::status(format!(
1462 "Created {} index '{}'",
1463 index_type_str, stmt.name
1464 )))
1465 }
1466 SchemaStatement::DropIndex { name, if_exists } => {
1467 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1470 let def = self.catalog.get_index(index_id);
1471 self.catalog.drop_index(index_id);
1472 if let Some(def) = def
1473 && let Some(prop_name) =
1474 self.catalog.get_property_key_name(def.property_key)
1475 {
1476 self.active_lpg_store().drop_property_index(&prop_name);
1477 }
1478 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1479 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1480 } else if if_exists {
1481 Ok(QueryResult::status("No change".to_string()))
1482 } else {
1483 Err(Error::Query(QueryError::new(
1484 QueryErrorKind::Semantic,
1485 format!("Index '{name}' does not exist"),
1486 )))
1487 }
1488 }
1489 SchemaStatement::CreateConstraint(stmt) => {
1490 use crate::catalog::TypeConstraint;
1491 use grafeo_adapters::query::gql::ast::ConstraintKind;
1492 let kind_str = match stmt.constraint_kind {
1493 ConstraintKind::Unique => "unique",
1494 ConstraintKind::NodeKey => "node_key",
1495 ConstraintKind::NotNull => "not_null",
1496 ConstraintKind::Exists => "exists",
1497 };
1498 let constraint_name = stmt
1499 .name
1500 .clone()
1501 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1502
1503 match stmt.constraint_kind {
1505 ConstraintKind::Unique => {
1506 for prop in &stmt.properties {
1507 let label_id = self.catalog.get_or_create_label(&stmt.label);
1508 let prop_id = self.catalog.get_or_create_property_key(prop);
1509 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1510 }
1511 let _ = self.catalog.add_constraint_to_type(
1512 &stmt.label,
1513 TypeConstraint::Unique(stmt.properties.clone()),
1514 );
1515 }
1516 ConstraintKind::NodeKey => {
1517 for prop in &stmt.properties {
1518 let label_id = self.catalog.get_or_create_label(&stmt.label);
1519 let prop_id = self.catalog.get_or_create_property_key(prop);
1520 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1521 let _ = self.catalog.add_required_property(label_id, prop_id);
1522 }
1523 let _ = self.catalog.add_constraint_to_type(
1524 &stmt.label,
1525 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1526 );
1527 }
1528 ConstraintKind::NotNull | ConstraintKind::Exists => {
1529 for prop in &stmt.properties {
1530 let label_id = self.catalog.get_or_create_label(&stmt.label);
1531 let prop_id = self.catalog.get_or_create_property_key(prop);
1532 let _ = self.catalog.add_required_property(label_id, prop_id);
1533 let _ = self.catalog.add_constraint_to_type(
1534 &stmt.label,
1535 TypeConstraint::NotNull(prop.clone()),
1536 );
1537 }
1538 }
1539 }
1540
1541 wal_log!(
1542 self,
1543 WalRecord::CreateConstraint {
1544 name: constraint_name.clone(),
1545 label: stmt.label.clone(),
1546 properties: stmt.properties.clone(),
1547 kind: kind_str.to_string(),
1548 }
1549 );
1550 Ok(QueryResult::status(format!(
1551 "Created {kind_str} constraint '{constraint_name}'"
1552 )))
1553 }
1554 SchemaStatement::DropConstraint { name, if_exists } => {
1555 let _ = if_exists;
1556 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1557 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1558 }
1559 SchemaStatement::CreateGraphType(stmt) => {
1560 use crate::catalog::GraphTypeDefinition;
1561 use grafeo_adapters::query::gql::ast::InlineElementType;
1562
1563 let effective_name = self.effective_type_key(&stmt.name);
1564
1565 let (mut node_types, mut edge_types, open) =
1567 if let Some(ref like_graph) = stmt.like_graph {
1568 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1570 if let Some(existing) = self
1571 .catalog
1572 .schema()
1573 .and_then(|s| s.get_graph_type(&type_name))
1574 {
1575 (
1576 existing.allowed_node_types.clone(),
1577 existing.allowed_edge_types.clone(),
1578 existing.open,
1579 )
1580 } else {
1581 (Vec::new(), Vec::new(), true)
1582 }
1583 } else {
1584 let nt = self.catalog.all_node_type_names();
1586 let et = self.catalog.all_edge_type_names();
1587 if nt.is_empty() && et.is_empty() {
1588 (Vec::new(), Vec::new(), true)
1589 } else {
1590 (nt, et, false)
1591 }
1592 }
1593 } else {
1594 let nt = stmt
1596 .node_types
1597 .iter()
1598 .map(|n| self.effective_type_key(n))
1599 .collect();
1600 let et = stmt
1601 .edge_types
1602 .iter()
1603 .map(|n| self.effective_type_key(n))
1604 .collect();
1605 (nt, et, stmt.open)
1606 };
1607
1608 for inline in &stmt.inline_types {
1613 match inline {
1614 InlineElementType::Node {
1615 name,
1616 properties,
1617 key_labels,
1618 is_reference,
1619 ..
1620 } => {
1621 let inline_effective = self.effective_type_key(name);
1622 if *is_reference {
1623 if self.catalog.get_node_type(&inline_effective).is_none() {
1625 return Err(Error::Query(QueryError::new(
1626 QueryErrorKind::Semantic,
1627 format!(
1628 "Referenced node type '{inline_effective}' does not exist"
1629 ),
1630 )));
1631 }
1632 } else {
1633 let def = NodeTypeDefinition {
1634 name: inline_effective.clone(),
1635 properties: properties
1636 .iter()
1637 .map(|p| TypedProperty {
1638 name: p.name.clone(),
1639 data_type: PropertyDataType::from_type_name(
1640 &p.data_type,
1641 ),
1642 nullable: p.nullable,
1643 default_value: None,
1644 })
1645 .collect(),
1646 constraints: Vec::new(),
1647 parent_types: key_labels.clone(),
1648 };
1649 self.catalog.register_or_replace_node_type(def);
1650 #[cfg(feature = "wal")]
1651 {
1652 let props_for_wal: Vec<(String, String, bool)> = properties
1653 .iter()
1654 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1655 .collect();
1656 self.log_schema_wal(&WalRecord::CreateNodeType {
1657 name: inline_effective.clone(),
1658 properties: props_for_wal,
1659 constraints: Vec::new(),
1660 });
1661 }
1662 }
1663 if !node_types.contains(&inline_effective) {
1664 node_types.push(inline_effective);
1665 }
1666 }
1667 InlineElementType::Edge {
1668 name,
1669 properties,
1670 source_node_types,
1671 target_node_types,
1672 is_reference,
1673 ..
1674 } => {
1675 let inline_effective = self.effective_type_key(name);
1676 if *is_reference {
1677 if self.catalog.get_edge_type_def(&inline_effective).is_none() {
1678 return Err(Error::Query(QueryError::new(
1679 QueryErrorKind::Semantic,
1680 format!(
1681 "Referenced edge type '{inline_effective}' does not exist"
1682 ),
1683 )));
1684 }
1685 } else {
1686 let def = EdgeTypeDefinition {
1687 name: inline_effective.clone(),
1688 properties: properties
1689 .iter()
1690 .map(|p| TypedProperty {
1691 name: p.name.clone(),
1692 data_type: PropertyDataType::from_type_name(
1693 &p.data_type,
1694 ),
1695 nullable: p.nullable,
1696 default_value: None,
1697 })
1698 .collect(),
1699 constraints: Vec::new(),
1700 source_node_types: source_node_types.clone(),
1701 target_node_types: target_node_types.clone(),
1702 };
1703 self.catalog.register_or_replace_edge_type_def(def);
1704 #[cfg(feature = "wal")]
1705 {
1706 let props_for_wal: Vec<(String, String, bool)> = properties
1707 .iter()
1708 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1709 .collect();
1710 self.log_schema_wal(&WalRecord::CreateEdgeType {
1711 name: inline_effective.clone(),
1712 properties: props_for_wal,
1713 constraints: Vec::new(),
1714 });
1715 }
1716 }
1717 if !edge_types.contains(&inline_effective) {
1718 edge_types.push(inline_effective);
1719 }
1720 }
1721 }
1722 }
1723
1724 let def = GraphTypeDefinition {
1725 name: effective_name.clone(),
1726 allowed_node_types: node_types.clone(),
1727 allowed_edge_types: edge_types.clone(),
1728 open,
1729 };
1730 let result = if stmt.or_replace {
1731 let _ = self.catalog.drop_graph_type(&effective_name);
1733 self.catalog.register_graph_type(def)
1734 } else {
1735 self.catalog.register_graph_type(def)
1736 };
1737 match result {
1738 Ok(()) => {
1739 wal_log!(
1740 self,
1741 WalRecord::CreateGraphType {
1742 name: effective_name.clone(),
1743 node_types,
1744 edge_types,
1745 open,
1746 }
1747 );
1748 Ok(QueryResult::status(format!(
1749 "Created graph type '{}'",
1750 stmt.name
1751 )))
1752 }
1753 Err(e) if stmt.if_not_exists => {
1754 let _ = e;
1755 Ok(QueryResult::status("No change"))
1756 }
1757 Err(e) => Err(Error::Query(QueryError::new(
1758 QueryErrorKind::Semantic,
1759 e.to_string(),
1760 ))),
1761 }
1762 }
1763 SchemaStatement::DropGraphType { name, if_exists } => {
1764 let effective_name = self.effective_type_key(&name);
1765 match self.catalog.drop_graph_type(&effective_name) {
1766 Ok(()) => {
1767 wal_log!(
1768 self,
1769 WalRecord::DropGraphType {
1770 name: effective_name
1771 }
1772 );
1773 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1774 }
1775 Err(e) if if_exists => {
1776 let _ = e;
1777 Ok(QueryResult::status("No change"))
1778 }
1779 Err(e) => Err(Error::Query(QueryError::new(
1780 QueryErrorKind::Semantic,
1781 e.to_string(),
1782 ))),
1783 }
1784 }
1785 SchemaStatement::CreateSchema {
1786 name,
1787 if_not_exists,
1788 } => {
1789 if name.contains('/') {
1790 return Err(Error::Query(QueryError::new(
1791 QueryErrorKind::Semantic,
1792 format!(
1793 "Schema name '{name}' must not contain '/' (reserved as schema/graph separator)"
1794 ),
1795 )));
1796 }
1797 match self.catalog.register_schema_namespace(name.clone()) {
1798 Ok(()) => {
1799 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1800 let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1803 if self.store.create_graph(&default_key).unwrap_or(false) {
1804 wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1805 }
1806 Ok(QueryResult::status(format!("Created schema '{name}'")))
1807 }
1808 Err(e) if if_not_exists => {
1809 let _ = e;
1810 Ok(QueryResult::status("No change"))
1811 }
1812 Err(e) => Err(Error::Query(QueryError::new(
1813 QueryErrorKind::Semantic,
1814 e.to_string(),
1815 ))),
1816 }
1817 }
1818 SchemaStatement::DropSchema { name, if_exists } => {
1819 let prefix = format!("{name}/");
1822 let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1823 let has_graphs = self
1824 .store
1825 .graph_names()
1826 .iter()
1827 .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1828 let has_types = self
1829 .catalog
1830 .all_node_type_names()
1831 .iter()
1832 .any(|n| n.starts_with(&prefix))
1833 || self
1834 .catalog
1835 .all_edge_type_names()
1836 .iter()
1837 .any(|n| n.starts_with(&prefix))
1838 || self
1839 .catalog
1840 .all_graph_type_names()
1841 .iter()
1842 .any(|n| n.starts_with(&prefix));
1843 if has_graphs || has_types {
1844 return Err(Error::Query(QueryError::new(
1845 QueryErrorKind::Semantic,
1846 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1847 )));
1848 }
1849 match self.catalog.drop_schema_namespace(&name) {
1850 Ok(()) => {
1851 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1852 if self.store.drop_graph(&default_graph_key) {
1854 wal_log!(
1855 self,
1856 WalRecord::DropNamedGraph {
1857 name: default_graph_key,
1858 }
1859 );
1860 }
1861 let mut current = self.current_schema.lock();
1863 if current
1864 .as_deref()
1865 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1866 {
1867 *current = None;
1868 }
1869 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1870 }
1871 Err(e) if if_exists => {
1872 let _ = e;
1873 Ok(QueryResult::status("No change"))
1874 }
1875 Err(e) => Err(Error::Query(QueryError::new(
1876 QueryErrorKind::Semantic,
1877 e.to_string(),
1878 ))),
1879 }
1880 }
1881 SchemaStatement::AlterNodeType(stmt) => {
1882 use grafeo_adapters::query::gql::ast::TypeAlteration;
1883 let effective_name = self.effective_type_key(&stmt.name);
1884 let mut wal_alts = Vec::new();
1885 for alt in &stmt.alterations {
1886 match alt {
1887 TypeAlteration::AddProperty(prop) => {
1888 let typed = TypedProperty {
1889 name: prop.name.clone(),
1890 data_type: PropertyDataType::from_type_name(&prop.data_type),
1891 nullable: prop.nullable,
1892 default_value: prop
1893 .default_value
1894 .as_ref()
1895 .map(|s| parse_default_literal(s)),
1896 };
1897 self.catalog
1898 .alter_node_type_add_property(&effective_name, typed)
1899 .map_err(|e| {
1900 Error::Query(QueryError::new(
1901 QueryErrorKind::Semantic,
1902 e.to_string(),
1903 ))
1904 })?;
1905 wal_alts.push((
1906 "add".to_string(),
1907 prop.name.clone(),
1908 prop.data_type.clone(),
1909 prop.nullable,
1910 ));
1911 }
1912 TypeAlteration::DropProperty(name) => {
1913 self.catalog
1914 .alter_node_type_drop_property(&effective_name, name)
1915 .map_err(|e| {
1916 Error::Query(QueryError::new(
1917 QueryErrorKind::Semantic,
1918 e.to_string(),
1919 ))
1920 })?;
1921 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1922 }
1923 }
1924 }
1925 wal_log!(
1926 self,
1927 WalRecord::AlterNodeType {
1928 name: effective_name,
1929 alterations: wal_alts,
1930 }
1931 );
1932 Ok(QueryResult::status(format!(
1933 "Altered node type '{}'",
1934 stmt.name
1935 )))
1936 }
1937 SchemaStatement::AlterEdgeType(stmt) => {
1938 use grafeo_adapters::query::gql::ast::TypeAlteration;
1939 let effective_name = self.effective_type_key(&stmt.name);
1940 let mut wal_alts = Vec::new();
1941 for alt in &stmt.alterations {
1942 match alt {
1943 TypeAlteration::AddProperty(prop) => {
1944 let typed = TypedProperty {
1945 name: prop.name.clone(),
1946 data_type: PropertyDataType::from_type_name(&prop.data_type),
1947 nullable: prop.nullable,
1948 default_value: prop
1949 .default_value
1950 .as_ref()
1951 .map(|s| parse_default_literal(s)),
1952 };
1953 self.catalog
1954 .alter_edge_type_add_property(&effective_name, typed)
1955 .map_err(|e| {
1956 Error::Query(QueryError::new(
1957 QueryErrorKind::Semantic,
1958 e.to_string(),
1959 ))
1960 })?;
1961 wal_alts.push((
1962 "add".to_string(),
1963 prop.name.clone(),
1964 prop.data_type.clone(),
1965 prop.nullable,
1966 ));
1967 }
1968 TypeAlteration::DropProperty(name) => {
1969 self.catalog
1970 .alter_edge_type_drop_property(&effective_name, name)
1971 .map_err(|e| {
1972 Error::Query(QueryError::new(
1973 QueryErrorKind::Semantic,
1974 e.to_string(),
1975 ))
1976 })?;
1977 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1978 }
1979 }
1980 }
1981 wal_log!(
1982 self,
1983 WalRecord::AlterEdgeType {
1984 name: effective_name,
1985 alterations: wal_alts,
1986 }
1987 );
1988 Ok(QueryResult::status(format!(
1989 "Altered edge type '{}'",
1990 stmt.name
1991 )))
1992 }
1993 SchemaStatement::AlterGraphType(stmt) => {
1994 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1995 let effective_name = self.effective_type_key(&stmt.name);
1996 let mut wal_alts = Vec::new();
1997 for alt in &stmt.alterations {
1998 match alt {
1999 GraphTypeAlteration::AddNodeType(name) => {
2000 self.catalog
2001 .alter_graph_type_add_node_type(&effective_name, name.clone())
2002 .map_err(|e| {
2003 Error::Query(QueryError::new(
2004 QueryErrorKind::Semantic,
2005 e.to_string(),
2006 ))
2007 })?;
2008 wal_alts.push(("add_node_type".to_string(), name.clone()));
2009 }
2010 GraphTypeAlteration::DropNodeType(name) => {
2011 self.catalog
2012 .alter_graph_type_drop_node_type(&effective_name, name)
2013 .map_err(|e| {
2014 Error::Query(QueryError::new(
2015 QueryErrorKind::Semantic,
2016 e.to_string(),
2017 ))
2018 })?;
2019 wal_alts.push(("drop_node_type".to_string(), name.clone()));
2020 }
2021 GraphTypeAlteration::AddEdgeType(name) => {
2022 self.catalog
2023 .alter_graph_type_add_edge_type(&effective_name, name.clone())
2024 .map_err(|e| {
2025 Error::Query(QueryError::new(
2026 QueryErrorKind::Semantic,
2027 e.to_string(),
2028 ))
2029 })?;
2030 wal_alts.push(("add_edge_type".to_string(), name.clone()));
2031 }
2032 GraphTypeAlteration::DropEdgeType(name) => {
2033 self.catalog
2034 .alter_graph_type_drop_edge_type(&effective_name, name)
2035 .map_err(|e| {
2036 Error::Query(QueryError::new(
2037 QueryErrorKind::Semantic,
2038 e.to_string(),
2039 ))
2040 })?;
2041 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
2042 }
2043 }
2044 }
2045 wal_log!(
2046 self,
2047 WalRecord::AlterGraphType {
2048 name: effective_name,
2049 alterations: wal_alts,
2050 }
2051 );
2052 Ok(QueryResult::status(format!(
2053 "Altered graph type '{}'",
2054 stmt.name
2055 )))
2056 }
2057 SchemaStatement::CreateProcedure(stmt) => {
2058 use crate::catalog::ProcedureDefinition;
2059
2060 let def = ProcedureDefinition {
2061 name: stmt.name.clone(),
2062 params: stmt
2063 .params
2064 .iter()
2065 .map(|p| (p.name.clone(), p.param_type.clone()))
2066 .collect(),
2067 returns: stmt
2068 .returns
2069 .iter()
2070 .map(|r| (r.name.clone(), r.return_type.clone()))
2071 .collect(),
2072 body: stmt.body.clone(),
2073 };
2074
2075 if stmt.or_replace {
2076 self.catalog.replace_procedure(def).map_err(|e| {
2077 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
2078 })?;
2079 } else {
2080 match self.catalog.register_procedure(def) {
2081 Ok(()) => {}
2082 Err(_) if stmt.if_not_exists => {
2083 return Ok(QueryResult::empty());
2084 }
2085 Err(e) => {
2086 return Err(Error::Query(QueryError::new(
2087 QueryErrorKind::Semantic,
2088 e.to_string(),
2089 )));
2090 }
2091 }
2092 }
2093
2094 wal_log!(
2095 self,
2096 WalRecord::CreateProcedure {
2097 name: stmt.name.clone(),
2098 params: stmt
2099 .params
2100 .iter()
2101 .map(|p| (p.name.clone(), p.param_type.clone()))
2102 .collect(),
2103 returns: stmt
2104 .returns
2105 .iter()
2106 .map(|r| (r.name.clone(), r.return_type.clone()))
2107 .collect(),
2108 body: stmt.body,
2109 }
2110 );
2111 Ok(QueryResult::status(format!(
2112 "Created procedure '{}'",
2113 stmt.name
2114 )))
2115 }
2116 SchemaStatement::DropProcedure { name, if_exists } => {
2117 match self.catalog.drop_procedure(&name) {
2118 Ok(()) => {}
2119 Err(_) if if_exists => {
2120 return Ok(QueryResult::empty());
2121 }
2122 Err(e) => {
2123 return Err(Error::Query(QueryError::new(
2124 QueryErrorKind::Semantic,
2125 e.to_string(),
2126 )));
2127 }
2128 }
2129 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2130 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2131 }
2132 SchemaStatement::ShowIndexes => {
2133 return self.execute_show_indexes();
2134 }
2135 SchemaStatement::ShowConstraints => {
2136 return self.execute_show_constraints();
2137 }
2138 SchemaStatement::ShowNodeTypes => {
2139 return self.execute_show_node_types();
2140 }
2141 SchemaStatement::ShowEdgeTypes => {
2142 return self.execute_show_edge_types();
2143 }
2144 SchemaStatement::ShowGraphTypes => {
2145 return self.execute_show_graph_types();
2146 }
2147 SchemaStatement::ShowGraphType(name) => {
2148 return self.execute_show_graph_type(&name);
2149 }
2150 SchemaStatement::ShowCurrentGraphType => {
2151 return self.execute_show_current_graph_type();
2152 }
2153 SchemaStatement::ShowGraphs => {
2154 return self.execute_show_graphs();
2155 }
2156 SchemaStatement::ShowSchemas => {
2157 return self.execute_show_schemas();
2158 }
2159 };
2160
2161 if result.is_ok() {
2164 self.query_cache.clear();
2165 }
2166
2167 result
2168 }
2169
2170 #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2172 fn create_vector_index_on_store(
2173 store: &LpgStore,
2174 label: &str,
2175 property: &str,
2176 dimensions: Option<usize>,
2177 metric: Option<&str>,
2178 ) -> Result<()> {
2179 use grafeo_common::types::{PropertyKey, Value};
2180 use grafeo_common::utils::error::Error;
2181 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2182
2183 let metric = match metric {
2184 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2185 Error::Internal(format!(
2186 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2187 ))
2188 })?,
2189 None => DistanceMetric::Cosine,
2190 };
2191
2192 let prop_key = PropertyKey::new(property);
2193 let mut found_dims: Option<usize> = dimensions;
2194 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2195
2196 for node in store.nodes_with_label(label) {
2197 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2198 if let Some(expected) = found_dims {
2199 if v.len() != expected {
2200 return Err(Error::Internal(format!(
2201 "Vector dimension mismatch: expected {expected}, found {} on node {}",
2202 v.len(),
2203 node.id.0
2204 )));
2205 }
2206 } else {
2207 found_dims = Some(v.len());
2208 }
2209 vectors.push((node.id, v.to_vec()));
2210 }
2211 }
2212
2213 let Some(dims) = found_dims else {
2214 return Err(Error::Internal(format!(
2215 "No vector properties found on :{label}({property}) and no dimensions specified"
2216 )));
2217 };
2218
2219 let config = HnswConfig::new(dims, metric);
2220 let index = HnswIndex::with_capacity(config, vectors.len());
2221 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2222 for (node_id, vec) in &vectors {
2223 index.insert(*node_id, vec, &accessor);
2224 }
2225
2226 store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2227 Ok(())
2228 }
2229
2230 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2232 fn create_vector_index_on_store(
2233 _store: &LpgStore,
2234 _label: &str,
2235 _property: &str,
2236 _dimensions: Option<usize>,
2237 _metric: Option<&str>,
2238 ) -> Result<()> {
2239 Err(grafeo_common::utils::error::Error::Internal(
2240 "Vector index support requires the 'vector-index' feature".to_string(),
2241 ))
2242 }
2243
2244 #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2246 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2247 use grafeo_common::types::{PropertyKey, Value};
2248 use grafeo_core::index::text::{BM25Config, InvertedIndex};
2249
2250 let mut index = InvertedIndex::new(BM25Config::default());
2251 let prop_key = PropertyKey::new(property);
2252
2253 let nodes = store.nodes_by_label(label);
2254 for node_id in nodes {
2255 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2256 index.insert(node_id, text.as_str());
2257 }
2258 }
2259
2260 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2261 Ok(())
2262 }
2263
2264 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2266 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2267 Err(grafeo_common::utils::error::Error::Internal(
2268 "Text index support requires the 'text-index' feature".to_string(),
2269 ))
2270 }
2271
2272 fn execute_show_indexes(&self) -> Result<QueryResult> {
2274 let indexes = self.catalog.all_indexes();
2275 let columns = vec![
2276 "name".to_string(),
2277 "type".to_string(),
2278 "label".to_string(),
2279 "property".to_string(),
2280 ];
2281 let rows: Vec<Vec<Value>> = indexes
2282 .into_iter()
2283 .map(|def| {
2284 let label_name = self
2285 .catalog
2286 .get_label_name(def.label)
2287 .unwrap_or_else(|| "?".into());
2288 let prop_name = self
2289 .catalog
2290 .get_property_key_name(def.property_key)
2291 .unwrap_or_else(|| "?".into());
2292 vec![
2293 Value::from(def.name),
2294 Value::from(format!("{:?}", def.index_type)),
2295 Value::from(&*label_name),
2296 Value::from(&*prop_name),
2297 ]
2298 })
2299 .collect();
2300 Ok(QueryResult {
2301 columns,
2302 column_types: Vec::new(),
2303 rows,
2304 ..QueryResult::empty()
2305 })
2306 }
2307
2308 fn execute_show_constraints(&self) -> Result<QueryResult> {
2310 Ok(QueryResult {
2313 columns: vec![
2314 "name".to_string(),
2315 "type".to_string(),
2316 "label".to_string(),
2317 "properties".to_string(),
2318 ],
2319 column_types: Vec::new(),
2320 rows: Vec::new(),
2321 ..QueryResult::empty()
2322 })
2323 }
2324
2325 fn execute_show_node_types(&self) -> Result<QueryResult> {
2327 let columns = vec![
2328 "name".to_string(),
2329 "properties".to_string(),
2330 "constraints".to_string(),
2331 "parents".to_string(),
2332 ];
2333 let schema = self.current_schema.lock().clone();
2334 let all_names = self.catalog.all_node_type_names();
2335 let type_names: Vec<String> = match &schema {
2336 Some(s) => {
2337 let prefix = format!("{s}/");
2338 all_names
2339 .into_iter()
2340 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2341 .collect()
2342 }
2343 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2344 };
2345 let rows: Vec<Vec<Value>> = type_names
2346 .into_iter()
2347 .filter_map(|name| {
2348 let lookup = match &schema {
2349 Some(s) => format!("{s}/{name}"),
2350 None => name.clone(),
2351 };
2352 let def = self.catalog.get_node_type(&lookup)?;
2353 let props: Vec<String> = def
2354 .properties
2355 .iter()
2356 .map(|p| {
2357 let nullable = if p.nullable { "" } else { " NOT NULL" };
2358 format!("{} {}{}", p.name, p.data_type, nullable)
2359 })
2360 .collect();
2361 let constraints: Vec<String> =
2362 def.constraints.iter().map(|c| format!("{c:?}")).collect();
2363 let parents = def.parent_types.join(", ");
2364 Some(vec![
2365 Value::from(name),
2366 Value::from(props.join(", ")),
2367 Value::from(constraints.join(", ")),
2368 Value::from(parents),
2369 ])
2370 })
2371 .collect();
2372 Ok(QueryResult {
2373 columns,
2374 column_types: Vec::new(),
2375 rows,
2376 ..QueryResult::empty()
2377 })
2378 }
2379
2380 fn execute_show_edge_types(&self) -> Result<QueryResult> {
2382 let columns = vec![
2383 "name".to_string(),
2384 "properties".to_string(),
2385 "source_types".to_string(),
2386 "target_types".to_string(),
2387 ];
2388 let schema = self.current_schema.lock().clone();
2389 let all_names = self.catalog.all_edge_type_names();
2390 let type_names: Vec<String> = match &schema {
2391 Some(s) => {
2392 let prefix = format!("{s}/");
2393 all_names
2394 .into_iter()
2395 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2396 .collect()
2397 }
2398 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2399 };
2400 let rows: Vec<Vec<Value>> = type_names
2401 .into_iter()
2402 .filter_map(|name| {
2403 let lookup = match &schema {
2404 Some(s) => format!("{s}/{name}"),
2405 None => name.clone(),
2406 };
2407 let def = self.catalog.get_edge_type_def(&lookup)?;
2408 let props: Vec<String> = def
2409 .properties
2410 .iter()
2411 .map(|p| {
2412 let nullable = if p.nullable { "" } else { " NOT NULL" };
2413 format!("{} {}{}", p.name, p.data_type, nullable)
2414 })
2415 .collect();
2416 let src = def.source_node_types.join(", ");
2417 let tgt = def.target_node_types.join(", ");
2418 Some(vec![
2419 Value::from(name),
2420 Value::from(props.join(", ")),
2421 Value::from(src),
2422 Value::from(tgt),
2423 ])
2424 })
2425 .collect();
2426 Ok(QueryResult {
2427 columns,
2428 column_types: Vec::new(),
2429 rows,
2430 ..QueryResult::empty()
2431 })
2432 }
2433
2434 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2436 let columns = vec![
2437 "name".to_string(),
2438 "open".to_string(),
2439 "node_types".to_string(),
2440 "edge_types".to_string(),
2441 ];
2442 let schema = self.current_schema.lock().clone();
2443 let all_names = self.catalog.all_graph_type_names();
2444 let type_names: Vec<String> = match &schema {
2445 Some(s) => {
2446 let prefix = format!("{s}/");
2447 all_names
2448 .into_iter()
2449 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2450 .collect()
2451 }
2452 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2453 };
2454 let rows: Vec<Vec<Value>> = type_names
2455 .into_iter()
2456 .filter_map(|name| {
2457 let lookup = match &schema {
2458 Some(s) => format!("{s}/{name}"),
2459 None => name.clone(),
2460 };
2461 let def = self.catalog.get_graph_type_def(&lookup)?;
2462 let strip = |n: &String| -> String {
2464 match &schema {
2465 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2466 None => n.clone(),
2467 }
2468 };
2469 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2470 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2471 Some(vec![
2472 Value::from(name),
2473 Value::from(def.open),
2474 Value::from(node_types.join(", ")),
2475 Value::from(edge_types.join(", ")),
2476 ])
2477 })
2478 .collect();
2479 Ok(QueryResult {
2480 columns,
2481 column_types: Vec::new(),
2482 rows,
2483 ..QueryResult::empty()
2484 })
2485 }
2486
2487 #[cfg(feature = "lpg")]
2493 fn execute_show_graphs(&self) -> Result<QueryResult> {
2494 let schema = self.current_schema.lock().clone();
2495 let all_names = self.store.graph_names();
2496
2497 let mut names: Vec<String> = match &schema {
2498 Some(s) => {
2499 let prefix = format!("{s}/");
2500 all_names
2501 .into_iter()
2502 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2503 .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2504 .collect()
2505 }
2506 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2507 };
2508 names.sort();
2509
2510 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2511 Ok(QueryResult {
2512 columns: vec!["name".to_string()],
2513 column_types: Vec::new(),
2514 rows,
2515 ..QueryResult::empty()
2516 })
2517 }
2518
2519 fn execute_show_schemas(&self) -> Result<QueryResult> {
2521 let mut names = self.catalog.schema_names();
2522 names.sort();
2523 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2524 Ok(QueryResult {
2525 columns: vec!["name".to_string()],
2526 column_types: Vec::new(),
2527 rows,
2528 ..QueryResult::empty()
2529 })
2530 }
2531
2532 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2534 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2535
2536 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2537 Error::Query(QueryError::new(
2538 QueryErrorKind::Semantic,
2539 format!("Graph type '{name}' not found"),
2540 ))
2541 })?;
2542
2543 let columns = vec![
2544 "name".to_string(),
2545 "open".to_string(),
2546 "node_types".to_string(),
2547 "edge_types".to_string(),
2548 ];
2549 let rows = vec![vec![
2550 Value::from(def.name),
2551 Value::from(def.open),
2552 Value::from(def.allowed_node_types.join(", ")),
2553 Value::from(def.allowed_edge_types.join(", ")),
2554 ]];
2555 Ok(QueryResult {
2556 columns,
2557 column_types: Vec::new(),
2558 rows,
2559 ..QueryResult::empty()
2560 })
2561 }
2562
2563 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2565 let graph_name = self
2566 .current_graph()
2567 .unwrap_or_else(|| "default".to_string());
2568 let columns = vec![
2569 "graph".to_string(),
2570 "graph_type".to_string(),
2571 "open".to_string(),
2572 "node_types".to_string(),
2573 "edge_types".to_string(),
2574 ];
2575
2576 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2577 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2578 {
2579 let rows = vec![vec![
2580 Value::from(graph_name),
2581 Value::from(type_name),
2582 Value::from(def.open),
2583 Value::from(def.allowed_node_types.join(", ")),
2584 Value::from(def.allowed_edge_types.join(", ")),
2585 ]];
2586 return Ok(QueryResult {
2587 columns,
2588 column_types: Vec::new(),
2589 rows,
2590 ..QueryResult::empty()
2591 });
2592 }
2593
2594 Ok(QueryResult {
2596 columns,
2597 column_types: Vec::new(),
2598 rows: vec![vec![
2599 Value::from(graph_name),
2600 Value::Null,
2601 Value::Null,
2602 Value::Null,
2603 Value::Null,
2604 ]],
2605 ..QueryResult::empty()
2606 })
2607 }
2608
2609 #[cfg(feature = "gql")]
2636 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2637 self.require_lpg("GQL")?;
2638
2639 #[cfg(feature = "testing-statement-injection")]
2640 grafeo_common::testing::statement_failure::maybe_fail_statement().map_err(|e| {
2641 grafeo_common::utils::error::Error::Internal(format!("injected failure: {e}"))
2642 })?;
2643
2644 use crate::query::{
2645 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2646 translators::gql,
2647 };
2648
2649 let _span = grafeo_info_span!(
2650 "grafeo::session::execute",
2651 language = "gql",
2652 query_len = query.len(),
2653 );
2654
2655 #[cfg(not(target_arch = "wasm32"))]
2656 let start_time = std::time::Instant::now();
2657
2658 let translation = gql::translate_full(query)?;
2660 let logical_plan = match translation {
2661 gql::GqlTranslationResult::SessionCommand(cmd) => {
2662 return self.execute_session_command(cmd);
2663 }
2664 #[cfg(feature = "lpg")]
2665 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2666 self.require_permission(crate::auth::StatementKind::Admin)?;
2668 if *self.read_only_tx.lock() {
2669 return Err(grafeo_common::utils::error::Error::Transaction(
2670 grafeo_common::utils::error::TransactionError::ReadOnly,
2671 ));
2672 }
2673 return self.execute_schema_command(cmd);
2674 }
2675 gql::GqlTranslationResult::Plan(plan) => {
2676 let read_only = *self.read_only_tx.lock();
2681 let need_check = read_only || !self.identity.can_admin();
2682 let is_mutation = need_check && plan.root.has_mutations();
2683 if is_mutation {
2684 self.require_permission(crate::auth::StatementKind::Write)?;
2685 }
2686 if read_only && is_mutation {
2687 return Err(grafeo_common::utils::error::Error::Transaction(
2688 grafeo_common::utils::error::TransactionError::ReadOnly,
2689 ));
2690 }
2691 plan
2692 }
2693 #[cfg(not(feature = "lpg"))]
2694 gql::GqlTranslationResult::SchemaCommand(_) => {
2695 return Err(grafeo_common::utils::error::Error::Internal(
2696 "Schema commands require the `lpg` feature".to_string(),
2697 ));
2698 }
2699 };
2700
2701 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2703
2704 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2706 cached_plan
2707 } else {
2708 let mut binder = Binder::new();
2710 let _binding_context = binder.bind(&logical_plan)?;
2711
2712 let active = self.active_store();
2714 let optimizer = Optimizer::from_graph_store(&*active);
2715 let plan = optimizer.optimize(logical_plan)?;
2716
2717 self.query_cache.put_optimized(cache_key, plan.clone());
2719
2720 plan
2721 };
2722
2723 let active = self.active_store();
2725
2726 if optimized_plan.explain {
2728 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2729 let mut plan = optimized_plan;
2730 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2731 return Ok(explain_result(&plan));
2732 }
2733
2734 if optimized_plan.profile {
2736 let has_mutations = optimized_plan.root.has_mutations();
2737 return self.with_auto_commit(has_mutations, || {
2738 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2739 let planner = self.create_planner_for_store(
2740 Arc::clone(&active),
2741 viewing_epoch,
2742 transaction_id,
2743 );
2744 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2745
2746 let executor = self.make_executor(physical_plan.columns.clone());
2747 let _result = executor.execute(physical_plan.operator.as_mut())?;
2748
2749 let total_time_ms;
2750 #[cfg(not(target_arch = "wasm32"))]
2751 {
2752 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2753 }
2754 #[cfg(target_arch = "wasm32")]
2755 {
2756 total_time_ms = 0.0;
2757 }
2758
2759 let profile_tree = crate::query::profile::build_profile_tree(
2760 &optimized_plan.root,
2761 &mut entries.into_iter(),
2762 );
2763 Ok(crate::query::profile::profile_result(
2764 &profile_tree,
2765 total_time_ms,
2766 ))
2767 });
2768 }
2769
2770 let has_mutations = optimized_plan.root.has_mutations();
2771
2772 let result = self.with_auto_commit(has_mutations, || {
2773 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2775
2776 let has_active_tx = self.current_transaction.lock().is_some();
2781 let read_only = !has_mutations && !has_active_tx;
2782 let planner = self.create_planner_for_store_with_read_only(
2783 Arc::clone(&active),
2784 viewing_epoch,
2785 transaction_id,
2786 read_only,
2787 );
2788 let physical_plan = planner.plan(&optimized_plan)?;
2789
2790 let executor = self.make_executor(physical_plan.columns.clone());
2792 let (mut source, push_ops) = {
2793 #[cfg(feature = "spill")]
2794 {
2795 let memory_ctx = self.make_operator_memory_context();
2796 grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2797 physical_plan.into_operator(),
2798 memory_ctx,
2799 )
2800 }
2801 #[cfg(not(feature = "spill"))]
2802 {
2803 grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2804 physical_plan.into_operator(),
2805 )
2806 }
2807 };
2808 let mut result = if push_ops.is_empty() {
2809 executor.execute(source.as_mut())?
2811 } else {
2812 executor.execute_pipeline(source, push_ops)?
2814 };
2815
2816 let rows_scanned = result.rows.len() as u64;
2818 #[cfg(not(target_arch = "wasm32"))]
2819 {
2820 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2821 result.execution_time_ms = Some(elapsed_ms);
2822 }
2823 result.rows_scanned = Some(rows_scanned);
2824
2825 Ok(result)
2826 });
2827
2828 #[cfg(feature = "metrics")]
2830 {
2831 #[cfg(not(target_arch = "wasm32"))]
2832 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2833 #[cfg(target_arch = "wasm32")]
2834 let elapsed_ms = None;
2835 self.record_query_metrics("gql", elapsed_ms, &result);
2836 }
2837
2838 result
2839 }
2840
2841 #[cfg(all(feature = "gql", feature = "lpg"))]
2862 pub fn execute_streaming(
2863 &self,
2864 query: &str,
2865 ) -> Result<crate::query::executor::stream::ResultStream<'_>> {
2866 use crate::query::executor::stream::{ResultStream, StreamGuard};
2867
2868 let (source, columns, deadline) = self.build_streaming_plan(query)?;
2869 let guard = StreamGuard::new(&self.active_streams);
2870 Ok(ResultStream::new(source, columns, deadline, guard))
2871 }
2872
2873 #[cfg(all(feature = "gql", feature = "lpg"))]
2881 pub(crate) fn build_streaming_plan(
2882 &self,
2883 query: &str,
2884 ) -> Result<(
2885 Box<dyn grafeo_core::execution::operators::Operator>,
2886 Vec<String>,
2887 Option<Instant>,
2888 )> {
2889 use crate::query::{
2890 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2891 translators::gql,
2892 };
2893
2894 self.require_lpg("GQL")?;
2895
2896 let _span = grafeo_info_span!(
2897 "grafeo::session::execute_streaming",
2898 language = "gql",
2899 query_len = query.len(),
2900 );
2901
2902 let translation = gql::translate_full(query)?;
2904 let logical_plan = match translation {
2905 gql::GqlTranslationResult::SessionCommand(_) => {
2906 return Err(grafeo_common::utils::error::Error::Query(
2907 grafeo_common::utils::error::QueryError::new(
2908 grafeo_common::utils::error::QueryErrorKind::Semantic,
2909 "session commands cannot be streamed; use execute() instead",
2910 ),
2911 ));
2912 }
2913 gql::GqlTranslationResult::SchemaCommand(_) => {
2914 return Err(grafeo_common::utils::error::Error::Query(
2915 grafeo_common::utils::error::QueryError::new(
2916 grafeo_common::utils::error::QueryErrorKind::Semantic,
2917 "schema DDL cannot be streamed; use execute() instead",
2918 ),
2919 ));
2920 }
2921 gql::GqlTranslationResult::Plan(plan) => {
2922 if plan.root.has_mutations() {
2923 return Err(grafeo_common::utils::error::Error::Query(
2924 grafeo_common::utils::error::QueryError::new(
2925 grafeo_common::utils::error::QueryErrorKind::Semantic,
2926 "mutating queries cannot be streamed; use execute() instead",
2927 ),
2928 ));
2929 }
2930 if !self.identity.can_admin() {
2931 self.require_permission(crate::auth::StatementKind::Read)?;
2932 }
2933 plan
2934 }
2935 };
2936
2937 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2939 let optimized_plan = if let Some(cached) = self.query_cache.get_optimized(&cache_key) {
2940 cached
2941 } else {
2942 let mut binder = Binder::new();
2943 let _binding_context = binder.bind(&logical_plan)?;
2944 let active = self.active_store();
2945 let optimizer = Optimizer::from_graph_store(&*active);
2946 let plan = optimizer.optimize(logical_plan)?;
2947 self.query_cache.put_optimized(cache_key, plan.clone());
2948 plan
2949 };
2950
2951 if optimized_plan.explain || optimized_plan.profile {
2952 return Err(grafeo_common::utils::error::Error::Query(
2953 grafeo_common::utils::error::QueryError::new(
2954 grafeo_common::utils::error::QueryErrorKind::Semantic,
2955 "EXPLAIN and PROFILE cannot be streamed; use execute() instead",
2956 ),
2957 ));
2958 }
2959
2960 let active = self.active_store();
2962 let has_active_tx = self.current_transaction.lock().is_some();
2963 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2964 let planner = self.create_planner_for_store_with_read_only(
2965 Arc::clone(&active),
2966 viewing_epoch,
2967 transaction_id,
2968 !has_active_tx,
2969 );
2970 let physical_plan = planner.plan(&optimized_plan)?;
2971 let columns = physical_plan.columns.clone();
2972
2973 let (source, push_ops) = {
2977 #[cfg(feature = "spill")]
2978 {
2979 let memory_ctx = self.make_operator_memory_context();
2980 grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2981 physical_plan.into_operator(),
2982 memory_ctx,
2983 )
2984 }
2985 #[cfg(not(feature = "spill"))]
2986 {
2987 grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2988 physical_plan.into_operator(),
2989 )
2990 }
2991 };
2992 if !push_ops.is_empty() {
2993 return Err(grafeo_common::utils::error::Error::Query(
2994 grafeo_common::utils::error::QueryError::new(
2995 grafeo_common::utils::error::QueryErrorKind::Semantic,
2996 "query requires a push-based pipeline (ORDER BY / aggregate / DISTINCT) \
2997 which cannot be streamed; use execute() instead",
2998 ),
2999 ));
3000 }
3001
3002 Ok((source, columns, self.query_deadline()))
3003 }
3004
3005 #[cfg(feature = "gql")]
3014 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
3015 let previous = self.viewing_epoch_override.lock().replace(epoch);
3016 let result = self.execute(query);
3017 *self.viewing_epoch_override.lock() = previous;
3018 result
3019 }
3020
3021 #[cfg(feature = "gql")]
3029 pub fn execute_at_epoch_with_params(
3030 &self,
3031 query: &str,
3032 epoch: EpochId,
3033 params: Option<std::collections::HashMap<String, Value>>,
3034 ) -> Result<QueryResult> {
3035 let previous = self.viewing_epoch_override.lock().replace(epoch);
3036 let result = if let Some(p) = params {
3037 self.execute_with_params(query, p)
3038 } else {
3039 self.execute(query)
3040 };
3041 *self.viewing_epoch_override.lock() = previous;
3042 result
3043 }
3044
3045 #[cfg(feature = "gql")]
3051 pub fn execute_with_params(
3052 &self,
3053 query: &str,
3054 params: std::collections::HashMap<String, Value>,
3055 ) -> Result<QueryResult> {
3056 self.require_lpg("GQL")?;
3057
3058 use crate::query::processor::{QueryLanguage, QueryProcessor};
3059
3060 let has_mutations = if self.identity.can_write() {
3064 Self::query_looks_like_mutation(query)
3066 } else {
3067 use crate::query::translators::gql;
3069 match gql::translate(query) {
3070 Ok(plan) if plan.root.has_mutations() => {
3071 self.require_permission(crate::auth::StatementKind::Write)?;
3072 true
3073 }
3074 Ok(_) => false,
3075 Err(_) => Self::query_looks_like_mutation(query),
3077 }
3078 };
3079 let active = self.active_store();
3080
3081 self.with_auto_commit(has_mutations, || {
3082 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3084
3085 let processor = QueryProcessor::for_stores_with_transaction(
3087 Arc::clone(&active),
3088 self.active_write_store(),
3089 Arc::clone(&self.transaction_manager),
3090 )?;
3091
3092 let processor = if let Some(transaction_id) = transaction_id {
3094 processor.with_transaction_context(viewing_epoch, transaction_id)
3095 } else {
3096 processor
3097 };
3098
3099 processor.process(query, QueryLanguage::Gql, Some(¶ms))
3100 })
3101 }
3102
3103 #[cfg(not(any(feature = "gql", feature = "cypher")))]
3109 pub fn execute_with_params(
3110 &self,
3111 _query: &str,
3112 _params: std::collections::HashMap<String, Value>,
3113 ) -> Result<QueryResult> {
3114 Err(grafeo_common::utils::error::Error::Internal(
3115 "No query language enabled".to_string(),
3116 ))
3117 }
3118
3119 #[cfg(not(any(feature = "gql", feature = "cypher")))]
3125 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
3126 Err(grafeo_common::utils::error::Error::Internal(
3127 "No query language enabled".to_string(),
3128 ))
3129 }
3130
3131 #[cfg(feature = "cypher")]
3137 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
3138 use crate::query::{
3139 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
3140 translators::cypher,
3141 };
3142
3143 let translation = cypher::translate_full(query)?;
3145 match translation {
3146 #[cfg(feature = "lpg")]
3147 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
3148 use grafeo_common::utils::error::{
3149 Error as GrafeoError, QueryError, QueryErrorKind,
3150 };
3151 self.require_permission(crate::auth::StatementKind::Admin)?;
3152 if *self.read_only_tx.lock() {
3153 return Err(GrafeoError::Query(QueryError::new(
3154 QueryErrorKind::Semantic,
3155 "Cannot execute schema DDL in a read-only transaction",
3156 )));
3157 }
3158 return self.execute_schema_command(cmd);
3159 }
3160 #[cfg(not(feature = "lpg"))]
3161 cypher::CypherTranslationResult::SchemaCommand(_) => {
3162 return Err(grafeo_common::utils::error::Error::Internal(
3163 "Schema DDL requires the `lpg` feature".to_string(),
3164 ));
3165 }
3166 cypher::CypherTranslationResult::ShowIndexes => {
3167 return self.execute_show_indexes();
3168 }
3169 cypher::CypherTranslationResult::ShowConstraints => {
3170 return self.execute_show_constraints();
3171 }
3172 cypher::CypherTranslationResult::ShowCurrentGraphType => {
3173 return self.execute_show_current_graph_type();
3174 }
3175 cypher::CypherTranslationResult::Plan(_) => {
3176 }
3178 }
3179
3180 #[cfg(not(target_arch = "wasm32"))]
3181 let start_time = std::time::Instant::now();
3182
3183 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
3185
3186 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3188 cached_plan
3189 } else {
3190 let logical_plan = cypher::translate(query)?;
3192
3193 let mut binder = Binder::new();
3195 let _binding_context = binder.bind(&logical_plan)?;
3196
3197 let active = self.active_store();
3199 let optimizer = Optimizer::from_graph_store(&*active);
3200 let plan = optimizer.optimize(logical_plan)?;
3201
3202 self.query_cache.put_optimized(cache_key, plan.clone());
3204
3205 plan
3206 };
3207
3208 if optimized_plan.root.has_mutations() {
3210 self.require_permission(crate::auth::StatementKind::Write)?;
3211 }
3212
3213 let active = self.active_store();
3215
3216 if optimized_plan.explain {
3218 use crate::query::processor::{annotate_pushdown_hints, explain_result};
3219 let mut plan = optimized_plan;
3220 annotate_pushdown_hints(&mut plan.root, active.as_ref());
3221 return Ok(explain_result(&plan));
3222 }
3223
3224 if optimized_plan.profile {
3226 let has_mutations = optimized_plan.root.has_mutations();
3227 return self.with_auto_commit(has_mutations, || {
3228 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3229 let planner = self.create_planner_for_store(
3230 Arc::clone(&active),
3231 viewing_epoch,
3232 transaction_id,
3233 );
3234 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
3235
3236 let executor = self.make_executor(physical_plan.columns.clone());
3237 let _result = executor.execute(physical_plan.operator.as_mut())?;
3238
3239 let total_time_ms;
3240 #[cfg(not(target_arch = "wasm32"))]
3241 {
3242 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
3243 }
3244 #[cfg(target_arch = "wasm32")]
3245 {
3246 total_time_ms = 0.0;
3247 }
3248
3249 let profile_tree = crate::query::profile::build_profile_tree(
3250 &optimized_plan.root,
3251 &mut entries.into_iter(),
3252 );
3253 Ok(crate::query::profile::profile_result(
3254 &profile_tree,
3255 total_time_ms,
3256 ))
3257 });
3258 }
3259
3260 let has_mutations = optimized_plan.root.has_mutations();
3261
3262 let result = self.with_auto_commit(has_mutations, || {
3263 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3265
3266 let planner =
3268 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3269 let mut physical_plan = planner.plan(&optimized_plan)?;
3270
3271 let executor = self.make_executor(physical_plan.columns.clone());
3273 executor.execute(physical_plan.operator.as_mut())
3274 });
3275
3276 #[cfg(feature = "metrics")]
3277 {
3278 #[cfg(not(target_arch = "wasm32"))]
3279 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3280 #[cfg(target_arch = "wasm32")]
3281 let elapsed_ms = None;
3282 self.record_query_metrics("cypher", elapsed_ms, &result);
3283 }
3284
3285 result
3286 }
3287
3288 #[cfg(feature = "gremlin")]
3312 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
3313 use crate::query::{binder::Binder, optimizer::Optimizer, translators::gremlin};
3314
3315 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3316 let start_time = Instant::now();
3317
3318 let logical_plan = gremlin::translate(query)?;
3320
3321 let mut binder = Binder::new();
3323 let _binding_context = binder.bind(&logical_plan)?;
3324
3325 let active = self.active_store();
3327 let optimizer = Optimizer::from_graph_store(&*active);
3328 let optimized_plan = optimizer.optimize(logical_plan)?;
3329
3330 let has_mutations = optimized_plan.root.has_mutations();
3331 if has_mutations {
3332 self.require_permission(crate::auth::StatementKind::Write)?;
3333 }
3334
3335 let result = self.with_auto_commit(has_mutations, || {
3336 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3338
3339 let planner =
3341 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3342 let mut physical_plan = planner.plan(&optimized_plan)?;
3343
3344 let executor = self.make_executor(physical_plan.columns.clone());
3346 executor.execute(physical_plan.operator.as_mut())
3347 });
3348
3349 #[cfg(feature = "metrics")]
3350 {
3351 #[cfg(not(target_arch = "wasm32"))]
3352 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3353 #[cfg(target_arch = "wasm32")]
3354 let elapsed_ms = None;
3355 self.record_query_metrics("gremlin", elapsed_ms, &result);
3356 }
3357
3358 result
3359 }
3360
3361 #[cfg(feature = "gremlin")]
3367 pub fn execute_gremlin_with_params(
3368 &self,
3369 query: &str,
3370 params: std::collections::HashMap<String, Value>,
3371 ) -> Result<QueryResult> {
3372 use crate::query::{
3373 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3374 translators::gremlin,
3375 };
3376
3377 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3378 let start_time = Instant::now();
3379
3380 let mut logical_plan = gremlin::translate(query)?;
3382
3383 substitute_params(&mut logical_plan, ¶ms)?;
3385
3386 let mut binder = Binder::new();
3388 let _binding_context = binder.bind(&logical_plan)?;
3389
3390 let active = self.active_store();
3392 let optimizer = Optimizer::from_graph_store(&*active);
3393 let optimized_plan = optimizer.optimize(logical_plan)?;
3394
3395 let has_mutations = optimized_plan.root.has_mutations();
3396 if has_mutations {
3397 self.require_permission(crate::auth::StatementKind::Write)?;
3398 }
3399
3400 let result = self.with_auto_commit(has_mutations, || {
3401 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3402 let planner =
3403 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3404 let mut physical_plan = planner.plan(&optimized_plan)?;
3405 let executor = self.make_executor(physical_plan.columns.clone());
3406 executor.execute(physical_plan.operator.as_mut())
3407 });
3408
3409 #[cfg(feature = "metrics")]
3410 {
3411 #[cfg(not(target_arch = "wasm32"))]
3412 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3413 #[cfg(target_arch = "wasm32")]
3414 let elapsed_ms = None;
3415 self.record_query_metrics("gremlin", elapsed_ms, &result);
3416 }
3417
3418 result
3419 }
3420
3421 #[cfg(feature = "graphql")]
3445 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3446 use crate::query::{
3447 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3448 translators::graphql,
3449 };
3450
3451 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3452 let start_time = Instant::now();
3453
3454 let mut logical_plan = graphql::translate(query)?;
3455
3456 if !logical_plan.default_params.is_empty() {
3458 let defaults = logical_plan.default_params.clone();
3459 substitute_params(&mut logical_plan, &defaults)?;
3460 }
3461
3462 let mut binder = Binder::new();
3463 let _binding_context = binder.bind(&logical_plan)?;
3464
3465 let active = self.active_store();
3466 let optimizer = Optimizer::from_graph_store(&*active);
3467 let optimized_plan = optimizer.optimize(logical_plan)?;
3468 let has_mutations = optimized_plan.root.has_mutations();
3469 if has_mutations {
3470 self.require_permission(crate::auth::StatementKind::Write)?;
3471 }
3472
3473 let result = self.with_auto_commit(has_mutations, || {
3474 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3475 let planner =
3476 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3477 let mut physical_plan = planner.plan(&optimized_plan)?;
3478 let executor = self.make_executor(physical_plan.columns.clone());
3479 executor.execute(physical_plan.operator.as_mut())
3480 });
3481
3482 #[cfg(feature = "metrics")]
3483 {
3484 #[cfg(not(target_arch = "wasm32"))]
3485 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3486 #[cfg(target_arch = "wasm32")]
3487 let elapsed_ms = None;
3488 self.record_query_metrics("graphql", elapsed_ms, &result);
3489 }
3490
3491 result
3492 }
3493
3494 #[cfg(feature = "graphql")]
3500 pub fn execute_graphql_with_params(
3501 &self,
3502 query: &str,
3503 params: std::collections::HashMap<String, Value>,
3504 ) -> Result<QueryResult> {
3505 use crate::query::{
3506 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3507 translators::graphql,
3508 };
3509
3510 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3511 let start_time = Instant::now();
3512
3513 let mut logical_plan = graphql::translate(query)?;
3515
3516 if !logical_plan.default_params.is_empty() {
3518 let mut merged = logical_plan.default_params.clone();
3519 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3520 substitute_params(&mut logical_plan, &merged)?;
3521 } else {
3522 substitute_params(&mut logical_plan, ¶ms)?;
3523 }
3524
3525 let mut binder = Binder::new();
3527 let _binding_context = binder.bind(&logical_plan)?;
3528
3529 let active = self.active_store();
3531 let optimizer = Optimizer::from_graph_store(&*active);
3532 let optimized_plan = optimizer.optimize(logical_plan)?;
3533
3534 let has_mutations = optimized_plan.root.has_mutations();
3535 if has_mutations {
3536 self.require_permission(crate::auth::StatementKind::Write)?;
3537 }
3538
3539 let result = self.with_auto_commit(has_mutations, || {
3540 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3541 let planner =
3542 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3543 let mut physical_plan = planner.plan(&optimized_plan)?;
3544 let executor = self.make_executor(physical_plan.columns.clone());
3545 executor.execute(physical_plan.operator.as_mut())
3546 });
3547
3548 #[cfg(feature = "metrics")]
3549 {
3550 #[cfg(not(target_arch = "wasm32"))]
3551 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3552 #[cfg(target_arch = "wasm32")]
3553 let elapsed_ms = None;
3554 self.record_query_metrics("graphql", elapsed_ms, &result);
3555 }
3556
3557 result
3558 }
3559
3560 #[cfg(feature = "sql-pgq")]
3585 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3586 use crate::query::{
3587 binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3588 processor::QueryLanguage, translators::sql_pgq,
3589 };
3590
3591 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3592 let start_time = Instant::now();
3593
3594 let logical_plan = sql_pgq::translate(query)?;
3596
3597 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3599 self.require_permission(crate::auth::StatementKind::Admin)?;
3600 return Ok(QueryResult {
3601 columns: vec!["status".into()],
3602 column_types: vec![grafeo_common::types::LogicalType::String],
3603 rows: vec![vec![Value::from(format!(
3604 "Property graph '{}' created ({} node tables, {} edge tables)",
3605 cpg.name,
3606 cpg.node_tables.len(),
3607 cpg.edge_tables.len()
3608 ))]],
3609 execution_time_ms: None,
3610 rows_scanned: None,
3611 status_message: None,
3612 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3613 });
3614 }
3615
3616 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3617
3618 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3619 cached_plan
3620 } else {
3621 let mut binder = Binder::new();
3622 let _binding_context = binder.bind(&logical_plan)?;
3623 let active = self.active_store();
3624 let optimizer = Optimizer::from_graph_store(&*active);
3625 let plan = optimizer.optimize(logical_plan)?;
3626 self.query_cache.put_optimized(cache_key, plan.clone());
3627 plan
3628 };
3629
3630 let active = self.active_store();
3631 let has_mutations = optimized_plan.root.has_mutations();
3632 if has_mutations {
3633 self.require_permission(crate::auth::StatementKind::Write)?;
3634 }
3635
3636 let result = self.with_auto_commit(has_mutations, || {
3637 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3638 let planner =
3639 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3640 let mut physical_plan = planner.plan(&optimized_plan)?;
3641 let executor = self.make_executor(physical_plan.columns.clone());
3642 executor.execute(physical_plan.operator.as_mut())
3643 });
3644
3645 #[cfg(feature = "metrics")]
3646 {
3647 #[cfg(not(target_arch = "wasm32"))]
3648 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3649 #[cfg(target_arch = "wasm32")]
3650 let elapsed_ms = None;
3651 self.record_query_metrics("sql", elapsed_ms, &result);
3652 }
3653
3654 result
3655 }
3656
3657 #[cfg(feature = "sql-pgq")]
3663 pub fn execute_sql_with_params(
3664 &self,
3665 query: &str,
3666 params: std::collections::HashMap<String, Value>,
3667 ) -> Result<QueryResult> {
3668 use crate::query::processor::{QueryLanguage, QueryProcessor};
3669
3670 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3671 let start_time = Instant::now();
3672
3673 let has_mutations = if self.identity.can_write() {
3674 Self::query_looks_like_mutation(query)
3675 } else {
3676 use crate::query::translators::sql_pgq;
3677 match sql_pgq::translate(query) {
3678 Ok(plan) if plan.root.has_mutations() => {
3679 self.require_permission(crate::auth::StatementKind::Write)?;
3680 true
3681 }
3682 Ok(_) => false,
3683 Err(_) => Self::query_looks_like_mutation(query),
3684 }
3685 };
3686 if has_mutations {
3687 self.require_permission(crate::auth::StatementKind::Write)?;
3688 }
3689 let active = self.active_store();
3690
3691 let result = self.with_auto_commit(has_mutations, || {
3692 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3693 let processor = QueryProcessor::for_stores_with_transaction(
3694 Arc::clone(&active),
3695 self.active_write_store(),
3696 Arc::clone(&self.transaction_manager),
3697 )?;
3698 let processor = if let Some(transaction_id) = transaction_id {
3699 processor.with_transaction_context(viewing_epoch, transaction_id)
3700 } else {
3701 processor
3702 };
3703 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3704 });
3705
3706 #[cfg(feature = "metrics")]
3707 {
3708 #[cfg(not(target_arch = "wasm32"))]
3709 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3710 #[cfg(target_arch = "wasm32")]
3711 let elapsed_ms = None;
3712 self.record_query_metrics("sql", elapsed_ms, &result);
3713 }
3714
3715 result
3716 }
3717
3718 pub fn execute_language(
3727 &self,
3728 query: &str,
3729 language: &str,
3730 params: Option<std::collections::HashMap<String, Value>>,
3731 ) -> Result<QueryResult> {
3732 let _span = grafeo_info_span!(
3733 "grafeo::session::execute",
3734 language,
3735 query_len = query.len(),
3736 );
3737 match language {
3738 "gql" => {
3739 if let Some(p) = params {
3740 self.execute_with_params(query, p)
3741 } else {
3742 self.execute(query)
3743 }
3744 }
3745 #[cfg(feature = "cypher")]
3746 "cypher" => {
3747 if let Some(p) = params {
3748 use crate::query::processor::{QueryLanguage, QueryProcessor};
3749
3750 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3751 let start_time = Instant::now();
3752
3753 let has_mutations = if self.identity.can_write() {
3754 Self::query_looks_like_mutation(query)
3755 } else {
3756 use crate::query::translators::cypher;
3757 match cypher::translate(query) {
3758 Ok(plan) if plan.root.has_mutations() => {
3759 self.require_permission(crate::auth::StatementKind::Write)?;
3760 true
3761 }
3762 Ok(_) => false,
3763 Err(_) => Self::query_looks_like_mutation(query),
3764 }
3765 };
3766 let active = self.active_store();
3767 let result = self.with_auto_commit(has_mutations, || {
3768 let processor = QueryProcessor::for_stores_with_transaction(
3769 Arc::clone(&active),
3770 self.active_write_store(),
3771 Arc::clone(&self.transaction_manager),
3772 )?;
3773 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3774 let processor = if let Some(transaction_id) = transaction_id {
3775 processor.with_transaction_context(viewing_epoch, transaction_id)
3776 } else {
3777 processor
3778 };
3779 processor.process(query, QueryLanguage::Cypher, Some(&p))
3780 });
3781
3782 #[cfg(feature = "metrics")]
3783 {
3784 #[cfg(not(target_arch = "wasm32"))]
3785 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3786 #[cfg(target_arch = "wasm32")]
3787 let elapsed_ms = None;
3788 self.record_query_metrics("cypher", elapsed_ms, &result);
3789 }
3790
3791 result
3792 } else {
3793 self.execute_cypher(query)
3794 }
3795 }
3796 #[cfg(feature = "gremlin")]
3797 "gremlin" => {
3798 if let Some(p) = params {
3799 self.execute_gremlin_with_params(query, p)
3800 } else {
3801 self.execute_gremlin(query)
3802 }
3803 }
3804 #[cfg(feature = "graphql")]
3805 "graphql" => {
3806 if let Some(p) = params {
3807 self.execute_graphql_with_params(query, p)
3808 } else {
3809 self.execute_graphql(query)
3810 }
3811 }
3812 #[cfg(all(feature = "graphql", feature = "triple-store"))]
3813 "graphql-rdf" => {
3814 if let Some(p) = params {
3815 self.execute_graphql_rdf_with_params(query, p)
3816 } else {
3817 self.execute_graphql_rdf(query)
3818 }
3819 }
3820 #[cfg(feature = "sql-pgq")]
3821 "sql" | "sql-pgq" => {
3822 if let Some(p) = params {
3823 self.execute_sql_with_params(query, p)
3824 } else {
3825 self.execute_sql(query)
3826 }
3827 }
3828 #[cfg(all(feature = "sparql", feature = "triple-store"))]
3829 "sparql" => {
3830 if let Some(p) = params {
3831 self.execute_sparql_with_params(query, p)
3832 } else {
3833 self.execute_sparql(query)
3834 }
3835 }
3836 other => Err(grafeo_common::utils::error::Error::Query(
3837 grafeo_common::utils::error::QueryError::new(
3838 grafeo_common::utils::error::QueryErrorKind::Semantic,
3839 format!("Unknown query language: '{other}'"),
3840 ),
3841 )),
3842 }
3843 }
3844
3845 pub fn clear_plan_cache(&self) {
3872 self.query_cache.clear();
3873 }
3874
3875 #[cfg(feature = "lpg")]
3883 pub fn begin_transaction(&mut self) -> Result<()> {
3884 self.begin_transaction_inner(false, None)
3885 }
3886
3887 #[cfg(feature = "lpg")]
3895 pub fn begin_transaction_with_isolation(
3896 &mut self,
3897 isolation_level: crate::transaction::IsolationLevel,
3898 ) -> Result<()> {
3899 self.begin_transaction_inner(false, Some(isolation_level))
3900 }
3901
3902 #[cfg(feature = "lpg")]
3904 fn begin_transaction_inner(
3905 &self,
3906 read_only: bool,
3907 isolation_level: Option<crate::transaction::IsolationLevel>,
3908 ) -> Result<()> {
3909 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3910 let mut current = self.current_transaction.lock();
3911 if current.is_some() {
3912 drop(current);
3914 let mut depth = self.transaction_nesting_depth.lock();
3915 *depth += 1;
3916 let sp_name = format!("_nested_tx_{}", *depth);
3917 self.savepoint(&sp_name)?;
3918 return Ok(());
3919 }
3920
3921 let active = self.active_lpg_store();
3922 self.transaction_start_node_count
3923 .store(active.node_count(), Ordering::Relaxed);
3924 self.transaction_start_edge_count
3925 .store(active.edge_count(), Ordering::Relaxed);
3926 let transaction_id = if let Some(level) = isolation_level {
3927 self.transaction_manager.begin_with_isolation(level)
3928 } else {
3929 self.transaction_manager.begin()
3930 };
3931 *current = Some(transaction_id);
3932 *self.read_only_tx.lock() = read_only || self.db_read_only;
3933
3934 let key = self.active_graph_storage_key();
3937 let mut touched = self.touched_graphs.lock();
3938 touched.clear();
3939 touched.push(key);
3940
3941 #[cfg(feature = "metrics")]
3942 {
3943 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3944 #[cfg(not(target_arch = "wasm32"))]
3945 {
3946 *self.tx_start_time.lock() = Some(Instant::now());
3947 }
3948 }
3949
3950 Ok(())
3951 }
3952
3953 #[cfg(feature = "lpg")]
3961 pub fn commit(&mut self) -> Result<()> {
3962 self.commit_inner()
3963 }
3964
3965 #[cfg(feature = "lpg")]
3967 fn commit_inner(&self) -> Result<()> {
3968 let _span = grafeo_debug_span!("grafeo::tx::commit");
3969
3970 #[cfg(feature = "testing-statement-injection")]
3971 if let Err(e) = grafeo_common::testing::statement_failure::maybe_fail_commit() {
3972 let _ = self.rollback_inner();
3978 return Err(grafeo_common::utils::error::Error::Internal(format!(
3979 "injected commit failure: {e}"
3980 )));
3981 }
3982
3983 self.check_no_active_streams("commit")?;
3984 {
3986 let mut depth = self.transaction_nesting_depth.lock();
3987 if *depth > 0 {
3988 let sp_name = format!("_nested_tx_{depth}");
3989 *depth -= 1;
3990 drop(depth);
3991 return self.release_savepoint(&sp_name);
3992 }
3993 }
3994
3995 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3996 grafeo_common::utils::error::Error::Transaction(
3997 grafeo_common::utils::error::TransactionError::InvalidState(
3998 "No active transaction".to_string(),
3999 ),
4000 )
4001 })?;
4002
4003 let touched = std::mem::take(&mut *self.touched_graphs.lock());
4011 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
4012 Ok(epoch) => epoch,
4013 Err(e) => {
4014 for graph_name in &touched {
4016 let store = self.resolve_store(graph_name);
4017 store.rollback_transaction_properties(transaction_id);
4018 }
4019 #[cfg(feature = "triple-store")]
4020 self.rollback_rdf_transaction(transaction_id);
4021 #[cfg(feature = "cdc")]
4023 if let Some(ref pending) = self.cdc_pending_events {
4024 pending.lock().clear();
4025 }
4026 *self.read_only_tx.lock() = self.db_read_only;
4027 self.savepoints.lock().clear();
4028 self.touched_graphs.lock().clear();
4029 #[cfg(feature = "metrics")]
4030 {
4031 crate::metrics::record_metric!(self.metrics, tx_active, dec);
4032 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
4033 #[cfg(not(target_arch = "wasm32"))]
4034 if let Some(start) = self.tx_start_time.lock().take() {
4035 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4036 crate::metrics::record_metric!(
4037 self.metrics,
4038 tx_duration,
4039 observe duration_ms
4040 );
4041 }
4042 }
4043 return Err(e);
4044 }
4045 };
4046
4047 for graph_name in &touched {
4049 let store = self.resolve_store(graph_name);
4050 store.finalize_version_epochs(transaction_id, commit_epoch);
4051 }
4052
4053 #[cfg(feature = "triple-store")]
4055 self.commit_rdf_transaction(transaction_id);
4056
4057 for graph_name in &touched {
4058 let store = self.resolve_store(graph_name);
4059 store.commit_transaction_properties(transaction_id);
4060 }
4061
4062 #[cfg(feature = "cdc")]
4066 if let Some(ref pending) = self.cdc_pending_events {
4067 let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
4068 self.cdc_log.record_batch(events.into_iter().map(|mut e| {
4069 e.epoch = commit_epoch;
4070 e
4071 }));
4072 }
4073
4074 #[cfg(feature = "wal")]
4079 if let Some(ref wal) = self.wal {
4080 use grafeo_storage::wal::WalRecord;
4081 if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
4082 grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
4083 }
4084 if let Err(e) = wal.log(&WalRecord::EpochAdvance {
4085 epoch: commit_epoch,
4086 }) {
4087 grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
4088 }
4089 }
4090
4091 let current_epoch = self.transaction_manager.current_epoch();
4094 for graph_name in &touched {
4095 let store = self.resolve_store(graph_name);
4096 store.sync_epoch(current_epoch);
4097 }
4098
4099 *self.read_only_tx.lock() = self.db_read_only;
4102 self.savepoints.lock().clear();
4103
4104 if self.gc_interval > 0 {
4106 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
4107 if count.is_multiple_of(self.gc_interval) {
4108 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
4109 let gc_start = std::time::Instant::now();
4110
4111 let min_epoch = self.transaction_manager.min_active_epoch();
4112 for graph_name in &touched {
4113 let store = self.resolve_store(graph_name);
4114 store.gc_versions(min_epoch);
4115 }
4116 self.transaction_manager.gc();
4117
4118 #[cfg(feature = "metrics")]
4119 {
4120 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
4121 #[cfg(not(target_arch = "wasm32"))]
4122 {
4123 let gc_duration_ms = gc_start.elapsed().as_secs_f64() * 1000.0;
4124 crate::metrics::record_metric!(
4125 self.metrics,
4126 gc_duration,
4127 observe gc_duration_ms
4128 );
4129 }
4130 }
4131 }
4132 }
4133
4134 #[cfg(feature = "metrics")]
4135 {
4136 crate::metrics::record_metric!(self.metrics, tx_active, dec);
4137 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
4138 #[cfg(not(target_arch = "wasm32"))]
4139 if let Some(start) = self.tx_start_time.lock().take() {
4140 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4141 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4142 }
4143 }
4144
4145 Ok(())
4146 }
4147
4148 #[cfg(feature = "lpg")]
4172 pub fn rollback(&mut self) -> Result<()> {
4173 self.rollback_inner()
4174 }
4175
4176 #[cfg(feature = "lpg")]
4178 fn rollback_inner(&self) -> Result<()> {
4179 let _span = grafeo_debug_span!("grafeo::tx::rollback");
4180 self.check_no_active_streams("rollback")?;
4181 {
4183 let mut depth = self.transaction_nesting_depth.lock();
4184 if *depth > 0 {
4185 let sp_name = format!("_nested_tx_{depth}");
4186 *depth -= 1;
4187 drop(depth);
4188 return self.rollback_to_savepoint(&sp_name);
4189 }
4190 }
4191
4192 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
4193 grafeo_common::utils::error::Error::Transaction(
4194 grafeo_common::utils::error::TransactionError::InvalidState(
4195 "No active transaction".to_string(),
4196 ),
4197 )
4198 })?;
4199
4200 *self.read_only_tx.lock() = self.db_read_only;
4202
4203 let touched = self.touched_graphs.lock().clone();
4205 for graph_name in &touched {
4206 let store = self.resolve_store(graph_name);
4207 store.discard_uncommitted_versions(transaction_id);
4208 }
4209
4210 #[cfg(feature = "triple-store")]
4212 self.rollback_rdf_transaction(transaction_id);
4213
4214 #[cfg(feature = "cdc")]
4216 if let Some(ref pending) = self.cdc_pending_events {
4217 pending.lock().clear();
4218 }
4219
4220 self.savepoints.lock().clear();
4222 self.touched_graphs.lock().clear();
4223
4224 let result = self.transaction_manager.abort(transaction_id);
4226
4227 #[cfg(feature = "wal")]
4233 if let Some(ref wal) = self.wal {
4234 use grafeo_storage::wal::WalRecord;
4235 if let Err(e) = wal.log(&WalRecord::TransactionAbort { transaction_id }) {
4236 grafeo_warn!("Failed to log transaction abort to WAL: {}", e);
4237 }
4238 }
4239
4240 #[cfg(feature = "metrics")]
4241 if result.is_ok() {
4242 crate::metrics::record_metric!(self.metrics, tx_active, dec);
4243 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
4244 #[cfg(not(target_arch = "wasm32"))]
4245 if let Some(start) = self.tx_start_time.lock().take() {
4246 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4247 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4248 }
4249 }
4250
4251 result
4252 }
4253
4254 #[cfg(feature = "lpg")]
4264 pub fn savepoint(&self, name: &str) -> Result<()> {
4265 let tx_id = self.current_transaction.lock().ok_or_else(|| {
4266 grafeo_common::utils::error::Error::Transaction(
4267 grafeo_common::utils::error::TransactionError::InvalidState(
4268 "No active transaction".to_string(),
4269 ),
4270 )
4271 })?;
4272
4273 let touched = self.touched_graphs.lock().clone();
4275 let graph_snapshots: Vec<GraphSavepoint> = touched
4276 .iter()
4277 .map(|graph_name| {
4278 let store = self.resolve_store(graph_name);
4279 GraphSavepoint {
4280 graph_name: graph_name.clone(),
4281 next_node_id: store.peek_next_node_id(),
4282 next_edge_id: store.peek_next_edge_id(),
4283 undo_log_position: store.property_undo_log_position(tx_id),
4284 }
4285 })
4286 .collect();
4287
4288 self.savepoints.lock().push(SavepointState {
4289 name: name.to_string(),
4290 graph_snapshots,
4291 active_graph: self.current_graph.lock().clone(),
4292 #[cfg(feature = "cdc")]
4293 cdc_event_position: self
4294 .cdc_pending_events
4295 .as_ref()
4296 .map_or(0, |p| p.lock().len()),
4297 });
4298 Ok(())
4299 }
4300
4301 #[cfg(feature = "lpg")]
4310 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4311 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
4312 grafeo_common::utils::error::Error::Transaction(
4313 grafeo_common::utils::error::TransactionError::InvalidState(
4314 "No active transaction".to_string(),
4315 ),
4316 )
4317 })?;
4318
4319 let mut savepoints = self.savepoints.lock();
4320
4321 let pos = savepoints
4323 .iter()
4324 .rposition(|sp| sp.name == name)
4325 .ok_or_else(|| {
4326 grafeo_common::utils::error::Error::Transaction(
4327 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4328 "Savepoint '{name}' not found"
4329 )),
4330 )
4331 })?;
4332
4333 let sp_state = savepoints[pos].clone();
4334
4335 savepoints.truncate(pos);
4337 drop(savepoints);
4338
4339 for gs in &sp_state.graph_snapshots {
4341 let store = self.resolve_store(&gs.graph_name);
4342
4343 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
4345
4346 let current_next_node = store.peek_next_node_id();
4348 let current_next_edge = store.peek_next_edge_id();
4349
4350 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
4351 .map(NodeId::new)
4352 .collect();
4353 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
4354 .map(EdgeId::new)
4355 .collect();
4356
4357 if !node_ids.is_empty() || !edge_ids.is_empty() {
4358 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
4359 }
4360 }
4361
4362 let touched = self.touched_graphs.lock().clone();
4366 for graph_name in &touched {
4367 let already_captured = sp_state
4368 .graph_snapshots
4369 .iter()
4370 .any(|gs| gs.graph_name == *graph_name);
4371 if !already_captured {
4372 let store = self.resolve_store(graph_name);
4373 store.discard_uncommitted_versions(transaction_id);
4374 }
4375 }
4376
4377 #[cfg(feature = "cdc")]
4379 if let Some(ref pending) = self.cdc_pending_events {
4380 pending.lock().truncate(sp_state.cdc_event_position);
4381 }
4382
4383 let mut touched = self.touched_graphs.lock();
4385 touched.clear();
4386 for gs in &sp_state.graph_snapshots {
4387 if !touched.contains(&gs.graph_name) {
4388 touched.push(gs.graph_name.clone());
4389 }
4390 }
4391
4392 Ok(())
4393 }
4394
4395 pub fn release_savepoint(&self, name: &str) -> Result<()> {
4401 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4402 grafeo_common::utils::error::Error::Transaction(
4403 grafeo_common::utils::error::TransactionError::InvalidState(
4404 "No active transaction".to_string(),
4405 ),
4406 )
4407 })?;
4408
4409 let mut savepoints = self.savepoints.lock();
4410 let pos = savepoints
4411 .iter()
4412 .rposition(|sp| sp.name == name)
4413 .ok_or_else(|| {
4414 grafeo_common::utils::error::Error::Transaction(
4415 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4416 "Savepoint '{name}' not found"
4417 )),
4418 )
4419 })?;
4420 savepoints.remove(pos);
4421 Ok(())
4422 }
4423
4424 #[must_use]
4426 pub fn in_transaction(&self) -> bool {
4427 self.current_transaction.lock().is_some()
4428 }
4429
4430 #[must_use]
4432 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4433 *self.current_transaction.lock()
4434 }
4435
4436 #[must_use]
4438 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4439 &self.transaction_manager
4440 }
4441
4442 #[cfg(feature = "lpg")]
4444 #[must_use]
4445 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4446 (
4447 self.transaction_start_node_count.load(Ordering::Relaxed),
4448 self.active_lpg_store().node_count(),
4449 )
4450 }
4451
4452 #[cfg(feature = "lpg")]
4454 #[must_use]
4455 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4456 (
4457 self.transaction_start_edge_count.load(Ordering::Relaxed),
4458 self.active_lpg_store().edge_count(),
4459 )
4460 }
4461
4462 #[cfg(feature = "lpg")]
4496 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4497 crate::transaction::PreparedCommit::new(self)
4498 }
4499
4500 pub fn set_auto_commit(&mut self, auto_commit: bool) {
4502 self.auto_commit = auto_commit;
4503 }
4504
4505 #[must_use]
4507 pub fn auto_commit(&self) -> bool {
4508 self.auto_commit
4509 }
4510
4511 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4516 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4517 }
4518
4519 #[cfg(feature = "lpg")]
4522 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4523 where
4524 F: FnOnce() -> Result<QueryResult>,
4525 {
4526 if self.needs_auto_commit(has_mutations) {
4527 self.begin_transaction_inner(false, None)?;
4528 match body() {
4529 Ok(result) => {
4530 self.commit_inner()?;
4531 Ok(result)
4532 }
4533 Err(e) => {
4534 let _ = self.rollback_inner();
4535 Err(e)
4536 }
4537 }
4538 } else {
4539 body()
4540 }
4541 }
4542
4543 #[cfg(not(feature = "lpg"))]
4545 fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4546 where
4547 F: FnOnce() -> Result<QueryResult>,
4548 {
4549 body()
4550 }
4551
4552 fn query_looks_like_mutation(query: &str) -> bool {
4558 let upper = query.to_ascii_uppercase();
4559 upper.contains("INSERT")
4560 || upper.contains("CREATE")
4561 || upper.contains("DELETE")
4562 || upper.contains("MERGE")
4563 || upper.contains("SET")
4564 || upper.contains("REMOVE")
4565 || upper.contains("DROP")
4566 || upper.contains("ALTER")
4567 }
4568
4569 #[cfg(feature = "lpg")]
4572 fn check_no_active_streams(&self, op: &str) -> Result<()> {
4573 if self.active_streams.load(Ordering::Acquire) > 0 {
4574 return Err(grafeo_common::utils::error::Error::Transaction(
4575 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4576 "Cannot {op} while streaming results are active; drop the stream first"
4577 )),
4578 ));
4579 }
4580 Ok(())
4581 }
4582
4583 #[must_use]
4585 fn query_deadline(&self) -> Option<Instant> {
4586 #[cfg(not(target_arch = "wasm32"))]
4587 {
4588 self.query_timeout.map(|d| Instant::now() + d)
4589 }
4590 #[cfg(target_arch = "wasm32")]
4591 {
4592 let _ = &self.query_timeout;
4593 None
4594 }
4595 }
4596
4597 fn make_executor(&self, columns: Vec<String>) -> Executor {
4599 Executor::with_columns(columns)
4600 .with_deadline(self.query_deadline())
4601 .with_timeout_duration(self.query_timeout)
4602 }
4603
4604 #[cfg(feature = "spill")]
4609 fn make_operator_memory_context(
4610 &self,
4611 ) -> Option<grafeo_core::execution::OperatorMemoryContext> {
4612 let bm = self.buffer_manager.as_ref()?;
4613 let spill_path = bm.config().spill_path.as_ref()?;
4614 let query_id = self
4616 .commit_counter
4617 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4618 let query_dir = spill_path.join(format!("query_{query_id}"));
4619 let sm = std::sync::Arc::new(
4620 grafeo_core::execution::SpillManager::new(&query_dir)
4621 .ok()?
4622 .with_owned_dir(),
4623 );
4624 Some(grafeo_core::execution::OperatorMemoryContext::new(
4625 std::sync::Arc::clone(bm),
4626 sm,
4627 ))
4628 }
4629
4630 fn check_property_size(&self, key: &str, value: &Value) -> Result<()> {
4632 if let Some(limit) = self.max_property_size {
4633 let size = value.estimated_size_bytes();
4634 if size > limit {
4635 let limit_display = if limit >= 1024 * 1024 && limit % (1024 * 1024) == 0 {
4636 format!("{} MiB", limit / (1024 * 1024))
4637 } else if limit >= 1024 && limit % 1024 == 0 {
4638 format!("{} KiB", limit / 1024)
4639 } else {
4640 format!("{limit} bytes")
4641 };
4642 return Err(grafeo_common::utils::error::Error::Query(
4643 grafeo_common::utils::error::QueryError::new(
4644 grafeo_common::utils::error::QueryErrorKind::Execution,
4645 format!(
4646 "Property '{key}' value exceeds maximum size of {limit_display} ({size} bytes)"
4647 ),
4648 )
4649 .with_hint(
4650 "Increase with Config::with_max_property_size() or disable with Config::without_max_property_size()".to_string(),
4651 ),
4652 ));
4653 }
4654 }
4655 Ok(())
4656 }
4657
4658 #[cfg(feature = "metrics")]
4664 fn record_query_metrics(
4665 &self,
4666 language: &str,
4667 elapsed_ms: Option<f64>,
4668 result: &Result<crate::database::QueryResult>,
4669 ) {
4670 use crate::metrics::record_metric;
4671
4672 record_metric!(self.metrics, query_count, inc);
4673 if let Some(ref reg) = self.metrics {
4674 reg.query_count_by_language.increment(language);
4675 }
4676 if let Some(ms) = elapsed_ms {
4677 record_metric!(self.metrics, query_latency, observe ms);
4678 }
4679 match result {
4680 Ok(r) => {
4681 let returned = r.rows.len() as u64;
4682 record_metric!(self.metrics, rows_returned, add returned);
4683 if let Some(scanned) = r.rows_scanned {
4684 record_metric!(self.metrics, rows_scanned, add scanned);
4685 }
4686 }
4687 Err(e) => {
4688 record_metric!(self.metrics, query_errors, inc);
4689 let msg = e.to_string();
4691 if msg.contains("exceeded timeout") {
4692 record_metric!(self.metrics, query_timeouts, inc);
4693 }
4694 }
4695 }
4696 }
4697
4698 #[cfg(feature = "gql")]
4700 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4701 use grafeo_adapters::query::gql::ast::{Expression, Literal};
4702 match expr {
4703 Expression::Literal(Literal::Integer(n)) => Some(*n),
4704 _ => None,
4705 }
4706 }
4707
4708 #[must_use]
4714 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4715 if let Some(epoch) = *self.viewing_epoch_override.lock() {
4717 return (epoch, None);
4718 }
4719
4720 if let Some(transaction_id) = *self.current_transaction.lock() {
4721 let epoch = self
4723 .transaction_manager
4724 .start_epoch(transaction_id)
4725 .unwrap_or_else(|| self.transaction_manager.current_epoch());
4726 (epoch, Some(transaction_id))
4727 } else {
4728 (self.transaction_manager.current_epoch(), None)
4730 }
4731 }
4732
4733 fn create_planner_for_store(
4738 &self,
4739 store: Arc<dyn GraphStoreSearch>,
4740 viewing_epoch: EpochId,
4741 transaction_id: Option<TransactionId>,
4742 ) -> crate::query::Planner {
4743 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4744 }
4745
4746 fn create_planner_for_store_with_read_only(
4747 &self,
4748 store: Arc<dyn GraphStoreSearch>,
4749 viewing_epoch: EpochId,
4750 transaction_id: Option<TransactionId>,
4751 read_only: bool,
4752 ) -> crate::query::Planner {
4753 use crate::query::Planner;
4754 use grafeo_core::execution::operators::{LazyValue, SessionContext};
4755
4756 let info_store = Arc::clone(&store);
4758 let schema_store = Arc::clone(&store);
4759
4760 let session_context = SessionContext {
4761 current_schema: self.current_schema(),
4762 current_graph: self.current_graph(),
4763 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4764 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4765 };
4766
4767 let write_store = self.active_write_store();
4768
4769 let mut planner = Planner::with_context(
4770 Arc::clone(&store),
4771 write_store,
4772 Arc::clone(&self.transaction_manager),
4773 transaction_id,
4774 viewing_epoch,
4775 )
4776 .with_factorized_execution(self.factorized_execution)
4777 .with_catalog(Arc::clone(&self.catalog))
4778 .with_session_context(session_context)
4779 .with_read_only(read_only);
4780
4781 #[cfg(feature = "lpg")]
4786 if matches!(self.lpg_backend, LpgBackend::Active) {
4787 planner = planner.with_lpg_store(Arc::clone(&self.store));
4788 }
4789
4790 let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog))
4792 .with_store(store)
4793 .with_max_property_size(self.max_property_size);
4794 planner = planner.with_validator(Arc::new(validator));
4795
4796 planner
4797 }
4798
4799 fn build_info_value(store: &dyn GraphStore) -> Value {
4801 use grafeo_common::types::PropertyKey;
4802 use std::collections::BTreeMap;
4803
4804 let mut map = BTreeMap::new();
4805 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4806 #[allow(clippy::cast_possible_wrap)]
4808 let node_count = store.node_count() as i64;
4809 #[allow(clippy::cast_possible_wrap)]
4811 let edge_count = store.edge_count() as i64;
4812 map.insert(PropertyKey::from("node_count"), Value::Int64(node_count));
4813 map.insert(PropertyKey::from("edge_count"), Value::Int64(edge_count));
4814 map.insert(
4815 PropertyKey::from("version"),
4816 Value::String(env!("CARGO_PKG_VERSION").into()),
4817 );
4818 Value::Map(map.into())
4819 }
4820
4821 fn build_schema_value(store: &dyn GraphStore) -> Value {
4823 use grafeo_common::types::PropertyKey;
4824 use std::collections::BTreeMap;
4825
4826 let labels: Vec<Value> = store
4827 .all_labels()
4828 .into_iter()
4829 .map(|l| Value::String(l.into()))
4830 .collect();
4831 let edge_types: Vec<Value> = store
4832 .all_edge_types()
4833 .into_iter()
4834 .map(|t| Value::String(t.into()))
4835 .collect();
4836 let property_keys: Vec<Value> = store
4837 .all_property_keys()
4838 .into_iter()
4839 .map(|k| Value::String(k.into()))
4840 .collect();
4841
4842 let mut map = BTreeMap::new();
4843 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4844 map.insert(
4845 PropertyKey::from("edge_types"),
4846 Value::List(edge_types.into()),
4847 );
4848 map.insert(
4849 PropertyKey::from("property_keys"),
4850 Value::List(property_keys.into()),
4851 );
4852 Value::Map(map.into())
4853 }
4854
4855 #[cfg(feature = "lpg")]
4860 pub fn create_node(&self, labels: &[&str]) -> NodeId {
4861 let (epoch, transaction_id) = self.get_transaction_context();
4862 let id = self.active_lpg_store().create_node_versioned(
4863 labels,
4864 epoch,
4865 transaction_id.unwrap_or(TransactionId::SYSTEM),
4866 );
4867
4868 #[cfg(feature = "wal")]
4869 self.log_wal_record(&grafeo_storage::wal::WalRecord::CreateNode {
4870 id,
4871 labels: labels.iter().map(|s| (*s).to_string()).collect(),
4872 });
4873
4874 id
4875 }
4876
4877 #[cfg(feature = "lpg")]
4885 pub fn create_node_with_props<'a>(
4886 &self,
4887 labels: &[&str],
4888 properties: impl IntoIterator<Item = (&'a str, Value)>,
4889 ) -> Result<NodeId> {
4890 let props: Vec<(&str, Value)> = properties.into_iter().collect();
4891 for (key, value) in &props {
4892 self.check_property_size(key, value)?;
4893 }
4894
4895 #[cfg(feature = "wal")]
4899 let wal_props: Vec<(String, Value)> = props
4900 .iter()
4901 .map(|(k, v)| ((*k).to_string(), v.clone()))
4902 .collect();
4903
4904 let (epoch, transaction_id) = self.get_transaction_context();
4905 let id = self.active_lpg_store().create_node_with_props_versioned(
4906 labels,
4907 props,
4908 epoch,
4909 transaction_id.unwrap_or(TransactionId::SYSTEM),
4910 );
4911
4912 #[cfg(feature = "wal")]
4913 {
4914 self.log_wal_record(&grafeo_storage::wal::WalRecord::CreateNode {
4915 id,
4916 labels: labels.iter().map(|s| (*s).to_string()).collect(),
4917 });
4918 for (key, value) in wal_props {
4919 self.log_wal_record(&grafeo_storage::wal::WalRecord::SetNodeProperty {
4920 id,
4921 key,
4922 value,
4923 });
4924 }
4925 }
4926
4927 Ok(id)
4928 }
4929
4930 #[cfg(feature = "lpg")]
4935 pub fn create_edge(
4936 &self,
4937 src: NodeId,
4938 dst: NodeId,
4939 edge_type: &str,
4940 ) -> grafeo_common::types::EdgeId {
4941 let (epoch, transaction_id) = self.get_transaction_context();
4942 let eid = self.active_lpg_store().create_edge_versioned(
4943 src,
4944 dst,
4945 edge_type,
4946 epoch,
4947 transaction_id.unwrap_or(TransactionId::SYSTEM),
4948 );
4949
4950 #[cfg(feature = "wal")]
4951 self.log_wal_record(&grafeo_storage::wal::WalRecord::CreateEdge {
4952 id: eid,
4953 src,
4954 dst,
4955 edge_type: edge_type.to_string(),
4956 });
4957
4958 eid
4959 }
4960
4961 #[cfg(feature = "lpg")]
4967 pub fn create_edge_with_props<'a>(
4968 &self,
4969 src: NodeId,
4970 dst: NodeId,
4971 edge_type: &str,
4972 properties: impl IntoIterator<Item = (&'a str, Value)>,
4973 ) -> Result<grafeo_common::types::EdgeId> {
4974 let props: Vec<(&str, Value)> = properties.into_iter().collect();
4975 for (key, value) in &props {
4976 self.check_property_size(key, value)?;
4977 }
4978 let (epoch, transaction_id) = self.get_transaction_context();
4979 let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4980 let store = self.active_lpg_store();
4981 let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4982 for (key, value) in &props {
4983 store.set_edge_property_versioned(eid, key, value.clone(), tid);
4984 }
4985
4986 #[cfg(feature = "wal")]
4987 {
4988 self.log_wal_record(&grafeo_storage::wal::WalRecord::CreateEdge {
4989 id: eid,
4990 src,
4991 dst,
4992 edge_type: edge_type.to_string(),
4993 });
4994 for (key, value) in props {
4995 self.log_wal_record(&grafeo_storage::wal::WalRecord::SetEdgeProperty {
4996 id: eid,
4997 key: key.to_string(),
4998 value,
4999 });
5000 }
5001 }
5002
5003 Ok(eid)
5004 }
5005
5006 #[cfg(feature = "lpg")]
5012 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) -> Result<()> {
5013 self.check_property_size(key, &value)?;
5014 let (_, transaction_id) = self.get_transaction_context();
5015
5016 #[cfg(feature = "wal")]
5017 let value_for_wal = value.clone();
5018
5019 if let Some(tid) = transaction_id {
5020 self.active_lpg_store()
5021 .set_node_property_versioned(id, key, value, tid);
5022 } else {
5023 self.active_lpg_store().set_node_property(id, key, value);
5024 }
5025
5026 #[cfg(feature = "wal")]
5027 self.log_wal_record(&grafeo_storage::wal::WalRecord::SetNodeProperty {
5028 id,
5029 key: key.to_string(),
5030 value: value_for_wal,
5031 });
5032
5033 Ok(())
5034 }
5035
5036 #[cfg(feature = "lpg")]
5042 pub fn set_edge_property(
5043 &self,
5044 id: grafeo_common::types::EdgeId,
5045 key: &str,
5046 value: Value,
5047 ) -> Result<()> {
5048 self.check_property_size(key, &value)?;
5049 let (_, transaction_id) = self.get_transaction_context();
5050
5051 #[cfg(feature = "wal")]
5052 let value_for_wal = value.clone();
5053
5054 if let Some(tid) = transaction_id {
5055 self.active_lpg_store()
5056 .set_edge_property_versioned(id, key, value, tid);
5057 } else {
5058 self.active_lpg_store().set_edge_property(id, key, value);
5059 }
5060
5061 #[cfg(feature = "wal")]
5062 self.log_wal_record(&grafeo_storage::wal::WalRecord::SetEdgeProperty {
5063 id,
5064 key: key.to_string(),
5065 value: value_for_wal,
5066 });
5067
5068 Ok(())
5069 }
5070
5071 #[cfg(feature = "lpg")]
5073 pub fn delete_node(&self, id: NodeId) -> bool {
5074 let (epoch, transaction_id) = self.get_transaction_context();
5075 let deleted = if let Some(tid) = transaction_id {
5076 self.active_lpg_store()
5077 .delete_node_versioned(id, epoch, tid)
5078 } else {
5079 self.active_lpg_store().delete_node(id)
5080 };
5081
5082 #[cfg(feature = "wal")]
5083 if deleted {
5084 self.log_wal_record(&grafeo_storage::wal::WalRecord::DeleteNode { id });
5085 }
5086
5087 deleted
5088 }
5089
5090 #[cfg(feature = "lpg")]
5092 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
5093 let (epoch, transaction_id) = self.get_transaction_context();
5094 let deleted = if let Some(tid) = transaction_id {
5095 self.active_lpg_store()
5096 .delete_edge_versioned(id, epoch, tid)
5097 } else {
5098 self.active_lpg_store().delete_edge(id)
5099 };
5100
5101 #[cfg(feature = "wal")]
5102 if deleted {
5103 self.log_wal_record(&grafeo_storage::wal::WalRecord::DeleteEdge { id });
5104 }
5105
5106 deleted
5107 }
5108
5109 #[cfg(feature = "lpg")]
5137 #[must_use]
5138 pub fn get_node(&self, id: NodeId) -> Option<Node> {
5139 let (epoch, transaction_id) = self.get_transaction_context();
5140 self.active_lpg_store().get_node_versioned(
5141 id,
5142 epoch,
5143 transaction_id.unwrap_or(TransactionId::SYSTEM),
5144 )
5145 }
5146
5147 #[cfg(feature = "lpg")]
5171 #[must_use]
5172 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
5173 self.get_node(id)
5174 .and_then(|node| node.get_property(key).cloned())
5175 }
5176
5177 #[cfg(feature = "lpg")]
5184 #[must_use]
5185 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
5186 let (epoch, transaction_id) = self.get_transaction_context();
5187 self.active_lpg_store().get_edge_versioned(
5188 id,
5189 epoch,
5190 transaction_id.unwrap_or(TransactionId::SYSTEM),
5191 )
5192 }
5193
5194 #[cfg(feature = "lpg")]
5220 #[must_use]
5221 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5222 self.active_lpg_store()
5223 .edges_from(node, Direction::Outgoing)
5224 .collect()
5225 }
5226
5227 #[cfg(feature = "lpg")]
5236 #[must_use]
5237 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5238 self.active_lpg_store()
5239 .edges_from(node, Direction::Incoming)
5240 .collect()
5241 }
5242
5243 #[cfg(feature = "lpg")]
5255 #[must_use]
5256 pub fn get_neighbors_outgoing_by_type(
5257 &self,
5258 node: NodeId,
5259 edge_type: &str,
5260 ) -> Vec<(NodeId, EdgeId)> {
5261 self.active_lpg_store()
5262 .edges_from(node, Direction::Outgoing)
5263 .filter(|(_, edge_id)| {
5264 self.get_edge(*edge_id)
5265 .is_some_and(|e| e.edge_type.as_str() == edge_type)
5266 })
5267 .collect()
5268 }
5269
5270 #[cfg(feature = "lpg")]
5277 #[must_use]
5278 pub fn node_exists(&self, id: NodeId) -> bool {
5279 self.get_node(id).is_some()
5280 }
5281
5282 #[cfg(feature = "lpg")]
5284 #[must_use]
5285 pub fn edge_exists(&self, id: EdgeId) -> bool {
5286 self.get_edge(id).is_some()
5287 }
5288
5289 #[cfg(feature = "lpg")]
5293 #[must_use]
5294 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
5295 let active = self.active_lpg_store();
5296 let out = active.out_degree(node);
5297 let in_degree = active.in_degree(node);
5298 (out, in_degree)
5299 }
5300
5301 #[cfg(feature = "lpg")]
5311 #[must_use]
5312 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
5313 let (epoch, transaction_id) = self.get_transaction_context();
5314 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
5315 let active = self.active_lpg_store();
5316 ids.iter()
5317 .map(|&id| active.get_node_versioned(id, epoch, tx))
5318 .collect()
5319 }
5320
5321 #[cfg(feature = "cdc")]
5329 pub fn history(
5330 &self,
5331 entity_id: impl Into<crate::cdc::EntityId>,
5332 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5333 self.require_permission(crate::auth::StatementKind::Read)?;
5334 Ok(self.cdc_log.history(entity_id.into()))
5335 }
5336
5337 #[cfg(feature = "cdc")]
5343 pub fn history_since(
5344 &self,
5345 entity_id: impl Into<crate::cdc::EntityId>,
5346 since_epoch: EpochId,
5347 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5348 self.require_permission(crate::auth::StatementKind::Read)?;
5349 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
5350 }
5351
5352 #[cfg(feature = "cdc")]
5358 pub fn changes_between(
5359 &self,
5360 start_epoch: EpochId,
5361 end_epoch: EpochId,
5362 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5363 self.require_permission(crate::auth::StatementKind::Read)?;
5364 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
5365 }
5366}
5367
5368impl Drop for Session {
5369 fn drop(&mut self) {
5370 #[cfg(feature = "lpg")]
5373 if self.in_transaction() {
5374 let _ = self.rollback_inner();
5375 }
5376
5377 #[cfg(feature = "metrics")]
5378 if let Some(ref reg) = self.metrics {
5379 reg.session_active
5380 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
5381 }
5382 }
5383}
5384
5385#[cfg(test)]
5386mod tests {
5387 use super::parse_default_literal;
5388 use crate::database::GrafeoDB;
5389 use grafeo_common::types::Value;
5390
5391 #[test]
5396 fn parse_default_literal_null() {
5397 assert_eq!(parse_default_literal("null"), Value::Null);
5398 assert_eq!(parse_default_literal("NULL"), Value::Null);
5399 assert_eq!(parse_default_literal("Null"), Value::Null);
5400 }
5401
5402 #[test]
5403 fn parse_default_literal_bool() {
5404 assert_eq!(parse_default_literal("true"), Value::Bool(true));
5405 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
5406 assert_eq!(parse_default_literal("false"), Value::Bool(false));
5407 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
5408 }
5409
5410 #[test]
5411 fn parse_default_literal_string_single_quoted() {
5412 assert_eq!(
5413 parse_default_literal("'hello'"),
5414 Value::String("hello".into())
5415 );
5416 }
5417
5418 #[test]
5419 fn parse_default_literal_string_double_quoted() {
5420 assert_eq!(
5421 parse_default_literal("\"world\""),
5422 Value::String("world".into())
5423 );
5424 }
5425
5426 #[test]
5427 fn parse_default_literal_integer() {
5428 assert_eq!(parse_default_literal("42"), Value::Int64(42));
5429 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
5430 assert_eq!(parse_default_literal("0"), Value::Int64(0));
5431 }
5432
5433 #[test]
5434 fn parse_default_literal_float() {
5435 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
5436 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
5437 }
5438
5439 #[test]
5440 fn parse_default_literal_fallback_string() {
5441 assert_eq!(
5443 parse_default_literal("some_identifier"),
5444 Value::String("some_identifier".into())
5445 );
5446 }
5447
5448 #[test]
5449 fn test_session_create_node() {
5450 let db = GrafeoDB::new_in_memory();
5451 let session = db.session();
5452
5453 let id = session.create_node(&["Person"]);
5454 assert!(id.is_valid());
5455 assert_eq!(db.node_count(), 1);
5456 }
5457
5458 #[test]
5459 fn test_session_transaction() {
5460 let db = GrafeoDB::new_in_memory();
5461 let mut session = db.session();
5462
5463 assert!(!session.in_transaction());
5464
5465 session.begin_transaction().unwrap();
5466 assert!(session.in_transaction());
5467
5468 session.commit().unwrap();
5469 assert!(!session.in_transaction());
5470 }
5471
5472 #[test]
5473 fn test_session_transaction_context() {
5474 let db = GrafeoDB::new_in_memory();
5475 let mut session = db.session();
5476
5477 let (_epoch1, transaction_id1) = session.get_transaction_context();
5479 assert!(transaction_id1.is_none());
5480
5481 session.begin_transaction().unwrap();
5483 let (epoch2, transaction_id2) = session.get_transaction_context();
5484 assert!(transaction_id2.is_some());
5485 let _ = epoch2; session.commit().unwrap();
5490 let (epoch3, tx_id3) = session.get_transaction_context();
5491 assert!(tx_id3.is_none());
5492 assert!(epoch3.as_u64() >= epoch2.as_u64());
5494 }
5495
5496 #[test]
5497 fn test_session_rollback() {
5498 let db = GrafeoDB::new_in_memory();
5499 let mut session = db.session();
5500
5501 session.begin_transaction().unwrap();
5502 session.rollback().unwrap();
5503 assert!(!session.in_transaction());
5504 }
5505
5506 #[test]
5507 fn test_session_rollback_discards_versions() {
5508 use grafeo_common::types::TransactionId;
5509
5510 let db = GrafeoDB::new_in_memory();
5511
5512 let node_before = db.store().create_node(&["Person"]);
5514 assert!(node_before.is_valid());
5515 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5516
5517 let mut session = db.session();
5519 session.begin_transaction().unwrap();
5520 let transaction_id = session.current_transaction.lock().unwrap();
5521
5522 let epoch = db.store().current_epoch();
5524 let node_in_tx = db
5525 .store()
5526 .create_node_versioned(&["Person"], epoch, transaction_id);
5527 assert!(node_in_tx.is_valid());
5528
5529 assert_eq!(
5533 db.node_count(),
5534 1,
5535 "PENDING nodes should be invisible to non-versioned node_count()"
5536 );
5537 assert!(
5538 db.store()
5539 .get_node_versioned(node_in_tx, epoch, transaction_id)
5540 .is_some(),
5541 "Transaction node should be visible to its own transaction"
5542 );
5543
5544 session.rollback().unwrap();
5546 assert!(!session.in_transaction());
5547
5548 let count_after = db.node_count();
5551 assert_eq!(
5552 count_after, 1,
5553 "Rollback should discard uncommitted node, but got {count_after}"
5554 );
5555
5556 let current_epoch = db.store().current_epoch();
5558 assert!(
5559 db.store()
5560 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
5561 .is_some(),
5562 "Original node should still exist"
5563 );
5564
5565 assert!(
5567 db.store()
5568 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
5569 .is_none(),
5570 "Transaction node should be gone"
5571 );
5572 }
5573
5574 #[test]
5575 fn test_session_create_node_in_transaction() {
5576 let db = GrafeoDB::new_in_memory();
5578
5579 let node_before = db.create_node(&["Person"]);
5581 assert!(node_before.is_valid());
5582 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5583
5584 let mut session = db.session();
5586 session.begin_transaction().unwrap();
5587 let transaction_id = session.current_transaction.lock().unwrap();
5588
5589 let node_in_tx = session.create_node(&["Person"]);
5591 assert!(node_in_tx.is_valid());
5592
5593 assert_eq!(
5596 db.node_count(),
5597 1,
5598 "PENDING nodes should be invisible to non-versioned node_count()"
5599 );
5600 let epoch = db.store().current_epoch();
5601 assert!(
5602 db.store()
5603 .get_node_versioned(node_in_tx, epoch, transaction_id)
5604 .is_some(),
5605 "Transaction node should be visible to its own transaction"
5606 );
5607
5608 session.rollback().unwrap();
5610
5611 let count_after = db.node_count();
5613 assert_eq!(
5614 count_after, 1,
5615 "Rollback should discard node created via session.create_node(), but got {count_after}"
5616 );
5617 }
5618
5619 #[test]
5620 fn test_session_create_node_with_props_in_transaction() {
5621 use grafeo_common::types::Value;
5622
5623 let db = GrafeoDB::new_in_memory();
5625
5626 db.create_node(&["Person"]);
5628 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5629
5630 let mut session = db.session();
5632 session.begin_transaction().unwrap();
5633 let transaction_id = session.current_transaction.lock().unwrap();
5634
5635 let node_in_tx = session
5636 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5637 .unwrap();
5638 assert!(node_in_tx.is_valid());
5639
5640 assert_eq!(
5643 db.node_count(),
5644 1,
5645 "PENDING nodes should be invisible to non-versioned node_count()"
5646 );
5647 let epoch = db.store().current_epoch();
5648 assert!(
5649 db.store()
5650 .get_node_versioned(node_in_tx, epoch, transaction_id)
5651 .is_some(),
5652 "Transaction node should be visible to its own transaction"
5653 );
5654
5655 session.rollback().unwrap();
5657
5658 let count_after = db.node_count();
5660 assert_eq!(
5661 count_after, 1,
5662 "Rollback should discard node created via session.create_node_with_props()"
5663 );
5664 }
5665
5666 #[cfg(feature = "gql")]
5667 mod gql_tests {
5668 use super::*;
5669
5670 #[test]
5671 fn test_gql_query_execution() {
5672 let db = GrafeoDB::new_in_memory();
5673 let session = db.session();
5674
5675 session.create_node(&["Person"]);
5677 session.create_node(&["Person"]);
5678 session.create_node(&["Animal"]);
5679
5680 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5682
5683 assert_eq!(result.row_count(), 2);
5685 assert_eq!(result.column_count(), 1);
5686 assert_eq!(result.columns[0], "n");
5687 }
5688
5689 #[test]
5690 fn test_gql_empty_result() {
5691 let db = GrafeoDB::new_in_memory();
5692 let session = db.session();
5693
5694 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5696
5697 assert_eq!(result.row_count(), 0);
5698 }
5699
5700 #[test]
5701 fn test_gql_parse_error() {
5702 let db = GrafeoDB::new_in_memory();
5703 let session = db.session();
5704
5705 let result = session.execute("MATCH (n RETURN n");
5707
5708 assert!(result.is_err());
5709 }
5710
5711 #[test]
5712 fn test_gql_relationship_traversal() {
5713 let db = GrafeoDB::new_in_memory();
5714 let session = db.session();
5715
5716 let alix = session.create_node(&["Person"]);
5718 let gus = session.create_node(&["Person"]);
5719 let vincent = session.create_node(&["Person"]);
5720
5721 session.create_edge(alix, gus, "KNOWS");
5722 session.create_edge(alix, vincent, "KNOWS");
5723
5724 let result = session
5726 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5727 .unwrap();
5728
5729 assert_eq!(result.row_count(), 2);
5731 assert_eq!(result.column_count(), 2);
5732 assert_eq!(result.columns[0], "a");
5733 assert_eq!(result.columns[1], "b");
5734 }
5735
5736 #[test]
5737 fn test_gql_relationship_with_type_filter() {
5738 let db = GrafeoDB::new_in_memory();
5739 let session = db.session();
5740
5741 let alix = session.create_node(&["Person"]);
5743 let gus = session.create_node(&["Person"]);
5744 let vincent = session.create_node(&["Person"]);
5745
5746 session.create_edge(alix, gus, "KNOWS");
5747 session.create_edge(alix, vincent, "WORKS_WITH");
5748
5749 let result = session
5751 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5752 .unwrap();
5753
5754 assert_eq!(result.row_count(), 1);
5756 }
5757
5758 #[test]
5759 fn test_gql_semantic_error_undefined_variable() {
5760 let db = GrafeoDB::new_in_memory();
5761 let session = db.session();
5762
5763 let result = session.execute("MATCH (n:Person) RETURN x");
5765
5766 assert!(result.is_err());
5768 let Err(err) = result else {
5769 panic!("Expected error")
5770 };
5771 assert!(
5772 err.to_string().contains("Undefined variable"),
5773 "Expected undefined variable error, got: {}",
5774 err
5775 );
5776 }
5777
5778 #[test]
5779 fn test_gql_where_clause_property_filter() {
5780 use grafeo_common::types::Value;
5781
5782 let db = GrafeoDB::new_in_memory();
5783 let session = db.session();
5784
5785 session
5787 .create_node_with_props(&["Person"], [("age", Value::Int64(25))])
5788 .unwrap();
5789 session
5790 .create_node_with_props(&["Person"], [("age", Value::Int64(35))])
5791 .unwrap();
5792 session
5793 .create_node_with_props(&["Person"], [("age", Value::Int64(45))])
5794 .unwrap();
5795
5796 let result = session
5798 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5799 .unwrap();
5800
5801 assert_eq!(result.row_count(), 2);
5803 }
5804
5805 #[test]
5806 fn test_gql_where_clause_equality() {
5807 use grafeo_common::types::Value;
5808
5809 let db = GrafeoDB::new_in_memory();
5810 let session = db.session();
5811
5812 session
5814 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5815 .unwrap();
5816 session
5817 .create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))])
5818 .unwrap();
5819 session
5820 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5821 .unwrap();
5822
5823 let result = session
5825 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5826 .unwrap();
5827
5828 assert_eq!(result.row_count(), 2);
5830 }
5831
5832 #[test]
5833 fn test_gql_return_property_access() {
5834 use grafeo_common::types::Value;
5835
5836 let db = GrafeoDB::new_in_memory();
5837 let session = db.session();
5838
5839 session
5841 .create_node_with_props(
5842 &["Person"],
5843 [
5844 ("name", Value::String("Alix".into())),
5845 ("age", Value::Int64(30)),
5846 ],
5847 )
5848 .unwrap();
5849 session
5850 .create_node_with_props(
5851 &["Person"],
5852 [
5853 ("name", Value::String("Gus".into())),
5854 ("age", Value::Int64(25)),
5855 ],
5856 )
5857 .unwrap();
5858
5859 let result = session
5861 .execute("MATCH (n:Person) RETURN n.name, n.age")
5862 .unwrap();
5863
5864 assert_eq!(result.row_count(), 2);
5866 assert_eq!(result.column_count(), 2);
5867 assert_eq!(result.columns[0], "n.name");
5868 assert_eq!(result.columns[1], "n.age");
5869
5870 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5872 assert!(names.contains(&&Value::String("Alix".into())));
5873 assert!(names.contains(&&Value::String("Gus".into())));
5874 }
5875
5876 #[test]
5877 fn test_gql_return_mixed_expressions() {
5878 use grafeo_common::types::Value;
5879
5880 let db = GrafeoDB::new_in_memory();
5881 let session = db.session();
5882
5883 session
5885 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5886 .unwrap();
5887
5888 let result = session
5890 .execute("MATCH (n:Person) RETURN n, n.name")
5891 .unwrap();
5892
5893 assert_eq!(result.row_count(), 1);
5894 assert_eq!(result.column_count(), 2);
5895 assert_eq!(result.columns[0], "n");
5896 assert_eq!(result.columns[1], "n.name");
5897
5898 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5900 }
5901 }
5902
5903 #[cfg(feature = "cypher")]
5904 mod cypher_tests {
5905 use super::*;
5906
5907 #[test]
5908 fn test_cypher_query_execution() {
5909 let db = GrafeoDB::new_in_memory();
5910 let session = db.session();
5911
5912 session.create_node(&["Person"]);
5914 session.create_node(&["Person"]);
5915 session.create_node(&["Animal"]);
5916
5917 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5919
5920 assert_eq!(result.row_count(), 2);
5922 assert_eq!(result.column_count(), 1);
5923 assert_eq!(result.columns[0], "n");
5924 }
5925
5926 #[test]
5927 fn test_cypher_empty_result() {
5928 let db = GrafeoDB::new_in_memory();
5929 let session = db.session();
5930
5931 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5933
5934 assert_eq!(result.row_count(), 0);
5935 }
5936
5937 #[test]
5938 fn test_cypher_parse_error() {
5939 let db = GrafeoDB::new_in_memory();
5940 let session = db.session();
5941
5942 let result = session.execute_cypher("MATCH (n RETURN n");
5944
5945 assert!(result.is_err());
5946 }
5947 }
5948
5949 mod direct_lookup_tests {
5952 use super::*;
5953 use grafeo_common::types::Value;
5954
5955 #[test]
5956 fn test_get_node() {
5957 let db = GrafeoDB::new_in_memory();
5958 let session = db.session();
5959
5960 let id = session.create_node(&["Person"]);
5961 let node = session.get_node(id);
5962
5963 assert!(node.is_some());
5964 let node = node.unwrap();
5965 assert_eq!(node.id, id);
5966 }
5967
5968 #[test]
5969 fn test_get_node_not_found() {
5970 use grafeo_common::types::NodeId;
5971
5972 let db = GrafeoDB::new_in_memory();
5973 let session = db.session();
5974
5975 let node = session.get_node(NodeId::new(9999));
5977 assert!(node.is_none());
5978 }
5979
5980 #[test]
5981 fn test_get_node_property() {
5982 let db = GrafeoDB::new_in_memory();
5983 let session = db.session();
5984
5985 let id = session
5986 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5987 .unwrap();
5988
5989 let name = session.get_node_property(id, "name");
5990 assert_eq!(name, Some(Value::String("Alix".into())));
5991
5992 let missing = session.get_node_property(id, "missing");
5994 assert!(missing.is_none());
5995 }
5996
5997 #[test]
5998 fn test_get_edge() {
5999 let db = GrafeoDB::new_in_memory();
6000 let session = db.session();
6001
6002 let alix = session.create_node(&["Person"]);
6003 let gus = session.create_node(&["Person"]);
6004 let edge_id = session.create_edge(alix, gus, "KNOWS");
6005
6006 let edge = session.get_edge(edge_id);
6007 assert!(edge.is_some());
6008 let edge = edge.unwrap();
6009 assert_eq!(edge.id, edge_id);
6010 assert_eq!(edge.src, alix);
6011 assert_eq!(edge.dst, gus);
6012 }
6013
6014 #[test]
6015 fn test_get_edge_not_found() {
6016 use grafeo_common::types::EdgeId;
6017
6018 let db = GrafeoDB::new_in_memory();
6019 let session = db.session();
6020
6021 let edge = session.get_edge(EdgeId::new(9999));
6022 assert!(edge.is_none());
6023 }
6024
6025 #[test]
6026 fn test_get_neighbors_outgoing() {
6027 let db = GrafeoDB::new_in_memory();
6028 let session = db.session();
6029
6030 let alix = session.create_node(&["Person"]);
6031 let gus = session.create_node(&["Person"]);
6032 let harm = session.create_node(&["Person"]);
6033
6034 session.create_edge(alix, gus, "KNOWS");
6035 session.create_edge(alix, harm, "KNOWS");
6036
6037 let neighbors = session.get_neighbors_outgoing(alix);
6038 assert_eq!(neighbors.len(), 2);
6039
6040 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
6041 assert!(neighbor_ids.contains(&gus));
6042 assert!(neighbor_ids.contains(&harm));
6043 }
6044
6045 #[test]
6046 fn test_get_neighbors_incoming() {
6047 let db = GrafeoDB::new_in_memory();
6048 let session = db.session();
6049
6050 let alix = session.create_node(&["Person"]);
6051 let gus = session.create_node(&["Person"]);
6052 let harm = session.create_node(&["Person"]);
6053
6054 session.create_edge(gus, alix, "KNOWS");
6055 session.create_edge(harm, alix, "KNOWS");
6056
6057 let neighbors = session.get_neighbors_incoming(alix);
6058 assert_eq!(neighbors.len(), 2);
6059
6060 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
6061 assert!(neighbor_ids.contains(&gus));
6062 assert!(neighbor_ids.contains(&harm));
6063 }
6064
6065 #[test]
6066 fn test_get_neighbors_outgoing_by_type() {
6067 let db = GrafeoDB::new_in_memory();
6068 let session = db.session();
6069
6070 let alix = session.create_node(&["Person"]);
6071 let gus = session.create_node(&["Person"]);
6072 let company = session.create_node(&["Company"]);
6073
6074 session.create_edge(alix, gus, "KNOWS");
6075 session.create_edge(alix, company, "WORKS_AT");
6076
6077 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
6078 assert_eq!(knows_neighbors.len(), 1);
6079 assert_eq!(knows_neighbors[0].0, gus);
6080
6081 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
6082 assert_eq!(works_neighbors.len(), 1);
6083 assert_eq!(works_neighbors[0].0, company);
6084
6085 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
6087 assert!(no_neighbors.is_empty());
6088 }
6089
6090 #[test]
6091 fn test_node_exists() {
6092 use grafeo_common::types::NodeId;
6093
6094 let db = GrafeoDB::new_in_memory();
6095 let session = db.session();
6096
6097 let id = session.create_node(&["Person"]);
6098
6099 assert!(session.node_exists(id));
6100 assert!(!session.node_exists(NodeId::new(9999)));
6101 }
6102
6103 #[test]
6104 fn test_edge_exists() {
6105 use grafeo_common::types::EdgeId;
6106
6107 let db = GrafeoDB::new_in_memory();
6108 let session = db.session();
6109
6110 let alix = session.create_node(&["Person"]);
6111 let gus = session.create_node(&["Person"]);
6112 let edge_id = session.create_edge(alix, gus, "KNOWS");
6113
6114 assert!(session.edge_exists(edge_id));
6115 assert!(!session.edge_exists(EdgeId::new(9999)));
6116 }
6117
6118 #[test]
6119 fn test_get_degree() {
6120 let db = GrafeoDB::new_in_memory();
6121 let session = db.session();
6122
6123 let alix = session.create_node(&["Person"]);
6124 let gus = session.create_node(&["Person"]);
6125 let harm = session.create_node(&["Person"]);
6126
6127 session.create_edge(alix, gus, "KNOWS");
6129 session.create_edge(alix, harm, "KNOWS");
6130 session.create_edge(gus, alix, "KNOWS");
6132
6133 let (out_degree, in_degree) = session.get_degree(alix);
6134 assert_eq!(out_degree, 2);
6135 assert_eq!(in_degree, 1);
6136
6137 let lonely = session.create_node(&["Person"]);
6139 let (out, in_deg) = session.get_degree(lonely);
6140 assert_eq!(out, 0);
6141 assert_eq!(in_deg, 0);
6142 }
6143
6144 #[test]
6145 fn test_get_nodes_batch() {
6146 let db = GrafeoDB::new_in_memory();
6147 let session = db.session();
6148
6149 let alix = session.create_node(&["Person"]);
6150 let gus = session.create_node(&["Person"]);
6151 let harm = session.create_node(&["Person"]);
6152
6153 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
6154 assert_eq!(nodes.len(), 3);
6155 assert!(nodes[0].is_some());
6156 assert!(nodes[1].is_some());
6157 assert!(nodes[2].is_some());
6158
6159 use grafeo_common::types::NodeId;
6161 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
6162 assert_eq!(nodes_with_missing.len(), 3);
6163 assert!(nodes_with_missing[0].is_some());
6164 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
6166 }
6167
6168 #[test]
6169 fn test_auto_commit_setting() {
6170 let db = GrafeoDB::new_in_memory();
6171 let mut session = db.session();
6172
6173 assert!(session.auto_commit());
6175
6176 session.set_auto_commit(false);
6177 assert!(!session.auto_commit());
6178
6179 session.set_auto_commit(true);
6180 assert!(session.auto_commit());
6181 }
6182
6183 #[test]
6184 fn test_transaction_double_begin_nests() {
6185 let db = GrafeoDB::new_in_memory();
6186 let mut session = db.session();
6187
6188 session.begin_transaction().unwrap();
6189 let result = session.begin_transaction();
6191 assert!(result.is_ok());
6192 session.commit().unwrap();
6194 session.commit().unwrap();
6196 }
6197
6198 #[test]
6199 fn test_commit_without_transaction_error() {
6200 let db = GrafeoDB::new_in_memory();
6201 let mut session = db.session();
6202
6203 let result = session.commit();
6204 assert!(result.is_err());
6205 }
6206
6207 #[test]
6208 fn test_rollback_without_transaction_error() {
6209 let db = GrafeoDB::new_in_memory();
6210 let mut session = db.session();
6211
6212 let result = session.rollback();
6213 assert!(result.is_err());
6214 }
6215
6216 #[test]
6217 fn test_create_edge_in_transaction() {
6218 let db = GrafeoDB::new_in_memory();
6219 let mut session = db.session();
6220
6221 let alix = session.create_node(&["Person"]);
6223 let gus = session.create_node(&["Person"]);
6224
6225 session.begin_transaction().unwrap();
6227 let edge_id = session.create_edge(alix, gus, "KNOWS");
6228
6229 assert!(session.edge_exists(edge_id));
6231
6232 session.commit().unwrap();
6234
6235 assert!(session.edge_exists(edge_id));
6237 }
6238
6239 #[test]
6240 fn test_neighbors_empty_node() {
6241 let db = GrafeoDB::new_in_memory();
6242 let session = db.session();
6243
6244 let lonely = session.create_node(&["Person"]);
6245
6246 assert!(session.get_neighbors_outgoing(lonely).is_empty());
6247 assert!(session.get_neighbors_incoming(lonely).is_empty());
6248 assert!(
6249 session
6250 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
6251 .is_empty()
6252 );
6253 }
6254 }
6255
6256 #[test]
6257 fn test_auto_gc_triggers_on_commit_interval() {
6258 use crate::config::Config;
6259
6260 let config = Config::in_memory().with_gc_interval(2);
6261 let db = GrafeoDB::with_config(config).unwrap();
6262 let mut session = db.session();
6263
6264 session.begin_transaction().unwrap();
6266 session.create_node(&["A"]);
6267 session.commit().unwrap();
6268
6269 session.begin_transaction().unwrap();
6271 session.create_node(&["B"]);
6272 session.commit().unwrap();
6273
6274 assert_eq!(db.node_count(), 2);
6276 }
6277
6278 #[test]
6279 fn test_query_timeout_config_propagates_to_session() {
6280 use crate::config::Config;
6281 use std::time::Duration;
6282
6283 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
6284 let db = GrafeoDB::with_config(config).unwrap();
6285 let session = db.session();
6286
6287 assert!(session.query_deadline().is_some());
6289 }
6290
6291 #[test]
6292 fn test_default_query_timeout_returns_deadline() {
6293 let db = GrafeoDB::new_in_memory();
6294 let session = db.session();
6295
6296 assert!(session.query_deadline().is_some());
6298 }
6299
6300 #[test]
6301 fn test_no_query_timeout_returns_no_deadline() {
6302 use crate::config::Config;
6303
6304 let config = Config::in_memory().without_query_timeout();
6305 let db = GrafeoDB::with_config(config).unwrap();
6306 let session = db.session();
6307
6308 assert!(session.query_deadline().is_none());
6309 }
6310
6311 #[test]
6312 fn test_graph_model_accessor() {
6313 use crate::config::GraphModel;
6314
6315 let db = GrafeoDB::new_in_memory();
6316 let session = db.session();
6317
6318 assert_eq!(session.graph_model(), GraphModel::Lpg);
6319 }
6320
6321 #[test]
6322 fn test_reject_oversized_property() {
6323 use crate::config::Config;
6324
6325 let config = Config::in_memory().with_max_property_size(100);
6326 let db = GrafeoDB::with_config(config).unwrap();
6327 let session = db.session();
6328
6329 let node = session.create_node(&["Test"]);
6330
6331 session
6333 .set_node_property(node, "small", Value::from("hello"))
6334 .unwrap();
6335
6336 let big = "x".repeat(200);
6338 let result = session.set_node_property(node, "big", Value::from(big.as_str()));
6339 assert!(result.is_err());
6340 let err = result.unwrap_err().to_string();
6341 assert!(
6342 err.contains("exceeds maximum size"),
6343 "Expected size error, got: {err}"
6344 );
6345 }
6346
6347 #[test]
6348 fn test_no_property_size_limit() {
6349 use crate::config::Config;
6350
6351 let config = Config::in_memory().without_max_property_size();
6352 let db = GrafeoDB::with_config(config).unwrap();
6353 let session = db.session();
6354
6355 let node = session.create_node(&["Test"]);
6356
6357 let big = "x".repeat(10_000);
6359 session
6360 .set_node_property(node, "big", Value::from(big.as_str()))
6361 .unwrap();
6362 }
6363
6364 #[cfg(feature = "gql")]
6365 #[test]
6366 fn test_external_store_session() {
6367 use grafeo_core::graph::GraphStoreMut;
6368 use std::sync::Arc;
6369
6370 let config = crate::config::Config::in_memory();
6371 let store =
6372 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
6373 let db = GrafeoDB::with_store(store, config).unwrap();
6374
6375 let mut session = db.session();
6376
6377 session.begin_transaction().unwrap();
6381 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
6382
6383 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
6385 assert_eq!(result.row_count(), 1);
6386
6387 session.commit().unwrap();
6388 }
6389
6390 #[cfg(feature = "gql")]
6393 mod session_command_tests {
6394 use super::*;
6395 use grafeo_common::types::Value;
6396
6397 #[test]
6398 fn test_use_graph_sets_current_graph() {
6399 let db = GrafeoDB::new_in_memory();
6400 let session = db.session();
6401
6402 session.execute("CREATE GRAPH mydb").unwrap();
6404 session.execute("USE GRAPH mydb").unwrap();
6405
6406 assert_eq!(session.current_graph(), Some("mydb".to_string()));
6407 }
6408
6409 #[test]
6410 fn test_use_graph_nonexistent_errors() {
6411 let db = GrafeoDB::new_in_memory();
6412 let session = db.session();
6413
6414 let result = session.execute("USE GRAPH doesnotexist");
6415 assert!(result.is_err());
6416 let err = result.unwrap_err().to_string();
6417 assert!(
6418 err.contains("does not exist"),
6419 "Expected 'does not exist' error, got: {err}"
6420 );
6421 }
6422
6423 #[test]
6424 fn test_use_graph_default_always_valid() {
6425 let db = GrafeoDB::new_in_memory();
6426 let session = db.session();
6427
6428 session.execute("USE GRAPH default").unwrap();
6430 assert_eq!(session.current_graph(), Some("default".to_string()));
6431 }
6432
6433 #[test]
6434 fn test_session_set_graph() {
6435 let db = GrafeoDB::new_in_memory();
6436 let session = db.session();
6437
6438 session.execute("CREATE GRAPH analytics").unwrap();
6439 session.execute("SESSION SET GRAPH analytics").unwrap();
6440 assert_eq!(session.current_graph(), Some("analytics".to_string()));
6441 }
6442
6443 #[test]
6444 fn test_session_set_graph_nonexistent_errors() {
6445 let db = GrafeoDB::new_in_memory();
6446 let session = db.session();
6447
6448 let result = session.execute("SESSION SET GRAPH nosuchgraph");
6449 assert!(result.is_err());
6450 }
6451
6452 #[test]
6453 fn test_session_set_time_zone() {
6454 let db = GrafeoDB::new_in_memory();
6455 let session = db.session();
6456
6457 assert_eq!(session.time_zone(), None);
6458
6459 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6460 assert_eq!(session.time_zone(), Some("UTC".to_string()));
6461
6462 session
6463 .execute("SESSION SET TIME ZONE 'America/New_York'")
6464 .unwrap();
6465 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
6466 }
6467
6468 #[test]
6469 fn test_session_set_parameter() {
6470 let db = GrafeoDB::new_in_memory();
6471 let session = db.session();
6472
6473 session
6474 .execute("SESSION SET PARAMETER $timeout = 30")
6475 .unwrap();
6476
6477 assert!(session.get_parameter("timeout").is_some());
6480 }
6481
6482 #[test]
6483 fn test_session_reset_clears_all_state() {
6484 let db = GrafeoDB::new_in_memory();
6485 let session = db.session();
6486
6487 session.execute("CREATE GRAPH analytics").unwrap();
6489 session.execute("SESSION SET GRAPH analytics").unwrap();
6490 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6491 session
6492 .execute("SESSION SET PARAMETER $limit = 100")
6493 .unwrap();
6494
6495 assert!(session.current_graph().is_some());
6497 assert!(session.time_zone().is_some());
6498 assert!(session.get_parameter("limit").is_some());
6499
6500 session.execute("SESSION RESET").unwrap();
6502
6503 assert_eq!(session.current_graph(), None);
6504 assert_eq!(session.time_zone(), None);
6505 assert!(session.get_parameter("limit").is_none());
6506 }
6507
6508 #[test]
6509 fn test_session_close_clears_state() {
6510 let db = GrafeoDB::new_in_memory();
6511 let session = db.session();
6512
6513 session.execute("CREATE GRAPH analytics").unwrap();
6514 session.execute("SESSION SET GRAPH analytics").unwrap();
6515 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6516
6517 session.execute("SESSION CLOSE").unwrap();
6518
6519 assert_eq!(session.current_graph(), None);
6520 assert_eq!(session.time_zone(), None);
6521 }
6522
6523 #[test]
6524 fn test_create_graph() {
6525 let db = GrafeoDB::new_in_memory();
6526 let session = db.session();
6527
6528 session.execute("CREATE GRAPH mydb").unwrap();
6529
6530 session.execute("USE GRAPH mydb").unwrap();
6532 assert_eq!(session.current_graph(), Some("mydb".to_string()));
6533 }
6534
6535 #[test]
6536 fn test_create_graph_duplicate_errors() {
6537 let db = GrafeoDB::new_in_memory();
6538 let session = db.session();
6539
6540 session.execute("CREATE GRAPH mydb").unwrap();
6541 let result = session.execute("CREATE GRAPH mydb");
6542
6543 assert!(result.is_err());
6544 let err = result.unwrap_err().to_string();
6545 assert!(
6546 err.contains("already exists"),
6547 "Expected 'already exists' error, got: {err}"
6548 );
6549 }
6550
6551 #[test]
6552 fn test_create_graph_if_not_exists() {
6553 let db = GrafeoDB::new_in_memory();
6554 let session = db.session();
6555
6556 session.execute("CREATE GRAPH mydb").unwrap();
6557 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
6559 }
6560
6561 #[test]
6562 fn test_drop_graph() {
6563 let db = GrafeoDB::new_in_memory();
6564 let session = db.session();
6565
6566 session.execute("CREATE GRAPH mydb").unwrap();
6567 session.execute("DROP GRAPH mydb").unwrap();
6568
6569 let result = session.execute("USE GRAPH mydb");
6571 assert!(result.is_err());
6572 }
6573
6574 #[test]
6575 fn test_drop_graph_nonexistent_errors() {
6576 let db = GrafeoDB::new_in_memory();
6577 let session = db.session();
6578
6579 let result = session.execute("DROP GRAPH nosuchgraph");
6580 assert!(result.is_err());
6581 let err = result.unwrap_err().to_string();
6582 assert!(
6583 err.contains("does not exist"),
6584 "Expected 'does not exist' error, got: {err}"
6585 );
6586 }
6587
6588 #[test]
6589 fn test_drop_graph_if_exists() {
6590 let db = GrafeoDB::new_in_memory();
6591 let session = db.session();
6592
6593 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
6595 }
6596
6597 #[test]
6598 fn test_start_transaction_via_gql() {
6599 let db = GrafeoDB::new_in_memory();
6600 let session = db.session();
6601
6602 session.execute("START TRANSACTION").unwrap();
6603 assert!(session.in_transaction());
6604 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6605 session.execute("COMMIT").unwrap();
6606 assert!(!session.in_transaction());
6607
6608 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6609 assert_eq!(result.rows.len(), 1);
6610 }
6611
6612 #[test]
6613 fn test_start_transaction_read_only_blocks_insert() {
6614 let db = GrafeoDB::new_in_memory();
6615 let session = db.session();
6616
6617 session.execute("START TRANSACTION READ ONLY").unwrap();
6618 let result = session.execute("INSERT (:Person {name: 'Alix'})");
6619 assert!(result.is_err());
6620 let err = result.unwrap_err().to_string();
6621 assert!(
6622 err.contains("read-only"),
6623 "Expected read-only error, got: {err}"
6624 );
6625 session.execute("ROLLBACK").unwrap();
6626 }
6627
6628 #[test]
6629 fn test_start_transaction_read_only_allows_reads() {
6630 let db = GrafeoDB::new_in_memory();
6631 let mut session = db.session();
6632 session.begin_transaction().unwrap();
6633 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6634 session.commit().unwrap();
6635
6636 session.execute("START TRANSACTION READ ONLY").unwrap();
6637 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6638 assert_eq!(result.rows.len(), 1);
6639 session.execute("COMMIT").unwrap();
6640 }
6641
6642 #[test]
6643 fn test_rollback_via_gql() {
6644 let db = GrafeoDB::new_in_memory();
6645 let session = db.session();
6646
6647 session.execute("START TRANSACTION").unwrap();
6648 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6649 session.execute("ROLLBACK").unwrap();
6650
6651 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6652 assert!(result.rows.is_empty());
6653 }
6654
6655 #[test]
6656 fn test_start_transaction_with_isolation_level() {
6657 let db = GrafeoDB::new_in_memory();
6658 let session = db.session();
6659
6660 session
6661 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6662 .unwrap();
6663 assert!(session.in_transaction());
6664 session.execute("ROLLBACK").unwrap();
6665 }
6666
6667 #[test]
6668 fn test_session_commands_return_empty_result() {
6669 let db = GrafeoDB::new_in_memory();
6670 let session = db.session();
6671
6672 session.execute("CREATE GRAPH test").unwrap();
6673 let result = session.execute("SESSION SET GRAPH test").unwrap();
6674 assert_eq!(result.row_count(), 0);
6675 assert_eq!(result.column_count(), 0);
6676 }
6677
6678 #[test]
6679 fn test_current_graph_default_is_none() {
6680 let db = GrafeoDB::new_in_memory();
6681 let session = db.session();
6682
6683 assert_eq!(session.current_graph(), None);
6684 }
6685
6686 #[test]
6687 fn test_time_zone_default_is_none() {
6688 let db = GrafeoDB::new_in_memory();
6689 let session = db.session();
6690
6691 assert_eq!(session.time_zone(), None);
6692 }
6693
6694 #[test]
6695 fn test_session_state_independent_across_sessions() {
6696 let db = GrafeoDB::new_in_memory();
6697 let session1 = db.session();
6698 let session2 = db.session();
6699
6700 session1.execute("CREATE GRAPH first").unwrap();
6701 session1.execute("CREATE GRAPH second").unwrap();
6702 session1.execute("SESSION SET GRAPH first").unwrap();
6703 session2.execute("SESSION SET GRAPH second").unwrap();
6704
6705 assert_eq!(session1.current_graph(), Some("first".to_string()));
6706 assert_eq!(session2.current_graph(), Some("second".to_string()));
6707 }
6708
6709 #[test]
6710 fn test_show_node_types() {
6711 let db = GrafeoDB::new_in_memory();
6712 let session = db.session();
6713
6714 session
6715 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6716 .unwrap();
6717
6718 let result = session.execute("SHOW NODE TYPES").unwrap();
6719 assert_eq!(
6720 result.columns,
6721 vec!["name", "properties", "constraints", "parents"]
6722 );
6723 assert_eq!(result.rows.len(), 1);
6724 assert_eq!(result.rows[0][0], Value::from("Person"));
6726 }
6727
6728 #[test]
6729 fn test_show_edge_types() {
6730 let db = GrafeoDB::new_in_memory();
6731 let session = db.session();
6732
6733 session
6734 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6735 .unwrap();
6736
6737 let result = session.execute("SHOW EDGE TYPES").unwrap();
6738 assert_eq!(
6739 result.columns,
6740 vec!["name", "properties", "source_types", "target_types"]
6741 );
6742 assert_eq!(result.rows.len(), 1);
6743 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6744 }
6745
6746 #[test]
6747 fn test_show_graph_types() {
6748 let db = GrafeoDB::new_in_memory();
6749 let session = db.session();
6750
6751 session
6752 .execute("CREATE NODE TYPE Person (name STRING)")
6753 .unwrap();
6754 session
6755 .execute(
6756 "CREATE GRAPH TYPE social (\
6757 NODE TYPE Person (name STRING)\
6758 )",
6759 )
6760 .unwrap();
6761
6762 let result = session.execute("SHOW GRAPH TYPES").unwrap();
6763 assert_eq!(
6764 result.columns,
6765 vec!["name", "open", "node_types", "edge_types"]
6766 );
6767 assert_eq!(result.rows.len(), 1);
6768 assert_eq!(result.rows[0][0], Value::from("social"));
6769 }
6770
6771 #[test]
6772 fn test_show_graph_type_named() {
6773 let db = GrafeoDB::new_in_memory();
6774 let session = db.session();
6775
6776 session
6777 .execute("CREATE NODE TYPE Person (name STRING)")
6778 .unwrap();
6779 session
6780 .execute(
6781 "CREATE GRAPH TYPE social (\
6782 NODE TYPE Person (name STRING)\
6783 )",
6784 )
6785 .unwrap();
6786
6787 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6788 assert_eq!(result.rows.len(), 1);
6789 assert_eq!(result.rows[0][0], Value::from("social"));
6790 }
6791
6792 #[test]
6793 fn test_show_graph_type_not_found() {
6794 let db = GrafeoDB::new_in_memory();
6795 let session = db.session();
6796
6797 let result = session.execute("SHOW GRAPH TYPE nonexistent");
6798 assert!(result.is_err());
6799 }
6800
6801 #[test]
6802 fn test_show_indexes_via_gql() {
6803 let db = GrafeoDB::new_in_memory();
6804 let session = db.session();
6805
6806 let result = session.execute("SHOW INDEXES").unwrap();
6807 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6808 }
6809
6810 #[test]
6811 fn test_show_constraints_via_gql() {
6812 let db = GrafeoDB::new_in_memory();
6813 let session = db.session();
6814
6815 let result = session.execute("SHOW CONSTRAINTS").unwrap();
6816 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6817 }
6818
6819 #[test]
6820 fn test_pattern_form_graph_type_roundtrip() {
6821 let db = GrafeoDB::new_in_memory();
6822 let session = db.session();
6823
6824 session
6826 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6827 .unwrap();
6828 session
6829 .execute("CREATE NODE TYPE City (name STRING)")
6830 .unwrap();
6831 session
6832 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6833 .unwrap();
6834 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6835
6836 session
6838 .execute(
6839 "CREATE GRAPH TYPE social (\
6840 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6841 (:Person)-[:LIVES_IN]->(:City)\
6842 )",
6843 )
6844 .unwrap();
6845
6846 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6848 assert_eq!(result.rows.len(), 1);
6849 assert_eq!(result.rows[0][0], Value::from("social"));
6850 }
6851 }
6852}