1#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut, GraphStoreSearch};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::Executor;
37use crate::query::cache::QueryCache;
38use crate::transaction::TransactionManager;
39
40const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
43
44fn parse_default_literal(text: &str) -> Value {
49 if text.eq_ignore_ascii_case("null") {
50 return Value::Null;
51 }
52 if text.eq_ignore_ascii_case("true") {
53 return Value::Bool(true);
54 }
55 if text.eq_ignore_ascii_case("false") {
56 return Value::Bool(false);
57 }
58 if (text.starts_with('\'') && text.ends_with('\''))
60 || (text.starts_with('"') && text.ends_with('"'))
61 {
62 return Value::String(text[1..text.len() - 1].into());
63 }
64 if let Ok(i) = text.parse::<i64>() {
66 return Value::Int64(i);
67 }
68 if let Ok(f) = text.parse::<f64>() {
69 return Value::Float64(f);
70 }
71 Value::String(text.into())
73}
74
75pub(crate) struct SessionConfig {
80 pub transaction_manager: Arc<TransactionManager>,
81 pub query_cache: Arc<QueryCache>,
82 pub catalog: Arc<Catalog>,
83 pub adaptive_config: AdaptiveConfig,
84 pub factorized_execution: bool,
85 pub graph_model: GraphModel,
86 pub query_timeout: Option<Duration>,
87 pub max_property_size: Option<usize>,
88 pub buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
90 pub commit_counter: Arc<AtomicUsize>,
91 pub gc_interval: usize,
92 pub read_only: bool,
94 pub identity: crate::auth::Identity,
96 #[cfg(feature = "lpg")]
98 pub projections: Arc<
99 parking_lot::RwLock<
100 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
101 >,
102 >,
103}
104
105pub struct Session {
111 #[cfg(feature = "lpg")]
113 store: Arc<LpgStore>,
114 #[cfg(feature = "lpg")]
119 lpg_backend: LpgBackend,
120 graph_store: Arc<dyn GraphStoreSearch>,
122 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
124 catalog: Arc<Catalog>,
126 #[cfg(feature = "triple-store")]
128 rdf_store: Arc<RdfStore>,
129 transaction_manager: Arc<TransactionManager>,
131 query_cache: Arc<QueryCache>,
133 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
137 read_only_tx: parking_lot::Mutex<bool>,
139 db_read_only: bool,
142 identity: crate::auth::Identity,
144 auto_commit: bool,
146 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
149 factorized_execution: bool,
151 graph_model: GraphModel,
153 query_timeout: Option<Duration>,
155 max_property_size: Option<usize>,
157 buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
159 commit_counter: Arc<AtomicUsize>,
161 gc_interval: usize,
163 transaction_start_node_count: AtomicUsize,
165 transaction_start_edge_count: AtomicUsize,
167 #[cfg(feature = "wal")]
169 wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
170 #[cfg(feature = "wal")]
172 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
173 #[cfg(feature = "cdc")]
175 cdc_log: Arc<crate::cdc::CdcLog>,
176 #[cfg(feature = "cdc")]
179 cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
180 current_graph: parking_lot::Mutex<Option<String>>,
182 current_schema: parking_lot::Mutex<Option<String>>,
185 time_zone: parking_lot::Mutex<Option<String>>,
187 session_params:
189 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
190 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
192 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
194 transaction_nesting_depth: parking_lot::Mutex<u32>,
198 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
202 active_streams: AtomicUsize,
206 #[cfg(feature = "metrics")]
208 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
209 #[cfg(feature = "metrics")]
211 tx_start_time: parking_lot::Mutex<Option<Instant>>,
212 #[cfg(feature = "lpg")]
214 projections: Arc<
215 parking_lot::RwLock<
216 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
217 >,
218 >,
219}
220
221#[cfg(feature = "lpg")]
223#[derive(Clone, Copy)]
224enum LpgBackend {
225 Active,
229 Placeholder,
233}
234
235#[derive(Clone)]
237struct GraphSavepoint {
238 graph_name: Option<String>,
239 next_node_id: u64,
240 next_edge_id: u64,
241 undo_log_position: usize,
242}
243
244#[derive(Clone)]
246struct SavepointState {
247 name: String,
248 graph_snapshots: Vec<GraphSavepoint>,
249 #[allow(dead_code)]
252 active_graph: Option<String>,
253 #[cfg(feature = "cdc")]
256 cdc_event_position: usize,
257}
258
259impl Session {
260 #[cfg(feature = "lpg")]
262 #[allow(dead_code)] pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
264 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
265 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
266 Self {
267 store,
268 lpg_backend: LpgBackend::Active,
269 graph_store,
270 graph_store_mut,
271 catalog: cfg.catalog,
272 #[cfg(feature = "triple-store")]
273 rdf_store: Arc::new(RdfStore::new()),
274 transaction_manager: cfg.transaction_manager,
275 query_cache: cfg.query_cache,
276 current_transaction: parking_lot::Mutex::new(None),
277 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
278 db_read_only: cfg.read_only,
279 identity: cfg.identity,
280 auto_commit: true,
281 adaptive_config: cfg.adaptive_config,
282 factorized_execution: cfg.factorized_execution,
283 graph_model: cfg.graph_model,
284 query_timeout: cfg.query_timeout,
285 max_property_size: cfg.max_property_size,
286 buffer_manager: cfg.buffer_manager,
287 commit_counter: cfg.commit_counter,
288 gc_interval: cfg.gc_interval,
289 transaction_start_node_count: AtomicUsize::new(0),
290 transaction_start_edge_count: AtomicUsize::new(0),
291 #[cfg(feature = "wal")]
292 wal: None,
293 #[cfg(feature = "wal")]
294 wal_graph_context: None,
295 #[cfg(feature = "cdc")]
296 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
297 #[cfg(feature = "cdc")]
298 cdc_pending_events: None,
299 current_graph: parking_lot::Mutex::new(None),
300 current_schema: parking_lot::Mutex::new(None),
301 time_zone: parking_lot::Mutex::new(None),
302 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
303 viewing_epoch_override: parking_lot::Mutex::new(None),
304 savepoints: parking_lot::Mutex::new(Vec::new()),
305 transaction_nesting_depth: parking_lot::Mutex::new(0),
306 touched_graphs: parking_lot::Mutex::new(Vec::new()),
307 active_streams: AtomicUsize::new(0),
308 #[cfg(feature = "metrics")]
309 metrics: None,
310 #[cfg(feature = "metrics")]
311 tx_start_time: parking_lot::Mutex::new(None),
312 projections: cfg.projections,
313 }
314 }
315
316 #[cfg(all(feature = "compact-store", feature = "lpg"))]
322 pub(crate) fn override_stores(
323 &mut self,
324 read_store: Arc<dyn GraphStoreSearch>,
325 write_store: Option<Arc<dyn GraphStoreMut>>,
326 ) {
327 self.graph_store = read_store;
328 self.graph_store_mut = write_store;
329 }
330
331 #[cfg(all(feature = "wal", feature = "lpg"))]
336 pub(crate) fn set_wal(
337 &mut self,
338 wal: Arc<grafeo_storage::wal::LpgWal>,
339 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
340 ) {
341 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
343 Arc::clone(&self.store),
344 Arc::clone(&wal),
345 Arc::clone(&wal_graph_context),
346 ));
347 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStoreSearch>;
348 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
349 self.wal = Some(wal);
350 self.wal_graph_context = Some(wal_graph_context);
351 }
352
353 #[cfg(feature = "cdc")]
360 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
361 if let Some(ref write_store) = self.graph_store_mut {
364 let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
365 Arc::clone(write_store),
366 Arc::clone(&cdc_log),
367 ));
368 self.cdc_pending_events = Some(cdc_store.pending_events());
369 self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
370 }
371 self.cdc_log = cdc_log;
372 }
373
374 #[cfg(feature = "metrics")]
376 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
377 self.metrics = Some(metrics);
378 }
379
380 pub(crate) fn with_external_store(
389 read_store: Arc<dyn GraphStoreSearch>,
390 write_store: Option<Arc<dyn GraphStoreMut>>,
391 cfg: SessionConfig,
392 ) -> Result<Self> {
393 Ok(Self {
394 #[cfg(feature = "lpg")]
395 store: Arc::new(LpgStore::new()?),
396 #[cfg(feature = "lpg")]
397 lpg_backend: LpgBackend::Placeholder,
398 graph_store: read_store,
399 graph_store_mut: write_store,
400 catalog: cfg.catalog,
401 #[cfg(feature = "triple-store")]
402 rdf_store: Arc::new(RdfStore::new()),
403 transaction_manager: cfg.transaction_manager,
404 query_cache: cfg.query_cache,
405 current_transaction: parking_lot::Mutex::new(None),
406 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
407 db_read_only: cfg.read_only,
408 identity: cfg.identity,
409 auto_commit: true,
410 adaptive_config: cfg.adaptive_config,
411 factorized_execution: cfg.factorized_execution,
412 graph_model: cfg.graph_model,
413 query_timeout: cfg.query_timeout,
414 max_property_size: cfg.max_property_size,
415 buffer_manager: cfg.buffer_manager,
416 commit_counter: cfg.commit_counter,
417 gc_interval: cfg.gc_interval,
418 transaction_start_node_count: AtomicUsize::new(0),
419 transaction_start_edge_count: AtomicUsize::new(0),
420 #[cfg(feature = "wal")]
421 wal: None,
422 #[cfg(feature = "wal")]
423 wal_graph_context: None,
424 #[cfg(feature = "cdc")]
425 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
426 #[cfg(feature = "cdc")]
427 cdc_pending_events: None,
428 current_graph: parking_lot::Mutex::new(None),
429 current_schema: parking_lot::Mutex::new(None),
430 time_zone: parking_lot::Mutex::new(None),
431 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
432 viewing_epoch_override: parking_lot::Mutex::new(None),
433 savepoints: parking_lot::Mutex::new(Vec::new()),
434 transaction_nesting_depth: parking_lot::Mutex::new(0),
435 touched_graphs: parking_lot::Mutex::new(Vec::new()),
436 active_streams: AtomicUsize::new(0),
437 #[cfg(feature = "metrics")]
438 metrics: None,
439 #[cfg(feature = "metrics")]
440 tx_start_time: parking_lot::Mutex::new(None),
441 #[cfg(feature = "lpg")]
442 projections: cfg.projections,
443 })
444 }
445
446 #[must_use]
448 pub fn graph_model(&self) -> GraphModel {
449 self.graph_model
450 }
451
452 #[must_use]
454 pub fn identity(&self) -> &crate::auth::Identity {
455 &self.identity
456 }
457
458 pub fn use_graph(&self, name: &str) {
462 *self.current_graph.lock() = Some(name.to_string());
463 self.track_graph_touch();
464 }
465
466 #[must_use]
468 pub fn current_graph(&self) -> Option<String> {
469 self.current_graph.lock().clone()
470 }
471
472 pub fn set_schema(&self, name: &str) {
476 *self.current_schema.lock() = Some(name.to_string());
477 self.track_graph_touch();
478 }
479
480 #[must_use]
484 pub fn current_schema(&self) -> Option<String> {
485 self.current_schema.lock().clone()
486 }
487
488 fn effective_graph_key(&self, graph_name: &str) -> String {
493 let schema = self.current_schema.lock().clone();
494 match schema {
495 Some(s) => format!("{s}/{graph_name}"),
496 None => graph_name.to_string(),
497 }
498 }
499
500 fn effective_type_key(&self, type_name: &str) -> String {
504 let schema = self.current_schema.lock().clone();
505 match schema {
506 Some(s) => format!("{s}/{type_name}"),
507 None => type_name.to_string(),
508 }
509 }
510
511 fn active_graph_storage_key(&self) -> Option<String> {
515 let graph = self.current_graph.lock().clone();
516 let schema = self.current_schema.lock().clone();
517 match (&schema, &graph) {
518 (None, None) => None,
519 (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
520 (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
521 (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
522 Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
523 }
524 (None, Some(name)) => Some(name.clone()),
525 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
526 }
527 }
528
529 fn active_store(&self) -> Arc<dyn GraphStoreSearch> {
537 let key = self.active_graph_storage_key();
538 match key {
539 None => Arc::clone(&self.graph_store),
540 #[cfg(feature = "lpg")]
541 Some(ref name) => match self.store.graph(name) {
542 Some(named_store) => {
543 #[cfg(feature = "wal")]
544 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
545 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
546 named_store,
547 Arc::clone(wal),
548 name.clone(),
549 Arc::clone(ctx),
550 )) as Arc<dyn GraphStoreSearch>;
551 }
552 named_store as Arc<dyn GraphStoreSearch>
553 }
554 None => Arc::clone(&self.graph_store),
555 },
556 #[cfg(not(feature = "lpg"))]
557 Some(_) => Arc::clone(&self.graph_store),
558 }
559 }
560
561 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
566 let key = self.active_graph_storage_key();
567 match key {
568 None => self.graph_store_mut.as_ref().map(Arc::clone),
569 #[cfg(feature = "lpg")]
570 Some(ref name) => match self.store.graph(name) {
571 Some(named_store) => {
572 let mut store: Arc<dyn GraphStoreMut> = named_store;
573
574 #[cfg(feature = "wal")]
575 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
576 store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
577 self.store
579 .graph(name)
580 .unwrap_or_else(|| Arc::clone(&self.store)),
581 Arc::clone(wal),
582 name.clone(),
583 Arc::clone(ctx),
584 ));
585 }
586
587 #[cfg(feature = "cdc")]
588 if let Some(ref pending) = self.cdc_pending_events {
589 store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
590 store,
591 Arc::clone(&self.cdc_log),
592 Arc::clone(pending),
593 ));
594 }
595
596 Some(store)
597 }
598 None => self.graph_store_mut.as_ref().map(Arc::clone),
599 },
600 #[cfg(not(feature = "lpg"))]
601 Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
602 }
603 }
604
605 #[cfg(feature = "lpg")]
610 fn active_lpg_store(&self) -> Arc<LpgStore> {
611 let key = self.active_graph_storage_key();
612 match key {
613 None => Arc::clone(&self.store),
614 Some(ref name) => self
615 .store
616 .graph(name)
617 .unwrap_or_else(|| Arc::clone(&self.store)),
618 }
619 }
620
621 #[cfg(feature = "lpg")]
624 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
625 match graph_name {
626 None => Arc::clone(&self.store),
627 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
628 Some(name) => self
629 .store
630 .graph(name)
631 .unwrap_or_else(|| Arc::clone(&self.store)),
632 }
633 }
634
635 fn track_graph_touch(&self) {
644 if self.current_transaction.lock().is_some() {
645 let key = self.active_graph_storage_key();
646 let mut touched = self.touched_graphs.lock();
647 if !touched.contains(&key) {
648 touched.push(key);
649 }
650 }
651 }
652
653 pub fn set_time_zone(&self, tz: &str) {
655 *self.time_zone.lock() = Some(tz.to_string());
656 }
657
658 #[must_use]
660 pub fn time_zone(&self) -> Option<String> {
661 self.time_zone.lock().clone()
662 }
663
664 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
666 self.session_params.lock().insert(key.to_string(), value);
667 }
668
669 #[must_use]
671 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
672 self.session_params.lock().get(key).cloned()
673 }
674
675 pub fn reset_session(&self) {
677 *self.current_schema.lock() = None;
678 *self.current_graph.lock() = None;
679 *self.time_zone.lock() = None;
680 self.session_params.lock().clear();
681 *self.viewing_epoch_override.lock() = None;
682 self.track_graph_touch();
683 }
684
685 pub fn reset_schema(&self) {
687 *self.current_schema.lock() = None;
688 self.track_graph_touch();
689 }
690
691 pub fn reset_graph(&self) {
693 *self.current_graph.lock() = None;
694 self.track_graph_touch();
695 }
696
697 pub fn reset_time_zone(&self) {
699 *self.time_zone.lock() = None;
700 }
701
702 pub fn reset_parameters(&self) {
704 self.session_params.lock().clear();
705 }
706
707 pub fn set_viewing_epoch(&self, epoch: EpochId) {
715 *self.viewing_epoch_override.lock() = Some(epoch);
716 }
717
718 pub fn clear_viewing_epoch(&self) {
720 *self.viewing_epoch_override.lock() = None;
721 }
722
723 #[must_use]
725 pub fn viewing_epoch(&self) -> Option<EpochId> {
726 *self.viewing_epoch_override.lock()
727 }
728
729 #[cfg(feature = "lpg")]
733 #[must_use]
734 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
735 self.active_lpg_store().get_node_history(id)
736 }
737
738 #[cfg(feature = "lpg")]
742 #[must_use]
743 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
744 self.active_lpg_store().get_edge_history(id)
745 }
746
747 fn require_lpg(&self, language: &str) -> Result<()> {
749 if self.graph_model == GraphModel::Rdf {
750 return Err(grafeo_common::utils::error::Error::Internal(format!(
751 "This is an RDF database. {language} queries require an LPG database."
752 )));
753 }
754 Ok(())
755 }
756
757 #[inline]
763 fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
764 if self.identity.can_admin() {
766 return Ok(());
767 }
768 crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
769 grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
770 grafeo_common::utils::error::QueryErrorKind::Semantic,
771 denied.to_string(),
772 ))
773 })
774 }
775
776 #[cfg(feature = "gql")]
778 fn execute_session_command(
779 &self,
780 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
781 ) -> Result<QueryResult> {
782 use grafeo_adapters::query::gql::ast::SessionCommand;
783 #[cfg(feature = "lpg")]
784 use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
785 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
786
787 match &cmd {
789 SessionCommand::CreateGraph { .. }
790 | SessionCommand::DropGraph { .. }
791 | SessionCommand::CreateProjection { .. }
792 | SessionCommand::DropProjection { .. } => {
793 self.require_permission(crate::auth::StatementKind::Write)?;
794 }
795 _ => {} }
797
798 if self.identity.has_grants() {
800 match &cmd {
801 SessionCommand::CreateGraph { name, .. }
802 | SessionCommand::DropGraph { name, .. }
803 if !self
804 .identity
805 .can_access_graph(name, crate::auth::Role::ReadWrite) =>
806 {
807 return Err(Error::Query(QueryError::new(
808 QueryErrorKind::Semantic,
809 format!(
810 "permission denied: no grant for graph '{name}' (user: {})",
811 self.identity.user_id()
812 ),
813 )));
814 }
815 _ => {}
816 }
817 }
818
819 if *self.read_only_tx.lock() {
821 match &cmd {
822 SessionCommand::CreateGraph { .. }
823 | SessionCommand::DropGraph { .. }
824 | SessionCommand::CreateProjection { .. }
825 | SessionCommand::DropProjection { .. } => {
826 return Err(Error::Transaction(
827 grafeo_common::utils::error::TransactionError::ReadOnly,
828 ));
829 }
830 _ => {} }
832 }
833
834 match cmd {
835 #[cfg(feature = "lpg")]
836 SessionCommand::CreateGraph {
837 name,
838 if_not_exists,
839 typed,
840 like_graph,
841 copy_of,
842 open: _,
843 } => {
844 if name.contains('/') {
846 return Err(Error::Query(QueryError::new(
847 QueryErrorKind::Semantic,
848 format!(
849 "Graph name '{name}' must not contain '/' (reserved as schema/graph separator)"
850 ),
851 )));
852 }
853 let storage_key = self.effective_graph_key(&name);
854
855 if let Some(ref src) = like_graph {
857 let src_key = self.effective_graph_key(src);
858 if self.store.graph(&src_key).is_none() {
859 return Err(Error::Query(QueryError::new(
860 QueryErrorKind::Semantic,
861 format!("Source graph '{src}' does not exist"),
862 )));
863 }
864 }
865 if let Some(ref src) = copy_of {
866 let src_key = self.effective_graph_key(src);
867 if self.store.graph(&src_key).is_none() {
868 return Err(Error::Query(QueryError::new(
869 QueryErrorKind::Semantic,
870 format!("Source graph '{src}' does not exist"),
871 )));
872 }
873 }
874
875 let created = self
876 .store
877 .create_graph(&storage_key)
878 .map_err(|e| Error::Internal(e.to_string()))?;
879 if !created && !if_not_exists {
880 return Err(Error::Query(QueryError::new(
881 QueryErrorKind::Semantic,
882 format!("Graph '{name}' already exists"),
883 )));
884 }
885 if created {
886 #[cfg(feature = "wal")]
887 self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
888 name: storage_key.clone(),
889 });
890 }
891
892 if let Some(ref src) = copy_of {
894 let src_key = self.effective_graph_key(src);
895 self.store
896 .copy_graph(Some(&src_key), Some(&storage_key))
897 .map_err(|e| Error::Internal(e.to_string()))?;
898 }
899
900 if let Some(type_name) = typed
904 && let Err(e) = self.catalog.bind_graph_type(
905 &storage_key,
906 if type_name.contains('/') {
907 type_name.clone()
908 } else {
909 self.effective_type_key(&type_name)
910 },
911 )
912 {
913 return Err(Error::Query(QueryError::new(
914 QueryErrorKind::Semantic,
915 e.to_string(),
916 )));
917 }
918
919 if let Some(ref src) = like_graph {
921 let src_key = self.effective_graph_key(src);
922 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
923 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
924 }
925 }
926
927 Ok(QueryResult::empty())
928 }
929 #[cfg(feature = "lpg")]
930 SessionCommand::DropGraph { name, if_exists } => {
931 let storage_key = self.effective_graph_key(&name);
932 let dropped = self.store.drop_graph(&storage_key);
933 if !dropped && !if_exists {
934 return Err(Error::Query(QueryError::new(
935 QueryErrorKind::Semantic,
936 format!("Graph '{name}' does not exist"),
937 )));
938 }
939 if dropped {
940 #[cfg(feature = "wal")]
941 self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
942 name: storage_key.clone(),
943 });
944 let mut current = self.current_graph.lock();
946 if current
947 .as_deref()
948 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
949 {
950 *current = None;
951 }
952 }
953 Ok(QueryResult::empty())
954 }
955 #[cfg(feature = "lpg")]
956 SessionCommand::UseGraph(name) => {
957 if self.identity.has_grants()
959 && !name.eq_ignore_ascii_case("default")
960 && !self
961 .identity
962 .can_access_graph(&name, crate::auth::Role::ReadOnly)
963 {
964 return Err(Error::Query(QueryError::new(
965 QueryErrorKind::Semantic,
966 format!(
967 "permission denied: no grant for graph '{name}' (user: {})",
968 self.identity.user_id()
969 ),
970 )));
971 }
972 let effective_key = self.effective_graph_key(&name);
974 if !name.eq_ignore_ascii_case("default")
975 && self.store.graph(&effective_key).is_none()
976 {
977 return Err(Error::Query(QueryError::new(
978 QueryErrorKind::Semantic,
979 format!("Graph '{name}' does not exist"),
980 )));
981 }
982 self.use_graph(&name);
983 Ok(QueryResult::empty())
984 }
985 #[cfg(feature = "lpg")]
986 SessionCommand::SessionSetGraph(name) => {
987 if self.identity.has_grants()
990 && !name.eq_ignore_ascii_case("default")
991 && !self
992 .identity
993 .can_access_graph(&name, crate::auth::Role::ReadOnly)
994 {
995 return Err(Error::Query(QueryError::new(
996 QueryErrorKind::Semantic,
997 format!(
998 "permission denied: no grant for graph '{name}' (user: {})",
999 self.identity.user_id()
1000 ),
1001 )));
1002 }
1003 let effective_key = self.effective_graph_key(&name);
1004 if !name.eq_ignore_ascii_case("default")
1005 && self.store.graph(&effective_key).is_none()
1006 {
1007 return Err(Error::Query(QueryError::new(
1008 QueryErrorKind::Semantic,
1009 format!("Graph '{name}' does not exist"),
1010 )));
1011 }
1012 self.use_graph(&name);
1013 Ok(QueryResult::empty())
1014 }
1015 SessionCommand::SessionSetSchema(name) => {
1016 if !self.catalog.schema_exists(&name) {
1018 return Err(Error::Query(QueryError::new(
1019 QueryErrorKind::Semantic,
1020 format!("Schema '{name}' does not exist"),
1021 )));
1022 }
1023 self.set_schema(&name);
1024 Ok(QueryResult::empty())
1025 }
1026 SessionCommand::SessionSetTimeZone(tz) => {
1027 self.set_time_zone(&tz);
1028 Ok(QueryResult::empty())
1029 }
1030 #[cfg(feature = "gql")]
1031 SessionCommand::SessionSetParameter(key, expr) => {
1032 if key.eq_ignore_ascii_case("viewing_epoch") {
1033 match Self::eval_integer_literal(&expr) {
1034 Some(n) if n >= 0 => {
1035 #[allow(clippy::cast_sign_loss)]
1037 let epoch = n as u64;
1038 self.set_viewing_epoch(EpochId::new(epoch));
1039 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
1040 }
1041 _ => Err(Error::Query(QueryError::new(
1042 QueryErrorKind::Semantic,
1043 "viewing_epoch must be a non-negative integer literal",
1044 ))),
1045 }
1046 } else {
1047 self.set_parameter(&key, Value::Null);
1050 Ok(QueryResult::empty())
1051 }
1052 }
1053 SessionCommand::SessionReset(target) => {
1054 use grafeo_adapters::query::gql::ast::SessionResetTarget;
1055 match target {
1056 SessionResetTarget::All => self.reset_session(),
1057 SessionResetTarget::Schema => self.reset_schema(),
1058 SessionResetTarget::Graph => self.reset_graph(),
1059 SessionResetTarget::TimeZone => self.reset_time_zone(),
1060 SessionResetTarget::Parameters => self.reset_parameters(),
1061 }
1062 Ok(QueryResult::empty())
1063 }
1064 SessionCommand::SessionClose => {
1065 self.reset_session();
1066 Ok(QueryResult::empty())
1067 }
1068 #[cfg(feature = "lpg")]
1069 SessionCommand::StartTransaction {
1070 read_only,
1071 isolation_level,
1072 } => {
1073 let engine_level = isolation_level.map(|l| match l {
1074 TransactionIsolationLevel::ReadCommitted => {
1075 crate::transaction::IsolationLevel::ReadCommitted
1076 }
1077 TransactionIsolationLevel::SnapshotIsolation => {
1078 crate::transaction::IsolationLevel::SnapshotIsolation
1079 }
1080 TransactionIsolationLevel::Serializable => {
1081 crate::transaction::IsolationLevel::Serializable
1082 }
1083 });
1084 self.begin_transaction_inner(read_only, engine_level)?;
1085 Ok(QueryResult::status("Transaction started"))
1086 }
1087 #[cfg(feature = "lpg")]
1088 SessionCommand::Commit => {
1089 self.commit_inner()?;
1090 Ok(QueryResult::status("Transaction committed"))
1091 }
1092 #[cfg(feature = "lpg")]
1093 SessionCommand::Rollback => {
1094 self.rollback_inner()?;
1095 Ok(QueryResult::status("Transaction rolled back"))
1096 }
1097 #[cfg(feature = "lpg")]
1098 SessionCommand::Savepoint(name) => {
1099 self.savepoint(&name)?;
1100 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1101 }
1102 #[cfg(feature = "lpg")]
1103 SessionCommand::RollbackToSavepoint(name) => {
1104 self.rollback_to_savepoint(&name)?;
1105 Ok(QueryResult::status(format!(
1106 "Rolled back to savepoint '{name}'"
1107 )))
1108 }
1109 #[cfg(feature = "lpg")]
1110 SessionCommand::ReleaseSavepoint(name) => {
1111 self.release_savepoint(&name)?;
1112 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1113 }
1114 #[cfg(feature = "lpg")]
1115 SessionCommand::CreateProjection {
1116 name,
1117 node_labels,
1118 edge_types,
1119 } => {
1120 use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1121 use std::collections::hash_map::Entry;
1122
1123 let spec = ProjectionSpec::new()
1124 .with_node_labels(node_labels)
1125 .with_edge_types(edge_types);
1126
1127 let store = self.active_store();
1128 let projection = Arc::new(GraphProjection::new(store, spec));
1129 let mut projections = self.projections.write();
1130 match projections.entry(name.clone()) {
1131 Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1132 QueryErrorKind::Semantic,
1133 format!("Projection '{name}' already exists"),
1134 ))),
1135 Entry::Vacant(e) => {
1136 e.insert(projection);
1137 Ok(QueryResult::status(format!("Projection '{name}' created")))
1138 }
1139 }
1140 }
1141 #[cfg(feature = "lpg")]
1142 SessionCommand::DropProjection { name } => {
1143 let removed = self.projections.write().remove(&name).is_some();
1144 if !removed {
1145 return Err(Error::Query(QueryError::new(
1146 QueryErrorKind::Semantic,
1147 format!("Projection '{name}' does not exist"),
1148 )));
1149 }
1150 Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1151 }
1152 #[cfg(feature = "lpg")]
1153 SessionCommand::ShowProjections => {
1154 let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1155 names.sort();
1156 let rows: Vec<Vec<Value>> =
1157 names.into_iter().map(|n| vec![Value::from(n)]).collect();
1158 Ok(QueryResult {
1159 columns: vec!["name".to_string()],
1160 column_types: Vec::new(),
1161 rows,
1162 ..QueryResult::empty()
1163 })
1164 }
1165 #[cfg(not(feature = "lpg"))]
1166 _ => Err(grafeo_common::utils::error::Error::Internal(
1167 "This command requires the `lpg` feature".to_string(),
1168 )),
1169 }
1170 }
1171
1172 #[cfg(feature = "wal")]
1174 fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1175 if let Some(ref wal) = self.wal
1176 && let Err(e) = wal.log(record)
1177 {
1178 grafeo_warn!("Failed to log schema change to WAL: {}", e);
1179 }
1180 }
1181
1182 #[cfg(all(feature = "lpg", feature = "gql"))]
1184 fn execute_schema_command(
1185 &self,
1186 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1187 ) -> Result<QueryResult> {
1188 use crate::catalog::{
1189 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1190 };
1191 use grafeo_adapters::query::gql::ast::SchemaStatement;
1192 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1193 #[cfg(feature = "wal")]
1194 use grafeo_storage::wal::WalRecord;
1195
1196 macro_rules! wal_log {
1198 ($self:expr, $record:expr) => {
1199 #[cfg(feature = "wal")]
1200 $self.log_schema_wal(&$record);
1201 };
1202 }
1203
1204 let result = match cmd {
1205 SchemaStatement::CreateNodeType(stmt) => {
1206 let effective_name = self.effective_type_key(&stmt.name);
1207 #[cfg(feature = "wal")]
1208 let props_for_wal: Vec<(String, String, bool)> = stmt
1209 .properties
1210 .iter()
1211 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1212 .collect();
1213 let def = NodeTypeDefinition {
1214 name: effective_name.clone(),
1215 properties: stmt
1216 .properties
1217 .iter()
1218 .map(|p| TypedProperty {
1219 name: p.name.clone(),
1220 data_type: PropertyDataType::from_type_name(&p.data_type),
1221 nullable: p.nullable,
1222 default_value: p
1223 .default_value
1224 .as_ref()
1225 .map(|s| parse_default_literal(s)),
1226 })
1227 .collect(),
1228 constraints: Vec::new(),
1229 parent_types: stmt.parent_types.clone(),
1230 };
1231 let result = if stmt.or_replace {
1232 let _ = self.catalog.drop_node_type(&effective_name);
1233 self.catalog.register_node_type(def)
1234 } else {
1235 self.catalog.register_node_type(def)
1236 };
1237 match result {
1238 Ok(()) => {
1239 wal_log!(
1240 self,
1241 WalRecord::CreateNodeType {
1242 name: effective_name.clone(),
1243 properties: props_for_wal,
1244 constraints: Vec::new(),
1245 }
1246 );
1247 Ok(QueryResult::status(format!(
1248 "Created node type '{}'",
1249 stmt.name
1250 )))
1251 }
1252 Err(e) if stmt.if_not_exists => {
1253 let _ = e;
1254 Ok(QueryResult::status("No change"))
1255 }
1256 Err(e) => Err(Error::Query(QueryError::new(
1257 QueryErrorKind::Semantic,
1258 e.to_string(),
1259 ))),
1260 }
1261 }
1262 SchemaStatement::CreateEdgeType(stmt) => {
1263 let effective_name = self.effective_type_key(&stmt.name);
1264 #[cfg(feature = "wal")]
1265 let props_for_wal: Vec<(String, String, bool)> = stmt
1266 .properties
1267 .iter()
1268 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1269 .collect();
1270 let def = EdgeTypeDefinition {
1271 name: effective_name.clone(),
1272 properties: stmt
1273 .properties
1274 .iter()
1275 .map(|p| TypedProperty {
1276 name: p.name.clone(),
1277 data_type: PropertyDataType::from_type_name(&p.data_type),
1278 nullable: p.nullable,
1279 default_value: p
1280 .default_value
1281 .as_ref()
1282 .map(|s| parse_default_literal(s)),
1283 })
1284 .collect(),
1285 constraints: Vec::new(),
1286 source_node_types: stmt.source_node_types.clone(),
1287 target_node_types: stmt.target_node_types.clone(),
1288 };
1289 let result = if stmt.or_replace {
1290 let _ = self.catalog.drop_edge_type_def(&effective_name);
1291 self.catalog.register_edge_type_def(def)
1292 } else {
1293 self.catalog.register_edge_type_def(def)
1294 };
1295 match result {
1296 Ok(()) => {
1297 wal_log!(
1298 self,
1299 WalRecord::CreateEdgeType {
1300 name: effective_name.clone(),
1301 properties: props_for_wal,
1302 constraints: Vec::new(),
1303 }
1304 );
1305 Ok(QueryResult::status(format!(
1306 "Created edge type '{}'",
1307 stmt.name
1308 )))
1309 }
1310 Err(e) if stmt.if_not_exists => {
1311 let _ = e;
1312 Ok(QueryResult::status("No change"))
1313 }
1314 Err(e) => Err(Error::Query(QueryError::new(
1315 QueryErrorKind::Semantic,
1316 e.to_string(),
1317 ))),
1318 }
1319 }
1320 SchemaStatement::CreateVectorIndex(stmt) => {
1321 Self::create_vector_index_on_store(
1322 &self.active_lpg_store(),
1323 &stmt.node_label,
1324 &stmt.property,
1325 stmt.dimensions,
1326 stmt.metric.as_deref(),
1327 )?;
1328 wal_log!(
1329 self,
1330 WalRecord::CreateIndex {
1331 name: stmt.name.clone(),
1332 label: stmt.node_label.clone(),
1333 property: stmt.property.clone(),
1334 index_type: "vector".to_string(),
1335 }
1336 );
1337 Ok(QueryResult::status(format!(
1338 "Created vector index '{}'",
1339 stmt.name
1340 )))
1341 }
1342 SchemaStatement::DropNodeType { name, if_exists } => {
1343 let effective_name = self.effective_type_key(&name);
1344 match self.catalog.drop_node_type(&effective_name) {
1345 Ok(()) => {
1346 wal_log!(
1347 self,
1348 WalRecord::DropNodeType {
1349 name: effective_name
1350 }
1351 );
1352 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1353 }
1354 Err(e) if if_exists => {
1355 let _ = e;
1356 Ok(QueryResult::status("No change"))
1357 }
1358 Err(e) => Err(Error::Query(QueryError::new(
1359 QueryErrorKind::Semantic,
1360 e.to_string(),
1361 ))),
1362 }
1363 }
1364 SchemaStatement::DropEdgeType { name, if_exists } => {
1365 let effective_name = self.effective_type_key(&name);
1366 match self.catalog.drop_edge_type_def(&effective_name) {
1367 Ok(()) => {
1368 wal_log!(
1369 self,
1370 WalRecord::DropEdgeType {
1371 name: effective_name
1372 }
1373 );
1374 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1375 }
1376 Err(e) if if_exists => {
1377 let _ = e;
1378 Ok(QueryResult::status("No change"))
1379 }
1380 Err(e) => Err(Error::Query(QueryError::new(
1381 QueryErrorKind::Semantic,
1382 e.to_string(),
1383 ))),
1384 }
1385 }
1386 SchemaStatement::CreateIndex(stmt) => {
1387 use crate::catalog::IndexType as CatalogIndexType;
1388 use grafeo_adapters::query::gql::ast::IndexKind;
1389 let active = self.active_lpg_store();
1390 let index_type_str = match stmt.index_kind {
1391 IndexKind::Property => "property",
1392 IndexKind::BTree => "btree",
1393 IndexKind::Text => "text",
1394 IndexKind::Vector => "vector",
1395 };
1396 match stmt.index_kind {
1397 IndexKind::Property | IndexKind::BTree => {
1398 for prop in &stmt.properties {
1399 active.create_property_index(prop);
1400 }
1401 }
1402 IndexKind::Text => {
1403 for prop in &stmt.properties {
1404 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1405 }
1406 }
1407 IndexKind::Vector => {
1408 for prop in &stmt.properties {
1409 Self::create_vector_index_on_store(
1410 &active,
1411 &stmt.label,
1412 prop,
1413 stmt.options.dimensions,
1414 stmt.options.metric.as_deref(),
1415 )?;
1416 }
1417 }
1418 }
1419 let catalog_index_type = match stmt.index_kind {
1422 IndexKind::Property => CatalogIndexType::Hash,
1423 IndexKind::BTree => CatalogIndexType::BTree,
1424 IndexKind::Text => CatalogIndexType::FullText,
1425 IndexKind::Vector => CatalogIndexType::Hash,
1426 };
1427 let label_id = self.catalog.get_or_create_label(&stmt.label);
1428 for prop in &stmt.properties {
1429 let prop_id = self.catalog.get_or_create_property_key(prop);
1430 self.catalog
1431 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1432 }
1433 #[cfg(feature = "wal")]
1434 for prop in &stmt.properties {
1435 wal_log!(
1436 self,
1437 WalRecord::CreateIndex {
1438 name: stmt.name.clone(),
1439 label: stmt.label.clone(),
1440 property: prop.clone(),
1441 index_type: index_type_str.to_string(),
1442 }
1443 );
1444 }
1445 Ok(QueryResult::status(format!(
1446 "Created {} index '{}'",
1447 index_type_str, stmt.name
1448 )))
1449 }
1450 SchemaStatement::DropIndex { name, if_exists } => {
1451 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1454 let def = self.catalog.get_index(index_id);
1455 self.catalog.drop_index(index_id);
1456 if let Some(def) = def
1457 && let Some(prop_name) =
1458 self.catalog.get_property_key_name(def.property_key)
1459 {
1460 self.active_lpg_store().drop_property_index(&prop_name);
1461 }
1462 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1463 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1464 } else if if_exists {
1465 Ok(QueryResult::status("No change".to_string()))
1466 } else {
1467 Err(Error::Query(QueryError::new(
1468 QueryErrorKind::Semantic,
1469 format!("Index '{name}' does not exist"),
1470 )))
1471 }
1472 }
1473 SchemaStatement::CreateConstraint(stmt) => {
1474 use crate::catalog::TypeConstraint;
1475 use grafeo_adapters::query::gql::ast::ConstraintKind;
1476 let kind_str = match stmt.constraint_kind {
1477 ConstraintKind::Unique => "unique",
1478 ConstraintKind::NodeKey => "node_key",
1479 ConstraintKind::NotNull => "not_null",
1480 ConstraintKind::Exists => "exists",
1481 };
1482 let constraint_name = stmt
1483 .name
1484 .clone()
1485 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1486
1487 match stmt.constraint_kind {
1489 ConstraintKind::Unique => {
1490 for prop in &stmt.properties {
1491 let label_id = self.catalog.get_or_create_label(&stmt.label);
1492 let prop_id = self.catalog.get_or_create_property_key(prop);
1493 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1494 }
1495 let _ = self.catalog.add_constraint_to_type(
1496 &stmt.label,
1497 TypeConstraint::Unique(stmt.properties.clone()),
1498 );
1499 }
1500 ConstraintKind::NodeKey => {
1501 for prop in &stmt.properties {
1502 let label_id = self.catalog.get_or_create_label(&stmt.label);
1503 let prop_id = self.catalog.get_or_create_property_key(prop);
1504 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1505 let _ = self.catalog.add_required_property(label_id, prop_id);
1506 }
1507 let _ = self.catalog.add_constraint_to_type(
1508 &stmt.label,
1509 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1510 );
1511 }
1512 ConstraintKind::NotNull | ConstraintKind::Exists => {
1513 for prop in &stmt.properties {
1514 let label_id = self.catalog.get_or_create_label(&stmt.label);
1515 let prop_id = self.catalog.get_or_create_property_key(prop);
1516 let _ = self.catalog.add_required_property(label_id, prop_id);
1517 let _ = self.catalog.add_constraint_to_type(
1518 &stmt.label,
1519 TypeConstraint::NotNull(prop.clone()),
1520 );
1521 }
1522 }
1523 }
1524
1525 wal_log!(
1526 self,
1527 WalRecord::CreateConstraint {
1528 name: constraint_name.clone(),
1529 label: stmt.label.clone(),
1530 properties: stmt.properties.clone(),
1531 kind: kind_str.to_string(),
1532 }
1533 );
1534 Ok(QueryResult::status(format!(
1535 "Created {kind_str} constraint '{constraint_name}'"
1536 )))
1537 }
1538 SchemaStatement::DropConstraint { name, if_exists } => {
1539 let _ = if_exists;
1540 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1541 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1542 }
1543 SchemaStatement::CreateGraphType(stmt) => {
1544 use crate::catalog::GraphTypeDefinition;
1545 use grafeo_adapters::query::gql::ast::InlineElementType;
1546
1547 let effective_name = self.effective_type_key(&stmt.name);
1548
1549 let (mut node_types, mut edge_types, open) =
1551 if let Some(ref like_graph) = stmt.like_graph {
1552 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1554 if let Some(existing) = self
1555 .catalog
1556 .schema()
1557 .and_then(|s| s.get_graph_type(&type_name))
1558 {
1559 (
1560 existing.allowed_node_types.clone(),
1561 existing.allowed_edge_types.clone(),
1562 existing.open,
1563 )
1564 } else {
1565 (Vec::new(), Vec::new(), true)
1566 }
1567 } else {
1568 let nt = self.catalog.all_node_type_names();
1570 let et = self.catalog.all_edge_type_names();
1571 if nt.is_empty() && et.is_empty() {
1572 (Vec::new(), Vec::new(), true)
1573 } else {
1574 (nt, et, false)
1575 }
1576 }
1577 } else {
1578 let nt = stmt
1580 .node_types
1581 .iter()
1582 .map(|n| self.effective_type_key(n))
1583 .collect();
1584 let et = stmt
1585 .edge_types
1586 .iter()
1587 .map(|n| self.effective_type_key(n))
1588 .collect();
1589 (nt, et, stmt.open)
1590 };
1591
1592 for inline in &stmt.inline_types {
1597 match inline {
1598 InlineElementType::Node {
1599 name,
1600 properties,
1601 key_labels,
1602 is_reference,
1603 ..
1604 } => {
1605 let inline_effective = self.effective_type_key(name);
1606 if *is_reference {
1607 if self.catalog.get_node_type(&inline_effective).is_none() {
1609 return Err(Error::Query(QueryError::new(
1610 QueryErrorKind::Semantic,
1611 format!(
1612 "Referenced node type '{inline_effective}' does not exist"
1613 ),
1614 )));
1615 }
1616 } else {
1617 let def = NodeTypeDefinition {
1618 name: inline_effective.clone(),
1619 properties: properties
1620 .iter()
1621 .map(|p| TypedProperty {
1622 name: p.name.clone(),
1623 data_type: PropertyDataType::from_type_name(
1624 &p.data_type,
1625 ),
1626 nullable: p.nullable,
1627 default_value: None,
1628 })
1629 .collect(),
1630 constraints: Vec::new(),
1631 parent_types: key_labels.clone(),
1632 };
1633 self.catalog.register_or_replace_node_type(def);
1634 #[cfg(feature = "wal")]
1635 {
1636 let props_for_wal: Vec<(String, String, bool)> = properties
1637 .iter()
1638 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1639 .collect();
1640 self.log_schema_wal(&WalRecord::CreateNodeType {
1641 name: inline_effective.clone(),
1642 properties: props_for_wal,
1643 constraints: Vec::new(),
1644 });
1645 }
1646 }
1647 if !node_types.contains(&inline_effective) {
1648 node_types.push(inline_effective);
1649 }
1650 }
1651 InlineElementType::Edge {
1652 name,
1653 properties,
1654 source_node_types,
1655 target_node_types,
1656 is_reference,
1657 ..
1658 } => {
1659 let inline_effective = self.effective_type_key(name);
1660 if *is_reference {
1661 if self.catalog.get_edge_type_def(&inline_effective).is_none() {
1662 return Err(Error::Query(QueryError::new(
1663 QueryErrorKind::Semantic,
1664 format!(
1665 "Referenced edge type '{inline_effective}' does not exist"
1666 ),
1667 )));
1668 }
1669 } else {
1670 let def = EdgeTypeDefinition {
1671 name: inline_effective.clone(),
1672 properties: properties
1673 .iter()
1674 .map(|p| TypedProperty {
1675 name: p.name.clone(),
1676 data_type: PropertyDataType::from_type_name(
1677 &p.data_type,
1678 ),
1679 nullable: p.nullable,
1680 default_value: None,
1681 })
1682 .collect(),
1683 constraints: Vec::new(),
1684 source_node_types: source_node_types.clone(),
1685 target_node_types: target_node_types.clone(),
1686 };
1687 self.catalog.register_or_replace_edge_type_def(def);
1688 #[cfg(feature = "wal")]
1689 {
1690 let props_for_wal: Vec<(String, String, bool)> = properties
1691 .iter()
1692 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1693 .collect();
1694 self.log_schema_wal(&WalRecord::CreateEdgeType {
1695 name: inline_effective.clone(),
1696 properties: props_for_wal,
1697 constraints: Vec::new(),
1698 });
1699 }
1700 }
1701 if !edge_types.contains(&inline_effective) {
1702 edge_types.push(inline_effective);
1703 }
1704 }
1705 }
1706 }
1707
1708 let def = GraphTypeDefinition {
1709 name: effective_name.clone(),
1710 allowed_node_types: node_types.clone(),
1711 allowed_edge_types: edge_types.clone(),
1712 open,
1713 };
1714 let result = if stmt.or_replace {
1715 let _ = self.catalog.drop_graph_type(&effective_name);
1717 self.catalog.register_graph_type(def)
1718 } else {
1719 self.catalog.register_graph_type(def)
1720 };
1721 match result {
1722 Ok(()) => {
1723 wal_log!(
1724 self,
1725 WalRecord::CreateGraphType {
1726 name: effective_name.clone(),
1727 node_types,
1728 edge_types,
1729 open,
1730 }
1731 );
1732 Ok(QueryResult::status(format!(
1733 "Created graph type '{}'",
1734 stmt.name
1735 )))
1736 }
1737 Err(e) if stmt.if_not_exists => {
1738 let _ = e;
1739 Ok(QueryResult::status("No change"))
1740 }
1741 Err(e) => Err(Error::Query(QueryError::new(
1742 QueryErrorKind::Semantic,
1743 e.to_string(),
1744 ))),
1745 }
1746 }
1747 SchemaStatement::DropGraphType { name, if_exists } => {
1748 let effective_name = self.effective_type_key(&name);
1749 match self.catalog.drop_graph_type(&effective_name) {
1750 Ok(()) => {
1751 wal_log!(
1752 self,
1753 WalRecord::DropGraphType {
1754 name: effective_name
1755 }
1756 );
1757 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1758 }
1759 Err(e) if if_exists => {
1760 let _ = e;
1761 Ok(QueryResult::status("No change"))
1762 }
1763 Err(e) => Err(Error::Query(QueryError::new(
1764 QueryErrorKind::Semantic,
1765 e.to_string(),
1766 ))),
1767 }
1768 }
1769 SchemaStatement::CreateSchema {
1770 name,
1771 if_not_exists,
1772 } => {
1773 if name.contains('/') {
1774 return Err(Error::Query(QueryError::new(
1775 QueryErrorKind::Semantic,
1776 format!(
1777 "Schema name '{name}' must not contain '/' (reserved as schema/graph separator)"
1778 ),
1779 )));
1780 }
1781 match self.catalog.register_schema_namespace(name.clone()) {
1782 Ok(()) => {
1783 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1784 let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1787 if self.store.create_graph(&default_key).unwrap_or(false) {
1788 wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1789 }
1790 Ok(QueryResult::status(format!("Created schema '{name}'")))
1791 }
1792 Err(e) if if_not_exists => {
1793 let _ = e;
1794 Ok(QueryResult::status("No change"))
1795 }
1796 Err(e) => Err(Error::Query(QueryError::new(
1797 QueryErrorKind::Semantic,
1798 e.to_string(),
1799 ))),
1800 }
1801 }
1802 SchemaStatement::DropSchema { name, if_exists } => {
1803 let prefix = format!("{name}/");
1806 let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1807 let has_graphs = self
1808 .store
1809 .graph_names()
1810 .iter()
1811 .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1812 let has_types = self
1813 .catalog
1814 .all_node_type_names()
1815 .iter()
1816 .any(|n| n.starts_with(&prefix))
1817 || self
1818 .catalog
1819 .all_edge_type_names()
1820 .iter()
1821 .any(|n| n.starts_with(&prefix))
1822 || self
1823 .catalog
1824 .all_graph_type_names()
1825 .iter()
1826 .any(|n| n.starts_with(&prefix));
1827 if has_graphs || has_types {
1828 return Err(Error::Query(QueryError::new(
1829 QueryErrorKind::Semantic,
1830 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1831 )));
1832 }
1833 match self.catalog.drop_schema_namespace(&name) {
1834 Ok(()) => {
1835 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1836 if self.store.drop_graph(&default_graph_key) {
1838 wal_log!(
1839 self,
1840 WalRecord::DropNamedGraph {
1841 name: default_graph_key,
1842 }
1843 );
1844 }
1845 let mut current = self.current_schema.lock();
1847 if current
1848 .as_deref()
1849 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1850 {
1851 *current = None;
1852 }
1853 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1854 }
1855 Err(e) if if_exists => {
1856 let _ = e;
1857 Ok(QueryResult::status("No change"))
1858 }
1859 Err(e) => Err(Error::Query(QueryError::new(
1860 QueryErrorKind::Semantic,
1861 e.to_string(),
1862 ))),
1863 }
1864 }
1865 SchemaStatement::AlterNodeType(stmt) => {
1866 use grafeo_adapters::query::gql::ast::TypeAlteration;
1867 let effective_name = self.effective_type_key(&stmt.name);
1868 let mut wal_alts = Vec::new();
1869 for alt in &stmt.alterations {
1870 match alt {
1871 TypeAlteration::AddProperty(prop) => {
1872 let typed = TypedProperty {
1873 name: prop.name.clone(),
1874 data_type: PropertyDataType::from_type_name(&prop.data_type),
1875 nullable: prop.nullable,
1876 default_value: prop
1877 .default_value
1878 .as_ref()
1879 .map(|s| parse_default_literal(s)),
1880 };
1881 self.catalog
1882 .alter_node_type_add_property(&effective_name, typed)
1883 .map_err(|e| {
1884 Error::Query(QueryError::new(
1885 QueryErrorKind::Semantic,
1886 e.to_string(),
1887 ))
1888 })?;
1889 wal_alts.push((
1890 "add".to_string(),
1891 prop.name.clone(),
1892 prop.data_type.clone(),
1893 prop.nullable,
1894 ));
1895 }
1896 TypeAlteration::DropProperty(name) => {
1897 self.catalog
1898 .alter_node_type_drop_property(&effective_name, name)
1899 .map_err(|e| {
1900 Error::Query(QueryError::new(
1901 QueryErrorKind::Semantic,
1902 e.to_string(),
1903 ))
1904 })?;
1905 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1906 }
1907 }
1908 }
1909 wal_log!(
1910 self,
1911 WalRecord::AlterNodeType {
1912 name: effective_name,
1913 alterations: wal_alts,
1914 }
1915 );
1916 Ok(QueryResult::status(format!(
1917 "Altered node type '{}'",
1918 stmt.name
1919 )))
1920 }
1921 SchemaStatement::AlterEdgeType(stmt) => {
1922 use grafeo_adapters::query::gql::ast::TypeAlteration;
1923 let effective_name = self.effective_type_key(&stmt.name);
1924 let mut wal_alts = Vec::new();
1925 for alt in &stmt.alterations {
1926 match alt {
1927 TypeAlteration::AddProperty(prop) => {
1928 let typed = TypedProperty {
1929 name: prop.name.clone(),
1930 data_type: PropertyDataType::from_type_name(&prop.data_type),
1931 nullable: prop.nullable,
1932 default_value: prop
1933 .default_value
1934 .as_ref()
1935 .map(|s| parse_default_literal(s)),
1936 };
1937 self.catalog
1938 .alter_edge_type_add_property(&effective_name, typed)
1939 .map_err(|e| {
1940 Error::Query(QueryError::new(
1941 QueryErrorKind::Semantic,
1942 e.to_string(),
1943 ))
1944 })?;
1945 wal_alts.push((
1946 "add".to_string(),
1947 prop.name.clone(),
1948 prop.data_type.clone(),
1949 prop.nullable,
1950 ));
1951 }
1952 TypeAlteration::DropProperty(name) => {
1953 self.catalog
1954 .alter_edge_type_drop_property(&effective_name, name)
1955 .map_err(|e| {
1956 Error::Query(QueryError::new(
1957 QueryErrorKind::Semantic,
1958 e.to_string(),
1959 ))
1960 })?;
1961 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1962 }
1963 }
1964 }
1965 wal_log!(
1966 self,
1967 WalRecord::AlterEdgeType {
1968 name: effective_name,
1969 alterations: wal_alts,
1970 }
1971 );
1972 Ok(QueryResult::status(format!(
1973 "Altered edge type '{}'",
1974 stmt.name
1975 )))
1976 }
1977 SchemaStatement::AlterGraphType(stmt) => {
1978 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1979 let effective_name = self.effective_type_key(&stmt.name);
1980 let mut wal_alts = Vec::new();
1981 for alt in &stmt.alterations {
1982 match alt {
1983 GraphTypeAlteration::AddNodeType(name) => {
1984 self.catalog
1985 .alter_graph_type_add_node_type(&effective_name, name.clone())
1986 .map_err(|e| {
1987 Error::Query(QueryError::new(
1988 QueryErrorKind::Semantic,
1989 e.to_string(),
1990 ))
1991 })?;
1992 wal_alts.push(("add_node_type".to_string(), name.clone()));
1993 }
1994 GraphTypeAlteration::DropNodeType(name) => {
1995 self.catalog
1996 .alter_graph_type_drop_node_type(&effective_name, name)
1997 .map_err(|e| {
1998 Error::Query(QueryError::new(
1999 QueryErrorKind::Semantic,
2000 e.to_string(),
2001 ))
2002 })?;
2003 wal_alts.push(("drop_node_type".to_string(), name.clone()));
2004 }
2005 GraphTypeAlteration::AddEdgeType(name) => {
2006 self.catalog
2007 .alter_graph_type_add_edge_type(&effective_name, name.clone())
2008 .map_err(|e| {
2009 Error::Query(QueryError::new(
2010 QueryErrorKind::Semantic,
2011 e.to_string(),
2012 ))
2013 })?;
2014 wal_alts.push(("add_edge_type".to_string(), name.clone()));
2015 }
2016 GraphTypeAlteration::DropEdgeType(name) => {
2017 self.catalog
2018 .alter_graph_type_drop_edge_type(&effective_name, name)
2019 .map_err(|e| {
2020 Error::Query(QueryError::new(
2021 QueryErrorKind::Semantic,
2022 e.to_string(),
2023 ))
2024 })?;
2025 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
2026 }
2027 }
2028 }
2029 wal_log!(
2030 self,
2031 WalRecord::AlterGraphType {
2032 name: effective_name,
2033 alterations: wal_alts,
2034 }
2035 );
2036 Ok(QueryResult::status(format!(
2037 "Altered graph type '{}'",
2038 stmt.name
2039 )))
2040 }
2041 SchemaStatement::CreateProcedure(stmt) => {
2042 use crate::catalog::ProcedureDefinition;
2043
2044 let def = ProcedureDefinition {
2045 name: stmt.name.clone(),
2046 params: stmt
2047 .params
2048 .iter()
2049 .map(|p| (p.name.clone(), p.param_type.clone()))
2050 .collect(),
2051 returns: stmt
2052 .returns
2053 .iter()
2054 .map(|r| (r.name.clone(), r.return_type.clone()))
2055 .collect(),
2056 body: stmt.body.clone(),
2057 };
2058
2059 if stmt.or_replace {
2060 self.catalog.replace_procedure(def).map_err(|e| {
2061 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
2062 })?;
2063 } else {
2064 match self.catalog.register_procedure(def) {
2065 Ok(()) => {}
2066 Err(_) if stmt.if_not_exists => {
2067 return Ok(QueryResult::empty());
2068 }
2069 Err(e) => {
2070 return Err(Error::Query(QueryError::new(
2071 QueryErrorKind::Semantic,
2072 e.to_string(),
2073 )));
2074 }
2075 }
2076 }
2077
2078 wal_log!(
2079 self,
2080 WalRecord::CreateProcedure {
2081 name: stmt.name.clone(),
2082 params: stmt
2083 .params
2084 .iter()
2085 .map(|p| (p.name.clone(), p.param_type.clone()))
2086 .collect(),
2087 returns: stmt
2088 .returns
2089 .iter()
2090 .map(|r| (r.name.clone(), r.return_type.clone()))
2091 .collect(),
2092 body: stmt.body,
2093 }
2094 );
2095 Ok(QueryResult::status(format!(
2096 "Created procedure '{}'",
2097 stmt.name
2098 )))
2099 }
2100 SchemaStatement::DropProcedure { name, if_exists } => {
2101 match self.catalog.drop_procedure(&name) {
2102 Ok(()) => {}
2103 Err(_) if if_exists => {
2104 return Ok(QueryResult::empty());
2105 }
2106 Err(e) => {
2107 return Err(Error::Query(QueryError::new(
2108 QueryErrorKind::Semantic,
2109 e.to_string(),
2110 )));
2111 }
2112 }
2113 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2114 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2115 }
2116 SchemaStatement::ShowIndexes => {
2117 return self.execute_show_indexes();
2118 }
2119 SchemaStatement::ShowConstraints => {
2120 return self.execute_show_constraints();
2121 }
2122 SchemaStatement::ShowNodeTypes => {
2123 return self.execute_show_node_types();
2124 }
2125 SchemaStatement::ShowEdgeTypes => {
2126 return self.execute_show_edge_types();
2127 }
2128 SchemaStatement::ShowGraphTypes => {
2129 return self.execute_show_graph_types();
2130 }
2131 SchemaStatement::ShowGraphType(name) => {
2132 return self.execute_show_graph_type(&name);
2133 }
2134 SchemaStatement::ShowCurrentGraphType => {
2135 return self.execute_show_current_graph_type();
2136 }
2137 SchemaStatement::ShowGraphs => {
2138 return self.execute_show_graphs();
2139 }
2140 SchemaStatement::ShowSchemas => {
2141 return self.execute_show_schemas();
2142 }
2143 };
2144
2145 if result.is_ok() {
2148 self.query_cache.clear();
2149 }
2150
2151 result
2152 }
2153
2154 #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2156 fn create_vector_index_on_store(
2157 store: &LpgStore,
2158 label: &str,
2159 property: &str,
2160 dimensions: Option<usize>,
2161 metric: Option<&str>,
2162 ) -> Result<()> {
2163 use grafeo_common::types::{PropertyKey, Value};
2164 use grafeo_common::utils::error::Error;
2165 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2166
2167 let metric = match metric {
2168 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2169 Error::Internal(format!(
2170 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2171 ))
2172 })?,
2173 None => DistanceMetric::Cosine,
2174 };
2175
2176 let prop_key = PropertyKey::new(property);
2177 let mut found_dims: Option<usize> = dimensions;
2178 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2179
2180 for node in store.nodes_with_label(label) {
2181 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2182 if let Some(expected) = found_dims {
2183 if v.len() != expected {
2184 return Err(Error::Internal(format!(
2185 "Vector dimension mismatch: expected {expected}, found {} on node {}",
2186 v.len(),
2187 node.id.0
2188 )));
2189 }
2190 } else {
2191 found_dims = Some(v.len());
2192 }
2193 vectors.push((node.id, v.to_vec()));
2194 }
2195 }
2196
2197 let Some(dims) = found_dims else {
2198 return Err(Error::Internal(format!(
2199 "No vector properties found on :{label}({property}) and no dimensions specified"
2200 )));
2201 };
2202
2203 let config = HnswConfig::new(dims, metric);
2204 let index = HnswIndex::with_capacity(config, vectors.len());
2205 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2206 for (node_id, vec) in &vectors {
2207 index.insert(*node_id, vec, &accessor);
2208 }
2209
2210 store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2211 Ok(())
2212 }
2213
2214 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2216 fn create_vector_index_on_store(
2217 _store: &LpgStore,
2218 _label: &str,
2219 _property: &str,
2220 _dimensions: Option<usize>,
2221 _metric: Option<&str>,
2222 ) -> Result<()> {
2223 Err(grafeo_common::utils::error::Error::Internal(
2224 "Vector index support requires the 'vector-index' feature".to_string(),
2225 ))
2226 }
2227
2228 #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2230 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2231 use grafeo_common::types::{PropertyKey, Value};
2232 use grafeo_core::index::text::{BM25Config, InvertedIndex};
2233
2234 let mut index = InvertedIndex::new(BM25Config::default());
2235 let prop_key = PropertyKey::new(property);
2236
2237 let nodes = store.nodes_by_label(label);
2238 for node_id in nodes {
2239 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2240 index.insert(node_id, text.as_str());
2241 }
2242 }
2243
2244 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2245 Ok(())
2246 }
2247
2248 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2250 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2251 Err(grafeo_common::utils::error::Error::Internal(
2252 "Text index support requires the 'text-index' feature".to_string(),
2253 ))
2254 }
2255
2256 fn execute_show_indexes(&self) -> Result<QueryResult> {
2258 let indexes = self.catalog.all_indexes();
2259 let columns = vec![
2260 "name".to_string(),
2261 "type".to_string(),
2262 "label".to_string(),
2263 "property".to_string(),
2264 ];
2265 let rows: Vec<Vec<Value>> = indexes
2266 .into_iter()
2267 .map(|def| {
2268 let label_name = self
2269 .catalog
2270 .get_label_name(def.label)
2271 .unwrap_or_else(|| "?".into());
2272 let prop_name = self
2273 .catalog
2274 .get_property_key_name(def.property_key)
2275 .unwrap_or_else(|| "?".into());
2276 vec![
2277 Value::from(def.name),
2278 Value::from(format!("{:?}", def.index_type)),
2279 Value::from(&*label_name),
2280 Value::from(&*prop_name),
2281 ]
2282 })
2283 .collect();
2284 Ok(QueryResult {
2285 columns,
2286 column_types: Vec::new(),
2287 rows,
2288 ..QueryResult::empty()
2289 })
2290 }
2291
2292 fn execute_show_constraints(&self) -> Result<QueryResult> {
2294 Ok(QueryResult {
2297 columns: vec![
2298 "name".to_string(),
2299 "type".to_string(),
2300 "label".to_string(),
2301 "properties".to_string(),
2302 ],
2303 column_types: Vec::new(),
2304 rows: Vec::new(),
2305 ..QueryResult::empty()
2306 })
2307 }
2308
2309 fn execute_show_node_types(&self) -> Result<QueryResult> {
2311 let columns = vec![
2312 "name".to_string(),
2313 "properties".to_string(),
2314 "constraints".to_string(),
2315 "parents".to_string(),
2316 ];
2317 let schema = self.current_schema.lock().clone();
2318 let all_names = self.catalog.all_node_type_names();
2319 let type_names: Vec<String> = match &schema {
2320 Some(s) => {
2321 let prefix = format!("{s}/");
2322 all_names
2323 .into_iter()
2324 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2325 .collect()
2326 }
2327 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2328 };
2329 let rows: Vec<Vec<Value>> = type_names
2330 .into_iter()
2331 .filter_map(|name| {
2332 let lookup = match &schema {
2333 Some(s) => format!("{s}/{name}"),
2334 None => name.clone(),
2335 };
2336 let def = self.catalog.get_node_type(&lookup)?;
2337 let props: Vec<String> = def
2338 .properties
2339 .iter()
2340 .map(|p| {
2341 let nullable = if p.nullable { "" } else { " NOT NULL" };
2342 format!("{} {}{}", p.name, p.data_type, nullable)
2343 })
2344 .collect();
2345 let constraints: Vec<String> =
2346 def.constraints.iter().map(|c| format!("{c:?}")).collect();
2347 let parents = def.parent_types.join(", ");
2348 Some(vec![
2349 Value::from(name),
2350 Value::from(props.join(", ")),
2351 Value::from(constraints.join(", ")),
2352 Value::from(parents),
2353 ])
2354 })
2355 .collect();
2356 Ok(QueryResult {
2357 columns,
2358 column_types: Vec::new(),
2359 rows,
2360 ..QueryResult::empty()
2361 })
2362 }
2363
2364 fn execute_show_edge_types(&self) -> Result<QueryResult> {
2366 let columns = vec![
2367 "name".to_string(),
2368 "properties".to_string(),
2369 "source_types".to_string(),
2370 "target_types".to_string(),
2371 ];
2372 let schema = self.current_schema.lock().clone();
2373 let all_names = self.catalog.all_edge_type_names();
2374 let type_names: Vec<String> = match &schema {
2375 Some(s) => {
2376 let prefix = format!("{s}/");
2377 all_names
2378 .into_iter()
2379 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2380 .collect()
2381 }
2382 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2383 };
2384 let rows: Vec<Vec<Value>> = type_names
2385 .into_iter()
2386 .filter_map(|name| {
2387 let lookup = match &schema {
2388 Some(s) => format!("{s}/{name}"),
2389 None => name.clone(),
2390 };
2391 let def = self.catalog.get_edge_type_def(&lookup)?;
2392 let props: Vec<String> = def
2393 .properties
2394 .iter()
2395 .map(|p| {
2396 let nullable = if p.nullable { "" } else { " NOT NULL" };
2397 format!("{} {}{}", p.name, p.data_type, nullable)
2398 })
2399 .collect();
2400 let src = def.source_node_types.join(", ");
2401 let tgt = def.target_node_types.join(", ");
2402 Some(vec![
2403 Value::from(name),
2404 Value::from(props.join(", ")),
2405 Value::from(src),
2406 Value::from(tgt),
2407 ])
2408 })
2409 .collect();
2410 Ok(QueryResult {
2411 columns,
2412 column_types: Vec::new(),
2413 rows,
2414 ..QueryResult::empty()
2415 })
2416 }
2417
2418 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2420 let columns = vec![
2421 "name".to_string(),
2422 "open".to_string(),
2423 "node_types".to_string(),
2424 "edge_types".to_string(),
2425 ];
2426 let schema = self.current_schema.lock().clone();
2427 let all_names = self.catalog.all_graph_type_names();
2428 let type_names: Vec<String> = match &schema {
2429 Some(s) => {
2430 let prefix = format!("{s}/");
2431 all_names
2432 .into_iter()
2433 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2434 .collect()
2435 }
2436 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2437 };
2438 let rows: Vec<Vec<Value>> = type_names
2439 .into_iter()
2440 .filter_map(|name| {
2441 let lookup = match &schema {
2442 Some(s) => format!("{s}/{name}"),
2443 None => name.clone(),
2444 };
2445 let def = self.catalog.get_graph_type_def(&lookup)?;
2446 let strip = |n: &String| -> String {
2448 match &schema {
2449 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2450 None => n.clone(),
2451 }
2452 };
2453 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2454 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2455 Some(vec![
2456 Value::from(name),
2457 Value::from(def.open),
2458 Value::from(node_types.join(", ")),
2459 Value::from(edge_types.join(", ")),
2460 ])
2461 })
2462 .collect();
2463 Ok(QueryResult {
2464 columns,
2465 column_types: Vec::new(),
2466 rows,
2467 ..QueryResult::empty()
2468 })
2469 }
2470
2471 #[cfg(feature = "lpg")]
2477 fn execute_show_graphs(&self) -> Result<QueryResult> {
2478 let schema = self.current_schema.lock().clone();
2479 let all_names = self.store.graph_names();
2480
2481 let mut names: Vec<String> = match &schema {
2482 Some(s) => {
2483 let prefix = format!("{s}/");
2484 all_names
2485 .into_iter()
2486 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2487 .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2488 .collect()
2489 }
2490 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2491 };
2492 names.sort();
2493
2494 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2495 Ok(QueryResult {
2496 columns: vec!["name".to_string()],
2497 column_types: Vec::new(),
2498 rows,
2499 ..QueryResult::empty()
2500 })
2501 }
2502
2503 fn execute_show_schemas(&self) -> Result<QueryResult> {
2505 let mut names = self.catalog.schema_names();
2506 names.sort();
2507 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2508 Ok(QueryResult {
2509 columns: vec!["name".to_string()],
2510 column_types: Vec::new(),
2511 rows,
2512 ..QueryResult::empty()
2513 })
2514 }
2515
2516 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2518 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2519
2520 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2521 Error::Query(QueryError::new(
2522 QueryErrorKind::Semantic,
2523 format!("Graph type '{name}' not found"),
2524 ))
2525 })?;
2526
2527 let columns = vec![
2528 "name".to_string(),
2529 "open".to_string(),
2530 "node_types".to_string(),
2531 "edge_types".to_string(),
2532 ];
2533 let rows = vec![vec![
2534 Value::from(def.name),
2535 Value::from(def.open),
2536 Value::from(def.allowed_node_types.join(", ")),
2537 Value::from(def.allowed_edge_types.join(", ")),
2538 ]];
2539 Ok(QueryResult {
2540 columns,
2541 column_types: Vec::new(),
2542 rows,
2543 ..QueryResult::empty()
2544 })
2545 }
2546
2547 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2549 let graph_name = self
2550 .current_graph()
2551 .unwrap_or_else(|| "default".to_string());
2552 let columns = vec![
2553 "graph".to_string(),
2554 "graph_type".to_string(),
2555 "open".to_string(),
2556 "node_types".to_string(),
2557 "edge_types".to_string(),
2558 ];
2559
2560 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2561 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2562 {
2563 let rows = vec![vec![
2564 Value::from(graph_name),
2565 Value::from(type_name),
2566 Value::from(def.open),
2567 Value::from(def.allowed_node_types.join(", ")),
2568 Value::from(def.allowed_edge_types.join(", ")),
2569 ]];
2570 return Ok(QueryResult {
2571 columns,
2572 column_types: Vec::new(),
2573 rows,
2574 ..QueryResult::empty()
2575 });
2576 }
2577
2578 Ok(QueryResult {
2580 columns,
2581 column_types: Vec::new(),
2582 rows: vec![vec![
2583 Value::from(graph_name),
2584 Value::Null,
2585 Value::Null,
2586 Value::Null,
2587 Value::Null,
2588 ]],
2589 ..QueryResult::empty()
2590 })
2591 }
2592
2593 #[cfg(feature = "gql")]
2620 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2621 self.require_lpg("GQL")?;
2622
2623 #[cfg(feature = "testing-statement-injection")]
2624 grafeo_common::testing::statement_failure::maybe_fail_statement().map_err(|e| {
2625 grafeo_common::utils::error::Error::Internal(format!("injected failure: {e}"))
2626 })?;
2627
2628 use crate::query::{
2629 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2630 translators::gql,
2631 };
2632
2633 let _span = grafeo_info_span!(
2634 "grafeo::session::execute",
2635 language = "gql",
2636 query_len = query.len(),
2637 );
2638
2639 #[cfg(not(target_arch = "wasm32"))]
2640 let start_time = std::time::Instant::now();
2641
2642 let translation = gql::translate_full(query)?;
2644 let logical_plan = match translation {
2645 gql::GqlTranslationResult::SessionCommand(cmd) => {
2646 return self.execute_session_command(cmd);
2647 }
2648 #[cfg(feature = "lpg")]
2649 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2650 self.require_permission(crate::auth::StatementKind::Admin)?;
2652 if *self.read_only_tx.lock() {
2653 return Err(grafeo_common::utils::error::Error::Transaction(
2654 grafeo_common::utils::error::TransactionError::ReadOnly,
2655 ));
2656 }
2657 return self.execute_schema_command(cmd);
2658 }
2659 gql::GqlTranslationResult::Plan(plan) => {
2660 let read_only = *self.read_only_tx.lock();
2665 let need_check = read_only || !self.identity.can_admin();
2666 let is_mutation = need_check && plan.root.has_mutations();
2667 if is_mutation {
2668 self.require_permission(crate::auth::StatementKind::Write)?;
2669 }
2670 if read_only && is_mutation {
2671 return Err(grafeo_common::utils::error::Error::Transaction(
2672 grafeo_common::utils::error::TransactionError::ReadOnly,
2673 ));
2674 }
2675 plan
2676 }
2677 #[cfg(not(feature = "lpg"))]
2678 gql::GqlTranslationResult::SchemaCommand(_) => {
2679 return Err(grafeo_common::utils::error::Error::Internal(
2680 "Schema commands require the `lpg` feature".to_string(),
2681 ));
2682 }
2683 };
2684
2685 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2687
2688 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2690 cached_plan
2691 } else {
2692 let mut binder = Binder::new();
2694 let _binding_context = binder.bind(&logical_plan)?;
2695
2696 let active = self.active_store();
2698 let optimizer = Optimizer::from_graph_store(&*active);
2699 let plan = optimizer.optimize(logical_plan)?;
2700
2701 self.query_cache.put_optimized(cache_key, plan.clone());
2703
2704 plan
2705 };
2706
2707 let active = self.active_store();
2709
2710 if optimized_plan.explain {
2712 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2713 let mut plan = optimized_plan;
2714 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2715 return Ok(explain_result(&plan));
2716 }
2717
2718 if optimized_plan.profile {
2720 let has_mutations = optimized_plan.root.has_mutations();
2721 return self.with_auto_commit(has_mutations, || {
2722 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2723 let planner = self.create_planner_for_store(
2724 Arc::clone(&active),
2725 viewing_epoch,
2726 transaction_id,
2727 );
2728 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2729
2730 let executor = self.make_executor(physical_plan.columns.clone());
2731 let _result = executor.execute(physical_plan.operator.as_mut())?;
2732
2733 let total_time_ms;
2734 #[cfg(not(target_arch = "wasm32"))]
2735 {
2736 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2737 }
2738 #[cfg(target_arch = "wasm32")]
2739 {
2740 total_time_ms = 0.0;
2741 }
2742
2743 let profile_tree = crate::query::profile::build_profile_tree(
2744 &optimized_plan.root,
2745 &mut entries.into_iter(),
2746 );
2747 Ok(crate::query::profile::profile_result(
2748 &profile_tree,
2749 total_time_ms,
2750 ))
2751 });
2752 }
2753
2754 let has_mutations = optimized_plan.root.has_mutations();
2755
2756 let result = self.with_auto_commit(has_mutations, || {
2757 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2759
2760 let has_active_tx = self.current_transaction.lock().is_some();
2765 let read_only = !has_mutations && !has_active_tx;
2766 let planner = self.create_planner_for_store_with_read_only(
2767 Arc::clone(&active),
2768 viewing_epoch,
2769 transaction_id,
2770 read_only,
2771 );
2772 let physical_plan = planner.plan(&optimized_plan)?;
2773
2774 let executor = self.make_executor(physical_plan.columns.clone());
2776 let (mut source, push_ops) = {
2777 #[cfg(feature = "spill")]
2778 {
2779 let memory_ctx = self.make_operator_memory_context();
2780 grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2781 physical_plan.into_operator(),
2782 memory_ctx,
2783 )
2784 }
2785 #[cfg(not(feature = "spill"))]
2786 {
2787 grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2788 physical_plan.into_operator(),
2789 )
2790 }
2791 };
2792 let mut result = if push_ops.is_empty() {
2793 executor.execute(source.as_mut())?
2795 } else {
2796 executor.execute_pipeline(source, push_ops)?
2798 };
2799
2800 let rows_scanned = result.rows.len() as u64;
2802 #[cfg(not(target_arch = "wasm32"))]
2803 {
2804 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2805 result.execution_time_ms = Some(elapsed_ms);
2806 }
2807 result.rows_scanned = Some(rows_scanned);
2808
2809 Ok(result)
2810 });
2811
2812 #[cfg(feature = "metrics")]
2814 {
2815 #[cfg(not(target_arch = "wasm32"))]
2816 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2817 #[cfg(target_arch = "wasm32")]
2818 let elapsed_ms = None;
2819 self.record_query_metrics("gql", elapsed_ms, &result);
2820 }
2821
2822 result
2823 }
2824
2825 #[cfg(all(feature = "gql", feature = "lpg"))]
2846 pub fn execute_streaming(
2847 &self,
2848 query: &str,
2849 ) -> Result<crate::query::executor::stream::ResultStream<'_>> {
2850 use crate::query::executor::stream::{ResultStream, StreamGuard};
2851
2852 let (source, columns, deadline) = self.build_streaming_plan(query)?;
2853 let guard = StreamGuard::new(&self.active_streams);
2854 Ok(ResultStream::new(source, columns, deadline, guard))
2855 }
2856
2857 #[cfg(all(feature = "gql", feature = "lpg"))]
2865 pub(crate) fn build_streaming_plan(
2866 &self,
2867 query: &str,
2868 ) -> Result<(
2869 Box<dyn grafeo_core::execution::operators::Operator>,
2870 Vec<String>,
2871 Option<Instant>,
2872 )> {
2873 use crate::query::{
2874 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2875 translators::gql,
2876 };
2877
2878 self.require_lpg("GQL")?;
2879
2880 let _span = grafeo_info_span!(
2881 "grafeo::session::execute_streaming",
2882 language = "gql",
2883 query_len = query.len(),
2884 );
2885
2886 let translation = gql::translate_full(query)?;
2888 let logical_plan = match translation {
2889 gql::GqlTranslationResult::SessionCommand(_) => {
2890 return Err(grafeo_common::utils::error::Error::Query(
2891 grafeo_common::utils::error::QueryError::new(
2892 grafeo_common::utils::error::QueryErrorKind::Semantic,
2893 "session commands cannot be streamed; use execute() instead",
2894 ),
2895 ));
2896 }
2897 gql::GqlTranslationResult::SchemaCommand(_) => {
2898 return Err(grafeo_common::utils::error::Error::Query(
2899 grafeo_common::utils::error::QueryError::new(
2900 grafeo_common::utils::error::QueryErrorKind::Semantic,
2901 "schema DDL cannot be streamed; use execute() instead",
2902 ),
2903 ));
2904 }
2905 gql::GqlTranslationResult::Plan(plan) => {
2906 if plan.root.has_mutations() {
2907 return Err(grafeo_common::utils::error::Error::Query(
2908 grafeo_common::utils::error::QueryError::new(
2909 grafeo_common::utils::error::QueryErrorKind::Semantic,
2910 "mutating queries cannot be streamed; use execute() instead",
2911 ),
2912 ));
2913 }
2914 if !self.identity.can_admin() {
2915 self.require_permission(crate::auth::StatementKind::Read)?;
2916 }
2917 plan
2918 }
2919 };
2920
2921 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2923 let optimized_plan = if let Some(cached) = self.query_cache.get_optimized(&cache_key) {
2924 cached
2925 } else {
2926 let mut binder = Binder::new();
2927 let _binding_context = binder.bind(&logical_plan)?;
2928 let active = self.active_store();
2929 let optimizer = Optimizer::from_graph_store(&*active);
2930 let plan = optimizer.optimize(logical_plan)?;
2931 self.query_cache.put_optimized(cache_key, plan.clone());
2932 plan
2933 };
2934
2935 if optimized_plan.explain || optimized_plan.profile {
2936 return Err(grafeo_common::utils::error::Error::Query(
2937 grafeo_common::utils::error::QueryError::new(
2938 grafeo_common::utils::error::QueryErrorKind::Semantic,
2939 "EXPLAIN and PROFILE cannot be streamed; use execute() instead",
2940 ),
2941 ));
2942 }
2943
2944 let active = self.active_store();
2946 let has_active_tx = self.current_transaction.lock().is_some();
2947 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2948 let planner = self.create_planner_for_store_with_read_only(
2949 Arc::clone(&active),
2950 viewing_epoch,
2951 transaction_id,
2952 !has_active_tx,
2953 );
2954 let physical_plan = planner.plan(&optimized_plan)?;
2955 let columns = physical_plan.columns.clone();
2956
2957 let (source, push_ops) = {
2961 #[cfg(feature = "spill")]
2962 {
2963 let memory_ctx = self.make_operator_memory_context();
2964 grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2965 physical_plan.into_operator(),
2966 memory_ctx,
2967 )
2968 }
2969 #[cfg(not(feature = "spill"))]
2970 {
2971 grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2972 physical_plan.into_operator(),
2973 )
2974 }
2975 };
2976 if !push_ops.is_empty() {
2977 return Err(grafeo_common::utils::error::Error::Query(
2978 grafeo_common::utils::error::QueryError::new(
2979 grafeo_common::utils::error::QueryErrorKind::Semantic,
2980 "query requires a push-based pipeline (ORDER BY / aggregate / DISTINCT) \
2981 which cannot be streamed; use execute() instead",
2982 ),
2983 ));
2984 }
2985
2986 Ok((source, columns, self.query_deadline()))
2987 }
2988
2989 #[cfg(feature = "gql")]
2998 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2999 let previous = self.viewing_epoch_override.lock().replace(epoch);
3000 let result = self.execute(query);
3001 *self.viewing_epoch_override.lock() = previous;
3002 result
3003 }
3004
3005 #[cfg(feature = "gql")]
3013 pub fn execute_at_epoch_with_params(
3014 &self,
3015 query: &str,
3016 epoch: EpochId,
3017 params: Option<std::collections::HashMap<String, Value>>,
3018 ) -> Result<QueryResult> {
3019 let previous = self.viewing_epoch_override.lock().replace(epoch);
3020 let result = if let Some(p) = params {
3021 self.execute_with_params(query, p)
3022 } else {
3023 self.execute(query)
3024 };
3025 *self.viewing_epoch_override.lock() = previous;
3026 result
3027 }
3028
3029 #[cfg(feature = "gql")]
3035 pub fn execute_with_params(
3036 &self,
3037 query: &str,
3038 params: std::collections::HashMap<String, Value>,
3039 ) -> Result<QueryResult> {
3040 self.require_lpg("GQL")?;
3041
3042 use crate::query::processor::{QueryLanguage, QueryProcessor};
3043
3044 let has_mutations = if self.identity.can_write() {
3048 Self::query_looks_like_mutation(query)
3050 } else {
3051 use crate::query::translators::gql;
3053 match gql::translate(query) {
3054 Ok(plan) if plan.root.has_mutations() => {
3055 self.require_permission(crate::auth::StatementKind::Write)?;
3056 true
3057 }
3058 Ok(_) => false,
3059 Err(_) => Self::query_looks_like_mutation(query),
3061 }
3062 };
3063 let active = self.active_store();
3064
3065 self.with_auto_commit(has_mutations, || {
3066 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3068
3069 let processor = QueryProcessor::for_stores_with_transaction(
3071 Arc::clone(&active),
3072 self.active_write_store(),
3073 Arc::clone(&self.transaction_manager),
3074 )?;
3075
3076 let processor = if let Some(transaction_id) = transaction_id {
3078 processor.with_transaction_context(viewing_epoch, transaction_id)
3079 } else {
3080 processor
3081 };
3082
3083 processor.process(query, QueryLanguage::Gql, Some(¶ms))
3084 })
3085 }
3086
3087 #[cfg(not(any(feature = "gql", feature = "cypher")))]
3093 pub fn execute_with_params(
3094 &self,
3095 _query: &str,
3096 _params: std::collections::HashMap<String, Value>,
3097 ) -> Result<QueryResult> {
3098 Err(grafeo_common::utils::error::Error::Internal(
3099 "No query language enabled".to_string(),
3100 ))
3101 }
3102
3103 #[cfg(not(any(feature = "gql", feature = "cypher")))]
3109 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
3110 Err(grafeo_common::utils::error::Error::Internal(
3111 "No query language enabled".to_string(),
3112 ))
3113 }
3114
3115 #[cfg(feature = "cypher")]
3121 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
3122 use crate::query::{
3123 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
3124 translators::cypher,
3125 };
3126
3127 let translation = cypher::translate_full(query)?;
3129 match translation {
3130 #[cfg(feature = "lpg")]
3131 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
3132 use grafeo_common::utils::error::{
3133 Error as GrafeoError, QueryError, QueryErrorKind,
3134 };
3135 self.require_permission(crate::auth::StatementKind::Admin)?;
3136 if *self.read_only_tx.lock() {
3137 return Err(GrafeoError::Query(QueryError::new(
3138 QueryErrorKind::Semantic,
3139 "Cannot execute schema DDL in a read-only transaction",
3140 )));
3141 }
3142 return self.execute_schema_command(cmd);
3143 }
3144 #[cfg(not(feature = "lpg"))]
3145 cypher::CypherTranslationResult::SchemaCommand(_) => {
3146 return Err(grafeo_common::utils::error::Error::Internal(
3147 "Schema DDL requires the `lpg` feature".to_string(),
3148 ));
3149 }
3150 cypher::CypherTranslationResult::ShowIndexes => {
3151 return self.execute_show_indexes();
3152 }
3153 cypher::CypherTranslationResult::ShowConstraints => {
3154 return self.execute_show_constraints();
3155 }
3156 cypher::CypherTranslationResult::ShowCurrentGraphType => {
3157 return self.execute_show_current_graph_type();
3158 }
3159 cypher::CypherTranslationResult::Plan(_) => {
3160 }
3162 }
3163
3164 #[cfg(not(target_arch = "wasm32"))]
3165 let start_time = std::time::Instant::now();
3166
3167 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
3169
3170 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3172 cached_plan
3173 } else {
3174 let logical_plan = cypher::translate(query)?;
3176
3177 let mut binder = Binder::new();
3179 let _binding_context = binder.bind(&logical_plan)?;
3180
3181 let active = self.active_store();
3183 let optimizer = Optimizer::from_graph_store(&*active);
3184 let plan = optimizer.optimize(logical_plan)?;
3185
3186 self.query_cache.put_optimized(cache_key, plan.clone());
3188
3189 plan
3190 };
3191
3192 if optimized_plan.root.has_mutations() {
3194 self.require_permission(crate::auth::StatementKind::Write)?;
3195 }
3196
3197 let active = self.active_store();
3199
3200 if optimized_plan.explain {
3202 use crate::query::processor::{annotate_pushdown_hints, explain_result};
3203 let mut plan = optimized_plan;
3204 annotate_pushdown_hints(&mut plan.root, active.as_ref());
3205 return Ok(explain_result(&plan));
3206 }
3207
3208 if optimized_plan.profile {
3210 let has_mutations = optimized_plan.root.has_mutations();
3211 return self.with_auto_commit(has_mutations, || {
3212 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3213 let planner = self.create_planner_for_store(
3214 Arc::clone(&active),
3215 viewing_epoch,
3216 transaction_id,
3217 );
3218 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
3219
3220 let executor = self.make_executor(physical_plan.columns.clone());
3221 let _result = executor.execute(physical_plan.operator.as_mut())?;
3222
3223 let total_time_ms;
3224 #[cfg(not(target_arch = "wasm32"))]
3225 {
3226 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
3227 }
3228 #[cfg(target_arch = "wasm32")]
3229 {
3230 total_time_ms = 0.0;
3231 }
3232
3233 let profile_tree = crate::query::profile::build_profile_tree(
3234 &optimized_plan.root,
3235 &mut entries.into_iter(),
3236 );
3237 Ok(crate::query::profile::profile_result(
3238 &profile_tree,
3239 total_time_ms,
3240 ))
3241 });
3242 }
3243
3244 let has_mutations = optimized_plan.root.has_mutations();
3245
3246 let result = self.with_auto_commit(has_mutations, || {
3247 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3249
3250 let planner =
3252 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3253 let mut physical_plan = planner.plan(&optimized_plan)?;
3254
3255 let executor = self.make_executor(physical_plan.columns.clone());
3257 executor.execute(physical_plan.operator.as_mut())
3258 });
3259
3260 #[cfg(feature = "metrics")]
3261 {
3262 #[cfg(not(target_arch = "wasm32"))]
3263 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3264 #[cfg(target_arch = "wasm32")]
3265 let elapsed_ms = None;
3266 self.record_query_metrics("cypher", elapsed_ms, &result);
3267 }
3268
3269 result
3270 }
3271
3272 #[cfg(feature = "gremlin")]
3296 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
3297 use crate::query::{binder::Binder, optimizer::Optimizer, translators::gremlin};
3298
3299 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3300 let start_time = Instant::now();
3301
3302 let logical_plan = gremlin::translate(query)?;
3304
3305 let mut binder = Binder::new();
3307 let _binding_context = binder.bind(&logical_plan)?;
3308
3309 let active = self.active_store();
3311 let optimizer = Optimizer::from_graph_store(&*active);
3312 let optimized_plan = optimizer.optimize(logical_plan)?;
3313
3314 let has_mutations = optimized_plan.root.has_mutations();
3315 if has_mutations {
3316 self.require_permission(crate::auth::StatementKind::Write)?;
3317 }
3318
3319 let result = self.with_auto_commit(has_mutations, || {
3320 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3322
3323 let planner =
3325 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3326 let mut physical_plan = planner.plan(&optimized_plan)?;
3327
3328 let executor = self.make_executor(physical_plan.columns.clone());
3330 executor.execute(physical_plan.operator.as_mut())
3331 });
3332
3333 #[cfg(feature = "metrics")]
3334 {
3335 #[cfg(not(target_arch = "wasm32"))]
3336 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3337 #[cfg(target_arch = "wasm32")]
3338 let elapsed_ms = None;
3339 self.record_query_metrics("gremlin", elapsed_ms, &result);
3340 }
3341
3342 result
3343 }
3344
3345 #[cfg(feature = "gremlin")]
3351 pub fn execute_gremlin_with_params(
3352 &self,
3353 query: &str,
3354 params: std::collections::HashMap<String, Value>,
3355 ) -> Result<QueryResult> {
3356 use crate::query::{
3357 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3358 translators::gremlin,
3359 };
3360
3361 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3362 let start_time = Instant::now();
3363
3364 let mut logical_plan = gremlin::translate(query)?;
3366
3367 substitute_params(&mut logical_plan, ¶ms)?;
3369
3370 let mut binder = Binder::new();
3372 let _binding_context = binder.bind(&logical_plan)?;
3373
3374 let active = self.active_store();
3376 let optimizer = Optimizer::from_graph_store(&*active);
3377 let optimized_plan = optimizer.optimize(logical_plan)?;
3378
3379 let has_mutations = optimized_plan.root.has_mutations();
3380 if has_mutations {
3381 self.require_permission(crate::auth::StatementKind::Write)?;
3382 }
3383
3384 let result = self.with_auto_commit(has_mutations, || {
3385 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3386 let planner =
3387 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3388 let mut physical_plan = planner.plan(&optimized_plan)?;
3389 let executor = self.make_executor(physical_plan.columns.clone());
3390 executor.execute(physical_plan.operator.as_mut())
3391 });
3392
3393 #[cfg(feature = "metrics")]
3394 {
3395 #[cfg(not(target_arch = "wasm32"))]
3396 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3397 #[cfg(target_arch = "wasm32")]
3398 let elapsed_ms = None;
3399 self.record_query_metrics("gremlin", elapsed_ms, &result);
3400 }
3401
3402 result
3403 }
3404
3405 #[cfg(feature = "graphql")]
3429 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3430 use crate::query::{
3431 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3432 translators::graphql,
3433 };
3434
3435 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3436 let start_time = Instant::now();
3437
3438 let mut logical_plan = graphql::translate(query)?;
3439
3440 if !logical_plan.default_params.is_empty() {
3442 let defaults = logical_plan.default_params.clone();
3443 substitute_params(&mut logical_plan, &defaults)?;
3444 }
3445
3446 let mut binder = Binder::new();
3447 let _binding_context = binder.bind(&logical_plan)?;
3448
3449 let active = self.active_store();
3450 let optimizer = Optimizer::from_graph_store(&*active);
3451 let optimized_plan = optimizer.optimize(logical_plan)?;
3452 let has_mutations = optimized_plan.root.has_mutations();
3453 if has_mutations {
3454 self.require_permission(crate::auth::StatementKind::Write)?;
3455 }
3456
3457 let result = self.with_auto_commit(has_mutations, || {
3458 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3459 let planner =
3460 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3461 let mut physical_plan = planner.plan(&optimized_plan)?;
3462 let executor = self.make_executor(physical_plan.columns.clone());
3463 executor.execute(physical_plan.operator.as_mut())
3464 });
3465
3466 #[cfg(feature = "metrics")]
3467 {
3468 #[cfg(not(target_arch = "wasm32"))]
3469 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3470 #[cfg(target_arch = "wasm32")]
3471 let elapsed_ms = None;
3472 self.record_query_metrics("graphql", elapsed_ms, &result);
3473 }
3474
3475 result
3476 }
3477
3478 #[cfg(feature = "graphql")]
3484 pub fn execute_graphql_with_params(
3485 &self,
3486 query: &str,
3487 params: std::collections::HashMap<String, Value>,
3488 ) -> Result<QueryResult> {
3489 use crate::query::{
3490 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3491 translators::graphql,
3492 };
3493
3494 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3495 let start_time = Instant::now();
3496
3497 let mut logical_plan = graphql::translate(query)?;
3499
3500 if !logical_plan.default_params.is_empty() {
3502 let mut merged = logical_plan.default_params.clone();
3503 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3504 substitute_params(&mut logical_plan, &merged)?;
3505 } else {
3506 substitute_params(&mut logical_plan, ¶ms)?;
3507 }
3508
3509 let mut binder = Binder::new();
3511 let _binding_context = binder.bind(&logical_plan)?;
3512
3513 let active = self.active_store();
3515 let optimizer = Optimizer::from_graph_store(&*active);
3516 let optimized_plan = optimizer.optimize(logical_plan)?;
3517
3518 let has_mutations = optimized_plan.root.has_mutations();
3519 if has_mutations {
3520 self.require_permission(crate::auth::StatementKind::Write)?;
3521 }
3522
3523 let result = self.with_auto_commit(has_mutations, || {
3524 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3525 let planner =
3526 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3527 let mut physical_plan = planner.plan(&optimized_plan)?;
3528 let executor = self.make_executor(physical_plan.columns.clone());
3529 executor.execute(physical_plan.operator.as_mut())
3530 });
3531
3532 #[cfg(feature = "metrics")]
3533 {
3534 #[cfg(not(target_arch = "wasm32"))]
3535 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3536 #[cfg(target_arch = "wasm32")]
3537 let elapsed_ms = None;
3538 self.record_query_metrics("graphql", elapsed_ms, &result);
3539 }
3540
3541 result
3542 }
3543
3544 #[cfg(feature = "sql-pgq")]
3569 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3570 use crate::query::{
3571 binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3572 processor::QueryLanguage, translators::sql_pgq,
3573 };
3574
3575 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3576 let start_time = Instant::now();
3577
3578 let logical_plan = sql_pgq::translate(query)?;
3580
3581 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3583 self.require_permission(crate::auth::StatementKind::Admin)?;
3584 return Ok(QueryResult {
3585 columns: vec!["status".into()],
3586 column_types: vec![grafeo_common::types::LogicalType::String],
3587 rows: vec![vec![Value::from(format!(
3588 "Property graph '{}' created ({} node tables, {} edge tables)",
3589 cpg.name,
3590 cpg.node_tables.len(),
3591 cpg.edge_tables.len()
3592 ))]],
3593 execution_time_ms: None,
3594 rows_scanned: None,
3595 status_message: None,
3596 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3597 });
3598 }
3599
3600 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3601
3602 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3603 cached_plan
3604 } else {
3605 let mut binder = Binder::new();
3606 let _binding_context = binder.bind(&logical_plan)?;
3607 let active = self.active_store();
3608 let optimizer = Optimizer::from_graph_store(&*active);
3609 let plan = optimizer.optimize(logical_plan)?;
3610 self.query_cache.put_optimized(cache_key, plan.clone());
3611 plan
3612 };
3613
3614 let active = self.active_store();
3615 let has_mutations = optimized_plan.root.has_mutations();
3616 if has_mutations {
3617 self.require_permission(crate::auth::StatementKind::Write)?;
3618 }
3619
3620 let result = self.with_auto_commit(has_mutations, || {
3621 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3622 let planner =
3623 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3624 let mut physical_plan = planner.plan(&optimized_plan)?;
3625 let executor = self.make_executor(physical_plan.columns.clone());
3626 executor.execute(physical_plan.operator.as_mut())
3627 });
3628
3629 #[cfg(feature = "metrics")]
3630 {
3631 #[cfg(not(target_arch = "wasm32"))]
3632 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3633 #[cfg(target_arch = "wasm32")]
3634 let elapsed_ms = None;
3635 self.record_query_metrics("sql", elapsed_ms, &result);
3636 }
3637
3638 result
3639 }
3640
3641 #[cfg(feature = "sql-pgq")]
3647 pub fn execute_sql_with_params(
3648 &self,
3649 query: &str,
3650 params: std::collections::HashMap<String, Value>,
3651 ) -> Result<QueryResult> {
3652 use crate::query::processor::{QueryLanguage, QueryProcessor};
3653
3654 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3655 let start_time = Instant::now();
3656
3657 let has_mutations = if self.identity.can_write() {
3658 Self::query_looks_like_mutation(query)
3659 } else {
3660 use crate::query::translators::sql_pgq;
3661 match sql_pgq::translate(query) {
3662 Ok(plan) if plan.root.has_mutations() => {
3663 self.require_permission(crate::auth::StatementKind::Write)?;
3664 true
3665 }
3666 Ok(_) => false,
3667 Err(_) => Self::query_looks_like_mutation(query),
3668 }
3669 };
3670 if has_mutations {
3671 self.require_permission(crate::auth::StatementKind::Write)?;
3672 }
3673 let active = self.active_store();
3674
3675 let result = self.with_auto_commit(has_mutations, || {
3676 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3677 let processor = QueryProcessor::for_stores_with_transaction(
3678 Arc::clone(&active),
3679 self.active_write_store(),
3680 Arc::clone(&self.transaction_manager),
3681 )?;
3682 let processor = if let Some(transaction_id) = transaction_id {
3683 processor.with_transaction_context(viewing_epoch, transaction_id)
3684 } else {
3685 processor
3686 };
3687 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3688 });
3689
3690 #[cfg(feature = "metrics")]
3691 {
3692 #[cfg(not(target_arch = "wasm32"))]
3693 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3694 #[cfg(target_arch = "wasm32")]
3695 let elapsed_ms = None;
3696 self.record_query_metrics("sql", elapsed_ms, &result);
3697 }
3698
3699 result
3700 }
3701
3702 pub fn execute_language(
3711 &self,
3712 query: &str,
3713 language: &str,
3714 params: Option<std::collections::HashMap<String, Value>>,
3715 ) -> Result<QueryResult> {
3716 let _span = grafeo_info_span!(
3717 "grafeo::session::execute",
3718 language,
3719 query_len = query.len(),
3720 );
3721 match language {
3722 "gql" => {
3723 if let Some(p) = params {
3724 self.execute_with_params(query, p)
3725 } else {
3726 self.execute(query)
3727 }
3728 }
3729 #[cfg(feature = "cypher")]
3730 "cypher" => {
3731 if let Some(p) = params {
3732 use crate::query::processor::{QueryLanguage, QueryProcessor};
3733
3734 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3735 let start_time = Instant::now();
3736
3737 let has_mutations = if self.identity.can_write() {
3738 Self::query_looks_like_mutation(query)
3739 } else {
3740 use crate::query::translators::cypher;
3741 match cypher::translate(query) {
3742 Ok(plan) if plan.root.has_mutations() => {
3743 self.require_permission(crate::auth::StatementKind::Write)?;
3744 true
3745 }
3746 Ok(_) => false,
3747 Err(_) => Self::query_looks_like_mutation(query),
3748 }
3749 };
3750 let active = self.active_store();
3751 let result = self.with_auto_commit(has_mutations, || {
3752 let processor = QueryProcessor::for_stores_with_transaction(
3753 Arc::clone(&active),
3754 self.active_write_store(),
3755 Arc::clone(&self.transaction_manager),
3756 )?;
3757 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3758 let processor = if let Some(transaction_id) = transaction_id {
3759 processor.with_transaction_context(viewing_epoch, transaction_id)
3760 } else {
3761 processor
3762 };
3763 processor.process(query, QueryLanguage::Cypher, Some(&p))
3764 });
3765
3766 #[cfg(feature = "metrics")]
3767 {
3768 #[cfg(not(target_arch = "wasm32"))]
3769 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3770 #[cfg(target_arch = "wasm32")]
3771 let elapsed_ms = None;
3772 self.record_query_metrics("cypher", elapsed_ms, &result);
3773 }
3774
3775 result
3776 } else {
3777 self.execute_cypher(query)
3778 }
3779 }
3780 #[cfg(feature = "gremlin")]
3781 "gremlin" => {
3782 if let Some(p) = params {
3783 self.execute_gremlin_with_params(query, p)
3784 } else {
3785 self.execute_gremlin(query)
3786 }
3787 }
3788 #[cfg(feature = "graphql")]
3789 "graphql" => {
3790 if let Some(p) = params {
3791 self.execute_graphql_with_params(query, p)
3792 } else {
3793 self.execute_graphql(query)
3794 }
3795 }
3796 #[cfg(all(feature = "graphql", feature = "triple-store"))]
3797 "graphql-rdf" => {
3798 if let Some(p) = params {
3799 self.execute_graphql_rdf_with_params(query, p)
3800 } else {
3801 self.execute_graphql_rdf(query)
3802 }
3803 }
3804 #[cfg(feature = "sql-pgq")]
3805 "sql" | "sql-pgq" => {
3806 if let Some(p) = params {
3807 self.execute_sql_with_params(query, p)
3808 } else {
3809 self.execute_sql(query)
3810 }
3811 }
3812 #[cfg(all(feature = "sparql", feature = "triple-store"))]
3813 "sparql" => {
3814 if let Some(p) = params {
3815 self.execute_sparql_with_params(query, p)
3816 } else {
3817 self.execute_sparql(query)
3818 }
3819 }
3820 other => Err(grafeo_common::utils::error::Error::Query(
3821 grafeo_common::utils::error::QueryError::new(
3822 grafeo_common::utils::error::QueryErrorKind::Semantic,
3823 format!("Unknown query language: '{other}'"),
3824 ),
3825 )),
3826 }
3827 }
3828
3829 pub fn clear_plan_cache(&self) {
3856 self.query_cache.clear();
3857 }
3858
3859 #[cfg(feature = "lpg")]
3867 pub fn begin_transaction(&mut self) -> Result<()> {
3868 self.begin_transaction_inner(false, None)
3869 }
3870
3871 #[cfg(feature = "lpg")]
3879 pub fn begin_transaction_with_isolation(
3880 &mut self,
3881 isolation_level: crate::transaction::IsolationLevel,
3882 ) -> Result<()> {
3883 self.begin_transaction_inner(false, Some(isolation_level))
3884 }
3885
3886 #[cfg(feature = "lpg")]
3888 fn begin_transaction_inner(
3889 &self,
3890 read_only: bool,
3891 isolation_level: Option<crate::transaction::IsolationLevel>,
3892 ) -> Result<()> {
3893 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3894 let mut current = self.current_transaction.lock();
3895 if current.is_some() {
3896 drop(current);
3898 let mut depth = self.transaction_nesting_depth.lock();
3899 *depth += 1;
3900 let sp_name = format!("_nested_tx_{}", *depth);
3901 self.savepoint(&sp_name)?;
3902 return Ok(());
3903 }
3904
3905 let active = self.active_lpg_store();
3906 self.transaction_start_node_count
3907 .store(active.node_count(), Ordering::Relaxed);
3908 self.transaction_start_edge_count
3909 .store(active.edge_count(), Ordering::Relaxed);
3910 let transaction_id = if let Some(level) = isolation_level {
3911 self.transaction_manager.begin_with_isolation(level)
3912 } else {
3913 self.transaction_manager.begin()
3914 };
3915 *current = Some(transaction_id);
3916 *self.read_only_tx.lock() = read_only || self.db_read_only;
3917
3918 let key = self.active_graph_storage_key();
3921 let mut touched = self.touched_graphs.lock();
3922 touched.clear();
3923 touched.push(key);
3924
3925 #[cfg(feature = "metrics")]
3926 {
3927 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3928 #[cfg(not(target_arch = "wasm32"))]
3929 {
3930 *self.tx_start_time.lock() = Some(Instant::now());
3931 }
3932 }
3933
3934 Ok(())
3935 }
3936
3937 #[cfg(feature = "lpg")]
3945 pub fn commit(&mut self) -> Result<()> {
3946 self.commit_inner()
3947 }
3948
3949 #[cfg(feature = "lpg")]
3951 fn commit_inner(&self) -> Result<()> {
3952 let _span = grafeo_debug_span!("grafeo::tx::commit");
3953
3954 #[cfg(feature = "testing-statement-injection")]
3955 if let Err(e) = grafeo_common::testing::statement_failure::maybe_fail_commit() {
3956 let _ = self.rollback_inner();
3962 return Err(grafeo_common::utils::error::Error::Internal(format!(
3963 "injected commit failure: {e}"
3964 )));
3965 }
3966
3967 self.check_no_active_streams("commit")?;
3968 {
3970 let mut depth = self.transaction_nesting_depth.lock();
3971 if *depth > 0 {
3972 let sp_name = format!("_nested_tx_{depth}");
3973 *depth -= 1;
3974 drop(depth);
3975 return self.release_savepoint(&sp_name);
3976 }
3977 }
3978
3979 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3980 grafeo_common::utils::error::Error::Transaction(
3981 grafeo_common::utils::error::TransactionError::InvalidState(
3982 "No active transaction".to_string(),
3983 ),
3984 )
3985 })?;
3986
3987 let touched = std::mem::take(&mut *self.touched_graphs.lock());
3995 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3996 Ok(epoch) => epoch,
3997 Err(e) => {
3998 for graph_name in &touched {
4000 let store = self.resolve_store(graph_name);
4001 store.rollback_transaction_properties(transaction_id);
4002 }
4003 #[cfg(feature = "triple-store")]
4004 self.rollback_rdf_transaction(transaction_id);
4005 #[cfg(feature = "cdc")]
4007 if let Some(ref pending) = self.cdc_pending_events {
4008 pending.lock().clear();
4009 }
4010 *self.read_only_tx.lock() = self.db_read_only;
4011 self.savepoints.lock().clear();
4012 self.touched_graphs.lock().clear();
4013 #[cfg(feature = "metrics")]
4014 {
4015 crate::metrics::record_metric!(self.metrics, tx_active, dec);
4016 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
4017 #[cfg(not(target_arch = "wasm32"))]
4018 if let Some(start) = self.tx_start_time.lock().take() {
4019 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4020 crate::metrics::record_metric!(
4021 self.metrics,
4022 tx_duration,
4023 observe duration_ms
4024 );
4025 }
4026 }
4027 return Err(e);
4028 }
4029 };
4030
4031 for graph_name in &touched {
4033 let store = self.resolve_store(graph_name);
4034 store.finalize_version_epochs(transaction_id, commit_epoch);
4035 }
4036
4037 #[cfg(feature = "triple-store")]
4039 self.commit_rdf_transaction(transaction_id);
4040
4041 for graph_name in &touched {
4042 let store = self.resolve_store(graph_name);
4043 store.commit_transaction_properties(transaction_id);
4044 }
4045
4046 #[cfg(feature = "cdc")]
4050 if let Some(ref pending) = self.cdc_pending_events {
4051 let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
4052 self.cdc_log.record_batch(events.into_iter().map(|mut e| {
4053 e.epoch = commit_epoch;
4054 e
4055 }));
4056 }
4057
4058 #[cfg(feature = "wal")]
4063 if let Some(ref wal) = self.wal {
4064 use grafeo_storage::wal::WalRecord;
4065 if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
4066 grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
4067 }
4068 if let Err(e) = wal.log(&WalRecord::EpochAdvance {
4069 epoch: commit_epoch,
4070 }) {
4071 grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
4072 }
4073 }
4074
4075 let current_epoch = self.transaction_manager.current_epoch();
4078 for graph_name in &touched {
4079 let store = self.resolve_store(graph_name);
4080 store.sync_epoch(current_epoch);
4081 }
4082
4083 *self.read_only_tx.lock() = self.db_read_only;
4086 self.savepoints.lock().clear();
4087
4088 if self.gc_interval > 0 {
4090 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
4091 if count.is_multiple_of(self.gc_interval) {
4092 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
4093 let gc_start = std::time::Instant::now();
4094
4095 let min_epoch = self.transaction_manager.min_active_epoch();
4096 for graph_name in &touched {
4097 let store = self.resolve_store(graph_name);
4098 store.gc_versions(min_epoch);
4099 }
4100 self.transaction_manager.gc();
4101
4102 #[cfg(feature = "metrics")]
4103 {
4104 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
4105 #[cfg(not(target_arch = "wasm32"))]
4106 {
4107 let gc_duration_ms = gc_start.elapsed().as_secs_f64() * 1000.0;
4108 crate::metrics::record_metric!(
4109 self.metrics,
4110 gc_duration,
4111 observe gc_duration_ms
4112 );
4113 }
4114 }
4115 }
4116 }
4117
4118 #[cfg(feature = "metrics")]
4119 {
4120 crate::metrics::record_metric!(self.metrics, tx_active, dec);
4121 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
4122 #[cfg(not(target_arch = "wasm32"))]
4123 if let Some(start) = self.tx_start_time.lock().take() {
4124 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4125 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4126 }
4127 }
4128
4129 Ok(())
4130 }
4131
4132 #[cfg(feature = "lpg")]
4156 pub fn rollback(&mut self) -> Result<()> {
4157 self.rollback_inner()
4158 }
4159
4160 #[cfg(feature = "lpg")]
4162 fn rollback_inner(&self) -> Result<()> {
4163 let _span = grafeo_debug_span!("grafeo::tx::rollback");
4164 self.check_no_active_streams("rollback")?;
4165 {
4167 let mut depth = self.transaction_nesting_depth.lock();
4168 if *depth > 0 {
4169 let sp_name = format!("_nested_tx_{depth}");
4170 *depth -= 1;
4171 drop(depth);
4172 return self.rollback_to_savepoint(&sp_name);
4173 }
4174 }
4175
4176 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
4177 grafeo_common::utils::error::Error::Transaction(
4178 grafeo_common::utils::error::TransactionError::InvalidState(
4179 "No active transaction".to_string(),
4180 ),
4181 )
4182 })?;
4183
4184 *self.read_only_tx.lock() = self.db_read_only;
4186
4187 let touched = self.touched_graphs.lock().clone();
4189 for graph_name in &touched {
4190 let store = self.resolve_store(graph_name);
4191 store.discard_uncommitted_versions(transaction_id);
4192 }
4193
4194 #[cfg(feature = "triple-store")]
4196 self.rollback_rdf_transaction(transaction_id);
4197
4198 #[cfg(feature = "cdc")]
4200 if let Some(ref pending) = self.cdc_pending_events {
4201 pending.lock().clear();
4202 }
4203
4204 self.savepoints.lock().clear();
4206 self.touched_graphs.lock().clear();
4207
4208 let result = self.transaction_manager.abort(transaction_id);
4210
4211 #[cfg(feature = "metrics")]
4212 if result.is_ok() {
4213 crate::metrics::record_metric!(self.metrics, tx_active, dec);
4214 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
4215 #[cfg(not(target_arch = "wasm32"))]
4216 if let Some(start) = self.tx_start_time.lock().take() {
4217 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4218 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4219 }
4220 }
4221
4222 result
4223 }
4224
4225 #[cfg(feature = "lpg")]
4235 pub fn savepoint(&self, name: &str) -> Result<()> {
4236 let tx_id = self.current_transaction.lock().ok_or_else(|| {
4237 grafeo_common::utils::error::Error::Transaction(
4238 grafeo_common::utils::error::TransactionError::InvalidState(
4239 "No active transaction".to_string(),
4240 ),
4241 )
4242 })?;
4243
4244 let touched = self.touched_graphs.lock().clone();
4246 let graph_snapshots: Vec<GraphSavepoint> = touched
4247 .iter()
4248 .map(|graph_name| {
4249 let store = self.resolve_store(graph_name);
4250 GraphSavepoint {
4251 graph_name: graph_name.clone(),
4252 next_node_id: store.peek_next_node_id(),
4253 next_edge_id: store.peek_next_edge_id(),
4254 undo_log_position: store.property_undo_log_position(tx_id),
4255 }
4256 })
4257 .collect();
4258
4259 self.savepoints.lock().push(SavepointState {
4260 name: name.to_string(),
4261 graph_snapshots,
4262 active_graph: self.current_graph.lock().clone(),
4263 #[cfg(feature = "cdc")]
4264 cdc_event_position: self
4265 .cdc_pending_events
4266 .as_ref()
4267 .map_or(0, |p| p.lock().len()),
4268 });
4269 Ok(())
4270 }
4271
4272 #[cfg(feature = "lpg")]
4281 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4282 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
4283 grafeo_common::utils::error::Error::Transaction(
4284 grafeo_common::utils::error::TransactionError::InvalidState(
4285 "No active transaction".to_string(),
4286 ),
4287 )
4288 })?;
4289
4290 let mut savepoints = self.savepoints.lock();
4291
4292 let pos = savepoints
4294 .iter()
4295 .rposition(|sp| sp.name == name)
4296 .ok_or_else(|| {
4297 grafeo_common::utils::error::Error::Transaction(
4298 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4299 "Savepoint '{name}' not found"
4300 )),
4301 )
4302 })?;
4303
4304 let sp_state = savepoints[pos].clone();
4305
4306 savepoints.truncate(pos);
4308 drop(savepoints);
4309
4310 for gs in &sp_state.graph_snapshots {
4312 let store = self.resolve_store(&gs.graph_name);
4313
4314 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
4316
4317 let current_next_node = store.peek_next_node_id();
4319 let current_next_edge = store.peek_next_edge_id();
4320
4321 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
4322 .map(NodeId::new)
4323 .collect();
4324 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
4325 .map(EdgeId::new)
4326 .collect();
4327
4328 if !node_ids.is_empty() || !edge_ids.is_empty() {
4329 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
4330 }
4331 }
4332
4333 let touched = self.touched_graphs.lock().clone();
4337 for graph_name in &touched {
4338 let already_captured = sp_state
4339 .graph_snapshots
4340 .iter()
4341 .any(|gs| gs.graph_name == *graph_name);
4342 if !already_captured {
4343 let store = self.resolve_store(graph_name);
4344 store.discard_uncommitted_versions(transaction_id);
4345 }
4346 }
4347
4348 #[cfg(feature = "cdc")]
4350 if let Some(ref pending) = self.cdc_pending_events {
4351 pending.lock().truncate(sp_state.cdc_event_position);
4352 }
4353
4354 let mut touched = self.touched_graphs.lock();
4356 touched.clear();
4357 for gs in &sp_state.graph_snapshots {
4358 if !touched.contains(&gs.graph_name) {
4359 touched.push(gs.graph_name.clone());
4360 }
4361 }
4362
4363 Ok(())
4364 }
4365
4366 pub fn release_savepoint(&self, name: &str) -> Result<()> {
4372 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4373 grafeo_common::utils::error::Error::Transaction(
4374 grafeo_common::utils::error::TransactionError::InvalidState(
4375 "No active transaction".to_string(),
4376 ),
4377 )
4378 })?;
4379
4380 let mut savepoints = self.savepoints.lock();
4381 let pos = savepoints
4382 .iter()
4383 .rposition(|sp| sp.name == name)
4384 .ok_or_else(|| {
4385 grafeo_common::utils::error::Error::Transaction(
4386 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4387 "Savepoint '{name}' not found"
4388 )),
4389 )
4390 })?;
4391 savepoints.remove(pos);
4392 Ok(())
4393 }
4394
4395 #[must_use]
4397 pub fn in_transaction(&self) -> bool {
4398 self.current_transaction.lock().is_some()
4399 }
4400
4401 #[must_use]
4403 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4404 *self.current_transaction.lock()
4405 }
4406
4407 #[must_use]
4409 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4410 &self.transaction_manager
4411 }
4412
4413 #[cfg(feature = "lpg")]
4415 #[must_use]
4416 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4417 (
4418 self.transaction_start_node_count.load(Ordering::Relaxed),
4419 self.active_lpg_store().node_count(),
4420 )
4421 }
4422
4423 #[cfg(feature = "lpg")]
4425 #[must_use]
4426 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4427 (
4428 self.transaction_start_edge_count.load(Ordering::Relaxed),
4429 self.active_lpg_store().edge_count(),
4430 )
4431 }
4432
4433 #[cfg(feature = "lpg")]
4467 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4468 crate::transaction::PreparedCommit::new(self)
4469 }
4470
4471 pub fn set_auto_commit(&mut self, auto_commit: bool) {
4473 self.auto_commit = auto_commit;
4474 }
4475
4476 #[must_use]
4478 pub fn auto_commit(&self) -> bool {
4479 self.auto_commit
4480 }
4481
4482 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4487 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4488 }
4489
4490 #[cfg(feature = "lpg")]
4493 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4494 where
4495 F: FnOnce() -> Result<QueryResult>,
4496 {
4497 if self.needs_auto_commit(has_mutations) {
4498 self.begin_transaction_inner(false, None)?;
4499 match body() {
4500 Ok(result) => {
4501 self.commit_inner()?;
4502 Ok(result)
4503 }
4504 Err(e) => {
4505 let _ = self.rollback_inner();
4506 Err(e)
4507 }
4508 }
4509 } else {
4510 body()
4511 }
4512 }
4513
4514 #[cfg(not(feature = "lpg"))]
4516 fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4517 where
4518 F: FnOnce() -> Result<QueryResult>,
4519 {
4520 body()
4521 }
4522
4523 fn query_looks_like_mutation(query: &str) -> bool {
4529 let upper = query.to_ascii_uppercase();
4530 upper.contains("INSERT")
4531 || upper.contains("CREATE")
4532 || upper.contains("DELETE")
4533 || upper.contains("MERGE")
4534 || upper.contains("SET")
4535 || upper.contains("REMOVE")
4536 || upper.contains("DROP")
4537 || upper.contains("ALTER")
4538 }
4539
4540 #[cfg(feature = "lpg")]
4543 fn check_no_active_streams(&self, op: &str) -> Result<()> {
4544 if self.active_streams.load(Ordering::Acquire) > 0 {
4545 return Err(grafeo_common::utils::error::Error::Transaction(
4546 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4547 "Cannot {op} while streaming results are active; drop the stream first"
4548 )),
4549 ));
4550 }
4551 Ok(())
4552 }
4553
4554 #[must_use]
4556 fn query_deadline(&self) -> Option<Instant> {
4557 #[cfg(not(target_arch = "wasm32"))]
4558 {
4559 self.query_timeout.map(|d| Instant::now() + d)
4560 }
4561 #[cfg(target_arch = "wasm32")]
4562 {
4563 let _ = &self.query_timeout;
4564 None
4565 }
4566 }
4567
4568 fn make_executor(&self, columns: Vec<String>) -> Executor {
4570 Executor::with_columns(columns)
4571 .with_deadline(self.query_deadline())
4572 .with_timeout_duration(self.query_timeout)
4573 }
4574
4575 #[cfg(feature = "spill")]
4580 fn make_operator_memory_context(
4581 &self,
4582 ) -> Option<grafeo_core::execution::OperatorMemoryContext> {
4583 let bm = self.buffer_manager.as_ref()?;
4584 let spill_path = bm.config().spill_path.as_ref()?;
4585 let query_id = self
4587 .commit_counter
4588 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4589 let query_dir = spill_path.join(format!("query_{query_id}"));
4590 let sm = std::sync::Arc::new(grafeo_core::execution::SpillManager::new(&query_dir).ok()?);
4591 Some(grafeo_core::execution::OperatorMemoryContext::new(
4592 std::sync::Arc::clone(bm),
4593 sm,
4594 ))
4595 }
4596
4597 fn check_property_size(&self, key: &str, value: &Value) -> Result<()> {
4599 if let Some(limit) = self.max_property_size {
4600 let size = value.estimated_size_bytes();
4601 if size > limit {
4602 let limit_display = if limit >= 1024 * 1024 && limit % (1024 * 1024) == 0 {
4603 format!("{} MiB", limit / (1024 * 1024))
4604 } else if limit >= 1024 && limit % 1024 == 0 {
4605 format!("{} KiB", limit / 1024)
4606 } else {
4607 format!("{limit} bytes")
4608 };
4609 return Err(grafeo_common::utils::error::Error::Query(
4610 grafeo_common::utils::error::QueryError::new(
4611 grafeo_common::utils::error::QueryErrorKind::Execution,
4612 format!(
4613 "Property '{key}' value exceeds maximum size of {limit_display} ({size} bytes)"
4614 ),
4615 )
4616 .with_hint(
4617 "Increase with Config::with_max_property_size() or disable with Config::without_max_property_size()".to_string(),
4618 ),
4619 ));
4620 }
4621 }
4622 Ok(())
4623 }
4624
4625 #[cfg(feature = "metrics")]
4631 fn record_query_metrics(
4632 &self,
4633 language: &str,
4634 elapsed_ms: Option<f64>,
4635 result: &Result<crate::database::QueryResult>,
4636 ) {
4637 use crate::metrics::record_metric;
4638
4639 record_metric!(self.metrics, query_count, inc);
4640 if let Some(ref reg) = self.metrics {
4641 reg.query_count_by_language.increment(language);
4642 }
4643 if let Some(ms) = elapsed_ms {
4644 record_metric!(self.metrics, query_latency, observe ms);
4645 }
4646 match result {
4647 Ok(r) => {
4648 let returned = r.rows.len() as u64;
4649 record_metric!(self.metrics, rows_returned, add returned);
4650 if let Some(scanned) = r.rows_scanned {
4651 record_metric!(self.metrics, rows_scanned, add scanned);
4652 }
4653 }
4654 Err(e) => {
4655 record_metric!(self.metrics, query_errors, inc);
4656 let msg = e.to_string();
4658 if msg.contains("exceeded timeout") {
4659 record_metric!(self.metrics, query_timeouts, inc);
4660 }
4661 }
4662 }
4663 }
4664
4665 #[cfg(feature = "gql")]
4667 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4668 use grafeo_adapters::query::gql::ast::{Expression, Literal};
4669 match expr {
4670 Expression::Literal(Literal::Integer(n)) => Some(*n),
4671 _ => None,
4672 }
4673 }
4674
4675 #[must_use]
4681 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4682 if let Some(epoch) = *self.viewing_epoch_override.lock() {
4684 return (epoch, None);
4685 }
4686
4687 if let Some(transaction_id) = *self.current_transaction.lock() {
4688 let epoch = self
4690 .transaction_manager
4691 .start_epoch(transaction_id)
4692 .unwrap_or_else(|| self.transaction_manager.current_epoch());
4693 (epoch, Some(transaction_id))
4694 } else {
4695 (self.transaction_manager.current_epoch(), None)
4697 }
4698 }
4699
4700 fn create_planner_for_store(
4705 &self,
4706 store: Arc<dyn GraphStoreSearch>,
4707 viewing_epoch: EpochId,
4708 transaction_id: Option<TransactionId>,
4709 ) -> crate::query::Planner {
4710 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4711 }
4712
4713 fn create_planner_for_store_with_read_only(
4714 &self,
4715 store: Arc<dyn GraphStoreSearch>,
4716 viewing_epoch: EpochId,
4717 transaction_id: Option<TransactionId>,
4718 read_only: bool,
4719 ) -> crate::query::Planner {
4720 use crate::query::Planner;
4721 use grafeo_core::execution::operators::{LazyValue, SessionContext};
4722
4723 let info_store = Arc::clone(&store);
4725 let schema_store = Arc::clone(&store);
4726
4727 let session_context = SessionContext {
4728 current_schema: self.current_schema(),
4729 current_graph: self.current_graph(),
4730 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4731 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4732 };
4733
4734 let write_store = self.active_write_store();
4735
4736 let mut planner = Planner::with_context(
4737 Arc::clone(&store),
4738 write_store,
4739 Arc::clone(&self.transaction_manager),
4740 transaction_id,
4741 viewing_epoch,
4742 )
4743 .with_factorized_execution(self.factorized_execution)
4744 .with_catalog(Arc::clone(&self.catalog))
4745 .with_session_context(session_context)
4746 .with_read_only(read_only);
4747
4748 #[cfg(feature = "lpg")]
4753 if matches!(self.lpg_backend, LpgBackend::Active) {
4754 planner = planner.with_lpg_store(Arc::clone(&self.store));
4755 }
4756
4757 let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog))
4759 .with_store(store)
4760 .with_max_property_size(self.max_property_size);
4761 planner = planner.with_validator(Arc::new(validator));
4762
4763 planner
4764 }
4765
4766 fn build_info_value(store: &dyn GraphStore) -> Value {
4768 use grafeo_common::types::PropertyKey;
4769 use std::collections::BTreeMap;
4770
4771 let mut map = BTreeMap::new();
4772 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4773 #[allow(clippy::cast_possible_wrap)]
4775 let node_count = store.node_count() as i64;
4776 #[allow(clippy::cast_possible_wrap)]
4778 let edge_count = store.edge_count() as i64;
4779 map.insert(PropertyKey::from("node_count"), Value::Int64(node_count));
4780 map.insert(PropertyKey::from("edge_count"), Value::Int64(edge_count));
4781 map.insert(
4782 PropertyKey::from("version"),
4783 Value::String(env!("CARGO_PKG_VERSION").into()),
4784 );
4785 Value::Map(map.into())
4786 }
4787
4788 fn build_schema_value(store: &dyn GraphStore) -> Value {
4790 use grafeo_common::types::PropertyKey;
4791 use std::collections::BTreeMap;
4792
4793 let labels: Vec<Value> = store
4794 .all_labels()
4795 .into_iter()
4796 .map(|l| Value::String(l.into()))
4797 .collect();
4798 let edge_types: Vec<Value> = store
4799 .all_edge_types()
4800 .into_iter()
4801 .map(|t| Value::String(t.into()))
4802 .collect();
4803 let property_keys: Vec<Value> = store
4804 .all_property_keys()
4805 .into_iter()
4806 .map(|k| Value::String(k.into()))
4807 .collect();
4808
4809 let mut map = BTreeMap::new();
4810 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4811 map.insert(
4812 PropertyKey::from("edge_types"),
4813 Value::List(edge_types.into()),
4814 );
4815 map.insert(
4816 PropertyKey::from("property_keys"),
4817 Value::List(property_keys.into()),
4818 );
4819 Value::Map(map.into())
4820 }
4821
4822 #[cfg(feature = "lpg")]
4827 pub fn create_node(&self, labels: &[&str]) -> NodeId {
4828 let (epoch, transaction_id) = self.get_transaction_context();
4829 self.active_lpg_store().create_node_versioned(
4830 labels,
4831 epoch,
4832 transaction_id.unwrap_or(TransactionId::SYSTEM),
4833 )
4834 }
4835
4836 #[cfg(feature = "lpg")]
4844 pub fn create_node_with_props<'a>(
4845 &self,
4846 labels: &[&str],
4847 properties: impl IntoIterator<Item = (&'a str, Value)>,
4848 ) -> Result<NodeId> {
4849 let props: Vec<(&str, Value)> = properties.into_iter().collect();
4850 for (key, value) in &props {
4851 self.check_property_size(key, value)?;
4852 }
4853 let (epoch, transaction_id) = self.get_transaction_context();
4854 Ok(self.active_lpg_store().create_node_with_props_versioned(
4855 labels,
4856 props,
4857 epoch,
4858 transaction_id.unwrap_or(TransactionId::SYSTEM),
4859 ))
4860 }
4861
4862 #[cfg(feature = "lpg")]
4867 pub fn create_edge(
4868 &self,
4869 src: NodeId,
4870 dst: NodeId,
4871 edge_type: &str,
4872 ) -> grafeo_common::types::EdgeId {
4873 let (epoch, transaction_id) = self.get_transaction_context();
4874 self.active_lpg_store().create_edge_versioned(
4875 src,
4876 dst,
4877 edge_type,
4878 epoch,
4879 transaction_id.unwrap_or(TransactionId::SYSTEM),
4880 )
4881 }
4882
4883 #[cfg(feature = "lpg")]
4889 pub fn create_edge_with_props<'a>(
4890 &self,
4891 src: NodeId,
4892 dst: NodeId,
4893 edge_type: &str,
4894 properties: impl IntoIterator<Item = (&'a str, Value)>,
4895 ) -> Result<grafeo_common::types::EdgeId> {
4896 let props: Vec<(&str, Value)> = properties.into_iter().collect();
4897 for (key, value) in &props {
4898 self.check_property_size(key, value)?;
4899 }
4900 let (epoch, transaction_id) = self.get_transaction_context();
4901 let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4902 let store = self.active_lpg_store();
4903 let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4904 for (key, value) in props {
4905 store.set_edge_property_versioned(eid, key, value, tid);
4906 }
4907 Ok(eid)
4908 }
4909
4910 #[cfg(feature = "lpg")]
4916 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) -> Result<()> {
4917 self.check_property_size(key, &value)?;
4918 let (_, transaction_id) = self.get_transaction_context();
4919 if let Some(tid) = transaction_id {
4920 self.active_lpg_store()
4921 .set_node_property_versioned(id, key, value, tid);
4922 } else {
4923 self.active_lpg_store().set_node_property(id, key, value);
4924 }
4925 Ok(())
4926 }
4927
4928 #[cfg(feature = "lpg")]
4934 pub fn set_edge_property(
4935 &self,
4936 id: grafeo_common::types::EdgeId,
4937 key: &str,
4938 value: Value,
4939 ) -> Result<()> {
4940 self.check_property_size(key, &value)?;
4941 let (_, transaction_id) = self.get_transaction_context();
4942 if let Some(tid) = transaction_id {
4943 self.active_lpg_store()
4944 .set_edge_property_versioned(id, key, value, tid);
4945 } else {
4946 self.active_lpg_store().set_edge_property(id, key, value);
4947 }
4948 Ok(())
4949 }
4950
4951 #[cfg(feature = "lpg")]
4953 pub fn delete_node(&self, id: NodeId) -> bool {
4954 let (epoch, transaction_id) = self.get_transaction_context();
4955 if let Some(tid) = transaction_id {
4956 self.active_lpg_store()
4957 .delete_node_versioned(id, epoch, tid)
4958 } else {
4959 self.active_lpg_store().delete_node(id)
4960 }
4961 }
4962
4963 #[cfg(feature = "lpg")]
4965 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4966 let (epoch, transaction_id) = self.get_transaction_context();
4967 if let Some(tid) = transaction_id {
4968 self.active_lpg_store()
4969 .delete_edge_versioned(id, epoch, tid)
4970 } else {
4971 self.active_lpg_store().delete_edge(id)
4972 }
4973 }
4974
4975 #[cfg(feature = "lpg")]
5003 #[must_use]
5004 pub fn get_node(&self, id: NodeId) -> Option<Node> {
5005 let (epoch, transaction_id) = self.get_transaction_context();
5006 self.active_lpg_store().get_node_versioned(
5007 id,
5008 epoch,
5009 transaction_id.unwrap_or(TransactionId::SYSTEM),
5010 )
5011 }
5012
5013 #[cfg(feature = "lpg")]
5037 #[must_use]
5038 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
5039 self.get_node(id)
5040 .and_then(|node| node.get_property(key).cloned())
5041 }
5042
5043 #[cfg(feature = "lpg")]
5050 #[must_use]
5051 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
5052 let (epoch, transaction_id) = self.get_transaction_context();
5053 self.active_lpg_store().get_edge_versioned(
5054 id,
5055 epoch,
5056 transaction_id.unwrap_or(TransactionId::SYSTEM),
5057 )
5058 }
5059
5060 #[cfg(feature = "lpg")]
5086 #[must_use]
5087 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5088 self.active_lpg_store()
5089 .edges_from(node, Direction::Outgoing)
5090 .collect()
5091 }
5092
5093 #[cfg(feature = "lpg")]
5102 #[must_use]
5103 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5104 self.active_lpg_store()
5105 .edges_from(node, Direction::Incoming)
5106 .collect()
5107 }
5108
5109 #[cfg(feature = "lpg")]
5121 #[must_use]
5122 pub fn get_neighbors_outgoing_by_type(
5123 &self,
5124 node: NodeId,
5125 edge_type: &str,
5126 ) -> Vec<(NodeId, EdgeId)> {
5127 self.active_lpg_store()
5128 .edges_from(node, Direction::Outgoing)
5129 .filter(|(_, edge_id)| {
5130 self.get_edge(*edge_id)
5131 .is_some_and(|e| e.edge_type.as_str() == edge_type)
5132 })
5133 .collect()
5134 }
5135
5136 #[cfg(feature = "lpg")]
5143 #[must_use]
5144 pub fn node_exists(&self, id: NodeId) -> bool {
5145 self.get_node(id).is_some()
5146 }
5147
5148 #[cfg(feature = "lpg")]
5150 #[must_use]
5151 pub fn edge_exists(&self, id: EdgeId) -> bool {
5152 self.get_edge(id).is_some()
5153 }
5154
5155 #[cfg(feature = "lpg")]
5159 #[must_use]
5160 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
5161 let active = self.active_lpg_store();
5162 let out = active.out_degree(node);
5163 let in_degree = active.in_degree(node);
5164 (out, in_degree)
5165 }
5166
5167 #[cfg(feature = "lpg")]
5177 #[must_use]
5178 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
5179 let (epoch, transaction_id) = self.get_transaction_context();
5180 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
5181 let active = self.active_lpg_store();
5182 ids.iter()
5183 .map(|&id| active.get_node_versioned(id, epoch, tx))
5184 .collect()
5185 }
5186
5187 #[cfg(feature = "cdc")]
5195 pub fn history(
5196 &self,
5197 entity_id: impl Into<crate::cdc::EntityId>,
5198 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5199 self.require_permission(crate::auth::StatementKind::Read)?;
5200 Ok(self.cdc_log.history(entity_id.into()))
5201 }
5202
5203 #[cfg(feature = "cdc")]
5209 pub fn history_since(
5210 &self,
5211 entity_id: impl Into<crate::cdc::EntityId>,
5212 since_epoch: EpochId,
5213 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5214 self.require_permission(crate::auth::StatementKind::Read)?;
5215 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
5216 }
5217
5218 #[cfg(feature = "cdc")]
5224 pub fn changes_between(
5225 &self,
5226 start_epoch: EpochId,
5227 end_epoch: EpochId,
5228 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5229 self.require_permission(crate::auth::StatementKind::Read)?;
5230 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
5231 }
5232}
5233
5234impl Drop for Session {
5235 fn drop(&mut self) {
5236 #[cfg(feature = "lpg")]
5239 if self.in_transaction() {
5240 let _ = self.rollback_inner();
5241 }
5242
5243 #[cfg(feature = "metrics")]
5244 if let Some(ref reg) = self.metrics {
5245 reg.session_active
5246 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
5247 }
5248 }
5249}
5250
5251#[cfg(test)]
5252mod tests {
5253 use super::parse_default_literal;
5254 use crate::database::GrafeoDB;
5255 use grafeo_common::types::Value;
5256
5257 #[test]
5262 fn parse_default_literal_null() {
5263 assert_eq!(parse_default_literal("null"), Value::Null);
5264 assert_eq!(parse_default_literal("NULL"), Value::Null);
5265 assert_eq!(parse_default_literal("Null"), Value::Null);
5266 }
5267
5268 #[test]
5269 fn parse_default_literal_bool() {
5270 assert_eq!(parse_default_literal("true"), Value::Bool(true));
5271 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
5272 assert_eq!(parse_default_literal("false"), Value::Bool(false));
5273 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
5274 }
5275
5276 #[test]
5277 fn parse_default_literal_string_single_quoted() {
5278 assert_eq!(
5279 parse_default_literal("'hello'"),
5280 Value::String("hello".into())
5281 );
5282 }
5283
5284 #[test]
5285 fn parse_default_literal_string_double_quoted() {
5286 assert_eq!(
5287 parse_default_literal("\"world\""),
5288 Value::String("world".into())
5289 );
5290 }
5291
5292 #[test]
5293 fn parse_default_literal_integer() {
5294 assert_eq!(parse_default_literal("42"), Value::Int64(42));
5295 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
5296 assert_eq!(parse_default_literal("0"), Value::Int64(0));
5297 }
5298
5299 #[test]
5300 fn parse_default_literal_float() {
5301 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
5302 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
5303 }
5304
5305 #[test]
5306 fn parse_default_literal_fallback_string() {
5307 assert_eq!(
5309 parse_default_literal("some_identifier"),
5310 Value::String("some_identifier".into())
5311 );
5312 }
5313
5314 #[test]
5315 fn test_session_create_node() {
5316 let db = GrafeoDB::new_in_memory();
5317 let session = db.session();
5318
5319 let id = session.create_node(&["Person"]);
5320 assert!(id.is_valid());
5321 assert_eq!(db.node_count(), 1);
5322 }
5323
5324 #[test]
5325 fn test_session_transaction() {
5326 let db = GrafeoDB::new_in_memory();
5327 let mut session = db.session();
5328
5329 assert!(!session.in_transaction());
5330
5331 session.begin_transaction().unwrap();
5332 assert!(session.in_transaction());
5333
5334 session.commit().unwrap();
5335 assert!(!session.in_transaction());
5336 }
5337
5338 #[test]
5339 fn test_session_transaction_context() {
5340 let db = GrafeoDB::new_in_memory();
5341 let mut session = db.session();
5342
5343 let (_epoch1, transaction_id1) = session.get_transaction_context();
5345 assert!(transaction_id1.is_none());
5346
5347 session.begin_transaction().unwrap();
5349 let (epoch2, transaction_id2) = session.get_transaction_context();
5350 assert!(transaction_id2.is_some());
5351 let _ = epoch2; session.commit().unwrap();
5356 let (epoch3, tx_id3) = session.get_transaction_context();
5357 assert!(tx_id3.is_none());
5358 assert!(epoch3.as_u64() >= epoch2.as_u64());
5360 }
5361
5362 #[test]
5363 fn test_session_rollback() {
5364 let db = GrafeoDB::new_in_memory();
5365 let mut session = db.session();
5366
5367 session.begin_transaction().unwrap();
5368 session.rollback().unwrap();
5369 assert!(!session.in_transaction());
5370 }
5371
5372 #[test]
5373 fn test_session_rollback_discards_versions() {
5374 use grafeo_common::types::TransactionId;
5375
5376 let db = GrafeoDB::new_in_memory();
5377
5378 let node_before = db.store().create_node(&["Person"]);
5380 assert!(node_before.is_valid());
5381 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5382
5383 let mut session = db.session();
5385 session.begin_transaction().unwrap();
5386 let transaction_id = session.current_transaction.lock().unwrap();
5387
5388 let epoch = db.store().current_epoch();
5390 let node_in_tx = db
5391 .store()
5392 .create_node_versioned(&["Person"], epoch, transaction_id);
5393 assert!(node_in_tx.is_valid());
5394
5395 assert_eq!(
5399 db.node_count(),
5400 1,
5401 "PENDING nodes should be invisible to non-versioned node_count()"
5402 );
5403 assert!(
5404 db.store()
5405 .get_node_versioned(node_in_tx, epoch, transaction_id)
5406 .is_some(),
5407 "Transaction node should be visible to its own transaction"
5408 );
5409
5410 session.rollback().unwrap();
5412 assert!(!session.in_transaction());
5413
5414 let count_after = db.node_count();
5417 assert_eq!(
5418 count_after, 1,
5419 "Rollback should discard uncommitted node, but got {count_after}"
5420 );
5421
5422 let current_epoch = db.store().current_epoch();
5424 assert!(
5425 db.store()
5426 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
5427 .is_some(),
5428 "Original node should still exist"
5429 );
5430
5431 assert!(
5433 db.store()
5434 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
5435 .is_none(),
5436 "Transaction node should be gone"
5437 );
5438 }
5439
5440 #[test]
5441 fn test_session_create_node_in_transaction() {
5442 let db = GrafeoDB::new_in_memory();
5444
5445 let node_before = db.create_node(&["Person"]);
5447 assert!(node_before.is_valid());
5448 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5449
5450 let mut session = db.session();
5452 session.begin_transaction().unwrap();
5453 let transaction_id = session.current_transaction.lock().unwrap();
5454
5455 let node_in_tx = session.create_node(&["Person"]);
5457 assert!(node_in_tx.is_valid());
5458
5459 assert_eq!(
5462 db.node_count(),
5463 1,
5464 "PENDING nodes should be invisible to non-versioned node_count()"
5465 );
5466 let epoch = db.store().current_epoch();
5467 assert!(
5468 db.store()
5469 .get_node_versioned(node_in_tx, epoch, transaction_id)
5470 .is_some(),
5471 "Transaction node should be visible to its own transaction"
5472 );
5473
5474 session.rollback().unwrap();
5476
5477 let count_after = db.node_count();
5479 assert_eq!(
5480 count_after, 1,
5481 "Rollback should discard node created via session.create_node(), but got {count_after}"
5482 );
5483 }
5484
5485 #[test]
5486 fn test_session_create_node_with_props_in_transaction() {
5487 use grafeo_common::types::Value;
5488
5489 let db = GrafeoDB::new_in_memory();
5491
5492 db.create_node(&["Person"]);
5494 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5495
5496 let mut session = db.session();
5498 session.begin_transaction().unwrap();
5499 let transaction_id = session.current_transaction.lock().unwrap();
5500
5501 let node_in_tx = session
5502 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5503 .unwrap();
5504 assert!(node_in_tx.is_valid());
5505
5506 assert_eq!(
5509 db.node_count(),
5510 1,
5511 "PENDING nodes should be invisible to non-versioned node_count()"
5512 );
5513 let epoch = db.store().current_epoch();
5514 assert!(
5515 db.store()
5516 .get_node_versioned(node_in_tx, epoch, transaction_id)
5517 .is_some(),
5518 "Transaction node should be visible to its own transaction"
5519 );
5520
5521 session.rollback().unwrap();
5523
5524 let count_after = db.node_count();
5526 assert_eq!(
5527 count_after, 1,
5528 "Rollback should discard node created via session.create_node_with_props()"
5529 );
5530 }
5531
5532 #[cfg(feature = "gql")]
5533 mod gql_tests {
5534 use super::*;
5535
5536 #[test]
5537 fn test_gql_query_execution() {
5538 let db = GrafeoDB::new_in_memory();
5539 let session = db.session();
5540
5541 session.create_node(&["Person"]);
5543 session.create_node(&["Person"]);
5544 session.create_node(&["Animal"]);
5545
5546 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5548
5549 assert_eq!(result.row_count(), 2);
5551 assert_eq!(result.column_count(), 1);
5552 assert_eq!(result.columns[0], "n");
5553 }
5554
5555 #[test]
5556 fn test_gql_empty_result() {
5557 let db = GrafeoDB::new_in_memory();
5558 let session = db.session();
5559
5560 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5562
5563 assert_eq!(result.row_count(), 0);
5564 }
5565
5566 #[test]
5567 fn test_gql_parse_error() {
5568 let db = GrafeoDB::new_in_memory();
5569 let session = db.session();
5570
5571 let result = session.execute("MATCH (n RETURN n");
5573
5574 assert!(result.is_err());
5575 }
5576
5577 #[test]
5578 fn test_gql_relationship_traversal() {
5579 let db = GrafeoDB::new_in_memory();
5580 let session = db.session();
5581
5582 let alix = session.create_node(&["Person"]);
5584 let gus = session.create_node(&["Person"]);
5585 let vincent = session.create_node(&["Person"]);
5586
5587 session.create_edge(alix, gus, "KNOWS");
5588 session.create_edge(alix, vincent, "KNOWS");
5589
5590 let result = session
5592 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5593 .unwrap();
5594
5595 assert_eq!(result.row_count(), 2);
5597 assert_eq!(result.column_count(), 2);
5598 assert_eq!(result.columns[0], "a");
5599 assert_eq!(result.columns[1], "b");
5600 }
5601
5602 #[test]
5603 fn test_gql_relationship_with_type_filter() {
5604 let db = GrafeoDB::new_in_memory();
5605 let session = db.session();
5606
5607 let alix = session.create_node(&["Person"]);
5609 let gus = session.create_node(&["Person"]);
5610 let vincent = session.create_node(&["Person"]);
5611
5612 session.create_edge(alix, gus, "KNOWS");
5613 session.create_edge(alix, vincent, "WORKS_WITH");
5614
5615 let result = session
5617 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5618 .unwrap();
5619
5620 assert_eq!(result.row_count(), 1);
5622 }
5623
5624 #[test]
5625 fn test_gql_semantic_error_undefined_variable() {
5626 let db = GrafeoDB::new_in_memory();
5627 let session = db.session();
5628
5629 let result = session.execute("MATCH (n:Person) RETURN x");
5631
5632 assert!(result.is_err());
5634 let Err(err) = result else {
5635 panic!("Expected error")
5636 };
5637 assert!(
5638 err.to_string().contains("Undefined variable"),
5639 "Expected undefined variable error, got: {}",
5640 err
5641 );
5642 }
5643
5644 #[test]
5645 fn test_gql_where_clause_property_filter() {
5646 use grafeo_common::types::Value;
5647
5648 let db = GrafeoDB::new_in_memory();
5649 let session = db.session();
5650
5651 session
5653 .create_node_with_props(&["Person"], [("age", Value::Int64(25))])
5654 .unwrap();
5655 session
5656 .create_node_with_props(&["Person"], [("age", Value::Int64(35))])
5657 .unwrap();
5658 session
5659 .create_node_with_props(&["Person"], [("age", Value::Int64(45))])
5660 .unwrap();
5661
5662 let result = session
5664 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5665 .unwrap();
5666
5667 assert_eq!(result.row_count(), 2);
5669 }
5670
5671 #[test]
5672 fn test_gql_where_clause_equality() {
5673 use grafeo_common::types::Value;
5674
5675 let db = GrafeoDB::new_in_memory();
5676 let session = db.session();
5677
5678 session
5680 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5681 .unwrap();
5682 session
5683 .create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))])
5684 .unwrap();
5685 session
5686 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5687 .unwrap();
5688
5689 let result = session
5691 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5692 .unwrap();
5693
5694 assert_eq!(result.row_count(), 2);
5696 }
5697
5698 #[test]
5699 fn test_gql_return_property_access() {
5700 use grafeo_common::types::Value;
5701
5702 let db = GrafeoDB::new_in_memory();
5703 let session = db.session();
5704
5705 session
5707 .create_node_with_props(
5708 &["Person"],
5709 [
5710 ("name", Value::String("Alix".into())),
5711 ("age", Value::Int64(30)),
5712 ],
5713 )
5714 .unwrap();
5715 session
5716 .create_node_with_props(
5717 &["Person"],
5718 [
5719 ("name", Value::String("Gus".into())),
5720 ("age", Value::Int64(25)),
5721 ],
5722 )
5723 .unwrap();
5724
5725 let result = session
5727 .execute("MATCH (n:Person) RETURN n.name, n.age")
5728 .unwrap();
5729
5730 assert_eq!(result.row_count(), 2);
5732 assert_eq!(result.column_count(), 2);
5733 assert_eq!(result.columns[0], "n.name");
5734 assert_eq!(result.columns[1], "n.age");
5735
5736 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5738 assert!(names.contains(&&Value::String("Alix".into())));
5739 assert!(names.contains(&&Value::String("Gus".into())));
5740 }
5741
5742 #[test]
5743 fn test_gql_return_mixed_expressions() {
5744 use grafeo_common::types::Value;
5745
5746 let db = GrafeoDB::new_in_memory();
5747 let session = db.session();
5748
5749 session
5751 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5752 .unwrap();
5753
5754 let result = session
5756 .execute("MATCH (n:Person) RETURN n, n.name")
5757 .unwrap();
5758
5759 assert_eq!(result.row_count(), 1);
5760 assert_eq!(result.column_count(), 2);
5761 assert_eq!(result.columns[0], "n");
5762 assert_eq!(result.columns[1], "n.name");
5763
5764 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5766 }
5767 }
5768
5769 #[cfg(feature = "cypher")]
5770 mod cypher_tests {
5771 use super::*;
5772
5773 #[test]
5774 fn test_cypher_query_execution() {
5775 let db = GrafeoDB::new_in_memory();
5776 let session = db.session();
5777
5778 session.create_node(&["Person"]);
5780 session.create_node(&["Person"]);
5781 session.create_node(&["Animal"]);
5782
5783 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5785
5786 assert_eq!(result.row_count(), 2);
5788 assert_eq!(result.column_count(), 1);
5789 assert_eq!(result.columns[0], "n");
5790 }
5791
5792 #[test]
5793 fn test_cypher_empty_result() {
5794 let db = GrafeoDB::new_in_memory();
5795 let session = db.session();
5796
5797 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5799
5800 assert_eq!(result.row_count(), 0);
5801 }
5802
5803 #[test]
5804 fn test_cypher_parse_error() {
5805 let db = GrafeoDB::new_in_memory();
5806 let session = db.session();
5807
5808 let result = session.execute_cypher("MATCH (n RETURN n");
5810
5811 assert!(result.is_err());
5812 }
5813 }
5814
5815 mod direct_lookup_tests {
5818 use super::*;
5819 use grafeo_common::types::Value;
5820
5821 #[test]
5822 fn test_get_node() {
5823 let db = GrafeoDB::new_in_memory();
5824 let session = db.session();
5825
5826 let id = session.create_node(&["Person"]);
5827 let node = session.get_node(id);
5828
5829 assert!(node.is_some());
5830 let node = node.unwrap();
5831 assert_eq!(node.id, id);
5832 }
5833
5834 #[test]
5835 fn test_get_node_not_found() {
5836 use grafeo_common::types::NodeId;
5837
5838 let db = GrafeoDB::new_in_memory();
5839 let session = db.session();
5840
5841 let node = session.get_node(NodeId::new(9999));
5843 assert!(node.is_none());
5844 }
5845
5846 #[test]
5847 fn test_get_node_property() {
5848 let db = GrafeoDB::new_in_memory();
5849 let session = db.session();
5850
5851 let id = session
5852 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5853 .unwrap();
5854
5855 let name = session.get_node_property(id, "name");
5856 assert_eq!(name, Some(Value::String("Alix".into())));
5857
5858 let missing = session.get_node_property(id, "missing");
5860 assert!(missing.is_none());
5861 }
5862
5863 #[test]
5864 fn test_get_edge() {
5865 let db = GrafeoDB::new_in_memory();
5866 let session = db.session();
5867
5868 let alix = session.create_node(&["Person"]);
5869 let gus = session.create_node(&["Person"]);
5870 let edge_id = session.create_edge(alix, gus, "KNOWS");
5871
5872 let edge = session.get_edge(edge_id);
5873 assert!(edge.is_some());
5874 let edge = edge.unwrap();
5875 assert_eq!(edge.id, edge_id);
5876 assert_eq!(edge.src, alix);
5877 assert_eq!(edge.dst, gus);
5878 }
5879
5880 #[test]
5881 fn test_get_edge_not_found() {
5882 use grafeo_common::types::EdgeId;
5883
5884 let db = GrafeoDB::new_in_memory();
5885 let session = db.session();
5886
5887 let edge = session.get_edge(EdgeId::new(9999));
5888 assert!(edge.is_none());
5889 }
5890
5891 #[test]
5892 fn test_get_neighbors_outgoing() {
5893 let db = GrafeoDB::new_in_memory();
5894 let session = db.session();
5895
5896 let alix = session.create_node(&["Person"]);
5897 let gus = session.create_node(&["Person"]);
5898 let harm = session.create_node(&["Person"]);
5899
5900 session.create_edge(alix, gus, "KNOWS");
5901 session.create_edge(alix, harm, "KNOWS");
5902
5903 let neighbors = session.get_neighbors_outgoing(alix);
5904 assert_eq!(neighbors.len(), 2);
5905
5906 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5907 assert!(neighbor_ids.contains(&gus));
5908 assert!(neighbor_ids.contains(&harm));
5909 }
5910
5911 #[test]
5912 fn test_get_neighbors_incoming() {
5913 let db = GrafeoDB::new_in_memory();
5914 let session = db.session();
5915
5916 let alix = session.create_node(&["Person"]);
5917 let gus = session.create_node(&["Person"]);
5918 let harm = session.create_node(&["Person"]);
5919
5920 session.create_edge(gus, alix, "KNOWS");
5921 session.create_edge(harm, alix, "KNOWS");
5922
5923 let neighbors = session.get_neighbors_incoming(alix);
5924 assert_eq!(neighbors.len(), 2);
5925
5926 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5927 assert!(neighbor_ids.contains(&gus));
5928 assert!(neighbor_ids.contains(&harm));
5929 }
5930
5931 #[test]
5932 fn test_get_neighbors_outgoing_by_type() {
5933 let db = GrafeoDB::new_in_memory();
5934 let session = db.session();
5935
5936 let alix = session.create_node(&["Person"]);
5937 let gus = session.create_node(&["Person"]);
5938 let company = session.create_node(&["Company"]);
5939
5940 session.create_edge(alix, gus, "KNOWS");
5941 session.create_edge(alix, company, "WORKS_AT");
5942
5943 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5944 assert_eq!(knows_neighbors.len(), 1);
5945 assert_eq!(knows_neighbors[0].0, gus);
5946
5947 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5948 assert_eq!(works_neighbors.len(), 1);
5949 assert_eq!(works_neighbors[0].0, company);
5950
5951 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5953 assert!(no_neighbors.is_empty());
5954 }
5955
5956 #[test]
5957 fn test_node_exists() {
5958 use grafeo_common::types::NodeId;
5959
5960 let db = GrafeoDB::new_in_memory();
5961 let session = db.session();
5962
5963 let id = session.create_node(&["Person"]);
5964
5965 assert!(session.node_exists(id));
5966 assert!(!session.node_exists(NodeId::new(9999)));
5967 }
5968
5969 #[test]
5970 fn test_edge_exists() {
5971 use grafeo_common::types::EdgeId;
5972
5973 let db = GrafeoDB::new_in_memory();
5974 let session = db.session();
5975
5976 let alix = session.create_node(&["Person"]);
5977 let gus = session.create_node(&["Person"]);
5978 let edge_id = session.create_edge(alix, gus, "KNOWS");
5979
5980 assert!(session.edge_exists(edge_id));
5981 assert!(!session.edge_exists(EdgeId::new(9999)));
5982 }
5983
5984 #[test]
5985 fn test_get_degree() {
5986 let db = GrafeoDB::new_in_memory();
5987 let session = db.session();
5988
5989 let alix = session.create_node(&["Person"]);
5990 let gus = session.create_node(&["Person"]);
5991 let harm = session.create_node(&["Person"]);
5992
5993 session.create_edge(alix, gus, "KNOWS");
5995 session.create_edge(alix, harm, "KNOWS");
5996 session.create_edge(gus, alix, "KNOWS");
5998
5999 let (out_degree, in_degree) = session.get_degree(alix);
6000 assert_eq!(out_degree, 2);
6001 assert_eq!(in_degree, 1);
6002
6003 let lonely = session.create_node(&["Person"]);
6005 let (out, in_deg) = session.get_degree(lonely);
6006 assert_eq!(out, 0);
6007 assert_eq!(in_deg, 0);
6008 }
6009
6010 #[test]
6011 fn test_get_nodes_batch() {
6012 let db = GrafeoDB::new_in_memory();
6013 let session = db.session();
6014
6015 let alix = session.create_node(&["Person"]);
6016 let gus = session.create_node(&["Person"]);
6017 let harm = session.create_node(&["Person"]);
6018
6019 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
6020 assert_eq!(nodes.len(), 3);
6021 assert!(nodes[0].is_some());
6022 assert!(nodes[1].is_some());
6023 assert!(nodes[2].is_some());
6024
6025 use grafeo_common::types::NodeId;
6027 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
6028 assert_eq!(nodes_with_missing.len(), 3);
6029 assert!(nodes_with_missing[0].is_some());
6030 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
6032 }
6033
6034 #[test]
6035 fn test_auto_commit_setting() {
6036 let db = GrafeoDB::new_in_memory();
6037 let mut session = db.session();
6038
6039 assert!(session.auto_commit());
6041
6042 session.set_auto_commit(false);
6043 assert!(!session.auto_commit());
6044
6045 session.set_auto_commit(true);
6046 assert!(session.auto_commit());
6047 }
6048
6049 #[test]
6050 fn test_transaction_double_begin_nests() {
6051 let db = GrafeoDB::new_in_memory();
6052 let mut session = db.session();
6053
6054 session.begin_transaction().unwrap();
6055 let result = session.begin_transaction();
6057 assert!(result.is_ok());
6058 session.commit().unwrap();
6060 session.commit().unwrap();
6062 }
6063
6064 #[test]
6065 fn test_commit_without_transaction_error() {
6066 let db = GrafeoDB::new_in_memory();
6067 let mut session = db.session();
6068
6069 let result = session.commit();
6070 assert!(result.is_err());
6071 }
6072
6073 #[test]
6074 fn test_rollback_without_transaction_error() {
6075 let db = GrafeoDB::new_in_memory();
6076 let mut session = db.session();
6077
6078 let result = session.rollback();
6079 assert!(result.is_err());
6080 }
6081
6082 #[test]
6083 fn test_create_edge_in_transaction() {
6084 let db = GrafeoDB::new_in_memory();
6085 let mut session = db.session();
6086
6087 let alix = session.create_node(&["Person"]);
6089 let gus = session.create_node(&["Person"]);
6090
6091 session.begin_transaction().unwrap();
6093 let edge_id = session.create_edge(alix, gus, "KNOWS");
6094
6095 assert!(session.edge_exists(edge_id));
6097
6098 session.commit().unwrap();
6100
6101 assert!(session.edge_exists(edge_id));
6103 }
6104
6105 #[test]
6106 fn test_neighbors_empty_node() {
6107 let db = GrafeoDB::new_in_memory();
6108 let session = db.session();
6109
6110 let lonely = session.create_node(&["Person"]);
6111
6112 assert!(session.get_neighbors_outgoing(lonely).is_empty());
6113 assert!(session.get_neighbors_incoming(lonely).is_empty());
6114 assert!(
6115 session
6116 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
6117 .is_empty()
6118 );
6119 }
6120 }
6121
6122 #[test]
6123 fn test_auto_gc_triggers_on_commit_interval() {
6124 use crate::config::Config;
6125
6126 let config = Config::in_memory().with_gc_interval(2);
6127 let db = GrafeoDB::with_config(config).unwrap();
6128 let mut session = db.session();
6129
6130 session.begin_transaction().unwrap();
6132 session.create_node(&["A"]);
6133 session.commit().unwrap();
6134
6135 session.begin_transaction().unwrap();
6137 session.create_node(&["B"]);
6138 session.commit().unwrap();
6139
6140 assert_eq!(db.node_count(), 2);
6142 }
6143
6144 #[test]
6145 fn test_query_timeout_config_propagates_to_session() {
6146 use crate::config::Config;
6147 use std::time::Duration;
6148
6149 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
6150 let db = GrafeoDB::with_config(config).unwrap();
6151 let session = db.session();
6152
6153 assert!(session.query_deadline().is_some());
6155 }
6156
6157 #[test]
6158 fn test_default_query_timeout_returns_deadline() {
6159 let db = GrafeoDB::new_in_memory();
6160 let session = db.session();
6161
6162 assert!(session.query_deadline().is_some());
6164 }
6165
6166 #[test]
6167 fn test_no_query_timeout_returns_no_deadline() {
6168 use crate::config::Config;
6169
6170 let config = Config::in_memory().without_query_timeout();
6171 let db = GrafeoDB::with_config(config).unwrap();
6172 let session = db.session();
6173
6174 assert!(session.query_deadline().is_none());
6175 }
6176
6177 #[test]
6178 fn test_graph_model_accessor() {
6179 use crate::config::GraphModel;
6180
6181 let db = GrafeoDB::new_in_memory();
6182 let session = db.session();
6183
6184 assert_eq!(session.graph_model(), GraphModel::Lpg);
6185 }
6186
6187 #[test]
6188 fn test_reject_oversized_property() {
6189 use crate::config::Config;
6190
6191 let config = Config::in_memory().with_max_property_size(100);
6192 let db = GrafeoDB::with_config(config).unwrap();
6193 let session = db.session();
6194
6195 let node = session.create_node(&["Test"]);
6196
6197 session
6199 .set_node_property(node, "small", Value::from("hello"))
6200 .unwrap();
6201
6202 let big = "x".repeat(200);
6204 let result = session.set_node_property(node, "big", Value::from(big.as_str()));
6205 assert!(result.is_err());
6206 let err = result.unwrap_err().to_string();
6207 assert!(
6208 err.contains("exceeds maximum size"),
6209 "Expected size error, got: {err}"
6210 );
6211 }
6212
6213 #[test]
6214 fn test_no_property_size_limit() {
6215 use crate::config::Config;
6216
6217 let config = Config::in_memory().without_max_property_size();
6218 let db = GrafeoDB::with_config(config).unwrap();
6219 let session = db.session();
6220
6221 let node = session.create_node(&["Test"]);
6222
6223 let big = "x".repeat(10_000);
6225 session
6226 .set_node_property(node, "big", Value::from(big.as_str()))
6227 .unwrap();
6228 }
6229
6230 #[cfg(feature = "gql")]
6231 #[test]
6232 fn test_external_store_session() {
6233 use grafeo_core::graph::GraphStoreMut;
6234 use std::sync::Arc;
6235
6236 let config = crate::config::Config::in_memory();
6237 let store =
6238 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
6239 let db = GrafeoDB::with_store(store, config).unwrap();
6240
6241 let mut session = db.session();
6242
6243 session.begin_transaction().unwrap();
6247 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
6248
6249 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
6251 assert_eq!(result.row_count(), 1);
6252
6253 session.commit().unwrap();
6254 }
6255
6256 #[cfg(feature = "gql")]
6259 mod session_command_tests {
6260 use super::*;
6261 use grafeo_common::types::Value;
6262
6263 #[test]
6264 fn test_use_graph_sets_current_graph() {
6265 let db = GrafeoDB::new_in_memory();
6266 let session = db.session();
6267
6268 session.execute("CREATE GRAPH mydb").unwrap();
6270 session.execute("USE GRAPH mydb").unwrap();
6271
6272 assert_eq!(session.current_graph(), Some("mydb".to_string()));
6273 }
6274
6275 #[test]
6276 fn test_use_graph_nonexistent_errors() {
6277 let db = GrafeoDB::new_in_memory();
6278 let session = db.session();
6279
6280 let result = session.execute("USE GRAPH doesnotexist");
6281 assert!(result.is_err());
6282 let err = result.unwrap_err().to_string();
6283 assert!(
6284 err.contains("does not exist"),
6285 "Expected 'does not exist' error, got: {err}"
6286 );
6287 }
6288
6289 #[test]
6290 fn test_use_graph_default_always_valid() {
6291 let db = GrafeoDB::new_in_memory();
6292 let session = db.session();
6293
6294 session.execute("USE GRAPH default").unwrap();
6296 assert_eq!(session.current_graph(), Some("default".to_string()));
6297 }
6298
6299 #[test]
6300 fn test_session_set_graph() {
6301 let db = GrafeoDB::new_in_memory();
6302 let session = db.session();
6303
6304 session.execute("CREATE GRAPH analytics").unwrap();
6305 session.execute("SESSION SET GRAPH analytics").unwrap();
6306 assert_eq!(session.current_graph(), Some("analytics".to_string()));
6307 }
6308
6309 #[test]
6310 fn test_session_set_graph_nonexistent_errors() {
6311 let db = GrafeoDB::new_in_memory();
6312 let session = db.session();
6313
6314 let result = session.execute("SESSION SET GRAPH nosuchgraph");
6315 assert!(result.is_err());
6316 }
6317
6318 #[test]
6319 fn test_session_set_time_zone() {
6320 let db = GrafeoDB::new_in_memory();
6321 let session = db.session();
6322
6323 assert_eq!(session.time_zone(), None);
6324
6325 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6326 assert_eq!(session.time_zone(), Some("UTC".to_string()));
6327
6328 session
6329 .execute("SESSION SET TIME ZONE 'America/New_York'")
6330 .unwrap();
6331 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
6332 }
6333
6334 #[test]
6335 fn test_session_set_parameter() {
6336 let db = GrafeoDB::new_in_memory();
6337 let session = db.session();
6338
6339 session
6340 .execute("SESSION SET PARAMETER $timeout = 30")
6341 .unwrap();
6342
6343 assert!(session.get_parameter("timeout").is_some());
6346 }
6347
6348 #[test]
6349 fn test_session_reset_clears_all_state() {
6350 let db = GrafeoDB::new_in_memory();
6351 let session = db.session();
6352
6353 session.execute("CREATE GRAPH analytics").unwrap();
6355 session.execute("SESSION SET GRAPH analytics").unwrap();
6356 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6357 session
6358 .execute("SESSION SET PARAMETER $limit = 100")
6359 .unwrap();
6360
6361 assert!(session.current_graph().is_some());
6363 assert!(session.time_zone().is_some());
6364 assert!(session.get_parameter("limit").is_some());
6365
6366 session.execute("SESSION RESET").unwrap();
6368
6369 assert_eq!(session.current_graph(), None);
6370 assert_eq!(session.time_zone(), None);
6371 assert!(session.get_parameter("limit").is_none());
6372 }
6373
6374 #[test]
6375 fn test_session_close_clears_state() {
6376 let db = GrafeoDB::new_in_memory();
6377 let session = db.session();
6378
6379 session.execute("CREATE GRAPH analytics").unwrap();
6380 session.execute("SESSION SET GRAPH analytics").unwrap();
6381 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6382
6383 session.execute("SESSION CLOSE").unwrap();
6384
6385 assert_eq!(session.current_graph(), None);
6386 assert_eq!(session.time_zone(), None);
6387 }
6388
6389 #[test]
6390 fn test_create_graph() {
6391 let db = GrafeoDB::new_in_memory();
6392 let session = db.session();
6393
6394 session.execute("CREATE GRAPH mydb").unwrap();
6395
6396 session.execute("USE GRAPH mydb").unwrap();
6398 assert_eq!(session.current_graph(), Some("mydb".to_string()));
6399 }
6400
6401 #[test]
6402 fn test_create_graph_duplicate_errors() {
6403 let db = GrafeoDB::new_in_memory();
6404 let session = db.session();
6405
6406 session.execute("CREATE GRAPH mydb").unwrap();
6407 let result = session.execute("CREATE GRAPH mydb");
6408
6409 assert!(result.is_err());
6410 let err = result.unwrap_err().to_string();
6411 assert!(
6412 err.contains("already exists"),
6413 "Expected 'already exists' error, got: {err}"
6414 );
6415 }
6416
6417 #[test]
6418 fn test_create_graph_if_not_exists() {
6419 let db = GrafeoDB::new_in_memory();
6420 let session = db.session();
6421
6422 session.execute("CREATE GRAPH mydb").unwrap();
6423 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
6425 }
6426
6427 #[test]
6428 fn test_drop_graph() {
6429 let db = GrafeoDB::new_in_memory();
6430 let session = db.session();
6431
6432 session.execute("CREATE GRAPH mydb").unwrap();
6433 session.execute("DROP GRAPH mydb").unwrap();
6434
6435 let result = session.execute("USE GRAPH mydb");
6437 assert!(result.is_err());
6438 }
6439
6440 #[test]
6441 fn test_drop_graph_nonexistent_errors() {
6442 let db = GrafeoDB::new_in_memory();
6443 let session = db.session();
6444
6445 let result = session.execute("DROP GRAPH nosuchgraph");
6446 assert!(result.is_err());
6447 let err = result.unwrap_err().to_string();
6448 assert!(
6449 err.contains("does not exist"),
6450 "Expected 'does not exist' error, got: {err}"
6451 );
6452 }
6453
6454 #[test]
6455 fn test_drop_graph_if_exists() {
6456 let db = GrafeoDB::new_in_memory();
6457 let session = db.session();
6458
6459 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
6461 }
6462
6463 #[test]
6464 fn test_start_transaction_via_gql() {
6465 let db = GrafeoDB::new_in_memory();
6466 let session = db.session();
6467
6468 session.execute("START TRANSACTION").unwrap();
6469 assert!(session.in_transaction());
6470 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6471 session.execute("COMMIT").unwrap();
6472 assert!(!session.in_transaction());
6473
6474 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6475 assert_eq!(result.rows.len(), 1);
6476 }
6477
6478 #[test]
6479 fn test_start_transaction_read_only_blocks_insert() {
6480 let db = GrafeoDB::new_in_memory();
6481 let session = db.session();
6482
6483 session.execute("START TRANSACTION READ ONLY").unwrap();
6484 let result = session.execute("INSERT (:Person {name: 'Alix'})");
6485 assert!(result.is_err());
6486 let err = result.unwrap_err().to_string();
6487 assert!(
6488 err.contains("read-only"),
6489 "Expected read-only error, got: {err}"
6490 );
6491 session.execute("ROLLBACK").unwrap();
6492 }
6493
6494 #[test]
6495 fn test_start_transaction_read_only_allows_reads() {
6496 let db = GrafeoDB::new_in_memory();
6497 let mut session = db.session();
6498 session.begin_transaction().unwrap();
6499 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6500 session.commit().unwrap();
6501
6502 session.execute("START TRANSACTION READ ONLY").unwrap();
6503 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6504 assert_eq!(result.rows.len(), 1);
6505 session.execute("COMMIT").unwrap();
6506 }
6507
6508 #[test]
6509 fn test_rollback_via_gql() {
6510 let db = GrafeoDB::new_in_memory();
6511 let session = db.session();
6512
6513 session.execute("START TRANSACTION").unwrap();
6514 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6515 session.execute("ROLLBACK").unwrap();
6516
6517 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6518 assert!(result.rows.is_empty());
6519 }
6520
6521 #[test]
6522 fn test_start_transaction_with_isolation_level() {
6523 let db = GrafeoDB::new_in_memory();
6524 let session = db.session();
6525
6526 session
6527 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6528 .unwrap();
6529 assert!(session.in_transaction());
6530 session.execute("ROLLBACK").unwrap();
6531 }
6532
6533 #[test]
6534 fn test_session_commands_return_empty_result() {
6535 let db = GrafeoDB::new_in_memory();
6536 let session = db.session();
6537
6538 session.execute("CREATE GRAPH test").unwrap();
6539 let result = session.execute("SESSION SET GRAPH test").unwrap();
6540 assert_eq!(result.row_count(), 0);
6541 assert_eq!(result.column_count(), 0);
6542 }
6543
6544 #[test]
6545 fn test_current_graph_default_is_none() {
6546 let db = GrafeoDB::new_in_memory();
6547 let session = db.session();
6548
6549 assert_eq!(session.current_graph(), None);
6550 }
6551
6552 #[test]
6553 fn test_time_zone_default_is_none() {
6554 let db = GrafeoDB::new_in_memory();
6555 let session = db.session();
6556
6557 assert_eq!(session.time_zone(), None);
6558 }
6559
6560 #[test]
6561 fn test_session_state_independent_across_sessions() {
6562 let db = GrafeoDB::new_in_memory();
6563 let session1 = db.session();
6564 let session2 = db.session();
6565
6566 session1.execute("CREATE GRAPH first").unwrap();
6567 session1.execute("CREATE GRAPH second").unwrap();
6568 session1.execute("SESSION SET GRAPH first").unwrap();
6569 session2.execute("SESSION SET GRAPH second").unwrap();
6570
6571 assert_eq!(session1.current_graph(), Some("first".to_string()));
6572 assert_eq!(session2.current_graph(), Some("second".to_string()));
6573 }
6574
6575 #[test]
6576 fn test_show_node_types() {
6577 let db = GrafeoDB::new_in_memory();
6578 let session = db.session();
6579
6580 session
6581 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6582 .unwrap();
6583
6584 let result = session.execute("SHOW NODE TYPES").unwrap();
6585 assert_eq!(
6586 result.columns,
6587 vec!["name", "properties", "constraints", "parents"]
6588 );
6589 assert_eq!(result.rows.len(), 1);
6590 assert_eq!(result.rows[0][0], Value::from("Person"));
6592 }
6593
6594 #[test]
6595 fn test_show_edge_types() {
6596 let db = GrafeoDB::new_in_memory();
6597 let session = db.session();
6598
6599 session
6600 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6601 .unwrap();
6602
6603 let result = session.execute("SHOW EDGE TYPES").unwrap();
6604 assert_eq!(
6605 result.columns,
6606 vec!["name", "properties", "source_types", "target_types"]
6607 );
6608 assert_eq!(result.rows.len(), 1);
6609 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6610 }
6611
6612 #[test]
6613 fn test_show_graph_types() {
6614 let db = GrafeoDB::new_in_memory();
6615 let session = db.session();
6616
6617 session
6618 .execute("CREATE NODE TYPE Person (name STRING)")
6619 .unwrap();
6620 session
6621 .execute(
6622 "CREATE GRAPH TYPE social (\
6623 NODE TYPE Person (name STRING)\
6624 )",
6625 )
6626 .unwrap();
6627
6628 let result = session.execute("SHOW GRAPH TYPES").unwrap();
6629 assert_eq!(
6630 result.columns,
6631 vec!["name", "open", "node_types", "edge_types"]
6632 );
6633 assert_eq!(result.rows.len(), 1);
6634 assert_eq!(result.rows[0][0], Value::from("social"));
6635 }
6636
6637 #[test]
6638 fn test_show_graph_type_named() {
6639 let db = GrafeoDB::new_in_memory();
6640 let session = db.session();
6641
6642 session
6643 .execute("CREATE NODE TYPE Person (name STRING)")
6644 .unwrap();
6645 session
6646 .execute(
6647 "CREATE GRAPH TYPE social (\
6648 NODE TYPE Person (name STRING)\
6649 )",
6650 )
6651 .unwrap();
6652
6653 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6654 assert_eq!(result.rows.len(), 1);
6655 assert_eq!(result.rows[0][0], Value::from("social"));
6656 }
6657
6658 #[test]
6659 fn test_show_graph_type_not_found() {
6660 let db = GrafeoDB::new_in_memory();
6661 let session = db.session();
6662
6663 let result = session.execute("SHOW GRAPH TYPE nonexistent");
6664 assert!(result.is_err());
6665 }
6666
6667 #[test]
6668 fn test_show_indexes_via_gql() {
6669 let db = GrafeoDB::new_in_memory();
6670 let session = db.session();
6671
6672 let result = session.execute("SHOW INDEXES").unwrap();
6673 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6674 }
6675
6676 #[test]
6677 fn test_show_constraints_via_gql() {
6678 let db = GrafeoDB::new_in_memory();
6679 let session = db.session();
6680
6681 let result = session.execute("SHOW CONSTRAINTS").unwrap();
6682 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6683 }
6684
6685 #[test]
6686 fn test_pattern_form_graph_type_roundtrip() {
6687 let db = GrafeoDB::new_in_memory();
6688 let session = db.session();
6689
6690 session
6692 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6693 .unwrap();
6694 session
6695 .execute("CREATE NODE TYPE City (name STRING)")
6696 .unwrap();
6697 session
6698 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6699 .unwrap();
6700 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6701
6702 session
6704 .execute(
6705 "CREATE GRAPH TYPE social (\
6706 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6707 (:Person)-[:LIVES_IN]->(:City)\
6708 )",
6709 )
6710 .unwrap();
6711
6712 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6714 assert_eq!(result.rows.len(), 1);
6715 assert_eq!(result.rows[0][0], Value::from("social"));
6716 }
6717 }
6718}