1#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::Executor;
37use crate::query::cache::QueryCache;
38use crate::transaction::TransactionManager;
39
40const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
43
44fn parse_default_literal(text: &str) -> Value {
49 if text.eq_ignore_ascii_case("null") {
50 return Value::Null;
51 }
52 if text.eq_ignore_ascii_case("true") {
53 return Value::Bool(true);
54 }
55 if text.eq_ignore_ascii_case("false") {
56 return Value::Bool(false);
57 }
58 if (text.starts_with('\'') && text.ends_with('\''))
60 || (text.starts_with('"') && text.ends_with('"'))
61 {
62 return Value::String(text[1..text.len() - 1].into());
63 }
64 if let Ok(i) = text.parse::<i64>() {
66 return Value::Int64(i);
67 }
68 if let Ok(f) = text.parse::<f64>() {
69 return Value::Float64(f);
70 }
71 Value::String(text.into())
73}
74
75pub(crate) struct SessionConfig {
80 pub transaction_manager: Arc<TransactionManager>,
81 pub query_cache: Arc<QueryCache>,
82 pub catalog: Arc<Catalog>,
83 pub adaptive_config: AdaptiveConfig,
84 pub factorized_execution: bool,
85 pub graph_model: GraphModel,
86 pub query_timeout: Option<Duration>,
87 pub max_property_size: Option<usize>,
88 pub buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
90 pub commit_counter: Arc<AtomicUsize>,
91 pub gc_interval: usize,
92 pub read_only: bool,
94 pub identity: crate::auth::Identity,
96 #[cfg(feature = "lpg")]
98 pub projections: Arc<
99 parking_lot::RwLock<
100 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
101 >,
102 >,
103}
104
105pub struct Session {
111 #[cfg(feature = "lpg")]
113 store: Arc<LpgStore>,
114 graph_store: Arc<dyn GraphStore>,
116 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
118 catalog: Arc<Catalog>,
120 #[cfg(feature = "triple-store")]
122 rdf_store: Arc<RdfStore>,
123 transaction_manager: Arc<TransactionManager>,
125 query_cache: Arc<QueryCache>,
127 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
131 read_only_tx: parking_lot::Mutex<bool>,
133 db_read_only: bool,
136 identity: crate::auth::Identity,
138 auto_commit: bool,
140 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
143 factorized_execution: bool,
145 graph_model: GraphModel,
147 query_timeout: Option<Duration>,
149 max_property_size: Option<usize>,
151 buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
153 commit_counter: Arc<AtomicUsize>,
155 gc_interval: usize,
157 transaction_start_node_count: AtomicUsize,
159 transaction_start_edge_count: AtomicUsize,
161 #[cfg(feature = "wal")]
163 wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
164 #[cfg(feature = "wal")]
166 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
167 #[cfg(feature = "cdc")]
169 cdc_log: Arc<crate::cdc::CdcLog>,
170 #[cfg(feature = "cdc")]
173 cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
174 current_graph: parking_lot::Mutex<Option<String>>,
176 current_schema: parking_lot::Mutex<Option<String>>,
179 time_zone: parking_lot::Mutex<Option<String>>,
181 session_params:
183 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
184 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
186 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
188 transaction_nesting_depth: parking_lot::Mutex<u32>,
192 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
196 #[cfg(feature = "metrics")]
198 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
199 #[cfg(feature = "metrics")]
201 tx_start_time: parking_lot::Mutex<Option<Instant>>,
202 #[cfg(feature = "lpg")]
204 projections: Arc<
205 parking_lot::RwLock<
206 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
207 >,
208 >,
209}
210
211#[derive(Clone)]
213struct GraphSavepoint {
214 graph_name: Option<String>,
215 next_node_id: u64,
216 next_edge_id: u64,
217 undo_log_position: usize,
218}
219
220#[derive(Clone)]
222struct SavepointState {
223 name: String,
224 graph_snapshots: Vec<GraphSavepoint>,
225 #[allow(dead_code)]
228 active_graph: Option<String>,
229 #[cfg(feature = "cdc")]
232 cdc_event_position: usize,
233}
234
235impl Session {
236 #[cfg(feature = "lpg")]
238 #[allow(dead_code)] pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
240 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
241 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
242 Self {
243 store,
244 graph_store,
245 graph_store_mut,
246 catalog: cfg.catalog,
247 #[cfg(feature = "triple-store")]
248 rdf_store: Arc::new(RdfStore::new()),
249 transaction_manager: cfg.transaction_manager,
250 query_cache: cfg.query_cache,
251 current_transaction: parking_lot::Mutex::new(None),
252 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
253 db_read_only: cfg.read_only,
254 identity: cfg.identity,
255 auto_commit: true,
256 adaptive_config: cfg.adaptive_config,
257 factorized_execution: cfg.factorized_execution,
258 graph_model: cfg.graph_model,
259 query_timeout: cfg.query_timeout,
260 max_property_size: cfg.max_property_size,
261 buffer_manager: cfg.buffer_manager,
262 commit_counter: cfg.commit_counter,
263 gc_interval: cfg.gc_interval,
264 transaction_start_node_count: AtomicUsize::new(0),
265 transaction_start_edge_count: AtomicUsize::new(0),
266 #[cfg(feature = "wal")]
267 wal: None,
268 #[cfg(feature = "wal")]
269 wal_graph_context: None,
270 #[cfg(feature = "cdc")]
271 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
272 #[cfg(feature = "cdc")]
273 cdc_pending_events: None,
274 current_graph: parking_lot::Mutex::new(None),
275 current_schema: parking_lot::Mutex::new(None),
276 time_zone: parking_lot::Mutex::new(None),
277 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
278 viewing_epoch_override: parking_lot::Mutex::new(None),
279 savepoints: parking_lot::Mutex::new(Vec::new()),
280 transaction_nesting_depth: parking_lot::Mutex::new(0),
281 touched_graphs: parking_lot::Mutex::new(Vec::new()),
282 #[cfg(feature = "metrics")]
283 metrics: None,
284 #[cfg(feature = "metrics")]
285 tx_start_time: parking_lot::Mutex::new(None),
286 projections: cfg.projections,
287 }
288 }
289
290 pub(crate) fn override_stores(
296 &mut self,
297 read_store: Arc<dyn GraphStore>,
298 write_store: Option<Arc<dyn GraphStoreMut>>,
299 ) {
300 self.graph_store = read_store;
301 self.graph_store_mut = write_store;
302 }
303
304 #[cfg(all(feature = "wal", feature = "lpg"))]
309 pub(crate) fn set_wal(
310 &mut self,
311 wal: Arc<grafeo_storage::wal::LpgWal>,
312 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
313 ) {
314 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
316 Arc::clone(&self.store),
317 Arc::clone(&wal),
318 Arc::clone(&wal_graph_context),
319 ));
320 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
321 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
322 self.wal = Some(wal);
323 self.wal_graph_context = Some(wal_graph_context);
324 }
325
326 #[cfg(feature = "cdc")]
333 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
334 if let Some(ref write_store) = self.graph_store_mut {
337 let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
338 Arc::clone(write_store),
339 Arc::clone(&cdc_log),
340 ));
341 self.cdc_pending_events = Some(cdc_store.pending_events());
342 self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
343 }
344 self.cdc_log = cdc_log;
345 }
346
347 #[cfg(feature = "metrics")]
349 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
350 self.metrics = Some(metrics);
351 }
352
353 pub(crate) fn with_external_store(
362 read_store: Arc<dyn GraphStore>,
363 write_store: Option<Arc<dyn GraphStoreMut>>,
364 cfg: SessionConfig,
365 ) -> Result<Self> {
366 Ok(Self {
367 #[cfg(feature = "lpg")]
368 store: Arc::new(LpgStore::new()?),
369 graph_store: read_store,
370 graph_store_mut: write_store,
371 catalog: cfg.catalog,
372 #[cfg(feature = "triple-store")]
373 rdf_store: Arc::new(RdfStore::new()),
374 transaction_manager: cfg.transaction_manager,
375 query_cache: cfg.query_cache,
376 current_transaction: parking_lot::Mutex::new(None),
377 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
378 db_read_only: cfg.read_only,
379 identity: cfg.identity,
380 auto_commit: true,
381 adaptive_config: cfg.adaptive_config,
382 factorized_execution: cfg.factorized_execution,
383 graph_model: cfg.graph_model,
384 query_timeout: cfg.query_timeout,
385 max_property_size: cfg.max_property_size,
386 buffer_manager: cfg.buffer_manager,
387 commit_counter: cfg.commit_counter,
388 gc_interval: cfg.gc_interval,
389 transaction_start_node_count: AtomicUsize::new(0),
390 transaction_start_edge_count: AtomicUsize::new(0),
391 #[cfg(feature = "wal")]
392 wal: None,
393 #[cfg(feature = "wal")]
394 wal_graph_context: None,
395 #[cfg(feature = "cdc")]
396 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
397 #[cfg(feature = "cdc")]
398 cdc_pending_events: None,
399 current_graph: parking_lot::Mutex::new(None),
400 current_schema: parking_lot::Mutex::new(None),
401 time_zone: parking_lot::Mutex::new(None),
402 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
403 viewing_epoch_override: parking_lot::Mutex::new(None),
404 savepoints: parking_lot::Mutex::new(Vec::new()),
405 transaction_nesting_depth: parking_lot::Mutex::new(0),
406 touched_graphs: parking_lot::Mutex::new(Vec::new()),
407 #[cfg(feature = "metrics")]
408 metrics: None,
409 #[cfg(feature = "metrics")]
410 tx_start_time: parking_lot::Mutex::new(None),
411 #[cfg(feature = "lpg")]
412 projections: cfg.projections,
413 })
414 }
415
416 #[must_use]
418 pub fn graph_model(&self) -> GraphModel {
419 self.graph_model
420 }
421
422 #[must_use]
424 pub fn identity(&self) -> &crate::auth::Identity {
425 &self.identity
426 }
427
428 pub fn use_graph(&self, name: &str) {
432 *self.current_graph.lock() = Some(name.to_string());
433 }
434
435 #[must_use]
437 pub fn current_graph(&self) -> Option<String> {
438 self.current_graph.lock().clone()
439 }
440
441 pub fn set_schema(&self, name: &str) {
445 *self.current_schema.lock() = Some(name.to_string());
446 }
447
448 #[must_use]
452 pub fn current_schema(&self) -> Option<String> {
453 self.current_schema.lock().clone()
454 }
455
456 fn effective_graph_key(&self, graph_name: &str) -> String {
461 let schema = self.current_schema.lock().clone();
462 match schema {
463 Some(s) => format!("{s}/{graph_name}"),
464 None => graph_name.to_string(),
465 }
466 }
467
468 fn effective_type_key(&self, type_name: &str) -> String {
472 let schema = self.current_schema.lock().clone();
473 match schema {
474 Some(s) => format!("{s}/{type_name}"),
475 None => type_name.to_string(),
476 }
477 }
478
479 fn active_graph_storage_key(&self) -> Option<String> {
483 let graph = self.current_graph.lock().clone();
484 let schema = self.current_schema.lock().clone();
485 match (&schema, &graph) {
486 (None, None) => None,
487 (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
488 (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
489 (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
490 Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
491 }
492 (None, Some(name)) => Some(name.clone()),
493 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
494 }
495 }
496
497 fn active_store(&self) -> Arc<dyn GraphStore> {
505 let key = self.active_graph_storage_key();
506 match key {
507 None => Arc::clone(&self.graph_store),
508 #[cfg(feature = "lpg")]
509 Some(ref name) => match self.store.graph(name) {
510 Some(named_store) => {
511 #[cfg(feature = "wal")]
512 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
513 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
514 named_store,
515 Arc::clone(wal),
516 name.clone(),
517 Arc::clone(ctx),
518 )) as Arc<dyn GraphStore>;
519 }
520 named_store as Arc<dyn GraphStore>
521 }
522 None => Arc::clone(&self.graph_store),
523 },
524 #[cfg(not(feature = "lpg"))]
525 Some(_) => Arc::clone(&self.graph_store),
526 }
527 }
528
529 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
534 let key = self.active_graph_storage_key();
535 match key {
536 None => self.graph_store_mut.as_ref().map(Arc::clone),
537 #[cfg(feature = "lpg")]
538 Some(ref name) => match self.store.graph(name) {
539 Some(named_store) => {
540 let mut store: Arc<dyn GraphStoreMut> = named_store;
541
542 #[cfg(feature = "wal")]
543 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
544 store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
545 self.store
547 .graph(name)
548 .unwrap_or_else(|| Arc::clone(&self.store)),
549 Arc::clone(wal),
550 name.clone(),
551 Arc::clone(ctx),
552 ));
553 }
554
555 #[cfg(feature = "cdc")]
556 if let Some(ref pending) = self.cdc_pending_events {
557 store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
558 store,
559 Arc::clone(&self.cdc_log),
560 Arc::clone(pending),
561 ));
562 }
563
564 Some(store)
565 }
566 None => self.graph_store_mut.as_ref().map(Arc::clone),
567 },
568 #[cfg(not(feature = "lpg"))]
569 Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
570 }
571 }
572
573 #[cfg(feature = "lpg")]
578 fn active_lpg_store(&self) -> Arc<LpgStore> {
579 let key = self.active_graph_storage_key();
580 match key {
581 None => Arc::clone(&self.store),
582 Some(ref name) => self
583 .store
584 .graph(name)
585 .unwrap_or_else(|| Arc::clone(&self.store)),
586 }
587 }
588
589 #[cfg(feature = "lpg")]
592 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
593 match graph_name {
594 None => Arc::clone(&self.store),
595 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
596 Some(name) => self
597 .store
598 .graph(name)
599 .unwrap_or_else(|| Arc::clone(&self.store)),
600 }
601 }
602
603 fn track_graph_touch(&self) {
608 if self.current_transaction.lock().is_some() {
609 let key = self.active_graph_storage_key();
610 let mut touched = self.touched_graphs.lock();
611 if !touched.contains(&key) {
612 touched.push(key);
613 }
614 }
615 }
616
617 pub fn set_time_zone(&self, tz: &str) {
619 *self.time_zone.lock() = Some(tz.to_string());
620 }
621
622 #[must_use]
624 pub fn time_zone(&self) -> Option<String> {
625 self.time_zone.lock().clone()
626 }
627
628 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
630 self.session_params.lock().insert(key.to_string(), value);
631 }
632
633 #[must_use]
635 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
636 self.session_params.lock().get(key).cloned()
637 }
638
639 pub fn reset_session(&self) {
641 *self.current_schema.lock() = None;
642 *self.current_graph.lock() = None;
643 *self.time_zone.lock() = None;
644 self.session_params.lock().clear();
645 *self.viewing_epoch_override.lock() = None;
646 }
647
648 pub fn reset_schema(&self) {
650 *self.current_schema.lock() = None;
651 }
652
653 pub fn reset_graph(&self) {
655 *self.current_graph.lock() = None;
656 }
657
658 pub fn reset_time_zone(&self) {
660 *self.time_zone.lock() = None;
661 }
662
663 pub fn reset_parameters(&self) {
665 self.session_params.lock().clear();
666 }
667
668 pub fn set_viewing_epoch(&self, epoch: EpochId) {
676 *self.viewing_epoch_override.lock() = Some(epoch);
677 }
678
679 pub fn clear_viewing_epoch(&self) {
681 *self.viewing_epoch_override.lock() = None;
682 }
683
684 #[must_use]
686 pub fn viewing_epoch(&self) -> Option<EpochId> {
687 *self.viewing_epoch_override.lock()
688 }
689
690 #[cfg(feature = "lpg")]
694 #[must_use]
695 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
696 self.active_lpg_store().get_node_history(id)
697 }
698
699 #[cfg(feature = "lpg")]
703 #[must_use]
704 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
705 self.active_lpg_store().get_edge_history(id)
706 }
707
708 fn require_lpg(&self, language: &str) -> Result<()> {
710 if self.graph_model == GraphModel::Rdf {
711 return Err(grafeo_common::utils::error::Error::Internal(format!(
712 "This is an RDF database. {language} queries require an LPG database."
713 )));
714 }
715 Ok(())
716 }
717
718 #[inline]
724 fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
725 if self.identity.can_admin() {
727 return Ok(());
728 }
729 crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
730 grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
731 grafeo_common::utils::error::QueryErrorKind::Semantic,
732 denied.to_string(),
733 ))
734 })
735 }
736
737 #[cfg(feature = "gql")]
739 fn execute_session_command(
740 &self,
741 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
742 ) -> Result<QueryResult> {
743 use grafeo_adapters::query::gql::ast::SessionCommand;
744 #[cfg(feature = "lpg")]
745 use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
746 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
747
748 match &cmd {
750 SessionCommand::CreateGraph { .. }
751 | SessionCommand::DropGraph { .. }
752 | SessionCommand::CreateProjection { .. }
753 | SessionCommand::DropProjection { .. } => {
754 self.require_permission(crate::auth::StatementKind::Write)?;
755 }
756 _ => {} }
758
759 if self.identity.has_grants() {
761 match &cmd {
762 SessionCommand::CreateGraph { name, .. }
763 | SessionCommand::DropGraph { name, .. } => {
764 if !self
765 .identity
766 .can_access_graph(name, crate::auth::Role::ReadWrite)
767 {
768 return Err(Error::Query(QueryError::new(
769 QueryErrorKind::Semantic,
770 format!(
771 "permission denied: no grant for graph '{name}' (user: {})",
772 self.identity.user_id()
773 ),
774 )));
775 }
776 }
777 _ => {}
778 }
779 }
780
781 if *self.read_only_tx.lock() {
783 match &cmd {
784 SessionCommand::CreateGraph { .. }
785 | SessionCommand::DropGraph { .. }
786 | SessionCommand::CreateProjection { .. }
787 | SessionCommand::DropProjection { .. } => {
788 return Err(Error::Transaction(
789 grafeo_common::utils::error::TransactionError::ReadOnly,
790 ));
791 }
792 _ => {} }
794 }
795
796 match cmd {
797 #[cfg(feature = "lpg")]
798 SessionCommand::CreateGraph {
799 name,
800 if_not_exists,
801 typed,
802 like_graph,
803 copy_of,
804 open: _,
805 } => {
806 let storage_key = self.effective_graph_key(&name);
808
809 if let Some(ref src) = like_graph {
811 let src_key = self.effective_graph_key(src);
812 if self.store.graph(&src_key).is_none() {
813 return Err(Error::Query(QueryError::new(
814 QueryErrorKind::Semantic,
815 format!("Source graph '{src}' does not exist"),
816 )));
817 }
818 }
819 if let Some(ref src) = copy_of {
820 let src_key = self.effective_graph_key(src);
821 if self.store.graph(&src_key).is_none() {
822 return Err(Error::Query(QueryError::new(
823 QueryErrorKind::Semantic,
824 format!("Source graph '{src}' does not exist"),
825 )));
826 }
827 }
828
829 let created = self
830 .store
831 .create_graph(&storage_key)
832 .map_err(|e| Error::Internal(e.to_string()))?;
833 if !created && !if_not_exists {
834 return Err(Error::Query(QueryError::new(
835 QueryErrorKind::Semantic,
836 format!("Graph '{name}' already exists"),
837 )));
838 }
839 if created {
840 #[cfg(feature = "wal")]
841 self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
842 name: storage_key.clone(),
843 });
844 }
845
846 if let Some(ref src) = copy_of {
848 let src_key = self.effective_graph_key(src);
849 self.store
850 .copy_graph(Some(&src_key), Some(&storage_key))
851 .map_err(|e| Error::Internal(e.to_string()))?;
852 }
853
854 if let Some(type_name) = typed
858 && let Err(e) = self.catalog.bind_graph_type(
859 &storage_key,
860 if type_name.contains('/') {
861 type_name.clone()
862 } else {
863 self.effective_type_key(&type_name)
864 },
865 )
866 {
867 return Err(Error::Query(QueryError::new(
868 QueryErrorKind::Semantic,
869 e.to_string(),
870 )));
871 }
872
873 if let Some(ref src) = like_graph {
875 let src_key = self.effective_graph_key(src);
876 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
877 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
878 }
879 }
880
881 Ok(QueryResult::empty())
882 }
883 #[cfg(feature = "lpg")]
884 SessionCommand::DropGraph { name, if_exists } => {
885 let storage_key = self.effective_graph_key(&name);
886 let dropped = self.store.drop_graph(&storage_key);
887 if !dropped && !if_exists {
888 return Err(Error::Query(QueryError::new(
889 QueryErrorKind::Semantic,
890 format!("Graph '{name}' does not exist"),
891 )));
892 }
893 if dropped {
894 #[cfg(feature = "wal")]
895 self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
896 name: storage_key.clone(),
897 });
898 let mut current = self.current_graph.lock();
900 if current
901 .as_deref()
902 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
903 {
904 *current = None;
905 }
906 }
907 Ok(QueryResult::empty())
908 }
909 #[cfg(feature = "lpg")]
910 SessionCommand::UseGraph(name) => {
911 if self.identity.has_grants()
913 && !name.eq_ignore_ascii_case("default")
914 && !self
915 .identity
916 .can_access_graph(&name, crate::auth::Role::ReadOnly)
917 {
918 return Err(Error::Query(QueryError::new(
919 QueryErrorKind::Semantic,
920 format!(
921 "permission denied: no grant for graph '{name}' (user: {})",
922 self.identity.user_id()
923 ),
924 )));
925 }
926 let effective_key = self.effective_graph_key(&name);
928 if !name.eq_ignore_ascii_case("default")
929 && self.store.graph(&effective_key).is_none()
930 {
931 return Err(Error::Query(QueryError::new(
932 QueryErrorKind::Semantic,
933 format!("Graph '{name}' does not exist"),
934 )));
935 }
936 self.use_graph(&name);
937 self.track_graph_touch();
939 Ok(QueryResult::empty())
940 }
941 #[cfg(feature = "lpg")]
942 SessionCommand::SessionSetGraph(name) => {
943 if self.identity.has_grants()
946 && !name.eq_ignore_ascii_case("default")
947 && !self
948 .identity
949 .can_access_graph(&name, crate::auth::Role::ReadOnly)
950 {
951 return Err(Error::Query(QueryError::new(
952 QueryErrorKind::Semantic,
953 format!(
954 "permission denied: no grant for graph '{name}' (user: {})",
955 self.identity.user_id()
956 ),
957 )));
958 }
959 let effective_key = self.effective_graph_key(&name);
960 if !name.eq_ignore_ascii_case("default")
961 && self.store.graph(&effective_key).is_none()
962 {
963 return Err(Error::Query(QueryError::new(
964 QueryErrorKind::Semantic,
965 format!("Graph '{name}' does not exist"),
966 )));
967 }
968 self.use_graph(&name);
969 self.track_graph_touch();
971 Ok(QueryResult::empty())
972 }
973 SessionCommand::SessionSetSchema(name) => {
974 if !self.catalog.schema_exists(&name) {
976 return Err(Error::Query(QueryError::new(
977 QueryErrorKind::Semantic,
978 format!("Schema '{name}' does not exist"),
979 )));
980 }
981 self.set_schema(&name);
982 Ok(QueryResult::empty())
983 }
984 SessionCommand::SessionSetTimeZone(tz) => {
985 self.set_time_zone(&tz);
986 Ok(QueryResult::empty())
987 }
988 #[cfg(feature = "gql")]
989 SessionCommand::SessionSetParameter(key, expr) => {
990 if key.eq_ignore_ascii_case("viewing_epoch") {
991 match Self::eval_integer_literal(&expr) {
992 Some(n) if n >= 0 => {
993 #[allow(clippy::cast_sign_loss)]
995 let epoch = n as u64;
996 self.set_viewing_epoch(EpochId::new(epoch));
997 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
998 }
999 _ => Err(Error::Query(QueryError::new(
1000 QueryErrorKind::Semantic,
1001 "viewing_epoch must be a non-negative integer literal",
1002 ))),
1003 }
1004 } else {
1005 self.set_parameter(&key, Value::Null);
1008 Ok(QueryResult::empty())
1009 }
1010 }
1011 SessionCommand::SessionReset(target) => {
1012 use grafeo_adapters::query::gql::ast::SessionResetTarget;
1013 match target {
1014 SessionResetTarget::All => self.reset_session(),
1015 SessionResetTarget::Schema => self.reset_schema(),
1016 SessionResetTarget::Graph => self.reset_graph(),
1017 SessionResetTarget::TimeZone => self.reset_time_zone(),
1018 SessionResetTarget::Parameters => self.reset_parameters(),
1019 }
1020 Ok(QueryResult::empty())
1021 }
1022 SessionCommand::SessionClose => {
1023 self.reset_session();
1024 Ok(QueryResult::empty())
1025 }
1026 #[cfg(feature = "lpg")]
1027 SessionCommand::StartTransaction {
1028 read_only,
1029 isolation_level,
1030 } => {
1031 let engine_level = isolation_level.map(|l| match l {
1032 TransactionIsolationLevel::ReadCommitted => {
1033 crate::transaction::IsolationLevel::ReadCommitted
1034 }
1035 TransactionIsolationLevel::SnapshotIsolation => {
1036 crate::transaction::IsolationLevel::SnapshotIsolation
1037 }
1038 TransactionIsolationLevel::Serializable => {
1039 crate::transaction::IsolationLevel::Serializable
1040 }
1041 });
1042 self.begin_transaction_inner(read_only, engine_level)?;
1043 Ok(QueryResult::status("Transaction started"))
1044 }
1045 #[cfg(feature = "lpg")]
1046 SessionCommand::Commit => {
1047 self.commit_inner()?;
1048 Ok(QueryResult::status("Transaction committed"))
1049 }
1050 #[cfg(feature = "lpg")]
1051 SessionCommand::Rollback => {
1052 self.rollback_inner()?;
1053 Ok(QueryResult::status("Transaction rolled back"))
1054 }
1055 #[cfg(feature = "lpg")]
1056 SessionCommand::Savepoint(name) => {
1057 self.savepoint(&name)?;
1058 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1059 }
1060 #[cfg(feature = "lpg")]
1061 SessionCommand::RollbackToSavepoint(name) => {
1062 self.rollback_to_savepoint(&name)?;
1063 Ok(QueryResult::status(format!(
1064 "Rolled back to savepoint '{name}'"
1065 )))
1066 }
1067 #[cfg(feature = "lpg")]
1068 SessionCommand::ReleaseSavepoint(name) => {
1069 self.release_savepoint(&name)?;
1070 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1071 }
1072 #[cfg(feature = "lpg")]
1073 SessionCommand::CreateProjection {
1074 name,
1075 node_labels,
1076 edge_types,
1077 } => {
1078 use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1079 use std::collections::hash_map::Entry;
1080
1081 let spec = ProjectionSpec::new()
1082 .with_node_labels(node_labels)
1083 .with_edge_types(edge_types);
1084
1085 let store = self.active_store();
1086 let projection = Arc::new(GraphProjection::new(store, spec));
1087 let mut projections = self.projections.write();
1088 match projections.entry(name.clone()) {
1089 Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1090 QueryErrorKind::Semantic,
1091 format!("Projection '{name}' already exists"),
1092 ))),
1093 Entry::Vacant(e) => {
1094 e.insert(projection);
1095 Ok(QueryResult::status(format!("Projection '{name}' created")))
1096 }
1097 }
1098 }
1099 #[cfg(feature = "lpg")]
1100 SessionCommand::DropProjection { name } => {
1101 let removed = self.projections.write().remove(&name).is_some();
1102 if !removed {
1103 return Err(Error::Query(QueryError::new(
1104 QueryErrorKind::Semantic,
1105 format!("Projection '{name}' does not exist"),
1106 )));
1107 }
1108 Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1109 }
1110 #[cfg(feature = "lpg")]
1111 SessionCommand::ShowProjections => {
1112 let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1113 names.sort();
1114 let rows: Vec<Vec<Value>> =
1115 names.into_iter().map(|n| vec![Value::from(n)]).collect();
1116 Ok(QueryResult {
1117 columns: vec!["name".to_string()],
1118 column_types: Vec::new(),
1119 rows,
1120 ..QueryResult::empty()
1121 })
1122 }
1123 #[cfg(not(feature = "lpg"))]
1124 _ => Err(grafeo_common::utils::error::Error::Internal(
1125 "This command requires the `lpg` feature".to_string(),
1126 )),
1127 }
1128 }
1129
1130 #[cfg(feature = "wal")]
1132 fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1133 if let Some(ref wal) = self.wal
1134 && let Err(e) = wal.log(record)
1135 {
1136 grafeo_warn!("Failed to log schema change to WAL: {}", e);
1137 }
1138 }
1139
1140 #[cfg(all(feature = "lpg", feature = "gql"))]
1142 fn execute_schema_command(
1143 &self,
1144 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1145 ) -> Result<QueryResult> {
1146 use crate::catalog::{
1147 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1148 };
1149 use grafeo_adapters::query::gql::ast::SchemaStatement;
1150 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1151 #[cfg(feature = "wal")]
1152 use grafeo_storage::wal::WalRecord;
1153
1154 macro_rules! wal_log {
1156 ($self:expr, $record:expr) => {
1157 #[cfg(feature = "wal")]
1158 $self.log_schema_wal(&$record);
1159 };
1160 }
1161
1162 let result = match cmd {
1163 SchemaStatement::CreateNodeType(stmt) => {
1164 let effective_name = self.effective_type_key(&stmt.name);
1165 #[cfg(feature = "wal")]
1166 let props_for_wal: Vec<(String, String, bool)> = stmt
1167 .properties
1168 .iter()
1169 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1170 .collect();
1171 let def = NodeTypeDefinition {
1172 name: effective_name.clone(),
1173 properties: stmt
1174 .properties
1175 .iter()
1176 .map(|p| TypedProperty {
1177 name: p.name.clone(),
1178 data_type: PropertyDataType::from_type_name(&p.data_type),
1179 nullable: p.nullable,
1180 default_value: p
1181 .default_value
1182 .as_ref()
1183 .map(|s| parse_default_literal(s)),
1184 })
1185 .collect(),
1186 constraints: Vec::new(),
1187 parent_types: stmt.parent_types.clone(),
1188 };
1189 let result = if stmt.or_replace {
1190 let _ = self.catalog.drop_node_type(&effective_name);
1191 self.catalog.register_node_type(def)
1192 } else {
1193 self.catalog.register_node_type(def)
1194 };
1195 match result {
1196 Ok(()) => {
1197 wal_log!(
1198 self,
1199 WalRecord::CreateNodeType {
1200 name: effective_name.clone(),
1201 properties: props_for_wal,
1202 constraints: Vec::new(),
1203 }
1204 );
1205 Ok(QueryResult::status(format!(
1206 "Created node type '{}'",
1207 stmt.name
1208 )))
1209 }
1210 Err(e) if stmt.if_not_exists => {
1211 let _ = e;
1212 Ok(QueryResult::status("No change"))
1213 }
1214 Err(e) => Err(Error::Query(QueryError::new(
1215 QueryErrorKind::Semantic,
1216 e.to_string(),
1217 ))),
1218 }
1219 }
1220 SchemaStatement::CreateEdgeType(stmt) => {
1221 let effective_name = self.effective_type_key(&stmt.name);
1222 #[cfg(feature = "wal")]
1223 let props_for_wal: Vec<(String, String, bool)> = stmt
1224 .properties
1225 .iter()
1226 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1227 .collect();
1228 let def = EdgeTypeDefinition {
1229 name: effective_name.clone(),
1230 properties: stmt
1231 .properties
1232 .iter()
1233 .map(|p| TypedProperty {
1234 name: p.name.clone(),
1235 data_type: PropertyDataType::from_type_name(&p.data_type),
1236 nullable: p.nullable,
1237 default_value: p
1238 .default_value
1239 .as_ref()
1240 .map(|s| parse_default_literal(s)),
1241 })
1242 .collect(),
1243 constraints: Vec::new(),
1244 source_node_types: stmt.source_node_types.clone(),
1245 target_node_types: stmt.target_node_types.clone(),
1246 };
1247 let result = if stmt.or_replace {
1248 let _ = self.catalog.drop_edge_type_def(&effective_name);
1249 self.catalog.register_edge_type_def(def)
1250 } else {
1251 self.catalog.register_edge_type_def(def)
1252 };
1253 match result {
1254 Ok(()) => {
1255 wal_log!(
1256 self,
1257 WalRecord::CreateEdgeType {
1258 name: effective_name.clone(),
1259 properties: props_for_wal,
1260 constraints: Vec::new(),
1261 }
1262 );
1263 Ok(QueryResult::status(format!(
1264 "Created edge type '{}'",
1265 stmt.name
1266 )))
1267 }
1268 Err(e) if stmt.if_not_exists => {
1269 let _ = e;
1270 Ok(QueryResult::status("No change"))
1271 }
1272 Err(e) => Err(Error::Query(QueryError::new(
1273 QueryErrorKind::Semantic,
1274 e.to_string(),
1275 ))),
1276 }
1277 }
1278 SchemaStatement::CreateVectorIndex(stmt) => {
1279 Self::create_vector_index_on_store(
1280 &self.active_lpg_store(),
1281 &stmt.node_label,
1282 &stmt.property,
1283 stmt.dimensions,
1284 stmt.metric.as_deref(),
1285 )?;
1286 wal_log!(
1287 self,
1288 WalRecord::CreateIndex {
1289 name: stmt.name.clone(),
1290 label: stmt.node_label.clone(),
1291 property: stmt.property.clone(),
1292 index_type: "vector".to_string(),
1293 }
1294 );
1295 Ok(QueryResult::status(format!(
1296 "Created vector index '{}'",
1297 stmt.name
1298 )))
1299 }
1300 SchemaStatement::DropNodeType { name, if_exists } => {
1301 let effective_name = self.effective_type_key(&name);
1302 match self.catalog.drop_node_type(&effective_name) {
1303 Ok(()) => {
1304 wal_log!(
1305 self,
1306 WalRecord::DropNodeType {
1307 name: effective_name
1308 }
1309 );
1310 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1311 }
1312 Err(e) if if_exists => {
1313 let _ = e;
1314 Ok(QueryResult::status("No change"))
1315 }
1316 Err(e) => Err(Error::Query(QueryError::new(
1317 QueryErrorKind::Semantic,
1318 e.to_string(),
1319 ))),
1320 }
1321 }
1322 SchemaStatement::DropEdgeType { name, if_exists } => {
1323 let effective_name = self.effective_type_key(&name);
1324 match self.catalog.drop_edge_type_def(&effective_name) {
1325 Ok(()) => {
1326 wal_log!(
1327 self,
1328 WalRecord::DropEdgeType {
1329 name: effective_name
1330 }
1331 );
1332 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1333 }
1334 Err(e) if if_exists => {
1335 let _ = e;
1336 Ok(QueryResult::status("No change"))
1337 }
1338 Err(e) => Err(Error::Query(QueryError::new(
1339 QueryErrorKind::Semantic,
1340 e.to_string(),
1341 ))),
1342 }
1343 }
1344 SchemaStatement::CreateIndex(stmt) => {
1345 use crate::catalog::IndexType as CatalogIndexType;
1346 use grafeo_adapters::query::gql::ast::IndexKind;
1347 let active = self.active_lpg_store();
1348 let index_type_str = match stmt.index_kind {
1349 IndexKind::Property => "property",
1350 IndexKind::BTree => "btree",
1351 IndexKind::Text => "text",
1352 IndexKind::Vector => "vector",
1353 };
1354 match stmt.index_kind {
1355 IndexKind::Property | IndexKind::BTree => {
1356 for prop in &stmt.properties {
1357 active.create_property_index(prop);
1358 }
1359 }
1360 IndexKind::Text => {
1361 for prop in &stmt.properties {
1362 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1363 }
1364 }
1365 IndexKind::Vector => {
1366 for prop in &stmt.properties {
1367 Self::create_vector_index_on_store(
1368 &active,
1369 &stmt.label,
1370 prop,
1371 stmt.options.dimensions,
1372 stmt.options.metric.as_deref(),
1373 )?;
1374 }
1375 }
1376 }
1377 let catalog_index_type = match stmt.index_kind {
1380 IndexKind::Property => CatalogIndexType::Hash,
1381 IndexKind::BTree => CatalogIndexType::BTree,
1382 IndexKind::Text => CatalogIndexType::FullText,
1383 IndexKind::Vector => CatalogIndexType::Hash,
1384 };
1385 let label_id = self.catalog.get_or_create_label(&stmt.label);
1386 for prop in &stmt.properties {
1387 let prop_id = self.catalog.get_or_create_property_key(prop);
1388 self.catalog
1389 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1390 }
1391 #[cfg(feature = "wal")]
1392 for prop in &stmt.properties {
1393 wal_log!(
1394 self,
1395 WalRecord::CreateIndex {
1396 name: stmt.name.clone(),
1397 label: stmt.label.clone(),
1398 property: prop.clone(),
1399 index_type: index_type_str.to_string(),
1400 }
1401 );
1402 }
1403 Ok(QueryResult::status(format!(
1404 "Created {} index '{}'",
1405 index_type_str, stmt.name
1406 )))
1407 }
1408 SchemaStatement::DropIndex { name, if_exists } => {
1409 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1412 let def = self.catalog.get_index(index_id);
1413 self.catalog.drop_index(index_id);
1414 if let Some(def) = def
1415 && let Some(prop_name) =
1416 self.catalog.get_property_key_name(def.property_key)
1417 {
1418 self.active_lpg_store().drop_property_index(&prop_name);
1419 }
1420 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1421 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1422 } else if if_exists {
1423 Ok(QueryResult::status("No change".to_string()))
1424 } else {
1425 Err(Error::Query(QueryError::new(
1426 QueryErrorKind::Semantic,
1427 format!("Index '{name}' does not exist"),
1428 )))
1429 }
1430 }
1431 SchemaStatement::CreateConstraint(stmt) => {
1432 use crate::catalog::TypeConstraint;
1433 use grafeo_adapters::query::gql::ast::ConstraintKind;
1434 let kind_str = match stmt.constraint_kind {
1435 ConstraintKind::Unique => "unique",
1436 ConstraintKind::NodeKey => "node_key",
1437 ConstraintKind::NotNull => "not_null",
1438 ConstraintKind::Exists => "exists",
1439 };
1440 let constraint_name = stmt
1441 .name
1442 .clone()
1443 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1444
1445 match stmt.constraint_kind {
1447 ConstraintKind::Unique => {
1448 for prop in &stmt.properties {
1449 let label_id = self.catalog.get_or_create_label(&stmt.label);
1450 let prop_id = self.catalog.get_or_create_property_key(prop);
1451 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1452 }
1453 let _ = self.catalog.add_constraint_to_type(
1454 &stmt.label,
1455 TypeConstraint::Unique(stmt.properties.clone()),
1456 );
1457 }
1458 ConstraintKind::NodeKey => {
1459 for prop in &stmt.properties {
1460 let label_id = self.catalog.get_or_create_label(&stmt.label);
1461 let prop_id = self.catalog.get_or_create_property_key(prop);
1462 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1463 let _ = self.catalog.add_required_property(label_id, prop_id);
1464 }
1465 let _ = self.catalog.add_constraint_to_type(
1466 &stmt.label,
1467 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1468 );
1469 }
1470 ConstraintKind::NotNull | ConstraintKind::Exists => {
1471 for prop in &stmt.properties {
1472 let label_id = self.catalog.get_or_create_label(&stmt.label);
1473 let prop_id = self.catalog.get_or_create_property_key(prop);
1474 let _ = self.catalog.add_required_property(label_id, prop_id);
1475 let _ = self.catalog.add_constraint_to_type(
1476 &stmt.label,
1477 TypeConstraint::NotNull(prop.clone()),
1478 );
1479 }
1480 }
1481 }
1482
1483 wal_log!(
1484 self,
1485 WalRecord::CreateConstraint {
1486 name: constraint_name.clone(),
1487 label: stmt.label.clone(),
1488 properties: stmt.properties.clone(),
1489 kind: kind_str.to_string(),
1490 }
1491 );
1492 Ok(QueryResult::status(format!(
1493 "Created {kind_str} constraint '{constraint_name}'"
1494 )))
1495 }
1496 SchemaStatement::DropConstraint { name, if_exists } => {
1497 let _ = if_exists;
1498 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1499 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1500 }
1501 SchemaStatement::CreateGraphType(stmt) => {
1502 use crate::catalog::GraphTypeDefinition;
1503 use grafeo_adapters::query::gql::ast::InlineElementType;
1504
1505 let effective_name = self.effective_type_key(&stmt.name);
1506
1507 let (mut node_types, mut edge_types, open) =
1509 if let Some(ref like_graph) = stmt.like_graph {
1510 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1512 if let Some(existing) = self
1513 .catalog
1514 .schema()
1515 .and_then(|s| s.get_graph_type(&type_name))
1516 {
1517 (
1518 existing.allowed_node_types.clone(),
1519 existing.allowed_edge_types.clone(),
1520 existing.open,
1521 )
1522 } else {
1523 (Vec::new(), Vec::new(), true)
1524 }
1525 } else {
1526 let nt = self.catalog.all_node_type_names();
1528 let et = self.catalog.all_edge_type_names();
1529 if nt.is_empty() && et.is_empty() {
1530 (Vec::new(), Vec::new(), true)
1531 } else {
1532 (nt, et, false)
1533 }
1534 }
1535 } else {
1536 let nt = stmt
1538 .node_types
1539 .iter()
1540 .map(|n| self.effective_type_key(n))
1541 .collect();
1542 let et = stmt
1543 .edge_types
1544 .iter()
1545 .map(|n| self.effective_type_key(n))
1546 .collect();
1547 (nt, et, stmt.open)
1548 };
1549
1550 for inline in &stmt.inline_types {
1552 match inline {
1553 InlineElementType::Node {
1554 name,
1555 properties,
1556 key_labels,
1557 ..
1558 } => {
1559 let inline_effective = self.effective_type_key(name);
1560 let def = NodeTypeDefinition {
1561 name: inline_effective.clone(),
1562 properties: properties
1563 .iter()
1564 .map(|p| TypedProperty {
1565 name: p.name.clone(),
1566 data_type: PropertyDataType::from_type_name(&p.data_type),
1567 nullable: p.nullable,
1568 default_value: None,
1569 })
1570 .collect(),
1571 constraints: Vec::new(),
1572 parent_types: key_labels.clone(),
1573 };
1574 self.catalog.register_or_replace_node_type(def);
1576 #[cfg(feature = "wal")]
1577 {
1578 let props_for_wal: Vec<(String, String, bool)> = properties
1579 .iter()
1580 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1581 .collect();
1582 self.log_schema_wal(&WalRecord::CreateNodeType {
1583 name: inline_effective.clone(),
1584 properties: props_for_wal,
1585 constraints: Vec::new(),
1586 });
1587 }
1588 if !node_types.contains(&inline_effective) {
1589 node_types.push(inline_effective);
1590 }
1591 }
1592 InlineElementType::Edge {
1593 name,
1594 properties,
1595 source_node_types,
1596 target_node_types,
1597 ..
1598 } => {
1599 let inline_effective = self.effective_type_key(name);
1600 let def = EdgeTypeDefinition {
1601 name: inline_effective.clone(),
1602 properties: properties
1603 .iter()
1604 .map(|p| TypedProperty {
1605 name: p.name.clone(),
1606 data_type: PropertyDataType::from_type_name(&p.data_type),
1607 nullable: p.nullable,
1608 default_value: None,
1609 })
1610 .collect(),
1611 constraints: Vec::new(),
1612 source_node_types: source_node_types.clone(),
1613 target_node_types: target_node_types.clone(),
1614 };
1615 self.catalog.register_or_replace_edge_type_def(def);
1616 #[cfg(feature = "wal")]
1617 {
1618 let props_for_wal: Vec<(String, String, bool)> = properties
1619 .iter()
1620 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1621 .collect();
1622 self.log_schema_wal(&WalRecord::CreateEdgeType {
1623 name: inline_effective.clone(),
1624 properties: props_for_wal,
1625 constraints: Vec::new(),
1626 });
1627 }
1628 if !edge_types.contains(&inline_effective) {
1629 edge_types.push(inline_effective);
1630 }
1631 }
1632 }
1633 }
1634
1635 let def = GraphTypeDefinition {
1636 name: effective_name.clone(),
1637 allowed_node_types: node_types.clone(),
1638 allowed_edge_types: edge_types.clone(),
1639 open,
1640 };
1641 let result = if stmt.or_replace {
1642 let _ = self.catalog.drop_graph_type(&effective_name);
1644 self.catalog.register_graph_type(def)
1645 } else {
1646 self.catalog.register_graph_type(def)
1647 };
1648 match result {
1649 Ok(()) => {
1650 wal_log!(
1651 self,
1652 WalRecord::CreateGraphType {
1653 name: effective_name.clone(),
1654 node_types,
1655 edge_types,
1656 open,
1657 }
1658 );
1659 Ok(QueryResult::status(format!(
1660 "Created graph type '{}'",
1661 stmt.name
1662 )))
1663 }
1664 Err(e) if stmt.if_not_exists => {
1665 let _ = e;
1666 Ok(QueryResult::status("No change"))
1667 }
1668 Err(e) => Err(Error::Query(QueryError::new(
1669 QueryErrorKind::Semantic,
1670 e.to_string(),
1671 ))),
1672 }
1673 }
1674 SchemaStatement::DropGraphType { name, if_exists } => {
1675 let effective_name = self.effective_type_key(&name);
1676 match self.catalog.drop_graph_type(&effective_name) {
1677 Ok(()) => {
1678 wal_log!(
1679 self,
1680 WalRecord::DropGraphType {
1681 name: effective_name
1682 }
1683 );
1684 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1685 }
1686 Err(e) if if_exists => {
1687 let _ = e;
1688 Ok(QueryResult::status("No change"))
1689 }
1690 Err(e) => Err(Error::Query(QueryError::new(
1691 QueryErrorKind::Semantic,
1692 e.to_string(),
1693 ))),
1694 }
1695 }
1696 SchemaStatement::CreateSchema {
1697 name,
1698 if_not_exists,
1699 } => match self.catalog.register_schema_namespace(name.clone()) {
1700 Ok(()) => {
1701 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1702 let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1705 if self.store.create_graph(&default_key).unwrap_or(false) {
1706 wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1707 }
1708 Ok(QueryResult::status(format!("Created schema '{name}'")))
1709 }
1710 Err(e) if if_not_exists => {
1711 let _ = e;
1712 Ok(QueryResult::status("No change"))
1713 }
1714 Err(e) => Err(Error::Query(QueryError::new(
1715 QueryErrorKind::Semantic,
1716 e.to_string(),
1717 ))),
1718 },
1719 SchemaStatement::DropSchema { name, if_exists } => {
1720 let prefix = format!("{name}/");
1723 let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1724 let has_graphs = self
1725 .store
1726 .graph_names()
1727 .iter()
1728 .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1729 let has_types = self
1730 .catalog
1731 .all_node_type_names()
1732 .iter()
1733 .any(|n| n.starts_with(&prefix))
1734 || self
1735 .catalog
1736 .all_edge_type_names()
1737 .iter()
1738 .any(|n| n.starts_with(&prefix))
1739 || self
1740 .catalog
1741 .all_graph_type_names()
1742 .iter()
1743 .any(|n| n.starts_with(&prefix));
1744 if has_graphs || has_types {
1745 return Err(Error::Query(QueryError::new(
1746 QueryErrorKind::Semantic,
1747 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1748 )));
1749 }
1750 match self.catalog.drop_schema_namespace(&name) {
1751 Ok(()) => {
1752 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1753 if self.store.drop_graph(&default_graph_key) {
1755 wal_log!(
1756 self,
1757 WalRecord::DropNamedGraph {
1758 name: default_graph_key,
1759 }
1760 );
1761 }
1762 let mut current = self.current_schema.lock();
1764 if current
1765 .as_deref()
1766 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1767 {
1768 *current = None;
1769 }
1770 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1771 }
1772 Err(e) if if_exists => {
1773 let _ = e;
1774 Ok(QueryResult::status("No change"))
1775 }
1776 Err(e) => Err(Error::Query(QueryError::new(
1777 QueryErrorKind::Semantic,
1778 e.to_string(),
1779 ))),
1780 }
1781 }
1782 SchemaStatement::AlterNodeType(stmt) => {
1783 use grafeo_adapters::query::gql::ast::TypeAlteration;
1784 let effective_name = self.effective_type_key(&stmt.name);
1785 let mut wal_alts = Vec::new();
1786 for alt in &stmt.alterations {
1787 match alt {
1788 TypeAlteration::AddProperty(prop) => {
1789 let typed = TypedProperty {
1790 name: prop.name.clone(),
1791 data_type: PropertyDataType::from_type_name(&prop.data_type),
1792 nullable: prop.nullable,
1793 default_value: prop
1794 .default_value
1795 .as_ref()
1796 .map(|s| parse_default_literal(s)),
1797 };
1798 self.catalog
1799 .alter_node_type_add_property(&effective_name, typed)
1800 .map_err(|e| {
1801 Error::Query(QueryError::new(
1802 QueryErrorKind::Semantic,
1803 e.to_string(),
1804 ))
1805 })?;
1806 wal_alts.push((
1807 "add".to_string(),
1808 prop.name.clone(),
1809 prop.data_type.clone(),
1810 prop.nullable,
1811 ));
1812 }
1813 TypeAlteration::DropProperty(name) => {
1814 self.catalog
1815 .alter_node_type_drop_property(&effective_name, name)
1816 .map_err(|e| {
1817 Error::Query(QueryError::new(
1818 QueryErrorKind::Semantic,
1819 e.to_string(),
1820 ))
1821 })?;
1822 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1823 }
1824 }
1825 }
1826 wal_log!(
1827 self,
1828 WalRecord::AlterNodeType {
1829 name: effective_name,
1830 alterations: wal_alts,
1831 }
1832 );
1833 Ok(QueryResult::status(format!(
1834 "Altered node type '{}'",
1835 stmt.name
1836 )))
1837 }
1838 SchemaStatement::AlterEdgeType(stmt) => {
1839 use grafeo_adapters::query::gql::ast::TypeAlteration;
1840 let effective_name = self.effective_type_key(&stmt.name);
1841 let mut wal_alts = Vec::new();
1842 for alt in &stmt.alterations {
1843 match alt {
1844 TypeAlteration::AddProperty(prop) => {
1845 let typed = TypedProperty {
1846 name: prop.name.clone(),
1847 data_type: PropertyDataType::from_type_name(&prop.data_type),
1848 nullable: prop.nullable,
1849 default_value: prop
1850 .default_value
1851 .as_ref()
1852 .map(|s| parse_default_literal(s)),
1853 };
1854 self.catalog
1855 .alter_edge_type_add_property(&effective_name, typed)
1856 .map_err(|e| {
1857 Error::Query(QueryError::new(
1858 QueryErrorKind::Semantic,
1859 e.to_string(),
1860 ))
1861 })?;
1862 wal_alts.push((
1863 "add".to_string(),
1864 prop.name.clone(),
1865 prop.data_type.clone(),
1866 prop.nullable,
1867 ));
1868 }
1869 TypeAlteration::DropProperty(name) => {
1870 self.catalog
1871 .alter_edge_type_drop_property(&effective_name, name)
1872 .map_err(|e| {
1873 Error::Query(QueryError::new(
1874 QueryErrorKind::Semantic,
1875 e.to_string(),
1876 ))
1877 })?;
1878 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1879 }
1880 }
1881 }
1882 wal_log!(
1883 self,
1884 WalRecord::AlterEdgeType {
1885 name: effective_name,
1886 alterations: wal_alts,
1887 }
1888 );
1889 Ok(QueryResult::status(format!(
1890 "Altered edge type '{}'",
1891 stmt.name
1892 )))
1893 }
1894 SchemaStatement::AlterGraphType(stmt) => {
1895 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1896 let effective_name = self.effective_type_key(&stmt.name);
1897 let mut wal_alts = Vec::new();
1898 for alt in &stmt.alterations {
1899 match alt {
1900 GraphTypeAlteration::AddNodeType(name) => {
1901 self.catalog
1902 .alter_graph_type_add_node_type(&effective_name, name.clone())
1903 .map_err(|e| {
1904 Error::Query(QueryError::new(
1905 QueryErrorKind::Semantic,
1906 e.to_string(),
1907 ))
1908 })?;
1909 wal_alts.push(("add_node_type".to_string(), name.clone()));
1910 }
1911 GraphTypeAlteration::DropNodeType(name) => {
1912 self.catalog
1913 .alter_graph_type_drop_node_type(&effective_name, name)
1914 .map_err(|e| {
1915 Error::Query(QueryError::new(
1916 QueryErrorKind::Semantic,
1917 e.to_string(),
1918 ))
1919 })?;
1920 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1921 }
1922 GraphTypeAlteration::AddEdgeType(name) => {
1923 self.catalog
1924 .alter_graph_type_add_edge_type(&effective_name, name.clone())
1925 .map_err(|e| {
1926 Error::Query(QueryError::new(
1927 QueryErrorKind::Semantic,
1928 e.to_string(),
1929 ))
1930 })?;
1931 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1932 }
1933 GraphTypeAlteration::DropEdgeType(name) => {
1934 self.catalog
1935 .alter_graph_type_drop_edge_type(&effective_name, name)
1936 .map_err(|e| {
1937 Error::Query(QueryError::new(
1938 QueryErrorKind::Semantic,
1939 e.to_string(),
1940 ))
1941 })?;
1942 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1943 }
1944 }
1945 }
1946 wal_log!(
1947 self,
1948 WalRecord::AlterGraphType {
1949 name: effective_name,
1950 alterations: wal_alts,
1951 }
1952 );
1953 Ok(QueryResult::status(format!(
1954 "Altered graph type '{}'",
1955 stmt.name
1956 )))
1957 }
1958 SchemaStatement::CreateProcedure(stmt) => {
1959 use crate::catalog::ProcedureDefinition;
1960
1961 let def = ProcedureDefinition {
1962 name: stmt.name.clone(),
1963 params: stmt
1964 .params
1965 .iter()
1966 .map(|p| (p.name.clone(), p.param_type.clone()))
1967 .collect(),
1968 returns: stmt
1969 .returns
1970 .iter()
1971 .map(|r| (r.name.clone(), r.return_type.clone()))
1972 .collect(),
1973 body: stmt.body.clone(),
1974 };
1975
1976 if stmt.or_replace {
1977 self.catalog.replace_procedure(def).map_err(|e| {
1978 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1979 })?;
1980 } else {
1981 match self.catalog.register_procedure(def) {
1982 Ok(()) => {}
1983 Err(_) if stmt.if_not_exists => {
1984 return Ok(QueryResult::empty());
1985 }
1986 Err(e) => {
1987 return Err(Error::Query(QueryError::new(
1988 QueryErrorKind::Semantic,
1989 e.to_string(),
1990 )));
1991 }
1992 }
1993 }
1994
1995 wal_log!(
1996 self,
1997 WalRecord::CreateProcedure {
1998 name: stmt.name.clone(),
1999 params: stmt
2000 .params
2001 .iter()
2002 .map(|p| (p.name.clone(), p.param_type.clone()))
2003 .collect(),
2004 returns: stmt
2005 .returns
2006 .iter()
2007 .map(|r| (r.name.clone(), r.return_type.clone()))
2008 .collect(),
2009 body: stmt.body,
2010 }
2011 );
2012 Ok(QueryResult::status(format!(
2013 "Created procedure '{}'",
2014 stmt.name
2015 )))
2016 }
2017 SchemaStatement::DropProcedure { name, if_exists } => {
2018 match self.catalog.drop_procedure(&name) {
2019 Ok(()) => {}
2020 Err(_) if if_exists => {
2021 return Ok(QueryResult::empty());
2022 }
2023 Err(e) => {
2024 return Err(Error::Query(QueryError::new(
2025 QueryErrorKind::Semantic,
2026 e.to_string(),
2027 )));
2028 }
2029 }
2030 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2031 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2032 }
2033 SchemaStatement::ShowIndexes => {
2034 return self.execute_show_indexes();
2035 }
2036 SchemaStatement::ShowConstraints => {
2037 return self.execute_show_constraints();
2038 }
2039 SchemaStatement::ShowNodeTypes => {
2040 return self.execute_show_node_types();
2041 }
2042 SchemaStatement::ShowEdgeTypes => {
2043 return self.execute_show_edge_types();
2044 }
2045 SchemaStatement::ShowGraphTypes => {
2046 return self.execute_show_graph_types();
2047 }
2048 SchemaStatement::ShowGraphType(name) => {
2049 return self.execute_show_graph_type(&name);
2050 }
2051 SchemaStatement::ShowCurrentGraphType => {
2052 return self.execute_show_current_graph_type();
2053 }
2054 SchemaStatement::ShowGraphs => {
2055 return self.execute_show_graphs();
2056 }
2057 SchemaStatement::ShowSchemas => {
2058 return self.execute_show_schemas();
2059 }
2060 };
2061
2062 if result.is_ok() {
2065 self.query_cache.clear();
2066 }
2067
2068 result
2069 }
2070
2071 #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2073 fn create_vector_index_on_store(
2074 store: &LpgStore,
2075 label: &str,
2076 property: &str,
2077 dimensions: Option<usize>,
2078 metric: Option<&str>,
2079 ) -> Result<()> {
2080 use grafeo_common::types::{PropertyKey, Value};
2081 use grafeo_common::utils::error::Error;
2082 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2083
2084 let metric = match metric {
2085 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2086 Error::Internal(format!(
2087 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2088 ))
2089 })?,
2090 None => DistanceMetric::Cosine,
2091 };
2092
2093 let prop_key = PropertyKey::new(property);
2094 let mut found_dims: Option<usize> = dimensions;
2095 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2096
2097 for node in store.nodes_with_label(label) {
2098 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2099 if let Some(expected) = found_dims {
2100 if v.len() != expected {
2101 return Err(Error::Internal(format!(
2102 "Vector dimension mismatch: expected {expected}, found {} on node {}",
2103 v.len(),
2104 node.id.0
2105 )));
2106 }
2107 } else {
2108 found_dims = Some(v.len());
2109 }
2110 vectors.push((node.id, v.to_vec()));
2111 }
2112 }
2113
2114 let Some(dims) = found_dims else {
2115 return Err(Error::Internal(format!(
2116 "No vector properties found on :{label}({property}) and no dimensions specified"
2117 )));
2118 };
2119
2120 let config = HnswConfig::new(dims, metric);
2121 let index = HnswIndex::with_capacity(config, vectors.len());
2122 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2123 for (node_id, vec) in &vectors {
2124 index.insert(*node_id, vec, &accessor);
2125 }
2126
2127 store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2128 Ok(())
2129 }
2130
2131 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2133 fn create_vector_index_on_store(
2134 _store: &LpgStore,
2135 _label: &str,
2136 _property: &str,
2137 _dimensions: Option<usize>,
2138 _metric: Option<&str>,
2139 ) -> Result<()> {
2140 Err(grafeo_common::utils::error::Error::Internal(
2141 "Vector index support requires the 'vector-index' feature".to_string(),
2142 ))
2143 }
2144
2145 #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2147 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2148 use grafeo_common::types::{PropertyKey, Value};
2149 use grafeo_core::index::text::{BM25Config, InvertedIndex};
2150
2151 let mut index = InvertedIndex::new(BM25Config::default());
2152 let prop_key = PropertyKey::new(property);
2153
2154 let nodes = store.nodes_by_label(label);
2155 for node_id in nodes {
2156 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2157 index.insert(node_id, text.as_str());
2158 }
2159 }
2160
2161 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2162 Ok(())
2163 }
2164
2165 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2167 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2168 Err(grafeo_common::utils::error::Error::Internal(
2169 "Text index support requires the 'text-index' feature".to_string(),
2170 ))
2171 }
2172
2173 fn execute_show_indexes(&self) -> Result<QueryResult> {
2175 let indexes = self.catalog.all_indexes();
2176 let columns = vec![
2177 "name".to_string(),
2178 "type".to_string(),
2179 "label".to_string(),
2180 "property".to_string(),
2181 ];
2182 let rows: Vec<Vec<Value>> = indexes
2183 .into_iter()
2184 .map(|def| {
2185 let label_name = self
2186 .catalog
2187 .get_label_name(def.label)
2188 .unwrap_or_else(|| "?".into());
2189 let prop_name = self
2190 .catalog
2191 .get_property_key_name(def.property_key)
2192 .unwrap_or_else(|| "?".into());
2193 vec![
2194 Value::from(def.name),
2195 Value::from(format!("{:?}", def.index_type)),
2196 Value::from(&*label_name),
2197 Value::from(&*prop_name),
2198 ]
2199 })
2200 .collect();
2201 Ok(QueryResult {
2202 columns,
2203 column_types: Vec::new(),
2204 rows,
2205 ..QueryResult::empty()
2206 })
2207 }
2208
2209 fn execute_show_constraints(&self) -> Result<QueryResult> {
2211 Ok(QueryResult {
2214 columns: vec![
2215 "name".to_string(),
2216 "type".to_string(),
2217 "label".to_string(),
2218 "properties".to_string(),
2219 ],
2220 column_types: Vec::new(),
2221 rows: Vec::new(),
2222 ..QueryResult::empty()
2223 })
2224 }
2225
2226 fn execute_show_node_types(&self) -> Result<QueryResult> {
2228 let columns = vec![
2229 "name".to_string(),
2230 "properties".to_string(),
2231 "constraints".to_string(),
2232 "parents".to_string(),
2233 ];
2234 let schema = self.current_schema.lock().clone();
2235 let all_names = self.catalog.all_node_type_names();
2236 let type_names: Vec<String> = match &schema {
2237 Some(s) => {
2238 let prefix = format!("{s}/");
2239 all_names
2240 .into_iter()
2241 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2242 .collect()
2243 }
2244 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2245 };
2246 let rows: Vec<Vec<Value>> = type_names
2247 .into_iter()
2248 .filter_map(|name| {
2249 let lookup = match &schema {
2250 Some(s) => format!("{s}/{name}"),
2251 None => name.clone(),
2252 };
2253 let def = self.catalog.get_node_type(&lookup)?;
2254 let props: Vec<String> = def
2255 .properties
2256 .iter()
2257 .map(|p| {
2258 let nullable = if p.nullable { "" } else { " NOT NULL" };
2259 format!("{} {}{}", p.name, p.data_type, nullable)
2260 })
2261 .collect();
2262 let constraints: Vec<String> =
2263 def.constraints.iter().map(|c| format!("{c:?}")).collect();
2264 let parents = def.parent_types.join(", ");
2265 Some(vec![
2266 Value::from(name),
2267 Value::from(props.join(", ")),
2268 Value::from(constraints.join(", ")),
2269 Value::from(parents),
2270 ])
2271 })
2272 .collect();
2273 Ok(QueryResult {
2274 columns,
2275 column_types: Vec::new(),
2276 rows,
2277 ..QueryResult::empty()
2278 })
2279 }
2280
2281 fn execute_show_edge_types(&self) -> Result<QueryResult> {
2283 let columns = vec![
2284 "name".to_string(),
2285 "properties".to_string(),
2286 "source_types".to_string(),
2287 "target_types".to_string(),
2288 ];
2289 let schema = self.current_schema.lock().clone();
2290 let all_names = self.catalog.all_edge_type_names();
2291 let type_names: Vec<String> = match &schema {
2292 Some(s) => {
2293 let prefix = format!("{s}/");
2294 all_names
2295 .into_iter()
2296 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2297 .collect()
2298 }
2299 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2300 };
2301 let rows: Vec<Vec<Value>> = type_names
2302 .into_iter()
2303 .filter_map(|name| {
2304 let lookup = match &schema {
2305 Some(s) => format!("{s}/{name}"),
2306 None => name.clone(),
2307 };
2308 let def = self.catalog.get_edge_type_def(&lookup)?;
2309 let props: Vec<String> = def
2310 .properties
2311 .iter()
2312 .map(|p| {
2313 let nullable = if p.nullable { "" } else { " NOT NULL" };
2314 format!("{} {}{}", p.name, p.data_type, nullable)
2315 })
2316 .collect();
2317 let src = def.source_node_types.join(", ");
2318 let tgt = def.target_node_types.join(", ");
2319 Some(vec![
2320 Value::from(name),
2321 Value::from(props.join(", ")),
2322 Value::from(src),
2323 Value::from(tgt),
2324 ])
2325 })
2326 .collect();
2327 Ok(QueryResult {
2328 columns,
2329 column_types: Vec::new(),
2330 rows,
2331 ..QueryResult::empty()
2332 })
2333 }
2334
2335 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2337 let columns = vec![
2338 "name".to_string(),
2339 "open".to_string(),
2340 "node_types".to_string(),
2341 "edge_types".to_string(),
2342 ];
2343 let schema = self.current_schema.lock().clone();
2344 let all_names = self.catalog.all_graph_type_names();
2345 let type_names: Vec<String> = match &schema {
2346 Some(s) => {
2347 let prefix = format!("{s}/");
2348 all_names
2349 .into_iter()
2350 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2351 .collect()
2352 }
2353 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2354 };
2355 let rows: Vec<Vec<Value>> = type_names
2356 .into_iter()
2357 .filter_map(|name| {
2358 let lookup = match &schema {
2359 Some(s) => format!("{s}/{name}"),
2360 None => name.clone(),
2361 };
2362 let def = self.catalog.get_graph_type_def(&lookup)?;
2363 let strip = |n: &String| -> String {
2365 match &schema {
2366 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2367 None => n.clone(),
2368 }
2369 };
2370 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2371 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2372 Some(vec![
2373 Value::from(name),
2374 Value::from(def.open),
2375 Value::from(node_types.join(", ")),
2376 Value::from(edge_types.join(", ")),
2377 ])
2378 })
2379 .collect();
2380 Ok(QueryResult {
2381 columns,
2382 column_types: Vec::new(),
2383 rows,
2384 ..QueryResult::empty()
2385 })
2386 }
2387
2388 #[cfg(feature = "lpg")]
2394 fn execute_show_graphs(&self) -> Result<QueryResult> {
2395 let schema = self.current_schema.lock().clone();
2396 let all_names = self.store.graph_names();
2397
2398 let mut names: Vec<String> = match &schema {
2399 Some(s) => {
2400 let prefix = format!("{s}/");
2401 all_names
2402 .into_iter()
2403 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2404 .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2405 .collect()
2406 }
2407 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2408 };
2409 names.sort();
2410
2411 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2412 Ok(QueryResult {
2413 columns: vec!["name".to_string()],
2414 column_types: Vec::new(),
2415 rows,
2416 ..QueryResult::empty()
2417 })
2418 }
2419
2420 fn execute_show_schemas(&self) -> Result<QueryResult> {
2422 let mut names = self.catalog.schema_names();
2423 names.sort();
2424 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2425 Ok(QueryResult {
2426 columns: vec!["name".to_string()],
2427 column_types: Vec::new(),
2428 rows,
2429 ..QueryResult::empty()
2430 })
2431 }
2432
2433 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2435 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2436
2437 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2438 Error::Query(QueryError::new(
2439 QueryErrorKind::Semantic,
2440 format!("Graph type '{name}' not found"),
2441 ))
2442 })?;
2443
2444 let columns = vec![
2445 "name".to_string(),
2446 "open".to_string(),
2447 "node_types".to_string(),
2448 "edge_types".to_string(),
2449 ];
2450 let rows = vec![vec![
2451 Value::from(def.name),
2452 Value::from(def.open),
2453 Value::from(def.allowed_node_types.join(", ")),
2454 Value::from(def.allowed_edge_types.join(", ")),
2455 ]];
2456 Ok(QueryResult {
2457 columns,
2458 column_types: Vec::new(),
2459 rows,
2460 ..QueryResult::empty()
2461 })
2462 }
2463
2464 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2466 let graph_name = self
2467 .current_graph()
2468 .unwrap_or_else(|| "default".to_string());
2469 let columns = vec![
2470 "graph".to_string(),
2471 "graph_type".to_string(),
2472 "open".to_string(),
2473 "node_types".to_string(),
2474 "edge_types".to_string(),
2475 ];
2476
2477 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2478 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2479 {
2480 let rows = vec![vec![
2481 Value::from(graph_name),
2482 Value::from(type_name),
2483 Value::from(def.open),
2484 Value::from(def.allowed_node_types.join(", ")),
2485 Value::from(def.allowed_edge_types.join(", ")),
2486 ]];
2487 return Ok(QueryResult {
2488 columns,
2489 column_types: Vec::new(),
2490 rows,
2491 ..QueryResult::empty()
2492 });
2493 }
2494
2495 Ok(QueryResult {
2497 columns,
2498 column_types: Vec::new(),
2499 rows: vec![vec![
2500 Value::from(graph_name),
2501 Value::Null,
2502 Value::Null,
2503 Value::Null,
2504 Value::Null,
2505 ]],
2506 ..QueryResult::empty()
2507 })
2508 }
2509
2510 #[cfg(feature = "gql")]
2537 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2538 self.require_lpg("GQL")?;
2539
2540 use crate::query::{
2541 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2542 translators::gql,
2543 };
2544
2545 let _span = grafeo_info_span!(
2546 "grafeo::session::execute",
2547 language = "gql",
2548 query_len = query.len(),
2549 );
2550
2551 #[cfg(not(target_arch = "wasm32"))]
2552 let start_time = std::time::Instant::now();
2553
2554 let translation = gql::translate_full(query)?;
2556 let logical_plan = match translation {
2557 gql::GqlTranslationResult::SessionCommand(cmd) => {
2558 return self.execute_session_command(cmd);
2559 }
2560 #[cfg(feature = "lpg")]
2561 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2562 self.require_permission(crate::auth::StatementKind::Admin)?;
2564 if *self.read_only_tx.lock() {
2565 return Err(grafeo_common::utils::error::Error::Transaction(
2566 grafeo_common::utils::error::TransactionError::ReadOnly,
2567 ));
2568 }
2569 return self.execute_schema_command(cmd);
2570 }
2571 gql::GqlTranslationResult::Plan(plan) => {
2572 let read_only = *self.read_only_tx.lock();
2577 let need_check = read_only || !self.identity.can_admin();
2578 let is_mutation = need_check && plan.root.has_mutations();
2579 if is_mutation {
2580 self.require_permission(crate::auth::StatementKind::Write)?;
2581 }
2582 if read_only && is_mutation {
2583 return Err(grafeo_common::utils::error::Error::Transaction(
2584 grafeo_common::utils::error::TransactionError::ReadOnly,
2585 ));
2586 }
2587 plan
2588 }
2589 #[cfg(not(feature = "lpg"))]
2590 gql::GqlTranslationResult::SchemaCommand(_) => {
2591 return Err(grafeo_common::utils::error::Error::Internal(
2592 "Schema commands require the `lpg` feature".to_string(),
2593 ));
2594 }
2595 };
2596
2597 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2599
2600 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2602 cached_plan
2603 } else {
2604 let mut binder = Binder::new();
2606 let _binding_context = binder.bind(&logical_plan)?;
2607
2608 let active = self.active_store();
2610 let optimizer = Optimizer::from_graph_store(&*active);
2611 let plan = optimizer.optimize(logical_plan)?;
2612
2613 self.query_cache.put_optimized(cache_key, plan.clone());
2615
2616 plan
2617 };
2618
2619 let active = self.active_store();
2621
2622 if optimized_plan.explain {
2624 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2625 let mut plan = optimized_plan;
2626 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2627 return Ok(explain_result(&plan));
2628 }
2629
2630 if optimized_plan.profile {
2632 let has_mutations = optimized_plan.root.has_mutations();
2633 return self.with_auto_commit(has_mutations, || {
2634 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2635 let planner = self.create_planner_for_store(
2636 Arc::clone(&active),
2637 viewing_epoch,
2638 transaction_id,
2639 );
2640 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2641
2642 let executor = self.make_executor(physical_plan.columns.clone());
2643 let _result = executor.execute(physical_plan.operator.as_mut())?;
2644
2645 let total_time_ms;
2646 #[cfg(not(target_arch = "wasm32"))]
2647 {
2648 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2649 }
2650 #[cfg(target_arch = "wasm32")]
2651 {
2652 total_time_ms = 0.0;
2653 }
2654
2655 let profile_tree = crate::query::profile::build_profile_tree(
2656 &optimized_plan.root,
2657 &mut entries.into_iter(),
2658 );
2659 Ok(crate::query::profile::profile_result(
2660 &profile_tree,
2661 total_time_ms,
2662 ))
2663 });
2664 }
2665
2666 let has_mutations = optimized_plan.root.has_mutations();
2667
2668 let result = self.with_auto_commit(has_mutations, || {
2669 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2671
2672 let has_active_tx = self.current_transaction.lock().is_some();
2677 let read_only = !has_mutations && !has_active_tx;
2678 let planner = self.create_planner_for_store_with_read_only(
2679 Arc::clone(&active),
2680 viewing_epoch,
2681 transaction_id,
2682 read_only,
2683 );
2684 let physical_plan = planner.plan(&optimized_plan)?;
2685
2686 let executor = self.make_executor(physical_plan.columns.clone());
2688 let (mut source, push_ops) = {
2689 #[cfg(feature = "spill")]
2690 {
2691 let memory_ctx = self.make_operator_memory_context();
2692 grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2693 physical_plan.into_operator(),
2694 memory_ctx,
2695 )
2696 }
2697 #[cfg(not(feature = "spill"))]
2698 {
2699 grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2700 physical_plan.into_operator(),
2701 )
2702 }
2703 };
2704 let mut result = if push_ops.is_empty() {
2705 executor.execute(source.as_mut())?
2707 } else {
2708 executor.execute_pipeline(source, push_ops)?
2710 };
2711
2712 let rows_scanned = result.rows.len() as u64;
2714 #[cfg(not(target_arch = "wasm32"))]
2715 {
2716 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2717 result.execution_time_ms = Some(elapsed_ms);
2718 }
2719 result.rows_scanned = Some(rows_scanned);
2720
2721 Ok(result)
2722 });
2723
2724 #[cfg(feature = "metrics")]
2726 {
2727 #[cfg(not(target_arch = "wasm32"))]
2728 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2729 #[cfg(target_arch = "wasm32")]
2730 let elapsed_ms = None;
2731 self.record_query_metrics("gql", elapsed_ms, &result);
2732 }
2733
2734 result
2735 }
2736
2737 #[cfg(feature = "gql")]
2746 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2747 let previous = self.viewing_epoch_override.lock().replace(epoch);
2748 let result = self.execute(query);
2749 *self.viewing_epoch_override.lock() = previous;
2750 result
2751 }
2752
2753 #[cfg(feature = "gql")]
2761 pub fn execute_at_epoch_with_params(
2762 &self,
2763 query: &str,
2764 epoch: EpochId,
2765 params: Option<std::collections::HashMap<String, Value>>,
2766 ) -> Result<QueryResult> {
2767 let previous = self.viewing_epoch_override.lock().replace(epoch);
2768 let result = if let Some(p) = params {
2769 self.execute_with_params(query, p)
2770 } else {
2771 self.execute(query)
2772 };
2773 *self.viewing_epoch_override.lock() = previous;
2774 result
2775 }
2776
2777 #[cfg(feature = "gql")]
2783 pub fn execute_with_params(
2784 &self,
2785 query: &str,
2786 params: std::collections::HashMap<String, Value>,
2787 ) -> Result<QueryResult> {
2788 self.require_lpg("GQL")?;
2789
2790 use crate::query::processor::{QueryLanguage, QueryProcessor};
2791
2792 let has_mutations = if self.identity.can_write() {
2796 Self::query_looks_like_mutation(query)
2798 } else {
2799 use crate::query::translators::gql;
2801 match gql::translate(query) {
2802 Ok(plan) if plan.root.has_mutations() => {
2803 self.require_permission(crate::auth::StatementKind::Write)?;
2804 true
2805 }
2806 Ok(_) => false,
2807 Err(_) => Self::query_looks_like_mutation(query),
2809 }
2810 };
2811 let active = self.active_store();
2812
2813 self.with_auto_commit(has_mutations, || {
2814 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2816
2817 let processor = QueryProcessor::for_stores_with_transaction(
2819 Arc::clone(&active),
2820 self.active_write_store(),
2821 Arc::clone(&self.transaction_manager),
2822 )?;
2823
2824 let processor = if let Some(transaction_id) = transaction_id {
2826 processor.with_transaction_context(viewing_epoch, transaction_id)
2827 } else {
2828 processor
2829 };
2830
2831 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2832 })
2833 }
2834
2835 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2841 pub fn execute_with_params(
2842 &self,
2843 _query: &str,
2844 _params: std::collections::HashMap<String, Value>,
2845 ) -> Result<QueryResult> {
2846 Err(grafeo_common::utils::error::Error::Internal(
2847 "No query language enabled".to_string(),
2848 ))
2849 }
2850
2851 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2857 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2858 Err(grafeo_common::utils::error::Error::Internal(
2859 "No query language enabled".to_string(),
2860 ))
2861 }
2862
2863 #[cfg(feature = "cypher")]
2869 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2870 use crate::query::{
2871 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2872 translators::cypher,
2873 };
2874
2875 let translation = cypher::translate_full(query)?;
2877 match translation {
2878 #[cfg(feature = "lpg")]
2879 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2880 use grafeo_common::utils::error::{
2881 Error as GrafeoError, QueryError, QueryErrorKind,
2882 };
2883 self.require_permission(crate::auth::StatementKind::Admin)?;
2884 if *self.read_only_tx.lock() {
2885 return Err(GrafeoError::Query(QueryError::new(
2886 QueryErrorKind::Semantic,
2887 "Cannot execute schema DDL in a read-only transaction",
2888 )));
2889 }
2890 return self.execute_schema_command(cmd);
2891 }
2892 #[cfg(not(feature = "lpg"))]
2893 cypher::CypherTranslationResult::SchemaCommand(_) => {
2894 return Err(grafeo_common::utils::error::Error::Internal(
2895 "Schema DDL requires the `lpg` feature".to_string(),
2896 ));
2897 }
2898 cypher::CypherTranslationResult::ShowIndexes => {
2899 return self.execute_show_indexes();
2900 }
2901 cypher::CypherTranslationResult::ShowConstraints => {
2902 return self.execute_show_constraints();
2903 }
2904 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2905 return self.execute_show_current_graph_type();
2906 }
2907 cypher::CypherTranslationResult::Plan(_) => {
2908 }
2910 }
2911
2912 #[cfg(not(target_arch = "wasm32"))]
2913 let start_time = std::time::Instant::now();
2914
2915 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2917
2918 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2920 cached_plan
2921 } else {
2922 let logical_plan = cypher::translate(query)?;
2924
2925 let mut binder = Binder::new();
2927 let _binding_context = binder.bind(&logical_plan)?;
2928
2929 let active = self.active_store();
2931 let optimizer = Optimizer::from_graph_store(&*active);
2932 let plan = optimizer.optimize(logical_plan)?;
2933
2934 self.query_cache.put_optimized(cache_key, plan.clone());
2936
2937 plan
2938 };
2939
2940 if optimized_plan.root.has_mutations() {
2942 self.require_permission(crate::auth::StatementKind::Write)?;
2943 }
2944
2945 let active = self.active_store();
2947
2948 if optimized_plan.explain {
2950 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2951 let mut plan = optimized_plan;
2952 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2953 return Ok(explain_result(&plan));
2954 }
2955
2956 if optimized_plan.profile {
2958 let has_mutations = optimized_plan.root.has_mutations();
2959 return self.with_auto_commit(has_mutations, || {
2960 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2961 let planner = self.create_planner_for_store(
2962 Arc::clone(&active),
2963 viewing_epoch,
2964 transaction_id,
2965 );
2966 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2967
2968 let executor = self.make_executor(physical_plan.columns.clone());
2969 let _result = executor.execute(physical_plan.operator.as_mut())?;
2970
2971 let total_time_ms;
2972 #[cfg(not(target_arch = "wasm32"))]
2973 {
2974 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2975 }
2976 #[cfg(target_arch = "wasm32")]
2977 {
2978 total_time_ms = 0.0;
2979 }
2980
2981 let profile_tree = crate::query::profile::build_profile_tree(
2982 &optimized_plan.root,
2983 &mut entries.into_iter(),
2984 );
2985 Ok(crate::query::profile::profile_result(
2986 &profile_tree,
2987 total_time_ms,
2988 ))
2989 });
2990 }
2991
2992 let has_mutations = optimized_plan.root.has_mutations();
2993
2994 let result = self.with_auto_commit(has_mutations, || {
2995 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2997
2998 let planner =
3000 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3001 let mut physical_plan = planner.plan(&optimized_plan)?;
3002
3003 let executor = self.make_executor(physical_plan.columns.clone());
3005 executor.execute(physical_plan.operator.as_mut())
3006 });
3007
3008 #[cfg(feature = "metrics")]
3009 {
3010 #[cfg(not(target_arch = "wasm32"))]
3011 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3012 #[cfg(target_arch = "wasm32")]
3013 let elapsed_ms = None;
3014 self.record_query_metrics("cypher", elapsed_ms, &result);
3015 }
3016
3017 result
3018 }
3019
3020 #[cfg(feature = "gremlin")]
3044 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
3045 use crate::query::{binder::Binder, optimizer::Optimizer, translators::gremlin};
3046
3047 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3048 let start_time = Instant::now();
3049
3050 let logical_plan = gremlin::translate(query)?;
3052
3053 let mut binder = Binder::new();
3055 let _binding_context = binder.bind(&logical_plan)?;
3056
3057 let active = self.active_store();
3059 let optimizer = Optimizer::from_graph_store(&*active);
3060 let optimized_plan = optimizer.optimize(logical_plan)?;
3061
3062 let has_mutations = optimized_plan.root.has_mutations();
3063 if has_mutations {
3064 self.require_permission(crate::auth::StatementKind::Write)?;
3065 }
3066
3067 let result = self.with_auto_commit(has_mutations, || {
3068 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3070
3071 let planner =
3073 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3074 let mut physical_plan = planner.plan(&optimized_plan)?;
3075
3076 let executor = self.make_executor(physical_plan.columns.clone());
3078 executor.execute(physical_plan.operator.as_mut())
3079 });
3080
3081 #[cfg(feature = "metrics")]
3082 {
3083 #[cfg(not(target_arch = "wasm32"))]
3084 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3085 #[cfg(target_arch = "wasm32")]
3086 let elapsed_ms = None;
3087 self.record_query_metrics("gremlin", elapsed_ms, &result);
3088 }
3089
3090 result
3091 }
3092
3093 #[cfg(feature = "gremlin")]
3099 pub fn execute_gremlin_with_params(
3100 &self,
3101 query: &str,
3102 params: std::collections::HashMap<String, Value>,
3103 ) -> Result<QueryResult> {
3104 use crate::query::{
3105 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3106 translators::gremlin,
3107 };
3108
3109 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3110 let start_time = Instant::now();
3111
3112 let mut logical_plan = gremlin::translate(query)?;
3114
3115 substitute_params(&mut logical_plan, ¶ms)?;
3117
3118 let mut binder = Binder::new();
3120 let _binding_context = binder.bind(&logical_plan)?;
3121
3122 let active = self.active_store();
3124 let optimizer = Optimizer::from_graph_store(&*active);
3125 let optimized_plan = optimizer.optimize(logical_plan)?;
3126
3127 let has_mutations = optimized_plan.root.has_mutations();
3128 if has_mutations {
3129 self.require_permission(crate::auth::StatementKind::Write)?;
3130 }
3131
3132 let result = self.with_auto_commit(has_mutations, || {
3133 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3134 let planner =
3135 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3136 let mut physical_plan = planner.plan(&optimized_plan)?;
3137 let executor = self.make_executor(physical_plan.columns.clone());
3138 executor.execute(physical_plan.operator.as_mut())
3139 });
3140
3141 #[cfg(feature = "metrics")]
3142 {
3143 #[cfg(not(target_arch = "wasm32"))]
3144 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3145 #[cfg(target_arch = "wasm32")]
3146 let elapsed_ms = None;
3147 self.record_query_metrics("gremlin", elapsed_ms, &result);
3148 }
3149
3150 result
3151 }
3152
3153 #[cfg(feature = "graphql")]
3177 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3178 use crate::query::{
3179 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3180 translators::graphql,
3181 };
3182
3183 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3184 let start_time = Instant::now();
3185
3186 let mut logical_plan = graphql::translate(query)?;
3187
3188 if !logical_plan.default_params.is_empty() {
3190 let defaults = logical_plan.default_params.clone();
3191 substitute_params(&mut logical_plan, &defaults)?;
3192 }
3193
3194 let mut binder = Binder::new();
3195 let _binding_context = binder.bind(&logical_plan)?;
3196
3197 let active = self.active_store();
3198 let optimizer = Optimizer::from_graph_store(&*active);
3199 let optimized_plan = optimizer.optimize(logical_plan)?;
3200 let has_mutations = optimized_plan.root.has_mutations();
3201 if has_mutations {
3202 self.require_permission(crate::auth::StatementKind::Write)?;
3203 }
3204
3205 let result = self.with_auto_commit(has_mutations, || {
3206 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3207 let planner =
3208 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3209 let mut physical_plan = planner.plan(&optimized_plan)?;
3210 let executor = self.make_executor(physical_plan.columns.clone());
3211 executor.execute(physical_plan.operator.as_mut())
3212 });
3213
3214 #[cfg(feature = "metrics")]
3215 {
3216 #[cfg(not(target_arch = "wasm32"))]
3217 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3218 #[cfg(target_arch = "wasm32")]
3219 let elapsed_ms = None;
3220 self.record_query_metrics("graphql", elapsed_ms, &result);
3221 }
3222
3223 result
3224 }
3225
3226 #[cfg(feature = "graphql")]
3232 pub fn execute_graphql_with_params(
3233 &self,
3234 query: &str,
3235 params: std::collections::HashMap<String, Value>,
3236 ) -> Result<QueryResult> {
3237 use crate::query::{
3238 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3239 translators::graphql,
3240 };
3241
3242 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3243 let start_time = Instant::now();
3244
3245 let mut logical_plan = graphql::translate(query)?;
3247
3248 if !logical_plan.default_params.is_empty() {
3250 let mut merged = logical_plan.default_params.clone();
3251 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3252 substitute_params(&mut logical_plan, &merged)?;
3253 } else {
3254 substitute_params(&mut logical_plan, ¶ms)?;
3255 }
3256
3257 let mut binder = Binder::new();
3259 let _binding_context = binder.bind(&logical_plan)?;
3260
3261 let active = self.active_store();
3263 let optimizer = Optimizer::from_graph_store(&*active);
3264 let optimized_plan = optimizer.optimize(logical_plan)?;
3265
3266 let has_mutations = optimized_plan.root.has_mutations();
3267 if has_mutations {
3268 self.require_permission(crate::auth::StatementKind::Write)?;
3269 }
3270
3271 let result = self.with_auto_commit(has_mutations, || {
3272 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3273 let planner =
3274 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3275 let mut physical_plan = planner.plan(&optimized_plan)?;
3276 let executor = self.make_executor(physical_plan.columns.clone());
3277 executor.execute(physical_plan.operator.as_mut())
3278 });
3279
3280 #[cfg(feature = "metrics")]
3281 {
3282 #[cfg(not(target_arch = "wasm32"))]
3283 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3284 #[cfg(target_arch = "wasm32")]
3285 let elapsed_ms = None;
3286 self.record_query_metrics("graphql", elapsed_ms, &result);
3287 }
3288
3289 result
3290 }
3291
3292 #[cfg(feature = "sql-pgq")]
3317 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3318 use crate::query::{
3319 binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3320 processor::QueryLanguage, translators::sql_pgq,
3321 };
3322
3323 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3324 let start_time = Instant::now();
3325
3326 let logical_plan = sql_pgq::translate(query)?;
3328
3329 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3331 self.require_permission(crate::auth::StatementKind::Admin)?;
3332 return Ok(QueryResult {
3333 columns: vec!["status".into()],
3334 column_types: vec![grafeo_common::types::LogicalType::String],
3335 rows: vec![vec![Value::from(format!(
3336 "Property graph '{}' created ({} node tables, {} edge tables)",
3337 cpg.name,
3338 cpg.node_tables.len(),
3339 cpg.edge_tables.len()
3340 ))]],
3341 execution_time_ms: None,
3342 rows_scanned: None,
3343 status_message: None,
3344 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3345 });
3346 }
3347
3348 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3349
3350 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3351 cached_plan
3352 } else {
3353 let mut binder = Binder::new();
3354 let _binding_context = binder.bind(&logical_plan)?;
3355 let active = self.active_store();
3356 let optimizer = Optimizer::from_graph_store(&*active);
3357 let plan = optimizer.optimize(logical_plan)?;
3358 self.query_cache.put_optimized(cache_key, plan.clone());
3359 plan
3360 };
3361
3362 let active = self.active_store();
3363 let has_mutations = optimized_plan.root.has_mutations();
3364 if has_mutations {
3365 self.require_permission(crate::auth::StatementKind::Write)?;
3366 }
3367
3368 let result = self.with_auto_commit(has_mutations, || {
3369 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3370 let planner =
3371 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3372 let mut physical_plan = planner.plan(&optimized_plan)?;
3373 let executor = self.make_executor(physical_plan.columns.clone());
3374 executor.execute(physical_plan.operator.as_mut())
3375 });
3376
3377 #[cfg(feature = "metrics")]
3378 {
3379 #[cfg(not(target_arch = "wasm32"))]
3380 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3381 #[cfg(target_arch = "wasm32")]
3382 let elapsed_ms = None;
3383 self.record_query_metrics("sql", elapsed_ms, &result);
3384 }
3385
3386 result
3387 }
3388
3389 #[cfg(feature = "sql-pgq")]
3395 pub fn execute_sql_with_params(
3396 &self,
3397 query: &str,
3398 params: std::collections::HashMap<String, Value>,
3399 ) -> Result<QueryResult> {
3400 use crate::query::processor::{QueryLanguage, QueryProcessor};
3401
3402 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3403 let start_time = Instant::now();
3404
3405 let has_mutations = if self.identity.can_write() {
3406 Self::query_looks_like_mutation(query)
3407 } else {
3408 use crate::query::translators::sql_pgq;
3409 match sql_pgq::translate(query) {
3410 Ok(plan) if plan.root.has_mutations() => {
3411 self.require_permission(crate::auth::StatementKind::Write)?;
3412 true
3413 }
3414 Ok(_) => false,
3415 Err(_) => Self::query_looks_like_mutation(query),
3416 }
3417 };
3418 if has_mutations {
3419 self.require_permission(crate::auth::StatementKind::Write)?;
3420 }
3421 let active = self.active_store();
3422
3423 let result = self.with_auto_commit(has_mutations, || {
3424 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3425 let processor = QueryProcessor::for_stores_with_transaction(
3426 Arc::clone(&active),
3427 self.active_write_store(),
3428 Arc::clone(&self.transaction_manager),
3429 )?;
3430 let processor = if let Some(transaction_id) = transaction_id {
3431 processor.with_transaction_context(viewing_epoch, transaction_id)
3432 } else {
3433 processor
3434 };
3435 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3436 });
3437
3438 #[cfg(feature = "metrics")]
3439 {
3440 #[cfg(not(target_arch = "wasm32"))]
3441 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3442 #[cfg(target_arch = "wasm32")]
3443 let elapsed_ms = None;
3444 self.record_query_metrics("sql", elapsed_ms, &result);
3445 }
3446
3447 result
3448 }
3449
3450 pub fn execute_language(
3459 &self,
3460 query: &str,
3461 language: &str,
3462 params: Option<std::collections::HashMap<String, Value>>,
3463 ) -> Result<QueryResult> {
3464 let _span = grafeo_info_span!(
3465 "grafeo::session::execute",
3466 language,
3467 query_len = query.len(),
3468 );
3469 match language {
3470 "gql" => {
3471 if let Some(p) = params {
3472 self.execute_with_params(query, p)
3473 } else {
3474 self.execute(query)
3475 }
3476 }
3477 #[cfg(feature = "cypher")]
3478 "cypher" => {
3479 if let Some(p) = params {
3480 use crate::query::processor::{QueryLanguage, QueryProcessor};
3481
3482 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3483 let start_time = Instant::now();
3484
3485 let has_mutations = if self.identity.can_write() {
3486 Self::query_looks_like_mutation(query)
3487 } else {
3488 use crate::query::translators::cypher;
3489 match cypher::translate(query) {
3490 Ok(plan) if plan.root.has_mutations() => {
3491 self.require_permission(crate::auth::StatementKind::Write)?;
3492 true
3493 }
3494 Ok(_) => false,
3495 Err(_) => Self::query_looks_like_mutation(query),
3496 }
3497 };
3498 let active = self.active_store();
3499 let result = self.with_auto_commit(has_mutations, || {
3500 let processor = QueryProcessor::for_stores_with_transaction(
3501 Arc::clone(&active),
3502 self.active_write_store(),
3503 Arc::clone(&self.transaction_manager),
3504 )?;
3505 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3506 let processor = if let Some(transaction_id) = transaction_id {
3507 processor.with_transaction_context(viewing_epoch, transaction_id)
3508 } else {
3509 processor
3510 };
3511 processor.process(query, QueryLanguage::Cypher, Some(&p))
3512 });
3513
3514 #[cfg(feature = "metrics")]
3515 {
3516 #[cfg(not(target_arch = "wasm32"))]
3517 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3518 #[cfg(target_arch = "wasm32")]
3519 let elapsed_ms = None;
3520 self.record_query_metrics("cypher", elapsed_ms, &result);
3521 }
3522
3523 result
3524 } else {
3525 self.execute_cypher(query)
3526 }
3527 }
3528 #[cfg(feature = "gremlin")]
3529 "gremlin" => {
3530 if let Some(p) = params {
3531 self.execute_gremlin_with_params(query, p)
3532 } else {
3533 self.execute_gremlin(query)
3534 }
3535 }
3536 #[cfg(feature = "graphql")]
3537 "graphql" => {
3538 if let Some(p) = params {
3539 self.execute_graphql_with_params(query, p)
3540 } else {
3541 self.execute_graphql(query)
3542 }
3543 }
3544 #[cfg(all(feature = "graphql", feature = "triple-store"))]
3545 "graphql-rdf" => {
3546 if let Some(p) = params {
3547 self.execute_graphql_rdf_with_params(query, p)
3548 } else {
3549 self.execute_graphql_rdf(query)
3550 }
3551 }
3552 #[cfg(feature = "sql-pgq")]
3553 "sql" | "sql-pgq" => {
3554 if let Some(p) = params {
3555 self.execute_sql_with_params(query, p)
3556 } else {
3557 self.execute_sql(query)
3558 }
3559 }
3560 #[cfg(all(feature = "sparql", feature = "triple-store"))]
3561 "sparql" => {
3562 if let Some(p) = params {
3563 self.execute_sparql_with_params(query, p)
3564 } else {
3565 self.execute_sparql(query)
3566 }
3567 }
3568 other => Err(grafeo_common::utils::error::Error::Query(
3569 grafeo_common::utils::error::QueryError::new(
3570 grafeo_common::utils::error::QueryErrorKind::Semantic,
3571 format!("Unknown query language: '{other}'"),
3572 ),
3573 )),
3574 }
3575 }
3576
3577 pub fn clear_plan_cache(&self) {
3604 self.query_cache.clear();
3605 }
3606
3607 #[cfg(feature = "lpg")]
3615 pub fn begin_transaction(&mut self) -> Result<()> {
3616 self.begin_transaction_inner(false, None)
3617 }
3618
3619 #[cfg(feature = "lpg")]
3627 pub fn begin_transaction_with_isolation(
3628 &mut self,
3629 isolation_level: crate::transaction::IsolationLevel,
3630 ) -> Result<()> {
3631 self.begin_transaction_inner(false, Some(isolation_level))
3632 }
3633
3634 #[cfg(feature = "lpg")]
3636 fn begin_transaction_inner(
3637 &self,
3638 read_only: bool,
3639 isolation_level: Option<crate::transaction::IsolationLevel>,
3640 ) -> Result<()> {
3641 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3642 let mut current = self.current_transaction.lock();
3643 if current.is_some() {
3644 drop(current);
3646 let mut depth = self.transaction_nesting_depth.lock();
3647 *depth += 1;
3648 let sp_name = format!("_nested_tx_{}", *depth);
3649 self.savepoint(&sp_name)?;
3650 return Ok(());
3651 }
3652
3653 let active = self.active_lpg_store();
3654 self.transaction_start_node_count
3655 .store(active.node_count(), Ordering::Relaxed);
3656 self.transaction_start_edge_count
3657 .store(active.edge_count(), Ordering::Relaxed);
3658 let transaction_id = if let Some(level) = isolation_level {
3659 self.transaction_manager.begin_with_isolation(level)
3660 } else {
3661 self.transaction_manager.begin()
3662 };
3663 *current = Some(transaction_id);
3664 *self.read_only_tx.lock() = read_only || self.db_read_only;
3665
3666 let key = self.active_graph_storage_key();
3669 let mut touched = self.touched_graphs.lock();
3670 touched.clear();
3671 touched.push(key);
3672
3673 #[cfg(feature = "metrics")]
3674 {
3675 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3676 #[cfg(not(target_arch = "wasm32"))]
3677 {
3678 *self.tx_start_time.lock() = Some(Instant::now());
3679 }
3680 }
3681
3682 Ok(())
3683 }
3684
3685 #[cfg(feature = "lpg")]
3693 pub fn commit(&mut self) -> Result<()> {
3694 self.commit_inner()
3695 }
3696
3697 #[cfg(feature = "lpg")]
3699 fn commit_inner(&self) -> Result<()> {
3700 let _span = grafeo_debug_span!("grafeo::tx::commit");
3701 {
3703 let mut depth = self.transaction_nesting_depth.lock();
3704 if *depth > 0 {
3705 let sp_name = format!("_nested_tx_{depth}");
3706 *depth -= 1;
3707 drop(depth);
3708 return self.release_savepoint(&sp_name);
3709 }
3710 }
3711
3712 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3713 grafeo_common::utils::error::Error::Transaction(
3714 grafeo_common::utils::error::TransactionError::InvalidState(
3715 "No active transaction".to_string(),
3716 ),
3717 )
3718 })?;
3719
3720 let touched = std::mem::take(&mut *self.touched_graphs.lock());
3728 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3729 Ok(epoch) => epoch,
3730 Err(e) => {
3731 for graph_name in &touched {
3733 let store = self.resolve_store(graph_name);
3734 store.rollback_transaction_properties(transaction_id);
3735 }
3736 #[cfg(feature = "triple-store")]
3737 self.rollback_rdf_transaction(transaction_id);
3738 #[cfg(feature = "cdc")]
3740 if let Some(ref pending) = self.cdc_pending_events {
3741 pending.lock().clear();
3742 }
3743 *self.read_only_tx.lock() = self.db_read_only;
3744 self.savepoints.lock().clear();
3745 self.touched_graphs.lock().clear();
3746 #[cfg(feature = "metrics")]
3747 {
3748 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3749 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3750 #[cfg(not(target_arch = "wasm32"))]
3751 if let Some(start) = self.tx_start_time.lock().take() {
3752 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3753 crate::metrics::record_metric!(
3754 self.metrics,
3755 tx_duration,
3756 observe duration_ms
3757 );
3758 }
3759 }
3760 return Err(e);
3761 }
3762 };
3763
3764 for graph_name in &touched {
3766 let store = self.resolve_store(graph_name);
3767 store.finalize_version_epochs(transaction_id, commit_epoch);
3768 }
3769
3770 #[cfg(feature = "triple-store")]
3772 self.commit_rdf_transaction(transaction_id);
3773
3774 for graph_name in &touched {
3775 let store = self.resolve_store(graph_name);
3776 store.commit_transaction_properties(transaction_id);
3777 }
3778
3779 #[cfg(feature = "cdc")]
3783 if let Some(ref pending) = self.cdc_pending_events {
3784 let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3785 self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3786 e.epoch = commit_epoch;
3787 e
3788 }));
3789 }
3790
3791 #[cfg(feature = "wal")]
3796 if let Some(ref wal) = self.wal {
3797 use grafeo_storage::wal::WalRecord;
3798 if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
3799 grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
3800 }
3801 if let Err(e) = wal.log(&WalRecord::EpochAdvance {
3802 epoch: commit_epoch,
3803 }) {
3804 grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
3805 }
3806 }
3807
3808 let current_epoch = self.transaction_manager.current_epoch();
3811 for graph_name in &touched {
3812 let store = self.resolve_store(graph_name);
3813 store.sync_epoch(current_epoch);
3814 }
3815
3816 *self.read_only_tx.lock() = self.db_read_only;
3819 self.savepoints.lock().clear();
3820
3821 if self.gc_interval > 0 {
3823 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3824 if count.is_multiple_of(self.gc_interval) {
3825 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3826 let gc_start = std::time::Instant::now();
3827
3828 let min_epoch = self.transaction_manager.min_active_epoch();
3829 for graph_name in &touched {
3830 let store = self.resolve_store(graph_name);
3831 store.gc_versions(min_epoch);
3832 }
3833 self.transaction_manager.gc();
3834
3835 #[cfg(feature = "metrics")]
3836 {
3837 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3838 #[cfg(not(target_arch = "wasm32"))]
3839 {
3840 let gc_duration_ms = gc_start.elapsed().as_secs_f64() * 1000.0;
3841 crate::metrics::record_metric!(
3842 self.metrics,
3843 gc_duration,
3844 observe gc_duration_ms
3845 );
3846 }
3847 }
3848 }
3849 }
3850
3851 #[cfg(feature = "metrics")]
3852 {
3853 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3854 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3855 #[cfg(not(target_arch = "wasm32"))]
3856 if let Some(start) = self.tx_start_time.lock().take() {
3857 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3858 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3859 }
3860 }
3861
3862 Ok(())
3863 }
3864
3865 #[cfg(feature = "lpg")]
3889 pub fn rollback(&mut self) -> Result<()> {
3890 self.rollback_inner()
3891 }
3892
3893 #[cfg(feature = "lpg")]
3895 fn rollback_inner(&self) -> Result<()> {
3896 let _span = grafeo_debug_span!("grafeo::tx::rollback");
3897 {
3899 let mut depth = self.transaction_nesting_depth.lock();
3900 if *depth > 0 {
3901 let sp_name = format!("_nested_tx_{depth}");
3902 *depth -= 1;
3903 drop(depth);
3904 return self.rollback_to_savepoint(&sp_name);
3905 }
3906 }
3907
3908 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3909 grafeo_common::utils::error::Error::Transaction(
3910 grafeo_common::utils::error::TransactionError::InvalidState(
3911 "No active transaction".to_string(),
3912 ),
3913 )
3914 })?;
3915
3916 *self.read_only_tx.lock() = self.db_read_only;
3918
3919 let touched = self.touched_graphs.lock().clone();
3921 for graph_name in &touched {
3922 let store = self.resolve_store(graph_name);
3923 store.discard_uncommitted_versions(transaction_id);
3924 }
3925
3926 #[cfg(feature = "triple-store")]
3928 self.rollback_rdf_transaction(transaction_id);
3929
3930 #[cfg(feature = "cdc")]
3932 if let Some(ref pending) = self.cdc_pending_events {
3933 pending.lock().clear();
3934 }
3935
3936 self.savepoints.lock().clear();
3938 self.touched_graphs.lock().clear();
3939
3940 let result = self.transaction_manager.abort(transaction_id);
3942
3943 #[cfg(feature = "metrics")]
3944 if result.is_ok() {
3945 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3946 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3947 #[cfg(not(target_arch = "wasm32"))]
3948 if let Some(start) = self.tx_start_time.lock().take() {
3949 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3950 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3951 }
3952 }
3953
3954 result
3955 }
3956
3957 #[cfg(feature = "lpg")]
3967 pub fn savepoint(&self, name: &str) -> Result<()> {
3968 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3969 grafeo_common::utils::error::Error::Transaction(
3970 grafeo_common::utils::error::TransactionError::InvalidState(
3971 "No active transaction".to_string(),
3972 ),
3973 )
3974 })?;
3975
3976 let touched = self.touched_graphs.lock().clone();
3978 let graph_snapshots: Vec<GraphSavepoint> = touched
3979 .iter()
3980 .map(|graph_name| {
3981 let store = self.resolve_store(graph_name);
3982 GraphSavepoint {
3983 graph_name: graph_name.clone(),
3984 next_node_id: store.peek_next_node_id(),
3985 next_edge_id: store.peek_next_edge_id(),
3986 undo_log_position: store.property_undo_log_position(tx_id),
3987 }
3988 })
3989 .collect();
3990
3991 self.savepoints.lock().push(SavepointState {
3992 name: name.to_string(),
3993 graph_snapshots,
3994 active_graph: self.current_graph.lock().clone(),
3995 #[cfg(feature = "cdc")]
3996 cdc_event_position: self
3997 .cdc_pending_events
3998 .as_ref()
3999 .map_or(0, |p| p.lock().len()),
4000 });
4001 Ok(())
4002 }
4003
4004 #[cfg(feature = "lpg")]
4013 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4014 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
4015 grafeo_common::utils::error::Error::Transaction(
4016 grafeo_common::utils::error::TransactionError::InvalidState(
4017 "No active transaction".to_string(),
4018 ),
4019 )
4020 })?;
4021
4022 let mut savepoints = self.savepoints.lock();
4023
4024 let pos = savepoints
4026 .iter()
4027 .rposition(|sp| sp.name == name)
4028 .ok_or_else(|| {
4029 grafeo_common::utils::error::Error::Transaction(
4030 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4031 "Savepoint '{name}' not found"
4032 )),
4033 )
4034 })?;
4035
4036 let sp_state = savepoints[pos].clone();
4037
4038 savepoints.truncate(pos);
4040 drop(savepoints);
4041
4042 for gs in &sp_state.graph_snapshots {
4044 let store = self.resolve_store(&gs.graph_name);
4045
4046 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
4048
4049 let current_next_node = store.peek_next_node_id();
4051 let current_next_edge = store.peek_next_edge_id();
4052
4053 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
4054 .map(NodeId::new)
4055 .collect();
4056 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
4057 .map(EdgeId::new)
4058 .collect();
4059
4060 if !node_ids.is_empty() || !edge_ids.is_empty() {
4061 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
4062 }
4063 }
4064
4065 let touched = self.touched_graphs.lock().clone();
4069 for graph_name in &touched {
4070 let already_captured = sp_state
4071 .graph_snapshots
4072 .iter()
4073 .any(|gs| gs.graph_name == *graph_name);
4074 if !already_captured {
4075 let store = self.resolve_store(graph_name);
4076 store.discard_uncommitted_versions(transaction_id);
4077 }
4078 }
4079
4080 #[cfg(feature = "cdc")]
4082 if let Some(ref pending) = self.cdc_pending_events {
4083 pending.lock().truncate(sp_state.cdc_event_position);
4084 }
4085
4086 let mut touched = self.touched_graphs.lock();
4088 touched.clear();
4089 for gs in &sp_state.graph_snapshots {
4090 if !touched.contains(&gs.graph_name) {
4091 touched.push(gs.graph_name.clone());
4092 }
4093 }
4094
4095 Ok(())
4096 }
4097
4098 pub fn release_savepoint(&self, name: &str) -> Result<()> {
4104 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4105 grafeo_common::utils::error::Error::Transaction(
4106 grafeo_common::utils::error::TransactionError::InvalidState(
4107 "No active transaction".to_string(),
4108 ),
4109 )
4110 })?;
4111
4112 let mut savepoints = self.savepoints.lock();
4113 let pos = savepoints
4114 .iter()
4115 .rposition(|sp| sp.name == name)
4116 .ok_or_else(|| {
4117 grafeo_common::utils::error::Error::Transaction(
4118 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4119 "Savepoint '{name}' not found"
4120 )),
4121 )
4122 })?;
4123 savepoints.remove(pos);
4124 Ok(())
4125 }
4126
4127 #[must_use]
4129 pub fn in_transaction(&self) -> bool {
4130 self.current_transaction.lock().is_some()
4131 }
4132
4133 #[must_use]
4135 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4136 *self.current_transaction.lock()
4137 }
4138
4139 #[must_use]
4141 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4142 &self.transaction_manager
4143 }
4144
4145 #[cfg(feature = "lpg")]
4147 #[must_use]
4148 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4149 (
4150 self.transaction_start_node_count.load(Ordering::Relaxed),
4151 self.active_lpg_store().node_count(),
4152 )
4153 }
4154
4155 #[cfg(feature = "lpg")]
4157 #[must_use]
4158 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4159 (
4160 self.transaction_start_edge_count.load(Ordering::Relaxed),
4161 self.active_lpg_store().edge_count(),
4162 )
4163 }
4164
4165 #[cfg(feature = "lpg")]
4199 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4200 crate::transaction::PreparedCommit::new(self)
4201 }
4202
4203 pub fn set_auto_commit(&mut self, auto_commit: bool) {
4205 self.auto_commit = auto_commit;
4206 }
4207
4208 #[must_use]
4210 pub fn auto_commit(&self) -> bool {
4211 self.auto_commit
4212 }
4213
4214 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4219 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4220 }
4221
4222 #[cfg(feature = "lpg")]
4225 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4226 where
4227 F: FnOnce() -> Result<QueryResult>,
4228 {
4229 if self.needs_auto_commit(has_mutations) {
4230 self.begin_transaction_inner(false, None)?;
4231 match body() {
4232 Ok(result) => {
4233 self.commit_inner()?;
4234 Ok(result)
4235 }
4236 Err(e) => {
4237 let _ = self.rollback_inner();
4238 Err(e)
4239 }
4240 }
4241 } else {
4242 body()
4243 }
4244 }
4245
4246 #[cfg(not(feature = "lpg"))]
4248 fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4249 where
4250 F: FnOnce() -> Result<QueryResult>,
4251 {
4252 body()
4253 }
4254
4255 fn query_looks_like_mutation(query: &str) -> bool {
4261 let upper = query.to_ascii_uppercase();
4262 upper.contains("INSERT")
4263 || upper.contains("CREATE")
4264 || upper.contains("DELETE")
4265 || upper.contains("MERGE")
4266 || upper.contains("SET")
4267 || upper.contains("REMOVE")
4268 || upper.contains("DROP")
4269 || upper.contains("ALTER")
4270 }
4271
4272 #[must_use]
4274 fn query_deadline(&self) -> Option<Instant> {
4275 #[cfg(not(target_arch = "wasm32"))]
4276 {
4277 self.query_timeout.map(|d| Instant::now() + d)
4278 }
4279 #[cfg(target_arch = "wasm32")]
4280 {
4281 let _ = &self.query_timeout;
4282 None
4283 }
4284 }
4285
4286 fn make_executor(&self, columns: Vec<String>) -> Executor {
4288 Executor::with_columns(columns)
4289 .with_deadline(self.query_deadline())
4290 .with_timeout_duration(self.query_timeout)
4291 }
4292
4293 #[cfg(feature = "spill")]
4298 fn make_operator_memory_context(
4299 &self,
4300 ) -> Option<grafeo_core::execution::OperatorMemoryContext> {
4301 let bm = self.buffer_manager.as_ref()?;
4302 let spill_path = bm.config().spill_path.as_ref()?;
4303 let query_id = self
4305 .commit_counter
4306 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4307 let query_dir = spill_path.join(format!("query_{query_id}"));
4308 let sm = std::sync::Arc::new(grafeo_core::execution::SpillManager::new(&query_dir).ok()?);
4309 Some(grafeo_core::execution::OperatorMemoryContext::new(
4310 std::sync::Arc::clone(bm),
4311 sm,
4312 ))
4313 }
4314
4315 fn check_property_size(&self, key: &str, value: &Value) -> Result<()> {
4317 if let Some(limit) = self.max_property_size {
4318 let size = value.estimated_size_bytes();
4319 if size > limit {
4320 let limit_display = if limit >= 1024 * 1024 && limit % (1024 * 1024) == 0 {
4321 format!("{} MiB", limit / (1024 * 1024))
4322 } else if limit >= 1024 && limit % 1024 == 0 {
4323 format!("{} KiB", limit / 1024)
4324 } else {
4325 format!("{limit} bytes")
4326 };
4327 return Err(grafeo_common::utils::error::Error::Query(
4328 grafeo_common::utils::error::QueryError::new(
4329 grafeo_common::utils::error::QueryErrorKind::Execution,
4330 format!(
4331 "Property '{key}' value exceeds maximum size of {limit_display} ({size} bytes)"
4332 ),
4333 )
4334 .with_hint(
4335 "Increase with Config::with_max_property_size() or disable with Config::without_max_property_size()".to_string(),
4336 ),
4337 ));
4338 }
4339 }
4340 Ok(())
4341 }
4342
4343 #[cfg(feature = "metrics")]
4349 fn record_query_metrics(
4350 &self,
4351 language: &str,
4352 elapsed_ms: Option<f64>,
4353 result: &Result<crate::database::QueryResult>,
4354 ) {
4355 use crate::metrics::record_metric;
4356
4357 record_metric!(self.metrics, query_count, inc);
4358 if let Some(ref reg) = self.metrics {
4359 reg.query_count_by_language.increment(language);
4360 }
4361 if let Some(ms) = elapsed_ms {
4362 record_metric!(self.metrics, query_latency, observe ms);
4363 }
4364 match result {
4365 Ok(r) => {
4366 let returned = r.rows.len() as u64;
4367 record_metric!(self.metrics, rows_returned, add returned);
4368 if let Some(scanned) = r.rows_scanned {
4369 record_metric!(self.metrics, rows_scanned, add scanned);
4370 }
4371 }
4372 Err(e) => {
4373 record_metric!(self.metrics, query_errors, inc);
4374 let msg = e.to_string();
4376 if msg.contains("exceeded timeout") {
4377 record_metric!(self.metrics, query_timeouts, inc);
4378 }
4379 }
4380 }
4381 }
4382
4383 #[cfg(feature = "gql")]
4385 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4386 use grafeo_adapters::query::gql::ast::{Expression, Literal};
4387 match expr {
4388 Expression::Literal(Literal::Integer(n)) => Some(*n),
4389 _ => None,
4390 }
4391 }
4392
4393 #[must_use]
4399 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4400 if let Some(epoch) = *self.viewing_epoch_override.lock() {
4402 return (epoch, None);
4403 }
4404
4405 if let Some(transaction_id) = *self.current_transaction.lock() {
4406 let epoch = self
4408 .transaction_manager
4409 .start_epoch(transaction_id)
4410 .unwrap_or_else(|| self.transaction_manager.current_epoch());
4411 (epoch, Some(transaction_id))
4412 } else {
4413 (self.transaction_manager.current_epoch(), None)
4415 }
4416 }
4417
4418 fn create_planner_for_store(
4423 &self,
4424 store: Arc<dyn GraphStore>,
4425 viewing_epoch: EpochId,
4426 transaction_id: Option<TransactionId>,
4427 ) -> crate::query::Planner {
4428 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4429 }
4430
4431 fn create_planner_for_store_with_read_only(
4432 &self,
4433 store: Arc<dyn GraphStore>,
4434 viewing_epoch: EpochId,
4435 transaction_id: Option<TransactionId>,
4436 read_only: bool,
4437 ) -> crate::query::Planner {
4438 use crate::query::Planner;
4439 use grafeo_core::execution::operators::{LazyValue, SessionContext};
4440
4441 let info_store = Arc::clone(&store);
4443 let schema_store = Arc::clone(&store);
4444
4445 let session_context = SessionContext {
4446 current_schema: self.current_schema(),
4447 current_graph: self.current_graph(),
4448 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4449 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4450 };
4451
4452 let write_store = self.active_write_store();
4453
4454 let mut planner = Planner::with_context(
4455 Arc::clone(&store),
4456 write_store,
4457 Arc::clone(&self.transaction_manager),
4458 transaction_id,
4459 viewing_epoch,
4460 )
4461 .with_factorized_execution(self.factorized_execution)
4462 .with_catalog(Arc::clone(&self.catalog))
4463 .with_session_context(session_context)
4464 .with_read_only(read_only);
4465
4466 let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog))
4468 .with_store(store)
4469 .with_max_property_size(self.max_property_size);
4470 planner = planner.with_validator(Arc::new(validator));
4471
4472 planner
4473 }
4474
4475 fn build_info_value(store: &dyn GraphStore) -> Value {
4477 use grafeo_common::types::PropertyKey;
4478 use std::collections::BTreeMap;
4479
4480 let mut map = BTreeMap::new();
4481 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4482 #[allow(clippy::cast_possible_wrap)]
4484 let node_count = store.node_count() as i64;
4485 #[allow(clippy::cast_possible_wrap)]
4487 let edge_count = store.edge_count() as i64;
4488 map.insert(PropertyKey::from("node_count"), Value::Int64(node_count));
4489 map.insert(PropertyKey::from("edge_count"), Value::Int64(edge_count));
4490 map.insert(
4491 PropertyKey::from("version"),
4492 Value::String(env!("CARGO_PKG_VERSION").into()),
4493 );
4494 Value::Map(map.into())
4495 }
4496
4497 fn build_schema_value(store: &dyn GraphStore) -> Value {
4499 use grafeo_common::types::PropertyKey;
4500 use std::collections::BTreeMap;
4501
4502 let labels: Vec<Value> = store
4503 .all_labels()
4504 .into_iter()
4505 .map(|l| Value::String(l.into()))
4506 .collect();
4507 let edge_types: Vec<Value> = store
4508 .all_edge_types()
4509 .into_iter()
4510 .map(|t| Value::String(t.into()))
4511 .collect();
4512 let property_keys: Vec<Value> = store
4513 .all_property_keys()
4514 .into_iter()
4515 .map(|k| Value::String(k.into()))
4516 .collect();
4517
4518 let mut map = BTreeMap::new();
4519 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4520 map.insert(
4521 PropertyKey::from("edge_types"),
4522 Value::List(edge_types.into()),
4523 );
4524 map.insert(
4525 PropertyKey::from("property_keys"),
4526 Value::List(property_keys.into()),
4527 );
4528 Value::Map(map.into())
4529 }
4530
4531 #[cfg(feature = "lpg")]
4536 pub fn create_node(&self, labels: &[&str]) -> NodeId {
4537 let (epoch, transaction_id) = self.get_transaction_context();
4538 self.active_lpg_store().create_node_versioned(
4539 labels,
4540 epoch,
4541 transaction_id.unwrap_or(TransactionId::SYSTEM),
4542 )
4543 }
4544
4545 #[cfg(feature = "lpg")]
4553 pub fn create_node_with_props<'a>(
4554 &self,
4555 labels: &[&str],
4556 properties: impl IntoIterator<Item = (&'a str, Value)>,
4557 ) -> Result<NodeId> {
4558 let props: Vec<(&str, Value)> = properties.into_iter().collect();
4559 for (key, value) in &props {
4560 self.check_property_size(key, value)?;
4561 }
4562 let (epoch, transaction_id) = self.get_transaction_context();
4563 Ok(self.active_lpg_store().create_node_with_props_versioned(
4564 labels,
4565 props,
4566 epoch,
4567 transaction_id.unwrap_or(TransactionId::SYSTEM),
4568 ))
4569 }
4570
4571 #[cfg(feature = "lpg")]
4576 pub fn create_edge(
4577 &self,
4578 src: NodeId,
4579 dst: NodeId,
4580 edge_type: &str,
4581 ) -> grafeo_common::types::EdgeId {
4582 let (epoch, transaction_id) = self.get_transaction_context();
4583 self.active_lpg_store().create_edge_versioned(
4584 src,
4585 dst,
4586 edge_type,
4587 epoch,
4588 transaction_id.unwrap_or(TransactionId::SYSTEM),
4589 )
4590 }
4591
4592 #[cfg(feature = "lpg")]
4598 pub fn create_edge_with_props<'a>(
4599 &self,
4600 src: NodeId,
4601 dst: NodeId,
4602 edge_type: &str,
4603 properties: impl IntoIterator<Item = (&'a str, Value)>,
4604 ) -> Result<grafeo_common::types::EdgeId> {
4605 let props: Vec<(&str, Value)> = properties.into_iter().collect();
4606 for (key, value) in &props {
4607 self.check_property_size(key, value)?;
4608 }
4609 let (epoch, transaction_id) = self.get_transaction_context();
4610 let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4611 let store = self.active_lpg_store();
4612 let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4613 for (key, value) in props {
4614 store.set_edge_property_versioned(eid, key, value, tid);
4615 }
4616 Ok(eid)
4617 }
4618
4619 #[cfg(feature = "lpg")]
4625 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) -> Result<()> {
4626 self.check_property_size(key, &value)?;
4627 let (_, transaction_id) = self.get_transaction_context();
4628 if let Some(tid) = transaction_id {
4629 self.active_lpg_store()
4630 .set_node_property_versioned(id, key, value, tid);
4631 } else {
4632 self.active_lpg_store().set_node_property(id, key, value);
4633 }
4634 Ok(())
4635 }
4636
4637 #[cfg(feature = "lpg")]
4643 pub fn set_edge_property(
4644 &self,
4645 id: grafeo_common::types::EdgeId,
4646 key: &str,
4647 value: Value,
4648 ) -> Result<()> {
4649 self.check_property_size(key, &value)?;
4650 let (_, transaction_id) = self.get_transaction_context();
4651 if let Some(tid) = transaction_id {
4652 self.active_lpg_store()
4653 .set_edge_property_versioned(id, key, value, tid);
4654 } else {
4655 self.active_lpg_store().set_edge_property(id, key, value);
4656 }
4657 Ok(())
4658 }
4659
4660 #[cfg(feature = "lpg")]
4662 pub fn delete_node(&self, id: NodeId) -> bool {
4663 let (epoch, transaction_id) = self.get_transaction_context();
4664 if let Some(tid) = transaction_id {
4665 self.active_lpg_store()
4666 .delete_node_versioned(id, epoch, tid)
4667 } else {
4668 self.active_lpg_store().delete_node(id)
4669 }
4670 }
4671
4672 #[cfg(feature = "lpg")]
4674 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4675 let (epoch, transaction_id) = self.get_transaction_context();
4676 if let Some(tid) = transaction_id {
4677 self.active_lpg_store()
4678 .delete_edge_versioned(id, epoch, tid)
4679 } else {
4680 self.active_lpg_store().delete_edge(id)
4681 }
4682 }
4683
4684 #[cfg(feature = "lpg")]
4712 #[must_use]
4713 pub fn get_node(&self, id: NodeId) -> Option<Node> {
4714 let (epoch, transaction_id) = self.get_transaction_context();
4715 self.active_lpg_store().get_node_versioned(
4716 id,
4717 epoch,
4718 transaction_id.unwrap_or(TransactionId::SYSTEM),
4719 )
4720 }
4721
4722 #[cfg(feature = "lpg")]
4746 #[must_use]
4747 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4748 self.get_node(id)
4749 .and_then(|node| node.get_property(key).cloned())
4750 }
4751
4752 #[cfg(feature = "lpg")]
4759 #[must_use]
4760 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4761 let (epoch, transaction_id) = self.get_transaction_context();
4762 self.active_lpg_store().get_edge_versioned(
4763 id,
4764 epoch,
4765 transaction_id.unwrap_or(TransactionId::SYSTEM),
4766 )
4767 }
4768
4769 #[cfg(feature = "lpg")]
4795 #[must_use]
4796 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4797 self.active_lpg_store()
4798 .edges_from(node, Direction::Outgoing)
4799 .collect()
4800 }
4801
4802 #[cfg(feature = "lpg")]
4811 #[must_use]
4812 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4813 self.active_lpg_store()
4814 .edges_from(node, Direction::Incoming)
4815 .collect()
4816 }
4817
4818 #[cfg(feature = "lpg")]
4830 #[must_use]
4831 pub fn get_neighbors_outgoing_by_type(
4832 &self,
4833 node: NodeId,
4834 edge_type: &str,
4835 ) -> Vec<(NodeId, EdgeId)> {
4836 self.active_lpg_store()
4837 .edges_from(node, Direction::Outgoing)
4838 .filter(|(_, edge_id)| {
4839 self.get_edge(*edge_id)
4840 .is_some_and(|e| e.edge_type.as_str() == edge_type)
4841 })
4842 .collect()
4843 }
4844
4845 #[cfg(feature = "lpg")]
4852 #[must_use]
4853 pub fn node_exists(&self, id: NodeId) -> bool {
4854 self.get_node(id).is_some()
4855 }
4856
4857 #[cfg(feature = "lpg")]
4859 #[must_use]
4860 pub fn edge_exists(&self, id: EdgeId) -> bool {
4861 self.get_edge(id).is_some()
4862 }
4863
4864 #[cfg(feature = "lpg")]
4868 #[must_use]
4869 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4870 let active = self.active_lpg_store();
4871 let out = active.out_degree(node);
4872 let in_degree = active.in_degree(node);
4873 (out, in_degree)
4874 }
4875
4876 #[cfg(feature = "lpg")]
4886 #[must_use]
4887 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4888 let (epoch, transaction_id) = self.get_transaction_context();
4889 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4890 let active = self.active_lpg_store();
4891 ids.iter()
4892 .map(|&id| active.get_node_versioned(id, epoch, tx))
4893 .collect()
4894 }
4895
4896 #[cfg(feature = "cdc")]
4904 pub fn history(
4905 &self,
4906 entity_id: impl Into<crate::cdc::EntityId>,
4907 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4908 self.require_permission(crate::auth::StatementKind::Read)?;
4909 Ok(self.cdc_log.history(entity_id.into()))
4910 }
4911
4912 #[cfg(feature = "cdc")]
4918 pub fn history_since(
4919 &self,
4920 entity_id: impl Into<crate::cdc::EntityId>,
4921 since_epoch: EpochId,
4922 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4923 self.require_permission(crate::auth::StatementKind::Read)?;
4924 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4925 }
4926
4927 #[cfg(feature = "cdc")]
4933 pub fn changes_between(
4934 &self,
4935 start_epoch: EpochId,
4936 end_epoch: EpochId,
4937 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4938 self.require_permission(crate::auth::StatementKind::Read)?;
4939 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4940 }
4941}
4942
4943impl Drop for Session {
4944 fn drop(&mut self) {
4945 #[cfg(feature = "lpg")]
4948 if self.in_transaction() {
4949 let _ = self.rollback_inner();
4950 }
4951
4952 #[cfg(feature = "metrics")]
4953 if let Some(ref reg) = self.metrics {
4954 reg.session_active
4955 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4956 }
4957 }
4958}
4959
4960#[cfg(test)]
4961mod tests {
4962 use super::parse_default_literal;
4963 use crate::database::GrafeoDB;
4964 use grafeo_common::types::Value;
4965
4966 #[test]
4971 fn parse_default_literal_null() {
4972 assert_eq!(parse_default_literal("null"), Value::Null);
4973 assert_eq!(parse_default_literal("NULL"), Value::Null);
4974 assert_eq!(parse_default_literal("Null"), Value::Null);
4975 }
4976
4977 #[test]
4978 fn parse_default_literal_bool() {
4979 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4980 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4981 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4982 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4983 }
4984
4985 #[test]
4986 fn parse_default_literal_string_single_quoted() {
4987 assert_eq!(
4988 parse_default_literal("'hello'"),
4989 Value::String("hello".into())
4990 );
4991 }
4992
4993 #[test]
4994 fn parse_default_literal_string_double_quoted() {
4995 assert_eq!(
4996 parse_default_literal("\"world\""),
4997 Value::String("world".into())
4998 );
4999 }
5000
5001 #[test]
5002 fn parse_default_literal_integer() {
5003 assert_eq!(parse_default_literal("42"), Value::Int64(42));
5004 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
5005 assert_eq!(parse_default_literal("0"), Value::Int64(0));
5006 }
5007
5008 #[test]
5009 fn parse_default_literal_float() {
5010 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
5011 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
5012 }
5013
5014 #[test]
5015 fn parse_default_literal_fallback_string() {
5016 assert_eq!(
5018 parse_default_literal("some_identifier"),
5019 Value::String("some_identifier".into())
5020 );
5021 }
5022
5023 #[test]
5024 fn test_session_create_node() {
5025 let db = GrafeoDB::new_in_memory();
5026 let session = db.session();
5027
5028 let id = session.create_node(&["Person"]);
5029 assert!(id.is_valid());
5030 assert_eq!(db.node_count(), 1);
5031 }
5032
5033 #[test]
5034 fn test_session_transaction() {
5035 let db = GrafeoDB::new_in_memory();
5036 let mut session = db.session();
5037
5038 assert!(!session.in_transaction());
5039
5040 session.begin_transaction().unwrap();
5041 assert!(session.in_transaction());
5042
5043 session.commit().unwrap();
5044 assert!(!session.in_transaction());
5045 }
5046
5047 #[test]
5048 fn test_session_transaction_context() {
5049 let db = GrafeoDB::new_in_memory();
5050 let mut session = db.session();
5051
5052 let (_epoch1, transaction_id1) = session.get_transaction_context();
5054 assert!(transaction_id1.is_none());
5055
5056 session.begin_transaction().unwrap();
5058 let (epoch2, transaction_id2) = session.get_transaction_context();
5059 assert!(transaction_id2.is_some());
5060 let _ = epoch2; session.commit().unwrap();
5065 let (epoch3, tx_id3) = session.get_transaction_context();
5066 assert!(tx_id3.is_none());
5067 assert!(epoch3.as_u64() >= epoch2.as_u64());
5069 }
5070
5071 #[test]
5072 fn test_session_rollback() {
5073 let db = GrafeoDB::new_in_memory();
5074 let mut session = db.session();
5075
5076 session.begin_transaction().unwrap();
5077 session.rollback().unwrap();
5078 assert!(!session.in_transaction());
5079 }
5080
5081 #[test]
5082 fn test_session_rollback_discards_versions() {
5083 use grafeo_common::types::TransactionId;
5084
5085 let db = GrafeoDB::new_in_memory();
5086
5087 let node_before = db.store().create_node(&["Person"]);
5089 assert!(node_before.is_valid());
5090 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5091
5092 let mut session = db.session();
5094 session.begin_transaction().unwrap();
5095 let transaction_id = session.current_transaction.lock().unwrap();
5096
5097 let epoch = db.store().current_epoch();
5099 let node_in_tx = db
5100 .store()
5101 .create_node_versioned(&["Person"], epoch, transaction_id);
5102 assert!(node_in_tx.is_valid());
5103
5104 assert_eq!(
5108 db.node_count(),
5109 1,
5110 "PENDING nodes should be invisible to non-versioned node_count()"
5111 );
5112 assert!(
5113 db.store()
5114 .get_node_versioned(node_in_tx, epoch, transaction_id)
5115 .is_some(),
5116 "Transaction node should be visible to its own transaction"
5117 );
5118
5119 session.rollback().unwrap();
5121 assert!(!session.in_transaction());
5122
5123 let count_after = db.node_count();
5126 assert_eq!(
5127 count_after, 1,
5128 "Rollback should discard uncommitted node, but got {count_after}"
5129 );
5130
5131 let current_epoch = db.store().current_epoch();
5133 assert!(
5134 db.store()
5135 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
5136 .is_some(),
5137 "Original node should still exist"
5138 );
5139
5140 assert!(
5142 db.store()
5143 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
5144 .is_none(),
5145 "Transaction node should be gone"
5146 );
5147 }
5148
5149 #[test]
5150 fn test_session_create_node_in_transaction() {
5151 let db = GrafeoDB::new_in_memory();
5153
5154 let node_before = db.create_node(&["Person"]);
5156 assert!(node_before.is_valid());
5157 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5158
5159 let mut session = db.session();
5161 session.begin_transaction().unwrap();
5162 let transaction_id = session.current_transaction.lock().unwrap();
5163
5164 let node_in_tx = session.create_node(&["Person"]);
5166 assert!(node_in_tx.is_valid());
5167
5168 assert_eq!(
5171 db.node_count(),
5172 1,
5173 "PENDING nodes should be invisible to non-versioned node_count()"
5174 );
5175 let epoch = db.store().current_epoch();
5176 assert!(
5177 db.store()
5178 .get_node_versioned(node_in_tx, epoch, transaction_id)
5179 .is_some(),
5180 "Transaction node should be visible to its own transaction"
5181 );
5182
5183 session.rollback().unwrap();
5185
5186 let count_after = db.node_count();
5188 assert_eq!(
5189 count_after, 1,
5190 "Rollback should discard node created via session.create_node(), but got {count_after}"
5191 );
5192 }
5193
5194 #[test]
5195 fn test_session_create_node_with_props_in_transaction() {
5196 use grafeo_common::types::Value;
5197
5198 let db = GrafeoDB::new_in_memory();
5200
5201 db.create_node(&["Person"]);
5203 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5204
5205 let mut session = db.session();
5207 session.begin_transaction().unwrap();
5208 let transaction_id = session.current_transaction.lock().unwrap();
5209
5210 let node_in_tx = session
5211 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5212 .unwrap();
5213 assert!(node_in_tx.is_valid());
5214
5215 assert_eq!(
5218 db.node_count(),
5219 1,
5220 "PENDING nodes should be invisible to non-versioned node_count()"
5221 );
5222 let epoch = db.store().current_epoch();
5223 assert!(
5224 db.store()
5225 .get_node_versioned(node_in_tx, epoch, transaction_id)
5226 .is_some(),
5227 "Transaction node should be visible to its own transaction"
5228 );
5229
5230 session.rollback().unwrap();
5232
5233 let count_after = db.node_count();
5235 assert_eq!(
5236 count_after, 1,
5237 "Rollback should discard node created via session.create_node_with_props()"
5238 );
5239 }
5240
5241 #[cfg(feature = "gql")]
5242 mod gql_tests {
5243 use super::*;
5244
5245 #[test]
5246 fn test_gql_query_execution() {
5247 let db = GrafeoDB::new_in_memory();
5248 let session = db.session();
5249
5250 session.create_node(&["Person"]);
5252 session.create_node(&["Person"]);
5253 session.create_node(&["Animal"]);
5254
5255 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5257
5258 assert_eq!(result.row_count(), 2);
5260 assert_eq!(result.column_count(), 1);
5261 assert_eq!(result.columns[0], "n");
5262 }
5263
5264 #[test]
5265 fn test_gql_empty_result() {
5266 let db = GrafeoDB::new_in_memory();
5267 let session = db.session();
5268
5269 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5271
5272 assert_eq!(result.row_count(), 0);
5273 }
5274
5275 #[test]
5276 fn test_gql_parse_error() {
5277 let db = GrafeoDB::new_in_memory();
5278 let session = db.session();
5279
5280 let result = session.execute("MATCH (n RETURN n");
5282
5283 assert!(result.is_err());
5284 }
5285
5286 #[test]
5287 fn test_gql_relationship_traversal() {
5288 let db = GrafeoDB::new_in_memory();
5289 let session = db.session();
5290
5291 let alix = session.create_node(&["Person"]);
5293 let gus = session.create_node(&["Person"]);
5294 let vincent = session.create_node(&["Person"]);
5295
5296 session.create_edge(alix, gus, "KNOWS");
5297 session.create_edge(alix, vincent, "KNOWS");
5298
5299 let result = session
5301 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5302 .unwrap();
5303
5304 assert_eq!(result.row_count(), 2);
5306 assert_eq!(result.column_count(), 2);
5307 assert_eq!(result.columns[0], "a");
5308 assert_eq!(result.columns[1], "b");
5309 }
5310
5311 #[test]
5312 fn test_gql_relationship_with_type_filter() {
5313 let db = GrafeoDB::new_in_memory();
5314 let session = db.session();
5315
5316 let alix = session.create_node(&["Person"]);
5318 let gus = session.create_node(&["Person"]);
5319 let vincent = session.create_node(&["Person"]);
5320
5321 session.create_edge(alix, gus, "KNOWS");
5322 session.create_edge(alix, vincent, "WORKS_WITH");
5323
5324 let result = session
5326 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5327 .unwrap();
5328
5329 assert_eq!(result.row_count(), 1);
5331 }
5332
5333 #[test]
5334 fn test_gql_semantic_error_undefined_variable() {
5335 let db = GrafeoDB::new_in_memory();
5336 let session = db.session();
5337
5338 let result = session.execute("MATCH (n:Person) RETURN x");
5340
5341 assert!(result.is_err());
5343 let Err(err) = result else {
5344 panic!("Expected error")
5345 };
5346 assert!(
5347 err.to_string().contains("Undefined variable"),
5348 "Expected undefined variable error, got: {}",
5349 err
5350 );
5351 }
5352
5353 #[test]
5354 fn test_gql_where_clause_property_filter() {
5355 use grafeo_common::types::Value;
5356
5357 let db = GrafeoDB::new_in_memory();
5358 let session = db.session();
5359
5360 session
5362 .create_node_with_props(&["Person"], [("age", Value::Int64(25))])
5363 .unwrap();
5364 session
5365 .create_node_with_props(&["Person"], [("age", Value::Int64(35))])
5366 .unwrap();
5367 session
5368 .create_node_with_props(&["Person"], [("age", Value::Int64(45))])
5369 .unwrap();
5370
5371 let result = session
5373 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5374 .unwrap();
5375
5376 assert_eq!(result.row_count(), 2);
5378 }
5379
5380 #[test]
5381 fn test_gql_where_clause_equality() {
5382 use grafeo_common::types::Value;
5383
5384 let db = GrafeoDB::new_in_memory();
5385 let session = db.session();
5386
5387 session
5389 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5390 .unwrap();
5391 session
5392 .create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))])
5393 .unwrap();
5394 session
5395 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5396 .unwrap();
5397
5398 let result = session
5400 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5401 .unwrap();
5402
5403 assert_eq!(result.row_count(), 2);
5405 }
5406
5407 #[test]
5408 fn test_gql_return_property_access() {
5409 use grafeo_common::types::Value;
5410
5411 let db = GrafeoDB::new_in_memory();
5412 let session = db.session();
5413
5414 session
5416 .create_node_with_props(
5417 &["Person"],
5418 [
5419 ("name", Value::String("Alix".into())),
5420 ("age", Value::Int64(30)),
5421 ],
5422 )
5423 .unwrap();
5424 session
5425 .create_node_with_props(
5426 &["Person"],
5427 [
5428 ("name", Value::String("Gus".into())),
5429 ("age", Value::Int64(25)),
5430 ],
5431 )
5432 .unwrap();
5433
5434 let result = session
5436 .execute("MATCH (n:Person) RETURN n.name, n.age")
5437 .unwrap();
5438
5439 assert_eq!(result.row_count(), 2);
5441 assert_eq!(result.column_count(), 2);
5442 assert_eq!(result.columns[0], "n.name");
5443 assert_eq!(result.columns[1], "n.age");
5444
5445 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5447 assert!(names.contains(&&Value::String("Alix".into())));
5448 assert!(names.contains(&&Value::String("Gus".into())));
5449 }
5450
5451 #[test]
5452 fn test_gql_return_mixed_expressions() {
5453 use grafeo_common::types::Value;
5454
5455 let db = GrafeoDB::new_in_memory();
5456 let session = db.session();
5457
5458 session
5460 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5461 .unwrap();
5462
5463 let result = session
5465 .execute("MATCH (n:Person) RETURN n, n.name")
5466 .unwrap();
5467
5468 assert_eq!(result.row_count(), 1);
5469 assert_eq!(result.column_count(), 2);
5470 assert_eq!(result.columns[0], "n");
5471 assert_eq!(result.columns[1], "n.name");
5472
5473 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5475 }
5476 }
5477
5478 #[cfg(feature = "cypher")]
5479 mod cypher_tests {
5480 use super::*;
5481
5482 #[test]
5483 fn test_cypher_query_execution() {
5484 let db = GrafeoDB::new_in_memory();
5485 let session = db.session();
5486
5487 session.create_node(&["Person"]);
5489 session.create_node(&["Person"]);
5490 session.create_node(&["Animal"]);
5491
5492 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5494
5495 assert_eq!(result.row_count(), 2);
5497 assert_eq!(result.column_count(), 1);
5498 assert_eq!(result.columns[0], "n");
5499 }
5500
5501 #[test]
5502 fn test_cypher_empty_result() {
5503 let db = GrafeoDB::new_in_memory();
5504 let session = db.session();
5505
5506 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5508
5509 assert_eq!(result.row_count(), 0);
5510 }
5511
5512 #[test]
5513 fn test_cypher_parse_error() {
5514 let db = GrafeoDB::new_in_memory();
5515 let session = db.session();
5516
5517 let result = session.execute_cypher("MATCH (n RETURN n");
5519
5520 assert!(result.is_err());
5521 }
5522 }
5523
5524 mod direct_lookup_tests {
5527 use super::*;
5528 use grafeo_common::types::Value;
5529
5530 #[test]
5531 fn test_get_node() {
5532 let db = GrafeoDB::new_in_memory();
5533 let session = db.session();
5534
5535 let id = session.create_node(&["Person"]);
5536 let node = session.get_node(id);
5537
5538 assert!(node.is_some());
5539 let node = node.unwrap();
5540 assert_eq!(node.id, id);
5541 }
5542
5543 #[test]
5544 fn test_get_node_not_found() {
5545 use grafeo_common::types::NodeId;
5546
5547 let db = GrafeoDB::new_in_memory();
5548 let session = db.session();
5549
5550 let node = session.get_node(NodeId::new(9999));
5552 assert!(node.is_none());
5553 }
5554
5555 #[test]
5556 fn test_get_node_property() {
5557 let db = GrafeoDB::new_in_memory();
5558 let session = db.session();
5559
5560 let id = session
5561 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5562 .unwrap();
5563
5564 let name = session.get_node_property(id, "name");
5565 assert_eq!(name, Some(Value::String("Alix".into())));
5566
5567 let missing = session.get_node_property(id, "missing");
5569 assert!(missing.is_none());
5570 }
5571
5572 #[test]
5573 fn test_get_edge() {
5574 let db = GrafeoDB::new_in_memory();
5575 let session = db.session();
5576
5577 let alix = session.create_node(&["Person"]);
5578 let gus = session.create_node(&["Person"]);
5579 let edge_id = session.create_edge(alix, gus, "KNOWS");
5580
5581 let edge = session.get_edge(edge_id);
5582 assert!(edge.is_some());
5583 let edge = edge.unwrap();
5584 assert_eq!(edge.id, edge_id);
5585 assert_eq!(edge.src, alix);
5586 assert_eq!(edge.dst, gus);
5587 }
5588
5589 #[test]
5590 fn test_get_edge_not_found() {
5591 use grafeo_common::types::EdgeId;
5592
5593 let db = GrafeoDB::new_in_memory();
5594 let session = db.session();
5595
5596 let edge = session.get_edge(EdgeId::new(9999));
5597 assert!(edge.is_none());
5598 }
5599
5600 #[test]
5601 fn test_get_neighbors_outgoing() {
5602 let db = GrafeoDB::new_in_memory();
5603 let session = db.session();
5604
5605 let alix = session.create_node(&["Person"]);
5606 let gus = session.create_node(&["Person"]);
5607 let harm = session.create_node(&["Person"]);
5608
5609 session.create_edge(alix, gus, "KNOWS");
5610 session.create_edge(alix, harm, "KNOWS");
5611
5612 let neighbors = session.get_neighbors_outgoing(alix);
5613 assert_eq!(neighbors.len(), 2);
5614
5615 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5616 assert!(neighbor_ids.contains(&gus));
5617 assert!(neighbor_ids.contains(&harm));
5618 }
5619
5620 #[test]
5621 fn test_get_neighbors_incoming() {
5622 let db = GrafeoDB::new_in_memory();
5623 let session = db.session();
5624
5625 let alix = session.create_node(&["Person"]);
5626 let gus = session.create_node(&["Person"]);
5627 let harm = session.create_node(&["Person"]);
5628
5629 session.create_edge(gus, alix, "KNOWS");
5630 session.create_edge(harm, alix, "KNOWS");
5631
5632 let neighbors = session.get_neighbors_incoming(alix);
5633 assert_eq!(neighbors.len(), 2);
5634
5635 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5636 assert!(neighbor_ids.contains(&gus));
5637 assert!(neighbor_ids.contains(&harm));
5638 }
5639
5640 #[test]
5641 fn test_get_neighbors_outgoing_by_type() {
5642 let db = GrafeoDB::new_in_memory();
5643 let session = db.session();
5644
5645 let alix = session.create_node(&["Person"]);
5646 let gus = session.create_node(&["Person"]);
5647 let company = session.create_node(&["Company"]);
5648
5649 session.create_edge(alix, gus, "KNOWS");
5650 session.create_edge(alix, company, "WORKS_AT");
5651
5652 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5653 assert_eq!(knows_neighbors.len(), 1);
5654 assert_eq!(knows_neighbors[0].0, gus);
5655
5656 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5657 assert_eq!(works_neighbors.len(), 1);
5658 assert_eq!(works_neighbors[0].0, company);
5659
5660 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5662 assert!(no_neighbors.is_empty());
5663 }
5664
5665 #[test]
5666 fn test_node_exists() {
5667 use grafeo_common::types::NodeId;
5668
5669 let db = GrafeoDB::new_in_memory();
5670 let session = db.session();
5671
5672 let id = session.create_node(&["Person"]);
5673
5674 assert!(session.node_exists(id));
5675 assert!(!session.node_exists(NodeId::new(9999)));
5676 }
5677
5678 #[test]
5679 fn test_edge_exists() {
5680 use grafeo_common::types::EdgeId;
5681
5682 let db = GrafeoDB::new_in_memory();
5683 let session = db.session();
5684
5685 let alix = session.create_node(&["Person"]);
5686 let gus = session.create_node(&["Person"]);
5687 let edge_id = session.create_edge(alix, gus, "KNOWS");
5688
5689 assert!(session.edge_exists(edge_id));
5690 assert!(!session.edge_exists(EdgeId::new(9999)));
5691 }
5692
5693 #[test]
5694 fn test_get_degree() {
5695 let db = GrafeoDB::new_in_memory();
5696 let session = db.session();
5697
5698 let alix = session.create_node(&["Person"]);
5699 let gus = session.create_node(&["Person"]);
5700 let harm = session.create_node(&["Person"]);
5701
5702 session.create_edge(alix, gus, "KNOWS");
5704 session.create_edge(alix, harm, "KNOWS");
5705 session.create_edge(gus, alix, "KNOWS");
5707
5708 let (out_degree, in_degree) = session.get_degree(alix);
5709 assert_eq!(out_degree, 2);
5710 assert_eq!(in_degree, 1);
5711
5712 let lonely = session.create_node(&["Person"]);
5714 let (out, in_deg) = session.get_degree(lonely);
5715 assert_eq!(out, 0);
5716 assert_eq!(in_deg, 0);
5717 }
5718
5719 #[test]
5720 fn test_get_nodes_batch() {
5721 let db = GrafeoDB::new_in_memory();
5722 let session = db.session();
5723
5724 let alix = session.create_node(&["Person"]);
5725 let gus = session.create_node(&["Person"]);
5726 let harm = session.create_node(&["Person"]);
5727
5728 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5729 assert_eq!(nodes.len(), 3);
5730 assert!(nodes[0].is_some());
5731 assert!(nodes[1].is_some());
5732 assert!(nodes[2].is_some());
5733
5734 use grafeo_common::types::NodeId;
5736 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5737 assert_eq!(nodes_with_missing.len(), 3);
5738 assert!(nodes_with_missing[0].is_some());
5739 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
5741 }
5742
5743 #[test]
5744 fn test_auto_commit_setting() {
5745 let db = GrafeoDB::new_in_memory();
5746 let mut session = db.session();
5747
5748 assert!(session.auto_commit());
5750
5751 session.set_auto_commit(false);
5752 assert!(!session.auto_commit());
5753
5754 session.set_auto_commit(true);
5755 assert!(session.auto_commit());
5756 }
5757
5758 #[test]
5759 fn test_transaction_double_begin_nests() {
5760 let db = GrafeoDB::new_in_memory();
5761 let mut session = db.session();
5762
5763 session.begin_transaction().unwrap();
5764 let result = session.begin_transaction();
5766 assert!(result.is_ok());
5767 session.commit().unwrap();
5769 session.commit().unwrap();
5771 }
5772
5773 #[test]
5774 fn test_commit_without_transaction_error() {
5775 let db = GrafeoDB::new_in_memory();
5776 let mut session = db.session();
5777
5778 let result = session.commit();
5779 assert!(result.is_err());
5780 }
5781
5782 #[test]
5783 fn test_rollback_without_transaction_error() {
5784 let db = GrafeoDB::new_in_memory();
5785 let mut session = db.session();
5786
5787 let result = session.rollback();
5788 assert!(result.is_err());
5789 }
5790
5791 #[test]
5792 fn test_create_edge_in_transaction() {
5793 let db = GrafeoDB::new_in_memory();
5794 let mut session = db.session();
5795
5796 let alix = session.create_node(&["Person"]);
5798 let gus = session.create_node(&["Person"]);
5799
5800 session.begin_transaction().unwrap();
5802 let edge_id = session.create_edge(alix, gus, "KNOWS");
5803
5804 assert!(session.edge_exists(edge_id));
5806
5807 session.commit().unwrap();
5809
5810 assert!(session.edge_exists(edge_id));
5812 }
5813
5814 #[test]
5815 fn test_neighbors_empty_node() {
5816 let db = GrafeoDB::new_in_memory();
5817 let session = db.session();
5818
5819 let lonely = session.create_node(&["Person"]);
5820
5821 assert!(session.get_neighbors_outgoing(lonely).is_empty());
5822 assert!(session.get_neighbors_incoming(lonely).is_empty());
5823 assert!(
5824 session
5825 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5826 .is_empty()
5827 );
5828 }
5829 }
5830
5831 #[test]
5832 fn test_auto_gc_triggers_on_commit_interval() {
5833 use crate::config::Config;
5834
5835 let config = Config::in_memory().with_gc_interval(2);
5836 let db = GrafeoDB::with_config(config).unwrap();
5837 let mut session = db.session();
5838
5839 session.begin_transaction().unwrap();
5841 session.create_node(&["A"]);
5842 session.commit().unwrap();
5843
5844 session.begin_transaction().unwrap();
5846 session.create_node(&["B"]);
5847 session.commit().unwrap();
5848
5849 assert_eq!(db.node_count(), 2);
5851 }
5852
5853 #[test]
5854 fn test_query_timeout_config_propagates_to_session() {
5855 use crate::config::Config;
5856 use std::time::Duration;
5857
5858 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5859 let db = GrafeoDB::with_config(config).unwrap();
5860 let session = db.session();
5861
5862 assert!(session.query_deadline().is_some());
5864 }
5865
5866 #[test]
5867 fn test_default_query_timeout_returns_deadline() {
5868 let db = GrafeoDB::new_in_memory();
5869 let session = db.session();
5870
5871 assert!(session.query_deadline().is_some());
5873 }
5874
5875 #[test]
5876 fn test_no_query_timeout_returns_no_deadline() {
5877 use crate::config::Config;
5878
5879 let config = Config::in_memory().without_query_timeout();
5880 let db = GrafeoDB::with_config(config).unwrap();
5881 let session = db.session();
5882
5883 assert!(session.query_deadline().is_none());
5884 }
5885
5886 #[test]
5887 fn test_graph_model_accessor() {
5888 use crate::config::GraphModel;
5889
5890 let db = GrafeoDB::new_in_memory();
5891 let session = db.session();
5892
5893 assert_eq!(session.graph_model(), GraphModel::Lpg);
5894 }
5895
5896 #[test]
5897 fn test_reject_oversized_property() {
5898 use crate::config::Config;
5899
5900 let config = Config::in_memory().with_max_property_size(100);
5901 let db = GrafeoDB::with_config(config).unwrap();
5902 let session = db.session();
5903
5904 let node = session.create_node(&["Test"]);
5905
5906 session
5908 .set_node_property(node, "small", Value::from("hello"))
5909 .unwrap();
5910
5911 let big = "x".repeat(200);
5913 let result = session.set_node_property(node, "big", Value::from(big.as_str()));
5914 assert!(result.is_err());
5915 let err = result.unwrap_err().to_string();
5916 assert!(
5917 err.contains("exceeds maximum size"),
5918 "Expected size error, got: {err}"
5919 );
5920 }
5921
5922 #[test]
5923 fn test_no_property_size_limit() {
5924 use crate::config::Config;
5925
5926 let config = Config::in_memory().without_max_property_size();
5927 let db = GrafeoDB::with_config(config).unwrap();
5928 let session = db.session();
5929
5930 let node = session.create_node(&["Test"]);
5931
5932 let big = "x".repeat(10_000);
5934 session
5935 .set_node_property(node, "big", Value::from(big.as_str()))
5936 .unwrap();
5937 }
5938
5939 #[cfg(feature = "gql")]
5940 #[test]
5941 fn test_external_store_session() {
5942 use grafeo_core::graph::GraphStoreMut;
5943 use std::sync::Arc;
5944
5945 let config = crate::config::Config::in_memory();
5946 let store =
5947 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5948 let db = GrafeoDB::with_store(store, config).unwrap();
5949
5950 let mut session = db.session();
5951
5952 session.begin_transaction().unwrap();
5956 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5957
5958 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5960 assert_eq!(result.row_count(), 1);
5961
5962 session.commit().unwrap();
5963 }
5964
5965 #[cfg(feature = "gql")]
5968 mod session_command_tests {
5969 use super::*;
5970 use grafeo_common::types::Value;
5971
5972 #[test]
5973 fn test_use_graph_sets_current_graph() {
5974 let db = GrafeoDB::new_in_memory();
5975 let session = db.session();
5976
5977 session.execute("CREATE GRAPH mydb").unwrap();
5979 session.execute("USE GRAPH mydb").unwrap();
5980
5981 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5982 }
5983
5984 #[test]
5985 fn test_use_graph_nonexistent_errors() {
5986 let db = GrafeoDB::new_in_memory();
5987 let session = db.session();
5988
5989 let result = session.execute("USE GRAPH doesnotexist");
5990 assert!(result.is_err());
5991 let err = result.unwrap_err().to_string();
5992 assert!(
5993 err.contains("does not exist"),
5994 "Expected 'does not exist' error, got: {err}"
5995 );
5996 }
5997
5998 #[test]
5999 fn test_use_graph_default_always_valid() {
6000 let db = GrafeoDB::new_in_memory();
6001 let session = db.session();
6002
6003 session.execute("USE GRAPH default").unwrap();
6005 assert_eq!(session.current_graph(), Some("default".to_string()));
6006 }
6007
6008 #[test]
6009 fn test_session_set_graph() {
6010 let db = GrafeoDB::new_in_memory();
6011 let session = db.session();
6012
6013 session.execute("CREATE GRAPH analytics").unwrap();
6014 session.execute("SESSION SET GRAPH analytics").unwrap();
6015 assert_eq!(session.current_graph(), Some("analytics".to_string()));
6016 }
6017
6018 #[test]
6019 fn test_session_set_graph_nonexistent_errors() {
6020 let db = GrafeoDB::new_in_memory();
6021 let session = db.session();
6022
6023 let result = session.execute("SESSION SET GRAPH nosuchgraph");
6024 assert!(result.is_err());
6025 }
6026
6027 #[test]
6028 fn test_session_set_time_zone() {
6029 let db = GrafeoDB::new_in_memory();
6030 let session = db.session();
6031
6032 assert_eq!(session.time_zone(), None);
6033
6034 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6035 assert_eq!(session.time_zone(), Some("UTC".to_string()));
6036
6037 session
6038 .execute("SESSION SET TIME ZONE 'America/New_York'")
6039 .unwrap();
6040 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
6041 }
6042
6043 #[test]
6044 fn test_session_set_parameter() {
6045 let db = GrafeoDB::new_in_memory();
6046 let session = db.session();
6047
6048 session
6049 .execute("SESSION SET PARAMETER $timeout = 30")
6050 .unwrap();
6051
6052 assert!(session.get_parameter("timeout").is_some());
6055 }
6056
6057 #[test]
6058 fn test_session_reset_clears_all_state() {
6059 let db = GrafeoDB::new_in_memory();
6060 let session = db.session();
6061
6062 session.execute("CREATE GRAPH analytics").unwrap();
6064 session.execute("SESSION SET GRAPH analytics").unwrap();
6065 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6066 session
6067 .execute("SESSION SET PARAMETER $limit = 100")
6068 .unwrap();
6069
6070 assert!(session.current_graph().is_some());
6072 assert!(session.time_zone().is_some());
6073 assert!(session.get_parameter("limit").is_some());
6074
6075 session.execute("SESSION RESET").unwrap();
6077
6078 assert_eq!(session.current_graph(), None);
6079 assert_eq!(session.time_zone(), None);
6080 assert!(session.get_parameter("limit").is_none());
6081 }
6082
6083 #[test]
6084 fn test_session_close_clears_state() {
6085 let db = GrafeoDB::new_in_memory();
6086 let session = db.session();
6087
6088 session.execute("CREATE GRAPH analytics").unwrap();
6089 session.execute("SESSION SET GRAPH analytics").unwrap();
6090 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6091
6092 session.execute("SESSION CLOSE").unwrap();
6093
6094 assert_eq!(session.current_graph(), None);
6095 assert_eq!(session.time_zone(), None);
6096 }
6097
6098 #[test]
6099 fn test_create_graph() {
6100 let db = GrafeoDB::new_in_memory();
6101 let session = db.session();
6102
6103 session.execute("CREATE GRAPH mydb").unwrap();
6104
6105 session.execute("USE GRAPH mydb").unwrap();
6107 assert_eq!(session.current_graph(), Some("mydb".to_string()));
6108 }
6109
6110 #[test]
6111 fn test_create_graph_duplicate_errors() {
6112 let db = GrafeoDB::new_in_memory();
6113 let session = db.session();
6114
6115 session.execute("CREATE GRAPH mydb").unwrap();
6116 let result = session.execute("CREATE GRAPH mydb");
6117
6118 assert!(result.is_err());
6119 let err = result.unwrap_err().to_string();
6120 assert!(
6121 err.contains("already exists"),
6122 "Expected 'already exists' error, got: {err}"
6123 );
6124 }
6125
6126 #[test]
6127 fn test_create_graph_if_not_exists() {
6128 let db = GrafeoDB::new_in_memory();
6129 let session = db.session();
6130
6131 session.execute("CREATE GRAPH mydb").unwrap();
6132 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
6134 }
6135
6136 #[test]
6137 fn test_drop_graph() {
6138 let db = GrafeoDB::new_in_memory();
6139 let session = db.session();
6140
6141 session.execute("CREATE GRAPH mydb").unwrap();
6142 session.execute("DROP GRAPH mydb").unwrap();
6143
6144 let result = session.execute("USE GRAPH mydb");
6146 assert!(result.is_err());
6147 }
6148
6149 #[test]
6150 fn test_drop_graph_nonexistent_errors() {
6151 let db = GrafeoDB::new_in_memory();
6152 let session = db.session();
6153
6154 let result = session.execute("DROP GRAPH nosuchgraph");
6155 assert!(result.is_err());
6156 let err = result.unwrap_err().to_string();
6157 assert!(
6158 err.contains("does not exist"),
6159 "Expected 'does not exist' error, got: {err}"
6160 );
6161 }
6162
6163 #[test]
6164 fn test_drop_graph_if_exists() {
6165 let db = GrafeoDB::new_in_memory();
6166 let session = db.session();
6167
6168 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
6170 }
6171
6172 #[test]
6173 fn test_start_transaction_via_gql() {
6174 let db = GrafeoDB::new_in_memory();
6175 let session = db.session();
6176
6177 session.execute("START TRANSACTION").unwrap();
6178 assert!(session.in_transaction());
6179 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6180 session.execute("COMMIT").unwrap();
6181 assert!(!session.in_transaction());
6182
6183 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6184 assert_eq!(result.rows.len(), 1);
6185 }
6186
6187 #[test]
6188 fn test_start_transaction_read_only_blocks_insert() {
6189 let db = GrafeoDB::new_in_memory();
6190 let session = db.session();
6191
6192 session.execute("START TRANSACTION READ ONLY").unwrap();
6193 let result = session.execute("INSERT (:Person {name: 'Alix'})");
6194 assert!(result.is_err());
6195 let err = result.unwrap_err().to_string();
6196 assert!(
6197 err.contains("read-only"),
6198 "Expected read-only error, got: {err}"
6199 );
6200 session.execute("ROLLBACK").unwrap();
6201 }
6202
6203 #[test]
6204 fn test_start_transaction_read_only_allows_reads() {
6205 let db = GrafeoDB::new_in_memory();
6206 let mut session = db.session();
6207 session.begin_transaction().unwrap();
6208 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6209 session.commit().unwrap();
6210
6211 session.execute("START TRANSACTION READ ONLY").unwrap();
6212 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6213 assert_eq!(result.rows.len(), 1);
6214 session.execute("COMMIT").unwrap();
6215 }
6216
6217 #[test]
6218 fn test_rollback_via_gql() {
6219 let db = GrafeoDB::new_in_memory();
6220 let session = db.session();
6221
6222 session.execute("START TRANSACTION").unwrap();
6223 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6224 session.execute("ROLLBACK").unwrap();
6225
6226 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6227 assert!(result.rows.is_empty());
6228 }
6229
6230 #[test]
6231 fn test_start_transaction_with_isolation_level() {
6232 let db = GrafeoDB::new_in_memory();
6233 let session = db.session();
6234
6235 session
6236 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6237 .unwrap();
6238 assert!(session.in_transaction());
6239 session.execute("ROLLBACK").unwrap();
6240 }
6241
6242 #[test]
6243 fn test_session_commands_return_empty_result() {
6244 let db = GrafeoDB::new_in_memory();
6245 let session = db.session();
6246
6247 session.execute("CREATE GRAPH test").unwrap();
6248 let result = session.execute("SESSION SET GRAPH test").unwrap();
6249 assert_eq!(result.row_count(), 0);
6250 assert_eq!(result.column_count(), 0);
6251 }
6252
6253 #[test]
6254 fn test_current_graph_default_is_none() {
6255 let db = GrafeoDB::new_in_memory();
6256 let session = db.session();
6257
6258 assert_eq!(session.current_graph(), None);
6259 }
6260
6261 #[test]
6262 fn test_time_zone_default_is_none() {
6263 let db = GrafeoDB::new_in_memory();
6264 let session = db.session();
6265
6266 assert_eq!(session.time_zone(), None);
6267 }
6268
6269 #[test]
6270 fn test_session_state_independent_across_sessions() {
6271 let db = GrafeoDB::new_in_memory();
6272 let session1 = db.session();
6273 let session2 = db.session();
6274
6275 session1.execute("CREATE GRAPH first").unwrap();
6276 session1.execute("CREATE GRAPH second").unwrap();
6277 session1.execute("SESSION SET GRAPH first").unwrap();
6278 session2.execute("SESSION SET GRAPH second").unwrap();
6279
6280 assert_eq!(session1.current_graph(), Some("first".to_string()));
6281 assert_eq!(session2.current_graph(), Some("second".to_string()));
6282 }
6283
6284 #[test]
6285 fn test_show_node_types() {
6286 let db = GrafeoDB::new_in_memory();
6287 let session = db.session();
6288
6289 session
6290 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6291 .unwrap();
6292
6293 let result = session.execute("SHOW NODE TYPES").unwrap();
6294 assert_eq!(
6295 result.columns,
6296 vec!["name", "properties", "constraints", "parents"]
6297 );
6298 assert_eq!(result.rows.len(), 1);
6299 assert_eq!(result.rows[0][0], Value::from("Person"));
6301 }
6302
6303 #[test]
6304 fn test_show_edge_types() {
6305 let db = GrafeoDB::new_in_memory();
6306 let session = db.session();
6307
6308 session
6309 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6310 .unwrap();
6311
6312 let result = session.execute("SHOW EDGE TYPES").unwrap();
6313 assert_eq!(
6314 result.columns,
6315 vec!["name", "properties", "source_types", "target_types"]
6316 );
6317 assert_eq!(result.rows.len(), 1);
6318 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6319 }
6320
6321 #[test]
6322 fn test_show_graph_types() {
6323 let db = GrafeoDB::new_in_memory();
6324 let session = db.session();
6325
6326 session
6327 .execute("CREATE NODE TYPE Person (name STRING)")
6328 .unwrap();
6329 session
6330 .execute(
6331 "CREATE GRAPH TYPE social (\
6332 NODE TYPE Person (name STRING)\
6333 )",
6334 )
6335 .unwrap();
6336
6337 let result = session.execute("SHOW GRAPH TYPES").unwrap();
6338 assert_eq!(
6339 result.columns,
6340 vec!["name", "open", "node_types", "edge_types"]
6341 );
6342 assert_eq!(result.rows.len(), 1);
6343 assert_eq!(result.rows[0][0], Value::from("social"));
6344 }
6345
6346 #[test]
6347 fn test_show_graph_type_named() {
6348 let db = GrafeoDB::new_in_memory();
6349 let session = db.session();
6350
6351 session
6352 .execute("CREATE NODE TYPE Person (name STRING)")
6353 .unwrap();
6354 session
6355 .execute(
6356 "CREATE GRAPH TYPE social (\
6357 NODE TYPE Person (name STRING)\
6358 )",
6359 )
6360 .unwrap();
6361
6362 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6363 assert_eq!(result.rows.len(), 1);
6364 assert_eq!(result.rows[0][0], Value::from("social"));
6365 }
6366
6367 #[test]
6368 fn test_show_graph_type_not_found() {
6369 let db = GrafeoDB::new_in_memory();
6370 let session = db.session();
6371
6372 let result = session.execute("SHOW GRAPH TYPE nonexistent");
6373 assert!(result.is_err());
6374 }
6375
6376 #[test]
6377 fn test_show_indexes_via_gql() {
6378 let db = GrafeoDB::new_in_memory();
6379 let session = db.session();
6380
6381 let result = session.execute("SHOW INDEXES").unwrap();
6382 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6383 }
6384
6385 #[test]
6386 fn test_show_constraints_via_gql() {
6387 let db = GrafeoDB::new_in_memory();
6388 let session = db.session();
6389
6390 let result = session.execute("SHOW CONSTRAINTS").unwrap();
6391 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6392 }
6393
6394 #[test]
6395 fn test_pattern_form_graph_type_roundtrip() {
6396 let db = GrafeoDB::new_in_memory();
6397 let session = db.session();
6398
6399 session
6401 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6402 .unwrap();
6403 session
6404 .execute("CREATE NODE TYPE City (name STRING)")
6405 .unwrap();
6406 session
6407 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6408 .unwrap();
6409 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6410
6411 session
6413 .execute(
6414 "CREATE GRAPH TYPE social (\
6415 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6416 (:Person)-[:LIVES_IN]->(:City)\
6417 )",
6418 )
6419 .unwrap();
6420
6421 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6423 assert_eq!(result.rows.len(), 1);
6424 assert_eq!(result.rows[0][0], Value::from("social"));
6425 }
6426 }
6427}