1#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_common::{grafeo_debug_span, grafeo_info_span, grafeo_warn};
17use grafeo_core::graph::Direction;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21use grafeo_core::graph::{GraphStore, GraphStoreMut};
22
23use crate::catalog::{Catalog, CatalogConstraintValidator};
24use crate::config::{AdaptiveConfig, GraphModel};
25use crate::database::QueryResult;
26use crate::query::cache::QueryCache;
27use crate::transaction::TransactionManager;
28
29fn parse_default_literal(text: &str) -> Value {
34 if text.eq_ignore_ascii_case("null") {
35 return Value::Null;
36 }
37 if text.eq_ignore_ascii_case("true") {
38 return Value::Bool(true);
39 }
40 if text.eq_ignore_ascii_case("false") {
41 return Value::Bool(false);
42 }
43 if (text.starts_with('\'') && text.ends_with('\''))
45 || (text.starts_with('"') && text.ends_with('"'))
46 {
47 return Value::String(text[1..text.len() - 1].into());
48 }
49 if let Ok(i) = text.parse::<i64>() {
51 return Value::Int64(i);
52 }
53 if let Ok(f) = text.parse::<f64>() {
54 return Value::Float64(f);
55 }
56 Value::String(text.into())
58}
59
60pub(crate) struct SessionConfig {
65 pub transaction_manager: Arc<TransactionManager>,
66 pub query_cache: Arc<QueryCache>,
67 pub catalog: Arc<Catalog>,
68 pub adaptive_config: AdaptiveConfig,
69 pub factorized_execution: bool,
70 pub graph_model: GraphModel,
71 pub query_timeout: Option<Duration>,
72 pub commit_counter: Arc<AtomicUsize>,
73 pub gc_interval: usize,
74 pub read_only: bool,
76}
77
78pub struct Session {
84 store: Arc<LpgStore>,
86 graph_store: Arc<dyn GraphStore>,
88 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
90 catalog: Arc<Catalog>,
92 #[cfg(feature = "rdf")]
94 rdf_store: Arc<RdfStore>,
95 transaction_manager: Arc<TransactionManager>,
97 query_cache: Arc<QueryCache>,
99 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
103 read_only_tx: parking_lot::Mutex<bool>,
105 db_read_only: bool,
108 auto_commit: bool,
110 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
113 factorized_execution: bool,
115 graph_model: GraphModel,
117 query_timeout: Option<Duration>,
119 commit_counter: Arc<AtomicUsize>,
121 gc_interval: usize,
123 transaction_start_node_count: AtomicUsize,
125 transaction_start_edge_count: AtomicUsize,
127 #[cfg(feature = "wal")]
129 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
130 #[cfg(feature = "wal")]
132 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
133 #[cfg(feature = "cdc")]
135 cdc_log: Arc<crate::cdc::CdcLog>,
136 #[cfg(feature = "cdc")]
139 cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
140 current_graph: parking_lot::Mutex<Option<String>>,
142 current_schema: parking_lot::Mutex<Option<String>>,
145 time_zone: parking_lot::Mutex<Option<String>>,
147 session_params:
149 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
150 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
152 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
154 transaction_nesting_depth: parking_lot::Mutex<u32>,
158 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
162 #[cfg(feature = "metrics")]
164 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
165 #[cfg(feature = "metrics")]
167 tx_start_time: parking_lot::Mutex<Option<Instant>>,
168}
169
170#[derive(Clone)]
172struct GraphSavepoint {
173 graph_name: Option<String>,
174 next_node_id: u64,
175 next_edge_id: u64,
176 undo_log_position: usize,
177}
178
179#[derive(Clone)]
181struct SavepointState {
182 name: String,
183 graph_snapshots: Vec<GraphSavepoint>,
184 #[allow(dead_code)]
187 active_graph: Option<String>,
188 #[cfg(feature = "cdc")]
191 cdc_event_position: usize,
192}
193
194impl Session {
195 #[allow(dead_code)]
197 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
198 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
199 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
200 Self {
201 store,
202 graph_store,
203 graph_store_mut,
204 catalog: cfg.catalog,
205 #[cfg(feature = "rdf")]
206 rdf_store: Arc::new(RdfStore::new()),
207 transaction_manager: cfg.transaction_manager,
208 query_cache: cfg.query_cache,
209 current_transaction: parking_lot::Mutex::new(None),
210 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
211 db_read_only: cfg.read_only,
212 auto_commit: true,
213 adaptive_config: cfg.adaptive_config,
214 factorized_execution: cfg.factorized_execution,
215 graph_model: cfg.graph_model,
216 query_timeout: cfg.query_timeout,
217 commit_counter: cfg.commit_counter,
218 gc_interval: cfg.gc_interval,
219 transaction_start_node_count: AtomicUsize::new(0),
220 transaction_start_edge_count: AtomicUsize::new(0),
221 #[cfg(feature = "wal")]
222 wal: None,
223 #[cfg(feature = "wal")]
224 wal_graph_context: None,
225 #[cfg(feature = "cdc")]
226 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
227 #[cfg(feature = "cdc")]
228 cdc_pending_events: None,
229 current_graph: parking_lot::Mutex::new(None),
230 current_schema: parking_lot::Mutex::new(None),
231 time_zone: parking_lot::Mutex::new(None),
232 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
233 viewing_epoch_override: parking_lot::Mutex::new(None),
234 savepoints: parking_lot::Mutex::new(Vec::new()),
235 transaction_nesting_depth: parking_lot::Mutex::new(0),
236 touched_graphs: parking_lot::Mutex::new(Vec::new()),
237 #[cfg(feature = "metrics")]
238 metrics: None,
239 #[cfg(feature = "metrics")]
240 tx_start_time: parking_lot::Mutex::new(None),
241 }
242 }
243
244 #[cfg(feature = "wal")]
249 pub(crate) fn set_wal(
250 &mut self,
251 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
252 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
253 ) {
254 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
256 Arc::clone(&self.store),
257 Arc::clone(&wal),
258 Arc::clone(&wal_graph_context),
259 ));
260 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
261 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
262 self.wal = Some(wal);
263 self.wal_graph_context = Some(wal_graph_context);
264 }
265
266 #[cfg(feature = "cdc")]
273 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
274 if let Some(ref write_store) = self.graph_store_mut {
277 let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
278 Arc::clone(write_store),
279 Arc::clone(&cdc_log),
280 ));
281 self.cdc_pending_events = Some(cdc_store.pending_events());
282 self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
283 }
284 self.cdc_log = cdc_log;
285 }
286
287 #[cfg(feature = "metrics")]
289 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
290 self.metrics = Some(metrics);
291 }
292
293 pub(crate) fn with_external_store(
302 read_store: Arc<dyn GraphStore>,
303 write_store: Option<Arc<dyn GraphStoreMut>>,
304 cfg: SessionConfig,
305 ) -> Result<Self> {
306 Ok(Self {
307 store: Arc::new(LpgStore::new()?),
308 graph_store: read_store,
309 graph_store_mut: write_store,
310 catalog: cfg.catalog,
311 #[cfg(feature = "rdf")]
312 rdf_store: Arc::new(RdfStore::new()),
313 transaction_manager: cfg.transaction_manager,
314 query_cache: cfg.query_cache,
315 current_transaction: parking_lot::Mutex::new(None),
316 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
317 db_read_only: cfg.read_only,
318 auto_commit: true,
319 adaptive_config: cfg.adaptive_config,
320 factorized_execution: cfg.factorized_execution,
321 graph_model: cfg.graph_model,
322 query_timeout: cfg.query_timeout,
323 commit_counter: cfg.commit_counter,
324 gc_interval: cfg.gc_interval,
325 transaction_start_node_count: AtomicUsize::new(0),
326 transaction_start_edge_count: AtomicUsize::new(0),
327 #[cfg(feature = "wal")]
328 wal: None,
329 #[cfg(feature = "wal")]
330 wal_graph_context: None,
331 #[cfg(feature = "cdc")]
332 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
333 #[cfg(feature = "cdc")]
334 cdc_pending_events: None,
335 current_graph: parking_lot::Mutex::new(None),
336 current_schema: parking_lot::Mutex::new(None),
337 time_zone: parking_lot::Mutex::new(None),
338 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
339 viewing_epoch_override: parking_lot::Mutex::new(None),
340 savepoints: parking_lot::Mutex::new(Vec::new()),
341 transaction_nesting_depth: parking_lot::Mutex::new(0),
342 touched_graphs: parking_lot::Mutex::new(Vec::new()),
343 #[cfg(feature = "metrics")]
344 metrics: None,
345 #[cfg(feature = "metrics")]
346 tx_start_time: parking_lot::Mutex::new(None),
347 })
348 }
349
350 #[must_use]
352 pub fn graph_model(&self) -> GraphModel {
353 self.graph_model
354 }
355
356 pub fn use_graph(&self, name: &str) {
360 *self.current_graph.lock() = Some(name.to_string());
361 }
362
363 #[must_use]
365 pub fn current_graph(&self) -> Option<String> {
366 self.current_graph.lock().clone()
367 }
368
369 pub fn set_schema(&self, name: &str) {
373 *self.current_schema.lock() = Some(name.to_string());
374 }
375
376 #[must_use]
380 pub fn current_schema(&self) -> Option<String> {
381 self.current_schema.lock().clone()
382 }
383
384 fn effective_graph_key(&self, graph_name: &str) -> String {
389 let schema = self.current_schema.lock().clone();
390 match schema {
391 Some(s) => format!("{s}/{graph_name}"),
392 None => graph_name.to_string(),
393 }
394 }
395
396 fn effective_type_key(&self, type_name: &str) -> String {
400 let schema = self.current_schema.lock().clone();
401 match schema {
402 Some(s) => format!("{s}/{type_name}"),
403 None => type_name.to_string(),
404 }
405 }
406
407 fn active_graph_storage_key(&self) -> Option<String> {
411 let graph = self.current_graph.lock().clone();
412 let schema = self.current_schema.lock().clone();
413 match (schema, graph) {
414 (_, None) => None,
415 (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
416 (None, Some(name)) => Some(name),
417 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
418 }
419 }
420
421 fn active_store(&self) -> Arc<dyn GraphStore> {
429 let key = self.active_graph_storage_key();
430 match key {
431 None => Arc::clone(&self.graph_store),
432 Some(ref name) => match self.store.graph(name) {
433 Some(named_store) => {
434 #[cfg(feature = "wal")]
435 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
436 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
437 named_store,
438 Arc::clone(wal),
439 name.clone(),
440 Arc::clone(ctx),
441 )) as Arc<dyn GraphStore>;
442 }
443 named_store as Arc<dyn GraphStore>
444 }
445 None => Arc::clone(&self.graph_store),
446 },
447 }
448 }
449
450 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
455 let key = self.active_graph_storage_key();
456 match key {
457 None => self.graph_store_mut.as_ref().map(Arc::clone),
458 Some(ref name) => match self.store.graph(name) {
459 Some(named_store) => {
460 let mut store: Arc<dyn GraphStoreMut> = named_store;
461
462 #[cfg(feature = "wal")]
463 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
464 store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
465 self.store
467 .graph(name)
468 .unwrap_or_else(|| Arc::clone(&self.store)),
469 Arc::clone(wal),
470 name.clone(),
471 Arc::clone(ctx),
472 ));
473 }
474
475 #[cfg(feature = "cdc")]
476 if let Some(ref pending) = self.cdc_pending_events {
477 store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
478 store,
479 Arc::clone(&self.cdc_log),
480 Arc::clone(pending),
481 ));
482 }
483
484 Some(store)
485 }
486 None => self.graph_store_mut.as_ref().map(Arc::clone),
487 },
488 }
489 }
490
491 fn active_lpg_store(&self) -> Arc<LpgStore> {
496 let key = self.active_graph_storage_key();
497 match key {
498 None => Arc::clone(&self.store),
499 Some(ref name) => self
500 .store
501 .graph(name)
502 .unwrap_or_else(|| Arc::clone(&self.store)),
503 }
504 }
505
506 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
509 match graph_name {
510 None => Arc::clone(&self.store),
511 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
512 Some(name) => self
513 .store
514 .graph(name)
515 .unwrap_or_else(|| Arc::clone(&self.store)),
516 }
517 }
518
519 fn track_graph_touch(&self) {
524 if self.current_transaction.lock().is_some() {
525 let key = self.active_graph_storage_key();
526 let mut touched = self.touched_graphs.lock();
527 if !touched.contains(&key) {
528 touched.push(key);
529 }
530 }
531 }
532
533 pub fn set_time_zone(&self, tz: &str) {
535 *self.time_zone.lock() = Some(tz.to_string());
536 }
537
538 #[must_use]
540 pub fn time_zone(&self) -> Option<String> {
541 self.time_zone.lock().clone()
542 }
543
544 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
546 self.session_params.lock().insert(key.to_string(), value);
547 }
548
549 #[must_use]
551 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
552 self.session_params.lock().get(key).cloned()
553 }
554
555 pub fn reset_session(&self) {
557 *self.current_schema.lock() = None;
558 *self.current_graph.lock() = None;
559 *self.time_zone.lock() = None;
560 self.session_params.lock().clear();
561 *self.viewing_epoch_override.lock() = None;
562 }
563
564 pub fn reset_schema(&self) {
566 *self.current_schema.lock() = None;
567 }
568
569 pub fn reset_graph(&self) {
571 *self.current_graph.lock() = None;
572 }
573
574 pub fn reset_time_zone(&self) {
576 *self.time_zone.lock() = None;
577 }
578
579 pub fn reset_parameters(&self) {
581 self.session_params.lock().clear();
582 }
583
584 pub fn set_viewing_epoch(&self, epoch: EpochId) {
592 *self.viewing_epoch_override.lock() = Some(epoch);
593 }
594
595 pub fn clear_viewing_epoch(&self) {
597 *self.viewing_epoch_override.lock() = None;
598 }
599
600 #[must_use]
602 pub fn viewing_epoch(&self) -> Option<EpochId> {
603 *self.viewing_epoch_override.lock()
604 }
605
606 #[must_use]
610 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
611 self.active_lpg_store().get_node_history(id)
612 }
613
614 #[must_use]
618 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
619 self.active_lpg_store().get_edge_history(id)
620 }
621
622 fn require_lpg(&self, language: &str) -> Result<()> {
624 if self.graph_model == GraphModel::Rdf {
625 return Err(grafeo_common::utils::error::Error::Internal(format!(
626 "This is an RDF database. {language} queries require an LPG database."
627 )));
628 }
629 Ok(())
630 }
631
632 #[cfg(feature = "gql")]
634 fn execute_session_command(
635 &self,
636 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
637 ) -> Result<QueryResult> {
638 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
639 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
640
641 if *self.read_only_tx.lock() {
643 match &cmd {
644 SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
645 return Err(Error::Transaction(
646 grafeo_common::utils::error::TransactionError::ReadOnly,
647 ));
648 }
649 _ => {} }
651 }
652
653 match cmd {
654 SessionCommand::CreateGraph {
655 name,
656 if_not_exists,
657 typed,
658 like_graph,
659 copy_of,
660 open: _,
661 } => {
662 let storage_key = self.effective_graph_key(&name);
664
665 if let Some(ref src) = like_graph {
667 let src_key = self.effective_graph_key(src);
668 if self.store.graph(&src_key).is_none() {
669 return Err(Error::Query(QueryError::new(
670 QueryErrorKind::Semantic,
671 format!("Source graph '{src}' does not exist"),
672 )));
673 }
674 }
675 if let Some(ref src) = copy_of {
676 let src_key = self.effective_graph_key(src);
677 if self.store.graph(&src_key).is_none() {
678 return Err(Error::Query(QueryError::new(
679 QueryErrorKind::Semantic,
680 format!("Source graph '{src}' does not exist"),
681 )));
682 }
683 }
684
685 let created = self
686 .store
687 .create_graph(&storage_key)
688 .map_err(|e| Error::Internal(e.to_string()))?;
689 if !created && !if_not_exists {
690 return Err(Error::Query(QueryError::new(
691 QueryErrorKind::Semantic,
692 format!("Graph '{name}' already exists"),
693 )));
694 }
695 if created {
696 #[cfg(feature = "wal")]
697 self.log_schema_wal(
698 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
699 name: storage_key.clone(),
700 },
701 );
702 }
703
704 if let Some(ref src) = copy_of {
706 let src_key = self.effective_graph_key(src);
707 self.store
708 .copy_graph(Some(&src_key), Some(&storage_key))
709 .map_err(|e| Error::Internal(e.to_string()))?;
710 }
711
712 if let Some(type_name) = typed
716 && let Err(e) = self.catalog.bind_graph_type(
717 &storage_key,
718 if type_name.contains('/') {
719 type_name.clone()
720 } else {
721 self.effective_type_key(&type_name)
722 },
723 )
724 {
725 return Err(Error::Query(QueryError::new(
726 QueryErrorKind::Semantic,
727 e.to_string(),
728 )));
729 }
730
731 if let Some(ref src) = like_graph {
733 let src_key = self.effective_graph_key(src);
734 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
735 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
736 }
737 }
738
739 Ok(QueryResult::empty())
740 }
741 SessionCommand::DropGraph { name, if_exists } => {
742 let storage_key = self.effective_graph_key(&name);
743 let dropped = self.store.drop_graph(&storage_key);
744 if !dropped && !if_exists {
745 return Err(Error::Query(QueryError::new(
746 QueryErrorKind::Semantic,
747 format!("Graph '{name}' does not exist"),
748 )));
749 }
750 if dropped {
751 #[cfg(feature = "wal")]
752 self.log_schema_wal(
753 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
754 name: storage_key.clone(),
755 },
756 );
757 let mut current = self.current_graph.lock();
759 if current
760 .as_deref()
761 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
762 {
763 *current = None;
764 }
765 }
766 Ok(QueryResult::empty())
767 }
768 SessionCommand::UseGraph(name) => {
769 let effective_key = self.effective_graph_key(&name);
771 if !name.eq_ignore_ascii_case("default")
772 && self.store.graph(&effective_key).is_none()
773 {
774 return Err(Error::Query(QueryError::new(
775 QueryErrorKind::Semantic,
776 format!("Graph '{name}' does not exist"),
777 )));
778 }
779 self.use_graph(&name);
780 self.track_graph_touch();
782 Ok(QueryResult::empty())
783 }
784 SessionCommand::SessionSetGraph(name) => {
785 let effective_key = self.effective_graph_key(&name);
787 if !name.eq_ignore_ascii_case("default")
788 && self.store.graph(&effective_key).is_none()
789 {
790 return Err(Error::Query(QueryError::new(
791 QueryErrorKind::Semantic,
792 format!("Graph '{name}' does not exist"),
793 )));
794 }
795 self.use_graph(&name);
796 self.track_graph_touch();
798 Ok(QueryResult::empty())
799 }
800 SessionCommand::SessionSetSchema(name) => {
801 if !self.catalog.schema_exists(&name) {
803 return Err(Error::Query(QueryError::new(
804 QueryErrorKind::Semantic,
805 format!("Schema '{name}' does not exist"),
806 )));
807 }
808 self.set_schema(&name);
809 Ok(QueryResult::empty())
810 }
811 SessionCommand::SessionSetTimeZone(tz) => {
812 self.set_time_zone(&tz);
813 Ok(QueryResult::empty())
814 }
815 SessionCommand::SessionSetParameter(key, expr) => {
816 if key.eq_ignore_ascii_case("viewing_epoch") {
817 match Self::eval_integer_literal(&expr) {
818 Some(n) if n >= 0 => {
819 self.set_viewing_epoch(EpochId::new(n as u64));
820 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
821 }
822 _ => Err(Error::Query(QueryError::new(
823 QueryErrorKind::Semantic,
824 "viewing_epoch must be a non-negative integer literal",
825 ))),
826 }
827 } else {
828 self.set_parameter(&key, Value::Null);
831 Ok(QueryResult::empty())
832 }
833 }
834 SessionCommand::SessionReset(target) => {
835 use grafeo_adapters::query::gql::ast::SessionResetTarget;
836 match target {
837 SessionResetTarget::All => self.reset_session(),
838 SessionResetTarget::Schema => self.reset_schema(),
839 SessionResetTarget::Graph => self.reset_graph(),
840 SessionResetTarget::TimeZone => self.reset_time_zone(),
841 SessionResetTarget::Parameters => self.reset_parameters(),
842 }
843 Ok(QueryResult::empty())
844 }
845 SessionCommand::SessionClose => {
846 self.reset_session();
847 Ok(QueryResult::empty())
848 }
849 SessionCommand::StartTransaction {
850 read_only,
851 isolation_level,
852 } => {
853 let engine_level = isolation_level.map(|l| match l {
854 TransactionIsolationLevel::ReadCommitted => {
855 crate::transaction::IsolationLevel::ReadCommitted
856 }
857 TransactionIsolationLevel::SnapshotIsolation => {
858 crate::transaction::IsolationLevel::SnapshotIsolation
859 }
860 TransactionIsolationLevel::Serializable => {
861 crate::transaction::IsolationLevel::Serializable
862 }
863 });
864 self.begin_transaction_inner(read_only, engine_level)?;
865 Ok(QueryResult::status("Transaction started"))
866 }
867 SessionCommand::Commit => {
868 self.commit_inner()?;
869 Ok(QueryResult::status("Transaction committed"))
870 }
871 SessionCommand::Rollback => {
872 self.rollback_inner()?;
873 Ok(QueryResult::status("Transaction rolled back"))
874 }
875 SessionCommand::Savepoint(name) => {
876 self.savepoint(&name)?;
877 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
878 }
879 SessionCommand::RollbackToSavepoint(name) => {
880 self.rollback_to_savepoint(&name)?;
881 Ok(QueryResult::status(format!(
882 "Rolled back to savepoint '{name}'"
883 )))
884 }
885 SessionCommand::ReleaseSavepoint(name) => {
886 self.release_savepoint(&name)?;
887 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
888 }
889 }
890 }
891
892 #[cfg(feature = "wal")]
894 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
895 if let Some(ref wal) = self.wal
896 && let Err(e) = wal.log(record)
897 {
898 grafeo_warn!("Failed to log schema change to WAL: {}", e);
899 }
900 }
901
902 #[cfg(feature = "gql")]
904 fn execute_schema_command(
905 &self,
906 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
907 ) -> Result<QueryResult> {
908 use crate::catalog::{
909 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
910 };
911 use grafeo_adapters::query::gql::ast::SchemaStatement;
912 #[cfg(feature = "wal")]
913 use grafeo_adapters::storage::wal::WalRecord;
914 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
915
916 macro_rules! wal_log {
918 ($self:expr, $record:expr) => {
919 #[cfg(feature = "wal")]
920 $self.log_schema_wal(&$record);
921 };
922 }
923
924 let result = match cmd {
925 SchemaStatement::CreateNodeType(stmt) => {
926 let effective_name = self.effective_type_key(&stmt.name);
927 #[cfg(feature = "wal")]
928 let props_for_wal: Vec<(String, String, bool)> = stmt
929 .properties
930 .iter()
931 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
932 .collect();
933 let def = NodeTypeDefinition {
934 name: effective_name.clone(),
935 properties: stmt
936 .properties
937 .iter()
938 .map(|p| TypedProperty {
939 name: p.name.clone(),
940 data_type: PropertyDataType::from_type_name(&p.data_type),
941 nullable: p.nullable,
942 default_value: p
943 .default_value
944 .as_ref()
945 .map(|s| parse_default_literal(s)),
946 })
947 .collect(),
948 constraints: Vec::new(),
949 parent_types: stmt.parent_types.clone(),
950 };
951 let result = if stmt.or_replace {
952 let _ = self.catalog.drop_node_type(&effective_name);
953 self.catalog.register_node_type(def)
954 } else {
955 self.catalog.register_node_type(def)
956 };
957 match result {
958 Ok(()) => {
959 wal_log!(
960 self,
961 WalRecord::CreateNodeType {
962 name: effective_name.clone(),
963 properties: props_for_wal,
964 constraints: Vec::new(),
965 }
966 );
967 Ok(QueryResult::status(format!(
968 "Created node type '{}'",
969 stmt.name
970 )))
971 }
972 Err(e) if stmt.if_not_exists => {
973 let _ = e;
974 Ok(QueryResult::status("No change"))
975 }
976 Err(e) => Err(Error::Query(QueryError::new(
977 QueryErrorKind::Semantic,
978 e.to_string(),
979 ))),
980 }
981 }
982 SchemaStatement::CreateEdgeType(stmt) => {
983 let effective_name = self.effective_type_key(&stmt.name);
984 #[cfg(feature = "wal")]
985 let props_for_wal: Vec<(String, String, bool)> = stmt
986 .properties
987 .iter()
988 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
989 .collect();
990 let def = EdgeTypeDefinition {
991 name: effective_name.clone(),
992 properties: stmt
993 .properties
994 .iter()
995 .map(|p| TypedProperty {
996 name: p.name.clone(),
997 data_type: PropertyDataType::from_type_name(&p.data_type),
998 nullable: p.nullable,
999 default_value: p
1000 .default_value
1001 .as_ref()
1002 .map(|s| parse_default_literal(s)),
1003 })
1004 .collect(),
1005 constraints: Vec::new(),
1006 source_node_types: stmt.source_node_types.clone(),
1007 target_node_types: stmt.target_node_types.clone(),
1008 };
1009 let result = if stmt.or_replace {
1010 let _ = self.catalog.drop_edge_type_def(&effective_name);
1011 self.catalog.register_edge_type_def(def)
1012 } else {
1013 self.catalog.register_edge_type_def(def)
1014 };
1015 match result {
1016 Ok(()) => {
1017 wal_log!(
1018 self,
1019 WalRecord::CreateEdgeType {
1020 name: effective_name.clone(),
1021 properties: props_for_wal,
1022 constraints: Vec::new(),
1023 }
1024 );
1025 Ok(QueryResult::status(format!(
1026 "Created edge type '{}'",
1027 stmt.name
1028 )))
1029 }
1030 Err(e) if stmt.if_not_exists => {
1031 let _ = e;
1032 Ok(QueryResult::status("No change"))
1033 }
1034 Err(e) => Err(Error::Query(QueryError::new(
1035 QueryErrorKind::Semantic,
1036 e.to_string(),
1037 ))),
1038 }
1039 }
1040 SchemaStatement::CreateVectorIndex(stmt) => {
1041 Self::create_vector_index_on_store(
1042 &self.active_lpg_store(),
1043 &stmt.node_label,
1044 &stmt.property,
1045 stmt.dimensions,
1046 stmt.metric.as_deref(),
1047 )?;
1048 wal_log!(
1049 self,
1050 WalRecord::CreateIndex {
1051 name: stmt.name.clone(),
1052 label: stmt.node_label.clone(),
1053 property: stmt.property.clone(),
1054 index_type: "vector".to_string(),
1055 }
1056 );
1057 Ok(QueryResult::status(format!(
1058 "Created vector index '{}'",
1059 stmt.name
1060 )))
1061 }
1062 SchemaStatement::DropNodeType { name, if_exists } => {
1063 let effective_name = self.effective_type_key(&name);
1064 match self.catalog.drop_node_type(&effective_name) {
1065 Ok(()) => {
1066 wal_log!(
1067 self,
1068 WalRecord::DropNodeType {
1069 name: effective_name
1070 }
1071 );
1072 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1073 }
1074 Err(e) if if_exists => {
1075 let _ = e;
1076 Ok(QueryResult::status("No change"))
1077 }
1078 Err(e) => Err(Error::Query(QueryError::new(
1079 QueryErrorKind::Semantic,
1080 e.to_string(),
1081 ))),
1082 }
1083 }
1084 SchemaStatement::DropEdgeType { name, if_exists } => {
1085 let effective_name = self.effective_type_key(&name);
1086 match self.catalog.drop_edge_type_def(&effective_name) {
1087 Ok(()) => {
1088 wal_log!(
1089 self,
1090 WalRecord::DropEdgeType {
1091 name: effective_name
1092 }
1093 );
1094 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1095 }
1096 Err(e) if if_exists => {
1097 let _ = e;
1098 Ok(QueryResult::status("No change"))
1099 }
1100 Err(e) => Err(Error::Query(QueryError::new(
1101 QueryErrorKind::Semantic,
1102 e.to_string(),
1103 ))),
1104 }
1105 }
1106 SchemaStatement::CreateIndex(stmt) => {
1107 use crate::catalog::IndexType as CatalogIndexType;
1108 use grafeo_adapters::query::gql::ast::IndexKind;
1109 let active = self.active_lpg_store();
1110 let index_type_str = match stmt.index_kind {
1111 IndexKind::Property => "property",
1112 IndexKind::BTree => "btree",
1113 IndexKind::Text => "text",
1114 IndexKind::Vector => "vector",
1115 };
1116 match stmt.index_kind {
1117 IndexKind::Property | IndexKind::BTree => {
1118 for prop in &stmt.properties {
1119 active.create_property_index(prop);
1120 }
1121 }
1122 IndexKind::Text => {
1123 for prop in &stmt.properties {
1124 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1125 }
1126 }
1127 IndexKind::Vector => {
1128 for prop in &stmt.properties {
1129 Self::create_vector_index_on_store(
1130 &active,
1131 &stmt.label,
1132 prop,
1133 stmt.options.dimensions,
1134 stmt.options.metric.as_deref(),
1135 )?;
1136 }
1137 }
1138 }
1139 let catalog_index_type = match stmt.index_kind {
1142 IndexKind::Property => CatalogIndexType::Hash,
1143 IndexKind::BTree => CatalogIndexType::BTree,
1144 IndexKind::Text => CatalogIndexType::FullText,
1145 IndexKind::Vector => CatalogIndexType::Hash,
1146 };
1147 let label_id = self.catalog.get_or_create_label(&stmt.label);
1148 for prop in &stmt.properties {
1149 let prop_id = self.catalog.get_or_create_property_key(prop);
1150 self.catalog
1151 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1152 }
1153 #[cfg(feature = "wal")]
1154 for prop in &stmt.properties {
1155 wal_log!(
1156 self,
1157 WalRecord::CreateIndex {
1158 name: stmt.name.clone(),
1159 label: stmt.label.clone(),
1160 property: prop.clone(),
1161 index_type: index_type_str.to_string(),
1162 }
1163 );
1164 }
1165 Ok(QueryResult::status(format!(
1166 "Created {} index '{}'",
1167 index_type_str, stmt.name
1168 )))
1169 }
1170 SchemaStatement::DropIndex { name, if_exists } => {
1171 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1174 let def = self.catalog.get_index(index_id);
1175 self.catalog.drop_index(index_id);
1176 if let Some(def) = def
1177 && let Some(prop_name) =
1178 self.catalog.get_property_key_name(def.property_key)
1179 {
1180 self.active_lpg_store().drop_property_index(&prop_name);
1181 }
1182 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1183 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1184 } else if if_exists {
1185 Ok(QueryResult::status("No change".to_string()))
1186 } else {
1187 Err(Error::Query(QueryError::new(
1188 QueryErrorKind::Semantic,
1189 format!("Index '{name}' does not exist"),
1190 )))
1191 }
1192 }
1193 SchemaStatement::CreateConstraint(stmt) => {
1194 use crate::catalog::TypeConstraint;
1195 use grafeo_adapters::query::gql::ast::ConstraintKind;
1196 let kind_str = match stmt.constraint_kind {
1197 ConstraintKind::Unique => "unique",
1198 ConstraintKind::NodeKey => "node_key",
1199 ConstraintKind::NotNull => "not_null",
1200 ConstraintKind::Exists => "exists",
1201 };
1202 let constraint_name = stmt
1203 .name
1204 .clone()
1205 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1206
1207 match stmt.constraint_kind {
1209 ConstraintKind::Unique => {
1210 for prop in &stmt.properties {
1211 let label_id = self.catalog.get_or_create_label(&stmt.label);
1212 let prop_id = self.catalog.get_or_create_property_key(prop);
1213 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1214 }
1215 let _ = self.catalog.add_constraint_to_type(
1216 &stmt.label,
1217 TypeConstraint::Unique(stmt.properties.clone()),
1218 );
1219 }
1220 ConstraintKind::NodeKey => {
1221 for prop in &stmt.properties {
1222 let label_id = self.catalog.get_or_create_label(&stmt.label);
1223 let prop_id = self.catalog.get_or_create_property_key(prop);
1224 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1225 let _ = self.catalog.add_required_property(label_id, prop_id);
1226 }
1227 let _ = self.catalog.add_constraint_to_type(
1228 &stmt.label,
1229 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1230 );
1231 }
1232 ConstraintKind::NotNull | ConstraintKind::Exists => {
1233 for prop in &stmt.properties {
1234 let label_id = self.catalog.get_or_create_label(&stmt.label);
1235 let prop_id = self.catalog.get_or_create_property_key(prop);
1236 let _ = self.catalog.add_required_property(label_id, prop_id);
1237 let _ = self.catalog.add_constraint_to_type(
1238 &stmt.label,
1239 TypeConstraint::NotNull(prop.clone()),
1240 );
1241 }
1242 }
1243 }
1244
1245 wal_log!(
1246 self,
1247 WalRecord::CreateConstraint {
1248 name: constraint_name.clone(),
1249 label: stmt.label.clone(),
1250 properties: stmt.properties.clone(),
1251 kind: kind_str.to_string(),
1252 }
1253 );
1254 Ok(QueryResult::status(format!(
1255 "Created {kind_str} constraint '{constraint_name}'"
1256 )))
1257 }
1258 SchemaStatement::DropConstraint { name, if_exists } => {
1259 let _ = if_exists;
1260 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1261 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1262 }
1263 SchemaStatement::CreateGraphType(stmt) => {
1264 use crate::catalog::GraphTypeDefinition;
1265 use grafeo_adapters::query::gql::ast::InlineElementType;
1266
1267 let effective_name = self.effective_type_key(&stmt.name);
1268
1269 let (mut node_types, mut edge_types, open) =
1271 if let Some(ref like_graph) = stmt.like_graph {
1272 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1274 if let Some(existing) = self
1275 .catalog
1276 .schema()
1277 .and_then(|s| s.get_graph_type(&type_name))
1278 {
1279 (
1280 existing.allowed_node_types.clone(),
1281 existing.allowed_edge_types.clone(),
1282 existing.open,
1283 )
1284 } else {
1285 (Vec::new(), Vec::new(), true)
1286 }
1287 } else {
1288 let nt = self.catalog.all_node_type_names();
1290 let et = self.catalog.all_edge_type_names();
1291 if nt.is_empty() && et.is_empty() {
1292 (Vec::new(), Vec::new(), true)
1293 } else {
1294 (nt, et, false)
1295 }
1296 }
1297 } else {
1298 let nt = stmt
1300 .node_types
1301 .iter()
1302 .map(|n| self.effective_type_key(n))
1303 .collect();
1304 let et = stmt
1305 .edge_types
1306 .iter()
1307 .map(|n| self.effective_type_key(n))
1308 .collect();
1309 (nt, et, stmt.open)
1310 };
1311
1312 for inline in &stmt.inline_types {
1314 match inline {
1315 InlineElementType::Node {
1316 name,
1317 properties,
1318 key_labels,
1319 ..
1320 } => {
1321 let inline_effective = self.effective_type_key(name);
1322 let def = NodeTypeDefinition {
1323 name: inline_effective.clone(),
1324 properties: properties
1325 .iter()
1326 .map(|p| TypedProperty {
1327 name: p.name.clone(),
1328 data_type: PropertyDataType::from_type_name(&p.data_type),
1329 nullable: p.nullable,
1330 default_value: None,
1331 })
1332 .collect(),
1333 constraints: Vec::new(),
1334 parent_types: key_labels.clone(),
1335 };
1336 self.catalog.register_or_replace_node_type(def);
1338 #[cfg(feature = "wal")]
1339 {
1340 let props_for_wal: Vec<(String, String, bool)> = properties
1341 .iter()
1342 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1343 .collect();
1344 self.log_schema_wal(&WalRecord::CreateNodeType {
1345 name: inline_effective.clone(),
1346 properties: props_for_wal,
1347 constraints: Vec::new(),
1348 });
1349 }
1350 if !node_types.contains(&inline_effective) {
1351 node_types.push(inline_effective);
1352 }
1353 }
1354 InlineElementType::Edge {
1355 name,
1356 properties,
1357 source_node_types,
1358 target_node_types,
1359 ..
1360 } => {
1361 let inline_effective = self.effective_type_key(name);
1362 let def = EdgeTypeDefinition {
1363 name: inline_effective.clone(),
1364 properties: properties
1365 .iter()
1366 .map(|p| TypedProperty {
1367 name: p.name.clone(),
1368 data_type: PropertyDataType::from_type_name(&p.data_type),
1369 nullable: p.nullable,
1370 default_value: None,
1371 })
1372 .collect(),
1373 constraints: Vec::new(),
1374 source_node_types: source_node_types.clone(),
1375 target_node_types: target_node_types.clone(),
1376 };
1377 self.catalog.register_or_replace_edge_type_def(def);
1378 #[cfg(feature = "wal")]
1379 {
1380 let props_for_wal: Vec<(String, String, bool)> = properties
1381 .iter()
1382 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1383 .collect();
1384 self.log_schema_wal(&WalRecord::CreateEdgeType {
1385 name: inline_effective.clone(),
1386 properties: props_for_wal,
1387 constraints: Vec::new(),
1388 });
1389 }
1390 if !edge_types.contains(&inline_effective) {
1391 edge_types.push(inline_effective);
1392 }
1393 }
1394 }
1395 }
1396
1397 let def = GraphTypeDefinition {
1398 name: effective_name.clone(),
1399 allowed_node_types: node_types.clone(),
1400 allowed_edge_types: edge_types.clone(),
1401 open,
1402 };
1403 let result = if stmt.or_replace {
1404 let _ = self.catalog.drop_graph_type(&effective_name);
1406 self.catalog.register_graph_type(def)
1407 } else {
1408 self.catalog.register_graph_type(def)
1409 };
1410 match result {
1411 Ok(()) => {
1412 wal_log!(
1413 self,
1414 WalRecord::CreateGraphType {
1415 name: effective_name.clone(),
1416 node_types,
1417 edge_types,
1418 open,
1419 }
1420 );
1421 Ok(QueryResult::status(format!(
1422 "Created graph type '{}'",
1423 stmt.name
1424 )))
1425 }
1426 Err(e) if stmt.if_not_exists => {
1427 let _ = e;
1428 Ok(QueryResult::status("No change"))
1429 }
1430 Err(e) => Err(Error::Query(QueryError::new(
1431 QueryErrorKind::Semantic,
1432 e.to_string(),
1433 ))),
1434 }
1435 }
1436 SchemaStatement::DropGraphType { name, if_exists } => {
1437 let effective_name = self.effective_type_key(&name);
1438 match self.catalog.drop_graph_type(&effective_name) {
1439 Ok(()) => {
1440 wal_log!(
1441 self,
1442 WalRecord::DropGraphType {
1443 name: effective_name
1444 }
1445 );
1446 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1447 }
1448 Err(e) if if_exists => {
1449 let _ = e;
1450 Ok(QueryResult::status("No change"))
1451 }
1452 Err(e) => Err(Error::Query(QueryError::new(
1453 QueryErrorKind::Semantic,
1454 e.to_string(),
1455 ))),
1456 }
1457 }
1458 SchemaStatement::CreateSchema {
1459 name,
1460 if_not_exists,
1461 } => match self.catalog.register_schema_namespace(name.clone()) {
1462 Ok(()) => {
1463 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1464 Ok(QueryResult::status(format!("Created schema '{name}'")))
1465 }
1466 Err(e) if if_not_exists => {
1467 let _ = e;
1468 Ok(QueryResult::status("No change"))
1469 }
1470 Err(e) => Err(Error::Query(QueryError::new(
1471 QueryErrorKind::Semantic,
1472 e.to_string(),
1473 ))),
1474 },
1475 SchemaStatement::DropSchema { name, if_exists } => {
1476 let prefix = format!("{name}/");
1478 let has_graphs = self
1479 .store
1480 .graph_names()
1481 .iter()
1482 .any(|g| g.starts_with(&prefix));
1483 let has_types = self
1484 .catalog
1485 .all_node_type_names()
1486 .iter()
1487 .any(|n| n.starts_with(&prefix))
1488 || self
1489 .catalog
1490 .all_edge_type_names()
1491 .iter()
1492 .any(|n| n.starts_with(&prefix))
1493 || self
1494 .catalog
1495 .all_graph_type_names()
1496 .iter()
1497 .any(|n| n.starts_with(&prefix));
1498 if has_graphs || has_types {
1499 return Err(Error::Query(QueryError::new(
1500 QueryErrorKind::Semantic,
1501 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1502 )));
1503 }
1504 match self.catalog.drop_schema_namespace(&name) {
1505 Ok(()) => {
1506 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1507 let mut current = self.current_schema.lock();
1509 if current
1510 .as_deref()
1511 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1512 {
1513 *current = None;
1514 }
1515 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1516 }
1517 Err(e) if if_exists => {
1518 let _ = e;
1519 Ok(QueryResult::status("No change"))
1520 }
1521 Err(e) => Err(Error::Query(QueryError::new(
1522 QueryErrorKind::Semantic,
1523 e.to_string(),
1524 ))),
1525 }
1526 }
1527 SchemaStatement::AlterNodeType(stmt) => {
1528 use grafeo_adapters::query::gql::ast::TypeAlteration;
1529 let effective_name = self.effective_type_key(&stmt.name);
1530 let mut wal_alts = Vec::new();
1531 for alt in &stmt.alterations {
1532 match alt {
1533 TypeAlteration::AddProperty(prop) => {
1534 let typed = TypedProperty {
1535 name: prop.name.clone(),
1536 data_type: PropertyDataType::from_type_name(&prop.data_type),
1537 nullable: prop.nullable,
1538 default_value: prop
1539 .default_value
1540 .as_ref()
1541 .map(|s| parse_default_literal(s)),
1542 };
1543 self.catalog
1544 .alter_node_type_add_property(&effective_name, typed)
1545 .map_err(|e| {
1546 Error::Query(QueryError::new(
1547 QueryErrorKind::Semantic,
1548 e.to_string(),
1549 ))
1550 })?;
1551 wal_alts.push((
1552 "add".to_string(),
1553 prop.name.clone(),
1554 prop.data_type.clone(),
1555 prop.nullable,
1556 ));
1557 }
1558 TypeAlteration::DropProperty(name) => {
1559 self.catalog
1560 .alter_node_type_drop_property(&effective_name, name)
1561 .map_err(|e| {
1562 Error::Query(QueryError::new(
1563 QueryErrorKind::Semantic,
1564 e.to_string(),
1565 ))
1566 })?;
1567 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1568 }
1569 }
1570 }
1571 wal_log!(
1572 self,
1573 WalRecord::AlterNodeType {
1574 name: effective_name,
1575 alterations: wal_alts,
1576 }
1577 );
1578 Ok(QueryResult::status(format!(
1579 "Altered node type '{}'",
1580 stmt.name
1581 )))
1582 }
1583 SchemaStatement::AlterEdgeType(stmt) => {
1584 use grafeo_adapters::query::gql::ast::TypeAlteration;
1585 let effective_name = self.effective_type_key(&stmt.name);
1586 let mut wal_alts = Vec::new();
1587 for alt in &stmt.alterations {
1588 match alt {
1589 TypeAlteration::AddProperty(prop) => {
1590 let typed = TypedProperty {
1591 name: prop.name.clone(),
1592 data_type: PropertyDataType::from_type_name(&prop.data_type),
1593 nullable: prop.nullable,
1594 default_value: prop
1595 .default_value
1596 .as_ref()
1597 .map(|s| parse_default_literal(s)),
1598 };
1599 self.catalog
1600 .alter_edge_type_add_property(&effective_name, typed)
1601 .map_err(|e| {
1602 Error::Query(QueryError::new(
1603 QueryErrorKind::Semantic,
1604 e.to_string(),
1605 ))
1606 })?;
1607 wal_alts.push((
1608 "add".to_string(),
1609 prop.name.clone(),
1610 prop.data_type.clone(),
1611 prop.nullable,
1612 ));
1613 }
1614 TypeAlteration::DropProperty(name) => {
1615 self.catalog
1616 .alter_edge_type_drop_property(&effective_name, name)
1617 .map_err(|e| {
1618 Error::Query(QueryError::new(
1619 QueryErrorKind::Semantic,
1620 e.to_string(),
1621 ))
1622 })?;
1623 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1624 }
1625 }
1626 }
1627 wal_log!(
1628 self,
1629 WalRecord::AlterEdgeType {
1630 name: effective_name,
1631 alterations: wal_alts,
1632 }
1633 );
1634 Ok(QueryResult::status(format!(
1635 "Altered edge type '{}'",
1636 stmt.name
1637 )))
1638 }
1639 SchemaStatement::AlterGraphType(stmt) => {
1640 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1641 let effective_name = self.effective_type_key(&stmt.name);
1642 let mut wal_alts = Vec::new();
1643 for alt in &stmt.alterations {
1644 match alt {
1645 GraphTypeAlteration::AddNodeType(name) => {
1646 self.catalog
1647 .alter_graph_type_add_node_type(&effective_name, name.clone())
1648 .map_err(|e| {
1649 Error::Query(QueryError::new(
1650 QueryErrorKind::Semantic,
1651 e.to_string(),
1652 ))
1653 })?;
1654 wal_alts.push(("add_node_type".to_string(), name.clone()));
1655 }
1656 GraphTypeAlteration::DropNodeType(name) => {
1657 self.catalog
1658 .alter_graph_type_drop_node_type(&effective_name, name)
1659 .map_err(|e| {
1660 Error::Query(QueryError::new(
1661 QueryErrorKind::Semantic,
1662 e.to_string(),
1663 ))
1664 })?;
1665 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1666 }
1667 GraphTypeAlteration::AddEdgeType(name) => {
1668 self.catalog
1669 .alter_graph_type_add_edge_type(&effective_name, name.clone())
1670 .map_err(|e| {
1671 Error::Query(QueryError::new(
1672 QueryErrorKind::Semantic,
1673 e.to_string(),
1674 ))
1675 })?;
1676 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1677 }
1678 GraphTypeAlteration::DropEdgeType(name) => {
1679 self.catalog
1680 .alter_graph_type_drop_edge_type(&effective_name, name)
1681 .map_err(|e| {
1682 Error::Query(QueryError::new(
1683 QueryErrorKind::Semantic,
1684 e.to_string(),
1685 ))
1686 })?;
1687 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1688 }
1689 }
1690 }
1691 wal_log!(
1692 self,
1693 WalRecord::AlterGraphType {
1694 name: effective_name,
1695 alterations: wal_alts,
1696 }
1697 );
1698 Ok(QueryResult::status(format!(
1699 "Altered graph type '{}'",
1700 stmt.name
1701 )))
1702 }
1703 SchemaStatement::CreateProcedure(stmt) => {
1704 use crate::catalog::ProcedureDefinition;
1705
1706 let def = ProcedureDefinition {
1707 name: stmt.name.clone(),
1708 params: stmt
1709 .params
1710 .iter()
1711 .map(|p| (p.name.clone(), p.param_type.clone()))
1712 .collect(),
1713 returns: stmt
1714 .returns
1715 .iter()
1716 .map(|r| (r.name.clone(), r.return_type.clone()))
1717 .collect(),
1718 body: stmt.body.clone(),
1719 };
1720
1721 if stmt.or_replace {
1722 self.catalog.replace_procedure(def).map_err(|e| {
1723 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1724 })?;
1725 } else {
1726 match self.catalog.register_procedure(def) {
1727 Ok(()) => {}
1728 Err(_) if stmt.if_not_exists => {
1729 return Ok(QueryResult::empty());
1730 }
1731 Err(e) => {
1732 return Err(Error::Query(QueryError::new(
1733 QueryErrorKind::Semantic,
1734 e.to_string(),
1735 )));
1736 }
1737 }
1738 }
1739
1740 wal_log!(
1741 self,
1742 WalRecord::CreateProcedure {
1743 name: stmt.name.clone(),
1744 params: stmt
1745 .params
1746 .iter()
1747 .map(|p| (p.name.clone(), p.param_type.clone()))
1748 .collect(),
1749 returns: stmt
1750 .returns
1751 .iter()
1752 .map(|r| (r.name.clone(), r.return_type.clone()))
1753 .collect(),
1754 body: stmt.body,
1755 }
1756 );
1757 Ok(QueryResult::status(format!(
1758 "Created procedure '{}'",
1759 stmt.name
1760 )))
1761 }
1762 SchemaStatement::DropProcedure { name, if_exists } => {
1763 match self.catalog.drop_procedure(&name) {
1764 Ok(()) => {}
1765 Err(_) if if_exists => {
1766 return Ok(QueryResult::empty());
1767 }
1768 Err(e) => {
1769 return Err(Error::Query(QueryError::new(
1770 QueryErrorKind::Semantic,
1771 e.to_string(),
1772 )));
1773 }
1774 }
1775 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1776 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1777 }
1778 SchemaStatement::ShowIndexes => {
1779 return self.execute_show_indexes();
1780 }
1781 SchemaStatement::ShowConstraints => {
1782 return self.execute_show_constraints();
1783 }
1784 SchemaStatement::ShowNodeTypes => {
1785 return self.execute_show_node_types();
1786 }
1787 SchemaStatement::ShowEdgeTypes => {
1788 return self.execute_show_edge_types();
1789 }
1790 SchemaStatement::ShowGraphTypes => {
1791 return self.execute_show_graph_types();
1792 }
1793 SchemaStatement::ShowGraphType(name) => {
1794 return self.execute_show_graph_type(&name);
1795 }
1796 SchemaStatement::ShowCurrentGraphType => {
1797 return self.execute_show_current_graph_type();
1798 }
1799 SchemaStatement::ShowGraphs => {
1800 return self.execute_show_graphs();
1801 }
1802 SchemaStatement::ShowSchemas => {
1803 return self.execute_show_schemas();
1804 }
1805 };
1806
1807 if result.is_ok() {
1810 self.query_cache.clear();
1811 }
1812
1813 result
1814 }
1815
1816 #[cfg(all(feature = "gql", feature = "vector-index"))]
1818 fn create_vector_index_on_store(
1819 store: &LpgStore,
1820 label: &str,
1821 property: &str,
1822 dimensions: Option<usize>,
1823 metric: Option<&str>,
1824 ) -> Result<()> {
1825 use grafeo_common::types::{PropertyKey, Value};
1826 use grafeo_common::utils::error::Error;
1827 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1828
1829 let metric = match metric {
1830 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1831 Error::Internal(format!(
1832 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1833 ))
1834 })?,
1835 None => DistanceMetric::Cosine,
1836 };
1837
1838 let prop_key = PropertyKey::new(property);
1839 let mut found_dims: Option<usize> = dimensions;
1840 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1841
1842 for node in store.nodes_with_label(label) {
1843 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1844 if let Some(expected) = found_dims {
1845 if v.len() != expected {
1846 return Err(Error::Internal(format!(
1847 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1848 v.len(),
1849 node.id.0
1850 )));
1851 }
1852 } else {
1853 found_dims = Some(v.len());
1854 }
1855 vectors.push((node.id, v.to_vec()));
1856 }
1857 }
1858
1859 let Some(dims) = found_dims else {
1860 return Err(Error::Internal(format!(
1861 "No vector properties found on :{label}({property}) and no dimensions specified"
1862 )));
1863 };
1864
1865 let config = HnswConfig::new(dims, metric);
1866 let index = HnswIndex::with_capacity(config, vectors.len());
1867 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1868 for (node_id, vec) in &vectors {
1869 index.insert(*node_id, vec, &accessor);
1870 }
1871
1872 store.add_vector_index(label, property, Arc::new(index));
1873 Ok(())
1874 }
1875
1876 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1878 fn create_vector_index_on_store(
1879 _store: &LpgStore,
1880 _label: &str,
1881 _property: &str,
1882 _dimensions: Option<usize>,
1883 _metric: Option<&str>,
1884 ) -> Result<()> {
1885 Err(grafeo_common::utils::error::Error::Internal(
1886 "Vector index support requires the 'vector-index' feature".to_string(),
1887 ))
1888 }
1889
1890 #[cfg(all(feature = "gql", feature = "text-index"))]
1892 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1893 use grafeo_common::types::{PropertyKey, Value};
1894 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1895
1896 let mut index = InvertedIndex::new(BM25Config::default());
1897 let prop_key = PropertyKey::new(property);
1898
1899 let nodes = store.nodes_by_label(label);
1900 for node_id in nodes {
1901 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1902 index.insert(node_id, text.as_str());
1903 }
1904 }
1905
1906 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1907 Ok(())
1908 }
1909
1910 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1912 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1913 Err(grafeo_common::utils::error::Error::Internal(
1914 "Text index support requires the 'text-index' feature".to_string(),
1915 ))
1916 }
1917
1918 fn execute_show_indexes(&self) -> Result<QueryResult> {
1920 let indexes = self.catalog.all_indexes();
1921 let columns = vec![
1922 "name".to_string(),
1923 "type".to_string(),
1924 "label".to_string(),
1925 "property".to_string(),
1926 ];
1927 let rows: Vec<Vec<Value>> = indexes
1928 .into_iter()
1929 .map(|def| {
1930 let label_name = self
1931 .catalog
1932 .get_label_name(def.label)
1933 .unwrap_or_else(|| "?".into());
1934 let prop_name = self
1935 .catalog
1936 .get_property_key_name(def.property_key)
1937 .unwrap_or_else(|| "?".into());
1938 vec![
1939 Value::from(def.name),
1940 Value::from(format!("{:?}", def.index_type)),
1941 Value::from(&*label_name),
1942 Value::from(&*prop_name),
1943 ]
1944 })
1945 .collect();
1946 Ok(QueryResult {
1947 columns,
1948 column_types: Vec::new(),
1949 rows,
1950 ..QueryResult::empty()
1951 })
1952 }
1953
1954 fn execute_show_constraints(&self) -> Result<QueryResult> {
1956 Ok(QueryResult {
1959 columns: vec![
1960 "name".to_string(),
1961 "type".to_string(),
1962 "label".to_string(),
1963 "properties".to_string(),
1964 ],
1965 column_types: Vec::new(),
1966 rows: Vec::new(),
1967 ..QueryResult::empty()
1968 })
1969 }
1970
1971 fn execute_show_node_types(&self) -> Result<QueryResult> {
1973 let columns = vec![
1974 "name".to_string(),
1975 "properties".to_string(),
1976 "constraints".to_string(),
1977 "parents".to_string(),
1978 ];
1979 let schema = self.current_schema.lock().clone();
1980 let all_names = self.catalog.all_node_type_names();
1981 let type_names: Vec<String> = match &schema {
1982 Some(s) => {
1983 let prefix = format!("{s}/");
1984 all_names
1985 .into_iter()
1986 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1987 .collect()
1988 }
1989 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1990 };
1991 let rows: Vec<Vec<Value>> = type_names
1992 .into_iter()
1993 .filter_map(|name| {
1994 let lookup = match &schema {
1995 Some(s) => format!("{s}/{name}"),
1996 None => name.clone(),
1997 };
1998 let def = self.catalog.get_node_type(&lookup)?;
1999 let props: Vec<String> = def
2000 .properties
2001 .iter()
2002 .map(|p| {
2003 let nullable = if p.nullable { "" } else { " NOT NULL" };
2004 format!("{} {}{}", p.name, p.data_type, nullable)
2005 })
2006 .collect();
2007 let constraints: Vec<String> =
2008 def.constraints.iter().map(|c| format!("{c:?}")).collect();
2009 let parents = def.parent_types.join(", ");
2010 Some(vec![
2011 Value::from(name),
2012 Value::from(props.join(", ")),
2013 Value::from(constraints.join(", ")),
2014 Value::from(parents),
2015 ])
2016 })
2017 .collect();
2018 Ok(QueryResult {
2019 columns,
2020 column_types: Vec::new(),
2021 rows,
2022 ..QueryResult::empty()
2023 })
2024 }
2025
2026 fn execute_show_edge_types(&self) -> Result<QueryResult> {
2028 let columns = vec![
2029 "name".to_string(),
2030 "properties".to_string(),
2031 "source_types".to_string(),
2032 "target_types".to_string(),
2033 ];
2034 let schema = self.current_schema.lock().clone();
2035 let all_names = self.catalog.all_edge_type_names();
2036 let type_names: Vec<String> = match &schema {
2037 Some(s) => {
2038 let prefix = format!("{s}/");
2039 all_names
2040 .into_iter()
2041 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2042 .collect()
2043 }
2044 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2045 };
2046 let rows: Vec<Vec<Value>> = type_names
2047 .into_iter()
2048 .filter_map(|name| {
2049 let lookup = match &schema {
2050 Some(s) => format!("{s}/{name}"),
2051 None => name.clone(),
2052 };
2053 let def = self.catalog.get_edge_type_def(&lookup)?;
2054 let props: Vec<String> = def
2055 .properties
2056 .iter()
2057 .map(|p| {
2058 let nullable = if p.nullable { "" } else { " NOT NULL" };
2059 format!("{} {}{}", p.name, p.data_type, nullable)
2060 })
2061 .collect();
2062 let src = def.source_node_types.join(", ");
2063 let tgt = def.target_node_types.join(", ");
2064 Some(vec![
2065 Value::from(name),
2066 Value::from(props.join(", ")),
2067 Value::from(src),
2068 Value::from(tgt),
2069 ])
2070 })
2071 .collect();
2072 Ok(QueryResult {
2073 columns,
2074 column_types: Vec::new(),
2075 rows,
2076 ..QueryResult::empty()
2077 })
2078 }
2079
2080 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2082 let columns = vec![
2083 "name".to_string(),
2084 "open".to_string(),
2085 "node_types".to_string(),
2086 "edge_types".to_string(),
2087 ];
2088 let schema = self.current_schema.lock().clone();
2089 let all_names = self.catalog.all_graph_type_names();
2090 let type_names: Vec<String> = match &schema {
2091 Some(s) => {
2092 let prefix = format!("{s}/");
2093 all_names
2094 .into_iter()
2095 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2096 .collect()
2097 }
2098 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2099 };
2100 let rows: Vec<Vec<Value>> = type_names
2101 .into_iter()
2102 .filter_map(|name| {
2103 let lookup = match &schema {
2104 Some(s) => format!("{s}/{name}"),
2105 None => name.clone(),
2106 };
2107 let def = self.catalog.get_graph_type_def(&lookup)?;
2108 let strip = |n: &String| -> String {
2110 match &schema {
2111 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2112 None => n.clone(),
2113 }
2114 };
2115 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2116 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2117 Some(vec![
2118 Value::from(name),
2119 Value::from(def.open),
2120 Value::from(node_types.join(", ")),
2121 Value::from(edge_types.join(", ")),
2122 ])
2123 })
2124 .collect();
2125 Ok(QueryResult {
2126 columns,
2127 column_types: Vec::new(),
2128 rows,
2129 ..QueryResult::empty()
2130 })
2131 }
2132
2133 fn execute_show_graphs(&self) -> Result<QueryResult> {
2139 let schema = self.current_schema.lock().clone();
2140 let all_names = self.store.graph_names();
2141
2142 let mut names: Vec<String> = match &schema {
2143 Some(s) => {
2144 let prefix = format!("{s}/");
2145 all_names
2146 .into_iter()
2147 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2148 .collect()
2149 }
2150 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2151 };
2152 names.sort();
2153
2154 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2155 Ok(QueryResult {
2156 columns: vec!["name".to_string()],
2157 column_types: Vec::new(),
2158 rows,
2159 ..QueryResult::empty()
2160 })
2161 }
2162
2163 fn execute_show_schemas(&self) -> Result<QueryResult> {
2165 let mut names = self.catalog.schema_names();
2166 names.sort();
2167 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2168 Ok(QueryResult {
2169 columns: vec!["name".to_string()],
2170 column_types: Vec::new(),
2171 rows,
2172 ..QueryResult::empty()
2173 })
2174 }
2175
2176 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2178 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2179
2180 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2181 Error::Query(QueryError::new(
2182 QueryErrorKind::Semantic,
2183 format!("Graph type '{name}' not found"),
2184 ))
2185 })?;
2186
2187 let columns = vec![
2188 "name".to_string(),
2189 "open".to_string(),
2190 "node_types".to_string(),
2191 "edge_types".to_string(),
2192 ];
2193 let rows = vec![vec![
2194 Value::from(def.name),
2195 Value::from(def.open),
2196 Value::from(def.allowed_node_types.join(", ")),
2197 Value::from(def.allowed_edge_types.join(", ")),
2198 ]];
2199 Ok(QueryResult {
2200 columns,
2201 column_types: Vec::new(),
2202 rows,
2203 ..QueryResult::empty()
2204 })
2205 }
2206
2207 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2209 let graph_name = self
2210 .current_graph()
2211 .unwrap_or_else(|| "default".to_string());
2212 let columns = vec![
2213 "graph".to_string(),
2214 "graph_type".to_string(),
2215 "open".to_string(),
2216 "node_types".to_string(),
2217 "edge_types".to_string(),
2218 ];
2219
2220 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2221 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2222 {
2223 let rows = vec![vec![
2224 Value::from(graph_name),
2225 Value::from(type_name),
2226 Value::from(def.open),
2227 Value::from(def.allowed_node_types.join(", ")),
2228 Value::from(def.allowed_edge_types.join(", ")),
2229 ]];
2230 return Ok(QueryResult {
2231 columns,
2232 column_types: Vec::new(),
2233 rows,
2234 ..QueryResult::empty()
2235 });
2236 }
2237
2238 Ok(QueryResult {
2240 columns,
2241 column_types: Vec::new(),
2242 rows: vec![vec![
2243 Value::from(graph_name),
2244 Value::Null,
2245 Value::Null,
2246 Value::Null,
2247 Value::Null,
2248 ]],
2249 ..QueryResult::empty()
2250 })
2251 }
2252
2253 #[cfg(feature = "gql")]
2280 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2281 self.require_lpg("GQL")?;
2282
2283 use crate::query::{
2284 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2285 processor::QueryLanguage, translators::gql,
2286 };
2287
2288 let _span = grafeo_info_span!(
2289 "grafeo::session::execute",
2290 language = "gql",
2291 query_len = query.len(),
2292 );
2293
2294 #[cfg(not(target_arch = "wasm32"))]
2295 let start_time = std::time::Instant::now();
2296
2297 let translation = gql::translate_full(query)?;
2299 let logical_plan = match translation {
2300 gql::GqlTranslationResult::SessionCommand(cmd) => {
2301 return self.execute_session_command(cmd);
2302 }
2303 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2304 if *self.read_only_tx.lock() {
2306 return Err(grafeo_common::utils::error::Error::Transaction(
2307 grafeo_common::utils::error::TransactionError::ReadOnly,
2308 ));
2309 }
2310 return self.execute_schema_command(cmd);
2311 }
2312 gql::GqlTranslationResult::Plan(plan) => {
2313 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2315 return Err(grafeo_common::utils::error::Error::Transaction(
2316 grafeo_common::utils::error::TransactionError::ReadOnly,
2317 ));
2318 }
2319 plan
2320 }
2321 };
2322
2323 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2325
2326 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2328 cached_plan
2329 } else {
2330 let mut binder = Binder::new();
2332 let _binding_context = binder.bind(&logical_plan)?;
2333
2334 let active = self.active_store();
2336 let optimizer = Optimizer::from_graph_store(&*active);
2337 let plan = optimizer.optimize(logical_plan)?;
2338
2339 self.query_cache.put_optimized(cache_key, plan.clone());
2341
2342 plan
2343 };
2344
2345 let active = self.active_store();
2347
2348 if optimized_plan.explain {
2350 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2351 let mut plan = optimized_plan;
2352 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2353 return Ok(explain_result(&plan));
2354 }
2355
2356 if optimized_plan.profile {
2358 let has_mutations = optimized_plan.root.has_mutations();
2359 return self.with_auto_commit(has_mutations, || {
2360 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2361 let planner = self.create_planner_for_store(
2362 Arc::clone(&active),
2363 viewing_epoch,
2364 transaction_id,
2365 );
2366 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2367
2368 let executor = Executor::with_columns(physical_plan.columns.clone())
2369 .with_deadline(self.query_deadline());
2370 let _result = executor.execute(physical_plan.operator.as_mut())?;
2371
2372 let total_time_ms;
2373 #[cfg(not(target_arch = "wasm32"))]
2374 {
2375 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2376 }
2377 #[cfg(target_arch = "wasm32")]
2378 {
2379 total_time_ms = 0.0;
2380 }
2381
2382 let profile_tree = crate::query::profile::build_profile_tree(
2383 &optimized_plan.root,
2384 &mut entries.into_iter(),
2385 );
2386 Ok(crate::query::profile::profile_result(
2387 &profile_tree,
2388 total_time_ms,
2389 ))
2390 });
2391 }
2392
2393 let has_mutations = optimized_plan.root.has_mutations();
2394
2395 let result = self.with_auto_commit(has_mutations, || {
2396 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2398
2399 let has_active_tx = self.current_transaction.lock().is_some();
2404 let read_only = !has_mutations && !has_active_tx;
2405 let planner = self.create_planner_for_store_with_read_only(
2406 Arc::clone(&active),
2407 viewing_epoch,
2408 transaction_id,
2409 read_only,
2410 );
2411 let mut physical_plan = planner.plan(&optimized_plan)?;
2412
2413 let executor = Executor::with_columns(physical_plan.columns.clone())
2415 .with_deadline(self.query_deadline());
2416 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2417
2418 let rows_scanned = result.rows.len() as u64;
2420 #[cfg(not(target_arch = "wasm32"))]
2421 {
2422 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2423 result.execution_time_ms = Some(elapsed_ms);
2424 }
2425 result.rows_scanned = Some(rows_scanned);
2426
2427 Ok(result)
2428 });
2429
2430 #[cfg(feature = "metrics")]
2432 {
2433 #[cfg(not(target_arch = "wasm32"))]
2434 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2435 #[cfg(target_arch = "wasm32")]
2436 let elapsed_ms = None;
2437 self.record_query_metrics("gql", elapsed_ms, &result);
2438 }
2439
2440 result
2441 }
2442
2443 #[cfg(feature = "gql")]
2452 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2453 let previous = self.viewing_epoch_override.lock().replace(epoch);
2454 let result = self.execute(query);
2455 *self.viewing_epoch_override.lock() = previous;
2456 result
2457 }
2458
2459 #[cfg(feature = "gql")]
2467 pub fn execute_at_epoch_with_params(
2468 &self,
2469 query: &str,
2470 epoch: EpochId,
2471 params: Option<std::collections::HashMap<String, Value>>,
2472 ) -> Result<QueryResult> {
2473 let previous = self.viewing_epoch_override.lock().replace(epoch);
2474 let result = if let Some(p) = params {
2475 self.execute_with_params(query, p)
2476 } else {
2477 self.execute(query)
2478 };
2479 *self.viewing_epoch_override.lock() = previous;
2480 result
2481 }
2482
2483 #[cfg(feature = "gql")]
2489 pub fn execute_with_params(
2490 &self,
2491 query: &str,
2492 params: std::collections::HashMap<String, Value>,
2493 ) -> Result<QueryResult> {
2494 self.require_lpg("GQL")?;
2495
2496 use crate::query::processor::{QueryLanguage, QueryProcessor};
2497
2498 let has_mutations = Self::query_looks_like_mutation(query);
2499 let active = self.active_store();
2500
2501 self.with_auto_commit(has_mutations, || {
2502 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2504
2505 let processor = QueryProcessor::for_stores_with_transaction(
2507 Arc::clone(&active),
2508 self.active_write_store(),
2509 Arc::clone(&self.transaction_manager),
2510 )?;
2511
2512 let processor = if let Some(transaction_id) = transaction_id {
2514 processor.with_transaction_context(viewing_epoch, transaction_id)
2515 } else {
2516 processor
2517 };
2518
2519 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2520 })
2521 }
2522
2523 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2529 pub fn execute_with_params(
2530 &self,
2531 _query: &str,
2532 _params: std::collections::HashMap<String, Value>,
2533 ) -> Result<QueryResult> {
2534 Err(grafeo_common::utils::error::Error::Internal(
2535 "No query language enabled".to_string(),
2536 ))
2537 }
2538
2539 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2545 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2546 Err(grafeo_common::utils::error::Error::Internal(
2547 "No query language enabled".to_string(),
2548 ))
2549 }
2550
2551 #[cfg(feature = "cypher")]
2557 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2558 use crate::query::{
2559 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2560 processor::QueryLanguage, translators::cypher,
2561 };
2562 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2563
2564 let translation = cypher::translate_full(query)?;
2566 match translation {
2567 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2568 if *self.read_only_tx.lock() {
2569 return Err(GrafeoError::Query(QueryError::new(
2570 QueryErrorKind::Semantic,
2571 "Cannot execute schema DDL in a read-only transaction",
2572 )));
2573 }
2574 return self.execute_schema_command(cmd);
2575 }
2576 cypher::CypherTranslationResult::ShowIndexes => {
2577 return self.execute_show_indexes();
2578 }
2579 cypher::CypherTranslationResult::ShowConstraints => {
2580 return self.execute_show_constraints();
2581 }
2582 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2583 return self.execute_show_current_graph_type();
2584 }
2585 cypher::CypherTranslationResult::Plan(_) => {
2586 }
2588 }
2589
2590 #[cfg(not(target_arch = "wasm32"))]
2591 let start_time = std::time::Instant::now();
2592
2593 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2595
2596 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2598 cached_plan
2599 } else {
2600 let logical_plan = cypher::translate(query)?;
2602
2603 let mut binder = Binder::new();
2605 let _binding_context = binder.bind(&logical_plan)?;
2606
2607 let active = self.active_store();
2609 let optimizer = Optimizer::from_graph_store(&*active);
2610 let plan = optimizer.optimize(logical_plan)?;
2611
2612 self.query_cache.put_optimized(cache_key, plan.clone());
2614
2615 plan
2616 };
2617
2618 let active = self.active_store();
2620
2621 if optimized_plan.explain {
2623 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2624 let mut plan = optimized_plan;
2625 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2626 return Ok(explain_result(&plan));
2627 }
2628
2629 if optimized_plan.profile {
2631 let has_mutations = optimized_plan.root.has_mutations();
2632 return self.with_auto_commit(has_mutations, || {
2633 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2634 let planner = self.create_planner_for_store(
2635 Arc::clone(&active),
2636 viewing_epoch,
2637 transaction_id,
2638 );
2639 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2640
2641 let executor = Executor::with_columns(physical_plan.columns.clone())
2642 .with_deadline(self.query_deadline());
2643 let _result = executor.execute(physical_plan.operator.as_mut())?;
2644
2645 let total_time_ms;
2646 #[cfg(not(target_arch = "wasm32"))]
2647 {
2648 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2649 }
2650 #[cfg(target_arch = "wasm32")]
2651 {
2652 total_time_ms = 0.0;
2653 }
2654
2655 let profile_tree = crate::query::profile::build_profile_tree(
2656 &optimized_plan.root,
2657 &mut entries.into_iter(),
2658 );
2659 Ok(crate::query::profile::profile_result(
2660 &profile_tree,
2661 total_time_ms,
2662 ))
2663 });
2664 }
2665
2666 let has_mutations = optimized_plan.root.has_mutations();
2667
2668 let result = self.with_auto_commit(has_mutations, || {
2669 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2671
2672 let planner =
2674 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2675 let mut physical_plan = planner.plan(&optimized_plan)?;
2676
2677 let executor = Executor::with_columns(physical_plan.columns.clone())
2679 .with_deadline(self.query_deadline());
2680 executor.execute(physical_plan.operator.as_mut())
2681 });
2682
2683 #[cfg(feature = "metrics")]
2684 {
2685 #[cfg(not(target_arch = "wasm32"))]
2686 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2687 #[cfg(target_arch = "wasm32")]
2688 let elapsed_ms = None;
2689 self.record_query_metrics("cypher", elapsed_ms, &result);
2690 }
2691
2692 result
2693 }
2694
2695 #[cfg(feature = "gremlin")]
2719 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2720 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2721
2722 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2723 let start_time = Instant::now();
2724
2725 let logical_plan = gremlin::translate(query)?;
2727
2728 let mut binder = Binder::new();
2730 let _binding_context = binder.bind(&logical_plan)?;
2731
2732 let active = self.active_store();
2734 let optimizer = Optimizer::from_graph_store(&*active);
2735 let optimized_plan = optimizer.optimize(logical_plan)?;
2736
2737 let has_mutations = optimized_plan.root.has_mutations();
2738
2739 let result = self.with_auto_commit(has_mutations, || {
2740 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2742
2743 let planner =
2745 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2746 let mut physical_plan = planner.plan(&optimized_plan)?;
2747
2748 let executor = Executor::with_columns(physical_plan.columns.clone())
2750 .with_deadline(self.query_deadline());
2751 executor.execute(physical_plan.operator.as_mut())
2752 });
2753
2754 #[cfg(feature = "metrics")]
2755 {
2756 #[cfg(not(target_arch = "wasm32"))]
2757 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2758 #[cfg(target_arch = "wasm32")]
2759 let elapsed_ms = None;
2760 self.record_query_metrics("gremlin", elapsed_ms, &result);
2761 }
2762
2763 result
2764 }
2765
2766 #[cfg(feature = "gremlin")]
2772 pub fn execute_gremlin_with_params(
2773 &self,
2774 query: &str,
2775 params: std::collections::HashMap<String, Value>,
2776 ) -> Result<QueryResult> {
2777 use crate::query::processor::{QueryLanguage, QueryProcessor};
2778
2779 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2780 let start_time = Instant::now();
2781
2782 let has_mutations = Self::query_looks_like_mutation(query);
2783 let active = self.active_store();
2784
2785 let result = self.with_auto_commit(has_mutations, || {
2786 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2787 let processor = QueryProcessor::for_stores_with_transaction(
2788 Arc::clone(&active),
2789 self.active_write_store(),
2790 Arc::clone(&self.transaction_manager),
2791 )?;
2792 let processor = if let Some(transaction_id) = transaction_id {
2793 processor.with_transaction_context(viewing_epoch, transaction_id)
2794 } else {
2795 processor
2796 };
2797 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2798 });
2799
2800 #[cfg(feature = "metrics")]
2801 {
2802 #[cfg(not(target_arch = "wasm32"))]
2803 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2804 #[cfg(target_arch = "wasm32")]
2805 let elapsed_ms = None;
2806 self.record_query_metrics("gremlin", elapsed_ms, &result);
2807 }
2808
2809 result
2810 }
2811
2812 #[cfg(feature = "graphql")]
2836 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2837 use crate::query::{
2838 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
2839 translators::graphql,
2840 };
2841
2842 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2843 let start_time = Instant::now();
2844
2845 let mut logical_plan = graphql::translate(query)?;
2846
2847 if !logical_plan.default_params.is_empty() {
2849 let defaults = logical_plan.default_params.clone();
2850 substitute_params(&mut logical_plan, &defaults)?;
2851 }
2852
2853 let mut binder = Binder::new();
2854 let _binding_context = binder.bind(&logical_plan)?;
2855
2856 let active = self.active_store();
2857 let optimizer = Optimizer::from_graph_store(&*active);
2858 let optimized_plan = optimizer.optimize(logical_plan)?;
2859 let has_mutations = optimized_plan.root.has_mutations();
2860
2861 let result = self.with_auto_commit(has_mutations, || {
2862 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2863 let planner =
2864 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2865 let mut physical_plan = planner.plan(&optimized_plan)?;
2866 let executor = Executor::with_columns(physical_plan.columns.clone())
2867 .with_deadline(self.query_deadline());
2868 executor.execute(physical_plan.operator.as_mut())
2869 });
2870
2871 #[cfg(feature = "metrics")]
2872 {
2873 #[cfg(not(target_arch = "wasm32"))]
2874 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2875 #[cfg(target_arch = "wasm32")]
2876 let elapsed_ms = None;
2877 self.record_query_metrics("graphql", elapsed_ms, &result);
2878 }
2879
2880 result
2881 }
2882
2883 #[cfg(feature = "graphql")]
2889 pub fn execute_graphql_with_params(
2890 &self,
2891 query: &str,
2892 params: std::collections::HashMap<String, Value>,
2893 ) -> Result<QueryResult> {
2894 use crate::query::processor::{QueryLanguage, QueryProcessor};
2895
2896 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2897 let start_time = Instant::now();
2898
2899 let has_mutations = Self::query_looks_like_mutation(query);
2900 let active = self.active_store();
2901
2902 let result = self.with_auto_commit(has_mutations, || {
2903 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2904 let processor = QueryProcessor::for_stores_with_transaction(
2905 Arc::clone(&active),
2906 self.active_write_store(),
2907 Arc::clone(&self.transaction_manager),
2908 )?;
2909 let processor = if let Some(transaction_id) = transaction_id {
2910 processor.with_transaction_context(viewing_epoch, transaction_id)
2911 } else {
2912 processor
2913 };
2914 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2915 });
2916
2917 #[cfg(feature = "metrics")]
2918 {
2919 #[cfg(not(target_arch = "wasm32"))]
2920 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2921 #[cfg(target_arch = "wasm32")]
2922 let elapsed_ms = None;
2923 self.record_query_metrics("graphql", elapsed_ms, &result);
2924 }
2925
2926 result
2927 }
2928
2929 #[cfg(feature = "sql-pgq")]
2954 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2955 use crate::query::{
2956 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2957 processor::QueryLanguage, translators::sql_pgq,
2958 };
2959
2960 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2961 let start_time = Instant::now();
2962
2963 let logical_plan = sql_pgq::translate(query)?;
2965
2966 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2968 return Ok(QueryResult {
2969 columns: vec!["status".into()],
2970 column_types: vec![grafeo_common::types::LogicalType::String],
2971 rows: vec![vec![Value::from(format!(
2972 "Property graph '{}' created ({} node tables, {} edge tables)",
2973 cpg.name,
2974 cpg.node_tables.len(),
2975 cpg.edge_tables.len()
2976 ))]],
2977 execution_time_ms: None,
2978 rows_scanned: None,
2979 status_message: None,
2980 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2981 });
2982 }
2983
2984 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2985
2986 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2987 cached_plan
2988 } else {
2989 let mut binder = Binder::new();
2990 let _binding_context = binder.bind(&logical_plan)?;
2991 let active = self.active_store();
2992 let optimizer = Optimizer::from_graph_store(&*active);
2993 let plan = optimizer.optimize(logical_plan)?;
2994 self.query_cache.put_optimized(cache_key, plan.clone());
2995 plan
2996 };
2997
2998 let active = self.active_store();
2999 let has_mutations = optimized_plan.root.has_mutations();
3000
3001 let result = self.with_auto_commit(has_mutations, || {
3002 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3003 let planner =
3004 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3005 let mut physical_plan = planner.plan(&optimized_plan)?;
3006 let executor = Executor::with_columns(physical_plan.columns.clone())
3007 .with_deadline(self.query_deadline());
3008 executor.execute(physical_plan.operator.as_mut())
3009 });
3010
3011 #[cfg(feature = "metrics")]
3012 {
3013 #[cfg(not(target_arch = "wasm32"))]
3014 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3015 #[cfg(target_arch = "wasm32")]
3016 let elapsed_ms = None;
3017 self.record_query_metrics("sql", elapsed_ms, &result);
3018 }
3019
3020 result
3021 }
3022
3023 #[cfg(feature = "sql-pgq")]
3029 pub fn execute_sql_with_params(
3030 &self,
3031 query: &str,
3032 params: std::collections::HashMap<String, Value>,
3033 ) -> Result<QueryResult> {
3034 use crate::query::processor::{QueryLanguage, QueryProcessor};
3035
3036 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3037 let start_time = Instant::now();
3038
3039 let has_mutations = Self::query_looks_like_mutation(query);
3040 let active = self.active_store();
3041
3042 let result = self.with_auto_commit(has_mutations, || {
3043 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3044 let processor = QueryProcessor::for_stores_with_transaction(
3045 Arc::clone(&active),
3046 self.active_write_store(),
3047 Arc::clone(&self.transaction_manager),
3048 )?;
3049 let processor = if let Some(transaction_id) = transaction_id {
3050 processor.with_transaction_context(viewing_epoch, transaction_id)
3051 } else {
3052 processor
3053 };
3054 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3055 });
3056
3057 #[cfg(feature = "metrics")]
3058 {
3059 #[cfg(not(target_arch = "wasm32"))]
3060 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3061 #[cfg(target_arch = "wasm32")]
3062 let elapsed_ms = None;
3063 self.record_query_metrics("sql", elapsed_ms, &result);
3064 }
3065
3066 result
3067 }
3068
3069 pub fn execute_language(
3078 &self,
3079 query: &str,
3080 language: &str,
3081 params: Option<std::collections::HashMap<String, Value>>,
3082 ) -> Result<QueryResult> {
3083 let _span = grafeo_info_span!(
3084 "grafeo::session::execute",
3085 language,
3086 query_len = query.len(),
3087 );
3088 match language {
3089 "gql" => {
3090 if let Some(p) = params {
3091 self.execute_with_params(query, p)
3092 } else {
3093 self.execute(query)
3094 }
3095 }
3096 #[cfg(feature = "cypher")]
3097 "cypher" => {
3098 if let Some(p) = params {
3099 use crate::query::processor::{QueryLanguage, QueryProcessor};
3100
3101 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3102 let start_time = Instant::now();
3103
3104 let has_mutations = Self::query_looks_like_mutation(query);
3105 let active = self.active_store();
3106 let result = self.with_auto_commit(has_mutations, || {
3107 let processor = QueryProcessor::for_stores_with_transaction(
3108 Arc::clone(&active),
3109 self.active_write_store(),
3110 Arc::clone(&self.transaction_manager),
3111 )?;
3112 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3113 let processor = if let Some(transaction_id) = transaction_id {
3114 processor.with_transaction_context(viewing_epoch, transaction_id)
3115 } else {
3116 processor
3117 };
3118 processor.process(query, QueryLanguage::Cypher, Some(&p))
3119 });
3120
3121 #[cfg(feature = "metrics")]
3122 {
3123 #[cfg(not(target_arch = "wasm32"))]
3124 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3125 #[cfg(target_arch = "wasm32")]
3126 let elapsed_ms = None;
3127 self.record_query_metrics("cypher", elapsed_ms, &result);
3128 }
3129
3130 result
3131 } else {
3132 self.execute_cypher(query)
3133 }
3134 }
3135 #[cfg(feature = "gremlin")]
3136 "gremlin" => {
3137 if let Some(p) = params {
3138 self.execute_gremlin_with_params(query, p)
3139 } else {
3140 self.execute_gremlin(query)
3141 }
3142 }
3143 #[cfg(feature = "graphql")]
3144 "graphql" => {
3145 if let Some(p) = params {
3146 self.execute_graphql_with_params(query, p)
3147 } else {
3148 self.execute_graphql(query)
3149 }
3150 }
3151 #[cfg(all(feature = "graphql", feature = "rdf"))]
3152 "graphql-rdf" => {
3153 if let Some(p) = params {
3154 self.execute_graphql_rdf_with_params(query, p)
3155 } else {
3156 self.execute_graphql_rdf(query)
3157 }
3158 }
3159 #[cfg(feature = "sql-pgq")]
3160 "sql" | "sql-pgq" => {
3161 if let Some(p) = params {
3162 self.execute_sql_with_params(query, p)
3163 } else {
3164 self.execute_sql(query)
3165 }
3166 }
3167 #[cfg(all(feature = "sparql", feature = "rdf"))]
3168 "sparql" => {
3169 if let Some(p) = params {
3170 self.execute_sparql_with_params(query, p)
3171 } else {
3172 self.execute_sparql(query)
3173 }
3174 }
3175 other => Err(grafeo_common::utils::error::Error::Query(
3176 grafeo_common::utils::error::QueryError::new(
3177 grafeo_common::utils::error::QueryErrorKind::Semantic,
3178 format!("Unknown query language: '{other}'"),
3179 ),
3180 )),
3181 }
3182 }
3183
3184 pub fn clear_plan_cache(&self) {
3211 self.query_cache.clear();
3212 }
3213
3214 pub fn begin_transaction(&mut self) -> Result<()> {
3222 self.begin_transaction_inner(false, None)
3223 }
3224
3225 pub fn begin_transaction_with_isolation(
3233 &mut self,
3234 isolation_level: crate::transaction::IsolationLevel,
3235 ) -> Result<()> {
3236 self.begin_transaction_inner(false, Some(isolation_level))
3237 }
3238
3239 fn begin_transaction_inner(
3241 &self,
3242 read_only: bool,
3243 isolation_level: Option<crate::transaction::IsolationLevel>,
3244 ) -> Result<()> {
3245 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3246 let mut current = self.current_transaction.lock();
3247 if current.is_some() {
3248 drop(current);
3250 let mut depth = self.transaction_nesting_depth.lock();
3251 *depth += 1;
3252 let sp_name = format!("_nested_tx_{}", *depth);
3253 self.savepoint(&sp_name)?;
3254 return Ok(());
3255 }
3256
3257 let active = self.active_lpg_store();
3258 self.transaction_start_node_count
3259 .store(active.node_count(), Ordering::Relaxed);
3260 self.transaction_start_edge_count
3261 .store(active.edge_count(), Ordering::Relaxed);
3262 let transaction_id = if let Some(level) = isolation_level {
3263 self.transaction_manager.begin_with_isolation(level)
3264 } else {
3265 self.transaction_manager.begin()
3266 };
3267 *current = Some(transaction_id);
3268 *self.read_only_tx.lock() = read_only || self.db_read_only;
3269
3270 let key = self.active_graph_storage_key();
3273 let mut touched = self.touched_graphs.lock();
3274 touched.clear();
3275 touched.push(key);
3276
3277 #[cfg(feature = "metrics")]
3278 {
3279 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3280 #[cfg(not(target_arch = "wasm32"))]
3281 {
3282 *self.tx_start_time.lock() = Some(Instant::now());
3283 }
3284 }
3285
3286 Ok(())
3287 }
3288
3289 pub fn commit(&mut self) -> Result<()> {
3297 self.commit_inner()
3298 }
3299
3300 fn commit_inner(&self) -> Result<()> {
3302 let _span = grafeo_debug_span!("grafeo::tx::commit");
3303 {
3305 let mut depth = self.transaction_nesting_depth.lock();
3306 if *depth > 0 {
3307 let sp_name = format!("_nested_tx_{depth}");
3308 *depth -= 1;
3309 drop(depth);
3310 return self.release_savepoint(&sp_name);
3311 }
3312 }
3313
3314 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3315 grafeo_common::utils::error::Error::Transaction(
3316 grafeo_common::utils::error::TransactionError::InvalidState(
3317 "No active transaction".to_string(),
3318 ),
3319 )
3320 })?;
3321
3322 let touched = self.touched_graphs.lock().clone();
3325 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3326 Ok(epoch) => epoch,
3327 Err(e) => {
3328 for graph_name in &touched {
3330 let store = self.resolve_store(graph_name);
3331 store.rollback_transaction_properties(transaction_id);
3332 }
3333 #[cfg(feature = "rdf")]
3334 self.rollback_rdf_transaction(transaction_id);
3335 #[cfg(feature = "cdc")]
3337 if let Some(ref pending) = self.cdc_pending_events {
3338 pending.lock().clear();
3339 }
3340 *self.read_only_tx.lock() = self.db_read_only;
3341 self.savepoints.lock().clear();
3342 self.touched_graphs.lock().clear();
3343 #[cfg(feature = "metrics")]
3344 {
3345 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3346 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3347 #[cfg(not(target_arch = "wasm32"))]
3348 if let Some(start) = self.tx_start_time.lock().take() {
3349 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3350 crate::metrics::record_metric!(
3351 self.metrics,
3352 tx_duration,
3353 observe duration_ms
3354 );
3355 }
3356 }
3357 return Err(e);
3358 }
3359 };
3360
3361 for graph_name in &touched {
3363 let store = self.resolve_store(graph_name);
3364 store.finalize_version_epochs(transaction_id, commit_epoch);
3365 }
3366
3367 #[cfg(feature = "rdf")]
3369 self.commit_rdf_transaction(transaction_id);
3370
3371 for graph_name in &touched {
3372 let store = self.resolve_store(graph_name);
3373 store.commit_transaction_properties(transaction_id);
3374 }
3375
3376 #[cfg(feature = "cdc")]
3380 if let Some(ref pending) = self.cdc_pending_events {
3381 let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3382 self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3383 e.epoch = commit_epoch;
3384 e
3385 }));
3386 }
3387
3388 let current_epoch = self.transaction_manager.current_epoch();
3391 for graph_name in &touched {
3392 let store = self.resolve_store(graph_name);
3393 store.sync_epoch(current_epoch);
3394 }
3395
3396 *self.read_only_tx.lock() = self.db_read_only;
3398 self.savepoints.lock().clear();
3399 self.touched_graphs.lock().clear();
3400
3401 if self.gc_interval > 0 {
3403 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3404 if count.is_multiple_of(self.gc_interval) {
3405 let min_epoch = self.transaction_manager.min_active_epoch();
3406 for graph_name in &touched {
3407 let store = self.resolve_store(graph_name);
3408 store.gc_versions(min_epoch);
3409 }
3410 self.transaction_manager.gc();
3411 #[cfg(feature = "metrics")]
3412 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3413 }
3414 }
3415
3416 #[cfg(feature = "metrics")]
3417 {
3418 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3419 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3420 #[cfg(not(target_arch = "wasm32"))]
3421 if let Some(start) = self.tx_start_time.lock().take() {
3422 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3423 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3424 }
3425 }
3426
3427 Ok(())
3428 }
3429
3430 pub fn rollback(&mut self) -> Result<()> {
3454 self.rollback_inner()
3455 }
3456
3457 fn rollback_inner(&self) -> Result<()> {
3459 let _span = grafeo_debug_span!("grafeo::tx::rollback");
3460 {
3462 let mut depth = self.transaction_nesting_depth.lock();
3463 if *depth > 0 {
3464 let sp_name = format!("_nested_tx_{depth}");
3465 *depth -= 1;
3466 drop(depth);
3467 return self.rollback_to_savepoint(&sp_name);
3468 }
3469 }
3470
3471 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3472 grafeo_common::utils::error::Error::Transaction(
3473 grafeo_common::utils::error::TransactionError::InvalidState(
3474 "No active transaction".to_string(),
3475 ),
3476 )
3477 })?;
3478
3479 *self.read_only_tx.lock() = self.db_read_only;
3481
3482 let touched = self.touched_graphs.lock().clone();
3484 for graph_name in &touched {
3485 let store = self.resolve_store(graph_name);
3486 store.discard_uncommitted_versions(transaction_id);
3487 }
3488
3489 #[cfg(feature = "rdf")]
3491 self.rollback_rdf_transaction(transaction_id);
3492
3493 #[cfg(feature = "cdc")]
3495 if let Some(ref pending) = self.cdc_pending_events {
3496 pending.lock().clear();
3497 }
3498
3499 self.savepoints.lock().clear();
3501 self.touched_graphs.lock().clear();
3502
3503 let result = self.transaction_manager.abort(transaction_id);
3505
3506 #[cfg(feature = "metrics")]
3507 if result.is_ok() {
3508 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3509 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3510 #[cfg(not(target_arch = "wasm32"))]
3511 if let Some(start) = self.tx_start_time.lock().take() {
3512 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3513 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3514 }
3515 }
3516
3517 result
3518 }
3519
3520 pub fn savepoint(&self, name: &str) -> Result<()> {
3530 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3531 grafeo_common::utils::error::Error::Transaction(
3532 grafeo_common::utils::error::TransactionError::InvalidState(
3533 "No active transaction".to_string(),
3534 ),
3535 )
3536 })?;
3537
3538 let touched = self.touched_graphs.lock().clone();
3540 let graph_snapshots: Vec<GraphSavepoint> = touched
3541 .iter()
3542 .map(|graph_name| {
3543 let store = self.resolve_store(graph_name);
3544 GraphSavepoint {
3545 graph_name: graph_name.clone(),
3546 next_node_id: store.peek_next_node_id(),
3547 next_edge_id: store.peek_next_edge_id(),
3548 undo_log_position: store.property_undo_log_position(tx_id),
3549 }
3550 })
3551 .collect();
3552
3553 self.savepoints.lock().push(SavepointState {
3554 name: name.to_string(),
3555 graph_snapshots,
3556 active_graph: self.current_graph.lock().clone(),
3557 #[cfg(feature = "cdc")]
3558 cdc_event_position: self
3559 .cdc_pending_events
3560 .as_ref()
3561 .map_or(0, |p| p.lock().len()),
3562 });
3563 Ok(())
3564 }
3565
3566 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3575 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3576 grafeo_common::utils::error::Error::Transaction(
3577 grafeo_common::utils::error::TransactionError::InvalidState(
3578 "No active transaction".to_string(),
3579 ),
3580 )
3581 })?;
3582
3583 let mut savepoints = self.savepoints.lock();
3584
3585 let pos = savepoints
3587 .iter()
3588 .rposition(|sp| sp.name == name)
3589 .ok_or_else(|| {
3590 grafeo_common::utils::error::Error::Transaction(
3591 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3592 "Savepoint '{name}' not found"
3593 )),
3594 )
3595 })?;
3596
3597 let sp_state = savepoints[pos].clone();
3598
3599 savepoints.truncate(pos);
3601 drop(savepoints);
3602
3603 for gs in &sp_state.graph_snapshots {
3605 let store = self.resolve_store(&gs.graph_name);
3606
3607 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3609
3610 let current_next_node = store.peek_next_node_id();
3612 let current_next_edge = store.peek_next_edge_id();
3613
3614 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3615 .map(NodeId::new)
3616 .collect();
3617 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3618 .map(EdgeId::new)
3619 .collect();
3620
3621 if !node_ids.is_empty() || !edge_ids.is_empty() {
3622 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3623 }
3624 }
3625
3626 let touched = self.touched_graphs.lock().clone();
3630 for graph_name in &touched {
3631 let already_captured = sp_state
3632 .graph_snapshots
3633 .iter()
3634 .any(|gs| gs.graph_name == *graph_name);
3635 if !already_captured {
3636 let store = self.resolve_store(graph_name);
3637 store.discard_uncommitted_versions(transaction_id);
3638 }
3639 }
3640
3641 #[cfg(feature = "cdc")]
3643 if let Some(ref pending) = self.cdc_pending_events {
3644 pending.lock().truncate(sp_state.cdc_event_position);
3645 }
3646
3647 let mut touched = self.touched_graphs.lock();
3649 touched.clear();
3650 for gs in &sp_state.graph_snapshots {
3651 if !touched.contains(&gs.graph_name) {
3652 touched.push(gs.graph_name.clone());
3653 }
3654 }
3655
3656 Ok(())
3657 }
3658
3659 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3665 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3666 grafeo_common::utils::error::Error::Transaction(
3667 grafeo_common::utils::error::TransactionError::InvalidState(
3668 "No active transaction".to_string(),
3669 ),
3670 )
3671 })?;
3672
3673 let mut savepoints = self.savepoints.lock();
3674 let pos = savepoints
3675 .iter()
3676 .rposition(|sp| sp.name == name)
3677 .ok_or_else(|| {
3678 grafeo_common::utils::error::Error::Transaction(
3679 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3680 "Savepoint '{name}' not found"
3681 )),
3682 )
3683 })?;
3684 savepoints.remove(pos);
3685 Ok(())
3686 }
3687
3688 #[must_use]
3690 pub fn in_transaction(&self) -> bool {
3691 self.current_transaction.lock().is_some()
3692 }
3693
3694 #[must_use]
3696 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3697 *self.current_transaction.lock()
3698 }
3699
3700 #[must_use]
3702 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3703 &self.transaction_manager
3704 }
3705
3706 #[must_use]
3708 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3709 (
3710 self.transaction_start_node_count.load(Ordering::Relaxed),
3711 self.active_lpg_store().node_count(),
3712 )
3713 }
3714
3715 #[must_use]
3717 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3718 (
3719 self.transaction_start_edge_count.load(Ordering::Relaxed),
3720 self.active_lpg_store().edge_count(),
3721 )
3722 }
3723
3724 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3758 crate::transaction::PreparedCommit::new(self)
3759 }
3760
3761 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3763 self.auto_commit = auto_commit;
3764 }
3765
3766 #[must_use]
3768 pub fn auto_commit(&self) -> bool {
3769 self.auto_commit
3770 }
3771
3772 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3777 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3778 }
3779
3780 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3783 where
3784 F: FnOnce() -> Result<QueryResult>,
3785 {
3786 if self.needs_auto_commit(has_mutations) {
3787 self.begin_transaction_inner(false, None)?;
3788 match body() {
3789 Ok(result) => {
3790 self.commit_inner()?;
3791 Ok(result)
3792 }
3793 Err(e) => {
3794 let _ = self.rollback_inner();
3795 Err(e)
3796 }
3797 }
3798 } else {
3799 body()
3800 }
3801 }
3802
3803 fn query_looks_like_mutation(query: &str) -> bool {
3809 let upper = query.to_ascii_uppercase();
3810 upper.contains("INSERT")
3811 || upper.contains("CREATE")
3812 || upper.contains("DELETE")
3813 || upper.contains("MERGE")
3814 || upper.contains("SET")
3815 || upper.contains("REMOVE")
3816 || upper.contains("DROP")
3817 || upper.contains("ALTER")
3818 }
3819
3820 #[must_use]
3822 fn query_deadline(&self) -> Option<Instant> {
3823 #[cfg(not(target_arch = "wasm32"))]
3824 {
3825 self.query_timeout.map(|d| Instant::now() + d)
3826 }
3827 #[cfg(target_arch = "wasm32")]
3828 {
3829 let _ = &self.query_timeout;
3830 None
3831 }
3832 }
3833
3834 #[cfg(feature = "metrics")]
3840 fn record_query_metrics(
3841 &self,
3842 language: &str,
3843 elapsed_ms: Option<f64>,
3844 result: &Result<crate::database::QueryResult>,
3845 ) {
3846 use crate::metrics::record_metric;
3847
3848 record_metric!(self.metrics, query_count, inc);
3849 if let Some(ref reg) = self.metrics {
3850 reg.query_count_by_language.increment(language);
3851 }
3852 if let Some(ms) = elapsed_ms {
3853 record_metric!(self.metrics, query_latency, observe ms);
3854 }
3855 match result {
3856 Ok(r) => {
3857 let returned = r.rows.len() as u64;
3858 record_metric!(self.metrics, rows_returned, add returned);
3859 if let Some(scanned) = r.rows_scanned {
3860 record_metric!(self.metrics, rows_scanned, add scanned);
3861 }
3862 }
3863 Err(e) => {
3864 record_metric!(self.metrics, query_errors, inc);
3865 let msg = e.to_string();
3867 if msg.contains("exceeded timeout") {
3868 record_metric!(self.metrics, query_timeouts, inc);
3869 }
3870 }
3871 }
3872 }
3873
3874 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3876 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3877 match expr {
3878 Expression::Literal(Literal::Integer(n)) => Some(*n),
3879 _ => None,
3880 }
3881 }
3882
3883 #[must_use]
3889 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3890 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3892 return (epoch, None);
3893 }
3894
3895 if let Some(transaction_id) = *self.current_transaction.lock() {
3896 let epoch = self
3898 .transaction_manager
3899 .start_epoch(transaction_id)
3900 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3901 (epoch, Some(transaction_id))
3902 } else {
3903 (self.transaction_manager.current_epoch(), None)
3905 }
3906 }
3907
3908 fn create_planner_for_store(
3913 &self,
3914 store: Arc<dyn GraphStore>,
3915 viewing_epoch: EpochId,
3916 transaction_id: Option<TransactionId>,
3917 ) -> crate::query::Planner {
3918 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3919 }
3920
3921 fn create_planner_for_store_with_read_only(
3922 &self,
3923 store: Arc<dyn GraphStore>,
3924 viewing_epoch: EpochId,
3925 transaction_id: Option<TransactionId>,
3926 read_only: bool,
3927 ) -> crate::query::Planner {
3928 use crate::query::Planner;
3929 use grafeo_core::execution::operators::{LazyValue, SessionContext};
3930
3931 let info_store = Arc::clone(&store);
3933 let schema_store = Arc::clone(&store);
3934
3935 let session_context = SessionContext {
3936 current_schema: self.current_schema(),
3937 current_graph: self.current_graph(),
3938 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3939 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3940 };
3941
3942 let write_store = self.active_write_store();
3943
3944 let mut planner = Planner::with_context(
3945 Arc::clone(&store),
3946 write_store,
3947 Arc::clone(&self.transaction_manager),
3948 transaction_id,
3949 viewing_epoch,
3950 )
3951 .with_factorized_execution(self.factorized_execution)
3952 .with_catalog(Arc::clone(&self.catalog))
3953 .with_session_context(session_context)
3954 .with_read_only(read_only);
3955
3956 let validator =
3958 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3959 planner = planner.with_validator(Arc::new(validator));
3960
3961 planner
3962 }
3963
3964 fn build_info_value(store: &dyn GraphStore) -> Value {
3966 use grafeo_common::types::PropertyKey;
3967 use std::collections::BTreeMap;
3968
3969 let mut map = BTreeMap::new();
3970 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3971 map.insert(
3972 PropertyKey::from("node_count"),
3973 Value::Int64(store.node_count() as i64),
3974 );
3975 map.insert(
3976 PropertyKey::from("edge_count"),
3977 Value::Int64(store.edge_count() as i64),
3978 );
3979 map.insert(
3980 PropertyKey::from("version"),
3981 Value::String(env!("CARGO_PKG_VERSION").into()),
3982 );
3983 Value::Map(map.into())
3984 }
3985
3986 fn build_schema_value(store: &dyn GraphStore) -> Value {
3988 use grafeo_common::types::PropertyKey;
3989 use std::collections::BTreeMap;
3990
3991 let labels: Vec<Value> = store
3992 .all_labels()
3993 .into_iter()
3994 .map(|l| Value::String(l.into()))
3995 .collect();
3996 let edge_types: Vec<Value> = store
3997 .all_edge_types()
3998 .into_iter()
3999 .map(|t| Value::String(t.into()))
4000 .collect();
4001 let property_keys: Vec<Value> = store
4002 .all_property_keys()
4003 .into_iter()
4004 .map(|k| Value::String(k.into()))
4005 .collect();
4006
4007 let mut map = BTreeMap::new();
4008 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4009 map.insert(
4010 PropertyKey::from("edge_types"),
4011 Value::List(edge_types.into()),
4012 );
4013 map.insert(
4014 PropertyKey::from("property_keys"),
4015 Value::List(property_keys.into()),
4016 );
4017 Value::Map(map.into())
4018 }
4019
4020 pub fn create_node(&self, labels: &[&str]) -> NodeId {
4025 let (epoch, transaction_id) = self.get_transaction_context();
4026 self.active_lpg_store().create_node_versioned(
4027 labels,
4028 epoch,
4029 transaction_id.unwrap_or(TransactionId::SYSTEM),
4030 )
4031 }
4032
4033 pub fn create_node_with_props<'a>(
4037 &self,
4038 labels: &[&str],
4039 properties: impl IntoIterator<Item = (&'a str, Value)>,
4040 ) -> NodeId {
4041 let (epoch, transaction_id) = self.get_transaction_context();
4042 self.active_lpg_store().create_node_with_props_versioned(
4043 labels,
4044 properties,
4045 epoch,
4046 transaction_id.unwrap_or(TransactionId::SYSTEM),
4047 )
4048 }
4049
4050 pub fn create_edge(
4055 &self,
4056 src: NodeId,
4057 dst: NodeId,
4058 edge_type: &str,
4059 ) -> grafeo_common::types::EdgeId {
4060 let (epoch, transaction_id) = self.get_transaction_context();
4061 self.active_lpg_store().create_edge_versioned(
4062 src,
4063 dst,
4064 edge_type,
4065 epoch,
4066 transaction_id.unwrap_or(TransactionId::SYSTEM),
4067 )
4068 }
4069
4070 pub fn create_edge_with_props<'a>(
4072 &self,
4073 src: NodeId,
4074 dst: NodeId,
4075 edge_type: &str,
4076 properties: impl IntoIterator<Item = (&'a str, Value)>,
4077 ) -> grafeo_common::types::EdgeId {
4078 let (epoch, transaction_id) = self.get_transaction_context();
4079 let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4080 let store = self.active_lpg_store();
4081 let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4082 for (key, value) in properties {
4083 store.set_edge_property_versioned(eid, key, value, tid);
4084 }
4085 eid
4086 }
4087
4088 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4090 let (_, transaction_id) = self.get_transaction_context();
4091 if let Some(tid) = transaction_id {
4092 self.active_lpg_store()
4093 .set_node_property_versioned(id, key, value, tid);
4094 } else {
4095 self.active_lpg_store().set_node_property(id, key, value);
4096 }
4097 }
4098
4099 pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4101 let (_, transaction_id) = self.get_transaction_context();
4102 if let Some(tid) = transaction_id {
4103 self.active_lpg_store()
4104 .set_edge_property_versioned(id, key, value, tid);
4105 } else {
4106 self.active_lpg_store().set_edge_property(id, key, value);
4107 }
4108 }
4109
4110 pub fn delete_node(&self, id: NodeId) -> bool {
4112 let (epoch, transaction_id) = self.get_transaction_context();
4113 if let Some(tid) = transaction_id {
4114 self.active_lpg_store()
4115 .delete_node_versioned(id, epoch, tid)
4116 } else {
4117 self.active_lpg_store().delete_node(id)
4118 }
4119 }
4120
4121 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4123 let (epoch, transaction_id) = self.get_transaction_context();
4124 if let Some(tid) = transaction_id {
4125 self.active_lpg_store()
4126 .delete_edge_versioned(id, epoch, tid)
4127 } else {
4128 self.active_lpg_store().delete_edge(id)
4129 }
4130 }
4131
4132 #[must_use]
4160 pub fn get_node(&self, id: NodeId) -> Option<Node> {
4161 let (epoch, transaction_id) = self.get_transaction_context();
4162 self.active_lpg_store().get_node_versioned(
4163 id,
4164 epoch,
4165 transaction_id.unwrap_or(TransactionId::SYSTEM),
4166 )
4167 }
4168
4169 #[must_use]
4193 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4194 self.get_node(id)
4195 .and_then(|node| node.get_property(key).cloned())
4196 }
4197
4198 #[must_use]
4205 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4206 let (epoch, transaction_id) = self.get_transaction_context();
4207 self.active_lpg_store().get_edge_versioned(
4208 id,
4209 epoch,
4210 transaction_id.unwrap_or(TransactionId::SYSTEM),
4211 )
4212 }
4213
4214 #[must_use]
4240 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4241 self.active_lpg_store()
4242 .edges_from(node, Direction::Outgoing)
4243 .collect()
4244 }
4245
4246 #[must_use]
4255 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4256 self.active_lpg_store()
4257 .edges_from(node, Direction::Incoming)
4258 .collect()
4259 }
4260
4261 #[must_use]
4273 pub fn get_neighbors_outgoing_by_type(
4274 &self,
4275 node: NodeId,
4276 edge_type: &str,
4277 ) -> Vec<(NodeId, EdgeId)> {
4278 self.active_lpg_store()
4279 .edges_from(node, Direction::Outgoing)
4280 .filter(|(_, edge_id)| {
4281 self.get_edge(*edge_id)
4282 .is_some_and(|e| e.edge_type.as_str() == edge_type)
4283 })
4284 .collect()
4285 }
4286
4287 #[must_use]
4294 pub fn node_exists(&self, id: NodeId) -> bool {
4295 self.get_node(id).is_some()
4296 }
4297
4298 #[must_use]
4300 pub fn edge_exists(&self, id: EdgeId) -> bool {
4301 self.get_edge(id).is_some()
4302 }
4303
4304 #[must_use]
4308 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4309 let active = self.active_lpg_store();
4310 let out = active.out_degree(node);
4311 let in_degree = active.in_degree(node);
4312 (out, in_degree)
4313 }
4314
4315 #[must_use]
4325 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4326 let (epoch, transaction_id) = self.get_transaction_context();
4327 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4328 let active = self.active_lpg_store();
4329 ids.iter()
4330 .map(|&id| active.get_node_versioned(id, epoch, tx))
4331 .collect()
4332 }
4333
4334 #[cfg(feature = "cdc")]
4338 pub fn history(
4339 &self,
4340 entity_id: impl Into<crate::cdc::EntityId>,
4341 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4342 Ok(self.cdc_log.history(entity_id.into()))
4343 }
4344
4345 #[cfg(feature = "cdc")]
4347 pub fn history_since(
4348 &self,
4349 entity_id: impl Into<crate::cdc::EntityId>,
4350 since_epoch: EpochId,
4351 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4352 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4353 }
4354
4355 #[cfg(feature = "cdc")]
4357 pub fn changes_between(
4358 &self,
4359 start_epoch: EpochId,
4360 end_epoch: EpochId,
4361 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4362 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4363 }
4364}
4365
4366impl Drop for Session {
4367 fn drop(&mut self) {
4368 if self.in_transaction() {
4371 let _ = self.rollback_inner();
4372 }
4373
4374 #[cfg(feature = "metrics")]
4375 if let Some(ref reg) = self.metrics {
4376 reg.session_active
4377 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4378 }
4379 }
4380}
4381
4382#[cfg(test)]
4383mod tests {
4384 use super::parse_default_literal;
4385 use crate::database::GrafeoDB;
4386 use grafeo_common::types::Value;
4387
4388 #[test]
4393 fn parse_default_literal_null() {
4394 assert_eq!(parse_default_literal("null"), Value::Null);
4395 assert_eq!(parse_default_literal("NULL"), Value::Null);
4396 assert_eq!(parse_default_literal("Null"), Value::Null);
4397 }
4398
4399 #[test]
4400 fn parse_default_literal_bool() {
4401 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4402 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4403 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4404 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4405 }
4406
4407 #[test]
4408 fn parse_default_literal_string_single_quoted() {
4409 assert_eq!(
4410 parse_default_literal("'hello'"),
4411 Value::String("hello".into())
4412 );
4413 }
4414
4415 #[test]
4416 fn parse_default_literal_string_double_quoted() {
4417 assert_eq!(
4418 parse_default_literal("\"world\""),
4419 Value::String("world".into())
4420 );
4421 }
4422
4423 #[test]
4424 fn parse_default_literal_integer() {
4425 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4426 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4427 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4428 }
4429
4430 #[test]
4431 fn parse_default_literal_float() {
4432 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4433 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4434 }
4435
4436 #[test]
4437 fn parse_default_literal_fallback_string() {
4438 assert_eq!(
4440 parse_default_literal("some_identifier"),
4441 Value::String("some_identifier".into())
4442 );
4443 }
4444
4445 #[test]
4446 fn test_session_create_node() {
4447 let db = GrafeoDB::new_in_memory();
4448 let session = db.session();
4449
4450 let id = session.create_node(&["Person"]);
4451 assert!(id.is_valid());
4452 assert_eq!(db.node_count(), 1);
4453 }
4454
4455 #[test]
4456 fn test_session_transaction() {
4457 let db = GrafeoDB::new_in_memory();
4458 let mut session = db.session();
4459
4460 assert!(!session.in_transaction());
4461
4462 session.begin_transaction().unwrap();
4463 assert!(session.in_transaction());
4464
4465 session.commit().unwrap();
4466 assert!(!session.in_transaction());
4467 }
4468
4469 #[test]
4470 fn test_session_transaction_context() {
4471 let db = GrafeoDB::new_in_memory();
4472 let mut session = db.session();
4473
4474 let (_epoch1, transaction_id1) = session.get_transaction_context();
4476 assert!(transaction_id1.is_none());
4477
4478 session.begin_transaction().unwrap();
4480 let (epoch2, transaction_id2) = session.get_transaction_context();
4481 assert!(transaction_id2.is_some());
4482 let _ = epoch2; session.commit().unwrap();
4487 let (epoch3, tx_id3) = session.get_transaction_context();
4488 assert!(tx_id3.is_none());
4489 assert!(epoch3.as_u64() >= epoch2.as_u64());
4491 }
4492
4493 #[test]
4494 fn test_session_rollback() {
4495 let db = GrafeoDB::new_in_memory();
4496 let mut session = db.session();
4497
4498 session.begin_transaction().unwrap();
4499 session.rollback().unwrap();
4500 assert!(!session.in_transaction());
4501 }
4502
4503 #[test]
4504 fn test_session_rollback_discards_versions() {
4505 use grafeo_common::types::TransactionId;
4506
4507 let db = GrafeoDB::new_in_memory();
4508
4509 let node_before = db.store().create_node(&["Person"]);
4511 assert!(node_before.is_valid());
4512 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4513
4514 let mut session = db.session();
4516 session.begin_transaction().unwrap();
4517 let transaction_id = session.current_transaction.lock().unwrap();
4518
4519 let epoch = db.store().current_epoch();
4521 let node_in_tx = db
4522 .store()
4523 .create_node_versioned(&["Person"], epoch, transaction_id);
4524 assert!(node_in_tx.is_valid());
4525
4526 assert_eq!(
4530 db.node_count(),
4531 1,
4532 "PENDING nodes should be invisible to non-versioned node_count()"
4533 );
4534 assert!(
4535 db.store()
4536 .get_node_versioned(node_in_tx, epoch, transaction_id)
4537 .is_some(),
4538 "Transaction node should be visible to its own transaction"
4539 );
4540
4541 session.rollback().unwrap();
4543 assert!(!session.in_transaction());
4544
4545 let count_after = db.node_count();
4548 assert_eq!(
4549 count_after, 1,
4550 "Rollback should discard uncommitted node, but got {count_after}"
4551 );
4552
4553 let current_epoch = db.store().current_epoch();
4555 assert!(
4556 db.store()
4557 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4558 .is_some(),
4559 "Original node should still exist"
4560 );
4561
4562 assert!(
4564 db.store()
4565 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4566 .is_none(),
4567 "Transaction node should be gone"
4568 );
4569 }
4570
4571 #[test]
4572 fn test_session_create_node_in_transaction() {
4573 let db = GrafeoDB::new_in_memory();
4575
4576 let node_before = db.create_node(&["Person"]);
4578 assert!(node_before.is_valid());
4579 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4580
4581 let mut session = db.session();
4583 session.begin_transaction().unwrap();
4584 let transaction_id = session.current_transaction.lock().unwrap();
4585
4586 let node_in_tx = session.create_node(&["Person"]);
4588 assert!(node_in_tx.is_valid());
4589
4590 assert_eq!(
4593 db.node_count(),
4594 1,
4595 "PENDING nodes should be invisible to non-versioned node_count()"
4596 );
4597 let epoch = db.store().current_epoch();
4598 assert!(
4599 db.store()
4600 .get_node_versioned(node_in_tx, epoch, transaction_id)
4601 .is_some(),
4602 "Transaction node should be visible to its own transaction"
4603 );
4604
4605 session.rollback().unwrap();
4607
4608 let count_after = db.node_count();
4610 assert_eq!(
4611 count_after, 1,
4612 "Rollback should discard node created via session.create_node(), but got {count_after}"
4613 );
4614 }
4615
4616 #[test]
4617 fn test_session_create_node_with_props_in_transaction() {
4618 use grafeo_common::types::Value;
4619
4620 let db = GrafeoDB::new_in_memory();
4622
4623 db.create_node(&["Person"]);
4625 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4626
4627 let mut session = db.session();
4629 session.begin_transaction().unwrap();
4630 let transaction_id = session.current_transaction.lock().unwrap();
4631
4632 let node_in_tx =
4633 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4634 assert!(node_in_tx.is_valid());
4635
4636 assert_eq!(
4639 db.node_count(),
4640 1,
4641 "PENDING nodes should be invisible to non-versioned node_count()"
4642 );
4643 let epoch = db.store().current_epoch();
4644 assert!(
4645 db.store()
4646 .get_node_versioned(node_in_tx, epoch, transaction_id)
4647 .is_some(),
4648 "Transaction node should be visible to its own transaction"
4649 );
4650
4651 session.rollback().unwrap();
4653
4654 let count_after = db.node_count();
4656 assert_eq!(
4657 count_after, 1,
4658 "Rollback should discard node created via session.create_node_with_props()"
4659 );
4660 }
4661
4662 #[cfg(feature = "gql")]
4663 mod gql_tests {
4664 use super::*;
4665
4666 #[test]
4667 fn test_gql_query_execution() {
4668 let db = GrafeoDB::new_in_memory();
4669 let session = db.session();
4670
4671 session.create_node(&["Person"]);
4673 session.create_node(&["Person"]);
4674 session.create_node(&["Animal"]);
4675
4676 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4678
4679 assert_eq!(result.row_count(), 2);
4681 assert_eq!(result.column_count(), 1);
4682 assert_eq!(result.columns[0], "n");
4683 }
4684
4685 #[test]
4686 fn test_gql_empty_result() {
4687 let db = GrafeoDB::new_in_memory();
4688 let session = db.session();
4689
4690 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4692
4693 assert_eq!(result.row_count(), 0);
4694 }
4695
4696 #[test]
4697 fn test_gql_parse_error() {
4698 let db = GrafeoDB::new_in_memory();
4699 let session = db.session();
4700
4701 let result = session.execute("MATCH (n RETURN n");
4703
4704 assert!(result.is_err());
4705 }
4706
4707 #[test]
4708 fn test_gql_relationship_traversal() {
4709 let db = GrafeoDB::new_in_memory();
4710 let session = db.session();
4711
4712 let alix = session.create_node(&["Person"]);
4714 let gus = session.create_node(&["Person"]);
4715 let vincent = session.create_node(&["Person"]);
4716
4717 session.create_edge(alix, gus, "KNOWS");
4718 session.create_edge(alix, vincent, "KNOWS");
4719
4720 let result = session
4722 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4723 .unwrap();
4724
4725 assert_eq!(result.row_count(), 2);
4727 assert_eq!(result.column_count(), 2);
4728 assert_eq!(result.columns[0], "a");
4729 assert_eq!(result.columns[1], "b");
4730 }
4731
4732 #[test]
4733 fn test_gql_relationship_with_type_filter() {
4734 let db = GrafeoDB::new_in_memory();
4735 let session = db.session();
4736
4737 let alix = session.create_node(&["Person"]);
4739 let gus = session.create_node(&["Person"]);
4740 let vincent = session.create_node(&["Person"]);
4741
4742 session.create_edge(alix, gus, "KNOWS");
4743 session.create_edge(alix, vincent, "WORKS_WITH");
4744
4745 let result = session
4747 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4748 .unwrap();
4749
4750 assert_eq!(result.row_count(), 1);
4752 }
4753
4754 #[test]
4755 fn test_gql_semantic_error_undefined_variable() {
4756 let db = GrafeoDB::new_in_memory();
4757 let session = db.session();
4758
4759 let result = session.execute("MATCH (n:Person) RETURN x");
4761
4762 assert!(result.is_err());
4764 let Err(err) = result else {
4765 panic!("Expected error")
4766 };
4767 assert!(
4768 err.to_string().contains("Undefined variable"),
4769 "Expected undefined variable error, got: {}",
4770 err
4771 );
4772 }
4773
4774 #[test]
4775 fn test_gql_where_clause_property_filter() {
4776 use grafeo_common::types::Value;
4777
4778 let db = GrafeoDB::new_in_memory();
4779 let session = db.session();
4780
4781 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4783 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4784 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4785
4786 let result = session
4788 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4789 .unwrap();
4790
4791 assert_eq!(result.row_count(), 2);
4793 }
4794
4795 #[test]
4796 fn test_gql_where_clause_equality() {
4797 use grafeo_common::types::Value;
4798
4799 let db = GrafeoDB::new_in_memory();
4800 let session = db.session();
4801
4802 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4804 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4805 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4806
4807 let result = session
4809 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4810 .unwrap();
4811
4812 assert_eq!(result.row_count(), 2);
4814 }
4815
4816 #[test]
4817 fn test_gql_return_property_access() {
4818 use grafeo_common::types::Value;
4819
4820 let db = GrafeoDB::new_in_memory();
4821 let session = db.session();
4822
4823 session.create_node_with_props(
4825 &["Person"],
4826 [
4827 ("name", Value::String("Alix".into())),
4828 ("age", Value::Int64(30)),
4829 ],
4830 );
4831 session.create_node_with_props(
4832 &["Person"],
4833 [
4834 ("name", Value::String("Gus".into())),
4835 ("age", Value::Int64(25)),
4836 ],
4837 );
4838
4839 let result = session
4841 .execute("MATCH (n:Person) RETURN n.name, n.age")
4842 .unwrap();
4843
4844 assert_eq!(result.row_count(), 2);
4846 assert_eq!(result.column_count(), 2);
4847 assert_eq!(result.columns[0], "n.name");
4848 assert_eq!(result.columns[1], "n.age");
4849
4850 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4852 assert!(names.contains(&&Value::String("Alix".into())));
4853 assert!(names.contains(&&Value::String("Gus".into())));
4854 }
4855
4856 #[test]
4857 fn test_gql_return_mixed_expressions() {
4858 use grafeo_common::types::Value;
4859
4860 let db = GrafeoDB::new_in_memory();
4861 let session = db.session();
4862
4863 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4865
4866 let result = session
4868 .execute("MATCH (n:Person) RETURN n, n.name")
4869 .unwrap();
4870
4871 assert_eq!(result.row_count(), 1);
4872 assert_eq!(result.column_count(), 2);
4873 assert_eq!(result.columns[0], "n");
4874 assert_eq!(result.columns[1], "n.name");
4875
4876 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4878 }
4879 }
4880
4881 #[cfg(feature = "cypher")]
4882 mod cypher_tests {
4883 use super::*;
4884
4885 #[test]
4886 fn test_cypher_query_execution() {
4887 let db = GrafeoDB::new_in_memory();
4888 let session = db.session();
4889
4890 session.create_node(&["Person"]);
4892 session.create_node(&["Person"]);
4893 session.create_node(&["Animal"]);
4894
4895 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4897
4898 assert_eq!(result.row_count(), 2);
4900 assert_eq!(result.column_count(), 1);
4901 assert_eq!(result.columns[0], "n");
4902 }
4903
4904 #[test]
4905 fn test_cypher_empty_result() {
4906 let db = GrafeoDB::new_in_memory();
4907 let session = db.session();
4908
4909 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4911
4912 assert_eq!(result.row_count(), 0);
4913 }
4914
4915 #[test]
4916 fn test_cypher_parse_error() {
4917 let db = GrafeoDB::new_in_memory();
4918 let session = db.session();
4919
4920 let result = session.execute_cypher("MATCH (n RETURN n");
4922
4923 assert!(result.is_err());
4924 }
4925 }
4926
4927 mod direct_lookup_tests {
4930 use super::*;
4931 use grafeo_common::types::Value;
4932
4933 #[test]
4934 fn test_get_node() {
4935 let db = GrafeoDB::new_in_memory();
4936 let session = db.session();
4937
4938 let id = session.create_node(&["Person"]);
4939 let node = session.get_node(id);
4940
4941 assert!(node.is_some());
4942 let node = node.unwrap();
4943 assert_eq!(node.id, id);
4944 }
4945
4946 #[test]
4947 fn test_get_node_not_found() {
4948 use grafeo_common::types::NodeId;
4949
4950 let db = GrafeoDB::new_in_memory();
4951 let session = db.session();
4952
4953 let node = session.get_node(NodeId::new(9999));
4955 assert!(node.is_none());
4956 }
4957
4958 #[test]
4959 fn test_get_node_property() {
4960 let db = GrafeoDB::new_in_memory();
4961 let session = db.session();
4962
4963 let id = session
4964 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4965
4966 let name = session.get_node_property(id, "name");
4967 assert_eq!(name, Some(Value::String("Alix".into())));
4968
4969 let missing = session.get_node_property(id, "missing");
4971 assert!(missing.is_none());
4972 }
4973
4974 #[test]
4975 fn test_get_edge() {
4976 let db = GrafeoDB::new_in_memory();
4977 let session = db.session();
4978
4979 let alix = session.create_node(&["Person"]);
4980 let gus = session.create_node(&["Person"]);
4981 let edge_id = session.create_edge(alix, gus, "KNOWS");
4982
4983 let edge = session.get_edge(edge_id);
4984 assert!(edge.is_some());
4985 let edge = edge.unwrap();
4986 assert_eq!(edge.id, edge_id);
4987 assert_eq!(edge.src, alix);
4988 assert_eq!(edge.dst, gus);
4989 }
4990
4991 #[test]
4992 fn test_get_edge_not_found() {
4993 use grafeo_common::types::EdgeId;
4994
4995 let db = GrafeoDB::new_in_memory();
4996 let session = db.session();
4997
4998 let edge = session.get_edge(EdgeId::new(9999));
4999 assert!(edge.is_none());
5000 }
5001
5002 #[test]
5003 fn test_get_neighbors_outgoing() {
5004 let db = GrafeoDB::new_in_memory();
5005 let session = db.session();
5006
5007 let alix = session.create_node(&["Person"]);
5008 let gus = session.create_node(&["Person"]);
5009 let harm = session.create_node(&["Person"]);
5010
5011 session.create_edge(alix, gus, "KNOWS");
5012 session.create_edge(alix, harm, "KNOWS");
5013
5014 let neighbors = session.get_neighbors_outgoing(alix);
5015 assert_eq!(neighbors.len(), 2);
5016
5017 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5018 assert!(neighbor_ids.contains(&gus));
5019 assert!(neighbor_ids.contains(&harm));
5020 }
5021
5022 #[test]
5023 fn test_get_neighbors_incoming() {
5024 let db = GrafeoDB::new_in_memory();
5025 let session = db.session();
5026
5027 let alix = session.create_node(&["Person"]);
5028 let gus = session.create_node(&["Person"]);
5029 let harm = session.create_node(&["Person"]);
5030
5031 session.create_edge(gus, alix, "KNOWS");
5032 session.create_edge(harm, alix, "KNOWS");
5033
5034 let neighbors = session.get_neighbors_incoming(alix);
5035 assert_eq!(neighbors.len(), 2);
5036
5037 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5038 assert!(neighbor_ids.contains(&gus));
5039 assert!(neighbor_ids.contains(&harm));
5040 }
5041
5042 #[test]
5043 fn test_get_neighbors_outgoing_by_type() {
5044 let db = GrafeoDB::new_in_memory();
5045 let session = db.session();
5046
5047 let alix = session.create_node(&["Person"]);
5048 let gus = session.create_node(&["Person"]);
5049 let company = session.create_node(&["Company"]);
5050
5051 session.create_edge(alix, gus, "KNOWS");
5052 session.create_edge(alix, company, "WORKS_AT");
5053
5054 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5055 assert_eq!(knows_neighbors.len(), 1);
5056 assert_eq!(knows_neighbors[0].0, gus);
5057
5058 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5059 assert_eq!(works_neighbors.len(), 1);
5060 assert_eq!(works_neighbors[0].0, company);
5061
5062 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5064 assert!(no_neighbors.is_empty());
5065 }
5066
5067 #[test]
5068 fn test_node_exists() {
5069 use grafeo_common::types::NodeId;
5070
5071 let db = GrafeoDB::new_in_memory();
5072 let session = db.session();
5073
5074 let id = session.create_node(&["Person"]);
5075
5076 assert!(session.node_exists(id));
5077 assert!(!session.node_exists(NodeId::new(9999)));
5078 }
5079
5080 #[test]
5081 fn test_edge_exists() {
5082 use grafeo_common::types::EdgeId;
5083
5084 let db = GrafeoDB::new_in_memory();
5085 let session = db.session();
5086
5087 let alix = session.create_node(&["Person"]);
5088 let gus = session.create_node(&["Person"]);
5089 let edge_id = session.create_edge(alix, gus, "KNOWS");
5090
5091 assert!(session.edge_exists(edge_id));
5092 assert!(!session.edge_exists(EdgeId::new(9999)));
5093 }
5094
5095 #[test]
5096 fn test_get_degree() {
5097 let db = GrafeoDB::new_in_memory();
5098 let session = db.session();
5099
5100 let alix = session.create_node(&["Person"]);
5101 let gus = session.create_node(&["Person"]);
5102 let harm = session.create_node(&["Person"]);
5103
5104 session.create_edge(alix, gus, "KNOWS");
5106 session.create_edge(alix, harm, "KNOWS");
5107 session.create_edge(gus, alix, "KNOWS");
5109
5110 let (out_degree, in_degree) = session.get_degree(alix);
5111 assert_eq!(out_degree, 2);
5112 assert_eq!(in_degree, 1);
5113
5114 let lonely = session.create_node(&["Person"]);
5116 let (out, in_deg) = session.get_degree(lonely);
5117 assert_eq!(out, 0);
5118 assert_eq!(in_deg, 0);
5119 }
5120
5121 #[test]
5122 fn test_get_nodes_batch() {
5123 let db = GrafeoDB::new_in_memory();
5124 let session = db.session();
5125
5126 let alix = session.create_node(&["Person"]);
5127 let gus = session.create_node(&["Person"]);
5128 let harm = session.create_node(&["Person"]);
5129
5130 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5131 assert_eq!(nodes.len(), 3);
5132 assert!(nodes[0].is_some());
5133 assert!(nodes[1].is_some());
5134 assert!(nodes[2].is_some());
5135
5136 use grafeo_common::types::NodeId;
5138 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5139 assert_eq!(nodes_with_missing.len(), 3);
5140 assert!(nodes_with_missing[0].is_some());
5141 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
5143 }
5144
5145 #[test]
5146 fn test_auto_commit_setting() {
5147 let db = GrafeoDB::new_in_memory();
5148 let mut session = db.session();
5149
5150 assert!(session.auto_commit());
5152
5153 session.set_auto_commit(false);
5154 assert!(!session.auto_commit());
5155
5156 session.set_auto_commit(true);
5157 assert!(session.auto_commit());
5158 }
5159
5160 #[test]
5161 fn test_transaction_double_begin_nests() {
5162 let db = GrafeoDB::new_in_memory();
5163 let mut session = db.session();
5164
5165 session.begin_transaction().unwrap();
5166 let result = session.begin_transaction();
5168 assert!(result.is_ok());
5169 session.commit().unwrap();
5171 session.commit().unwrap();
5173 }
5174
5175 #[test]
5176 fn test_commit_without_transaction_error() {
5177 let db = GrafeoDB::new_in_memory();
5178 let mut session = db.session();
5179
5180 let result = session.commit();
5181 assert!(result.is_err());
5182 }
5183
5184 #[test]
5185 fn test_rollback_without_transaction_error() {
5186 let db = GrafeoDB::new_in_memory();
5187 let mut session = db.session();
5188
5189 let result = session.rollback();
5190 assert!(result.is_err());
5191 }
5192
5193 #[test]
5194 fn test_create_edge_in_transaction() {
5195 let db = GrafeoDB::new_in_memory();
5196 let mut session = db.session();
5197
5198 let alix = session.create_node(&["Person"]);
5200 let gus = session.create_node(&["Person"]);
5201
5202 session.begin_transaction().unwrap();
5204 let edge_id = session.create_edge(alix, gus, "KNOWS");
5205
5206 assert!(session.edge_exists(edge_id));
5208
5209 session.commit().unwrap();
5211
5212 assert!(session.edge_exists(edge_id));
5214 }
5215
5216 #[test]
5217 fn test_neighbors_empty_node() {
5218 let db = GrafeoDB::new_in_memory();
5219 let session = db.session();
5220
5221 let lonely = session.create_node(&["Person"]);
5222
5223 assert!(session.get_neighbors_outgoing(lonely).is_empty());
5224 assert!(session.get_neighbors_incoming(lonely).is_empty());
5225 assert!(
5226 session
5227 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5228 .is_empty()
5229 );
5230 }
5231 }
5232
5233 #[test]
5234 fn test_auto_gc_triggers_on_commit_interval() {
5235 use crate::config::Config;
5236
5237 let config = Config::in_memory().with_gc_interval(2);
5238 let db = GrafeoDB::with_config(config).unwrap();
5239 let mut session = db.session();
5240
5241 session.begin_transaction().unwrap();
5243 session.create_node(&["A"]);
5244 session.commit().unwrap();
5245
5246 session.begin_transaction().unwrap();
5248 session.create_node(&["B"]);
5249 session.commit().unwrap();
5250
5251 assert_eq!(db.node_count(), 2);
5253 }
5254
5255 #[test]
5256 fn test_query_timeout_config_propagates_to_session() {
5257 use crate::config::Config;
5258 use std::time::Duration;
5259
5260 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5261 let db = GrafeoDB::with_config(config).unwrap();
5262 let session = db.session();
5263
5264 assert!(session.query_deadline().is_some());
5266 }
5267
5268 #[test]
5269 fn test_no_query_timeout_returns_no_deadline() {
5270 let db = GrafeoDB::new_in_memory();
5271 let session = db.session();
5272
5273 assert!(session.query_deadline().is_none());
5275 }
5276
5277 #[test]
5278 fn test_graph_model_accessor() {
5279 use crate::config::GraphModel;
5280
5281 let db = GrafeoDB::new_in_memory();
5282 let session = db.session();
5283
5284 assert_eq!(session.graph_model(), GraphModel::Lpg);
5285 }
5286
5287 #[cfg(feature = "gql")]
5288 #[test]
5289 fn test_external_store_session() {
5290 use grafeo_core::graph::GraphStoreMut;
5291 use std::sync::Arc;
5292
5293 let config = crate::config::Config::in_memory();
5294 let store =
5295 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5296 let db = GrafeoDB::with_store(store, config).unwrap();
5297
5298 let mut session = db.session();
5299
5300 session.begin_transaction().unwrap();
5304 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5305
5306 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5308 assert_eq!(result.row_count(), 1);
5309
5310 session.commit().unwrap();
5311 }
5312
5313 #[cfg(feature = "gql")]
5316 mod session_command_tests {
5317 use super::*;
5318 use grafeo_common::types::Value;
5319
5320 #[test]
5321 fn test_use_graph_sets_current_graph() {
5322 let db = GrafeoDB::new_in_memory();
5323 let session = db.session();
5324
5325 session.execute("CREATE GRAPH mydb").unwrap();
5327 session.execute("USE GRAPH mydb").unwrap();
5328
5329 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5330 }
5331
5332 #[test]
5333 fn test_use_graph_nonexistent_errors() {
5334 let db = GrafeoDB::new_in_memory();
5335 let session = db.session();
5336
5337 let result = session.execute("USE GRAPH doesnotexist");
5338 assert!(result.is_err());
5339 let err = result.unwrap_err().to_string();
5340 assert!(
5341 err.contains("does not exist"),
5342 "Expected 'does not exist' error, got: {err}"
5343 );
5344 }
5345
5346 #[test]
5347 fn test_use_graph_default_always_valid() {
5348 let db = GrafeoDB::new_in_memory();
5349 let session = db.session();
5350
5351 session.execute("USE GRAPH default").unwrap();
5353 assert_eq!(session.current_graph(), Some("default".to_string()));
5354 }
5355
5356 #[test]
5357 fn test_session_set_graph() {
5358 let db = GrafeoDB::new_in_memory();
5359 let session = db.session();
5360
5361 session.execute("CREATE GRAPH analytics").unwrap();
5362 session.execute("SESSION SET GRAPH analytics").unwrap();
5363 assert_eq!(session.current_graph(), Some("analytics".to_string()));
5364 }
5365
5366 #[test]
5367 fn test_session_set_graph_nonexistent_errors() {
5368 let db = GrafeoDB::new_in_memory();
5369 let session = db.session();
5370
5371 let result = session.execute("SESSION SET GRAPH nosuchgraph");
5372 assert!(result.is_err());
5373 }
5374
5375 #[test]
5376 fn test_session_set_time_zone() {
5377 let db = GrafeoDB::new_in_memory();
5378 let session = db.session();
5379
5380 assert_eq!(session.time_zone(), None);
5381
5382 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5383 assert_eq!(session.time_zone(), Some("UTC".to_string()));
5384
5385 session
5386 .execute("SESSION SET TIME ZONE 'America/New_York'")
5387 .unwrap();
5388 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5389 }
5390
5391 #[test]
5392 fn test_session_set_parameter() {
5393 let db = GrafeoDB::new_in_memory();
5394 let session = db.session();
5395
5396 session
5397 .execute("SESSION SET PARAMETER $timeout = 30")
5398 .unwrap();
5399
5400 assert!(session.get_parameter("timeout").is_some());
5403 }
5404
5405 #[test]
5406 fn test_session_reset_clears_all_state() {
5407 let db = GrafeoDB::new_in_memory();
5408 let session = db.session();
5409
5410 session.execute("CREATE GRAPH analytics").unwrap();
5412 session.execute("SESSION SET GRAPH analytics").unwrap();
5413 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5414 session
5415 .execute("SESSION SET PARAMETER $limit = 100")
5416 .unwrap();
5417
5418 assert!(session.current_graph().is_some());
5420 assert!(session.time_zone().is_some());
5421 assert!(session.get_parameter("limit").is_some());
5422
5423 session.execute("SESSION RESET").unwrap();
5425
5426 assert_eq!(session.current_graph(), None);
5427 assert_eq!(session.time_zone(), None);
5428 assert!(session.get_parameter("limit").is_none());
5429 }
5430
5431 #[test]
5432 fn test_session_close_clears_state() {
5433 let db = GrafeoDB::new_in_memory();
5434 let session = db.session();
5435
5436 session.execute("CREATE GRAPH analytics").unwrap();
5437 session.execute("SESSION SET GRAPH analytics").unwrap();
5438 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5439
5440 session.execute("SESSION CLOSE").unwrap();
5441
5442 assert_eq!(session.current_graph(), None);
5443 assert_eq!(session.time_zone(), None);
5444 }
5445
5446 #[test]
5447 fn test_create_graph() {
5448 let db = GrafeoDB::new_in_memory();
5449 let session = db.session();
5450
5451 session.execute("CREATE GRAPH mydb").unwrap();
5452
5453 session.execute("USE GRAPH mydb").unwrap();
5455 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5456 }
5457
5458 #[test]
5459 fn test_create_graph_duplicate_errors() {
5460 let db = GrafeoDB::new_in_memory();
5461 let session = db.session();
5462
5463 session.execute("CREATE GRAPH mydb").unwrap();
5464 let result = session.execute("CREATE GRAPH mydb");
5465
5466 assert!(result.is_err());
5467 let err = result.unwrap_err().to_string();
5468 assert!(
5469 err.contains("already exists"),
5470 "Expected 'already exists' error, got: {err}"
5471 );
5472 }
5473
5474 #[test]
5475 fn test_create_graph_if_not_exists() {
5476 let db = GrafeoDB::new_in_memory();
5477 let session = db.session();
5478
5479 session.execute("CREATE GRAPH mydb").unwrap();
5480 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5482 }
5483
5484 #[test]
5485 fn test_drop_graph() {
5486 let db = GrafeoDB::new_in_memory();
5487 let session = db.session();
5488
5489 session.execute("CREATE GRAPH mydb").unwrap();
5490 session.execute("DROP GRAPH mydb").unwrap();
5491
5492 let result = session.execute("USE GRAPH mydb");
5494 assert!(result.is_err());
5495 }
5496
5497 #[test]
5498 fn test_drop_graph_nonexistent_errors() {
5499 let db = GrafeoDB::new_in_memory();
5500 let session = db.session();
5501
5502 let result = session.execute("DROP GRAPH nosuchgraph");
5503 assert!(result.is_err());
5504 let err = result.unwrap_err().to_string();
5505 assert!(
5506 err.contains("does not exist"),
5507 "Expected 'does not exist' error, got: {err}"
5508 );
5509 }
5510
5511 #[test]
5512 fn test_drop_graph_if_exists() {
5513 let db = GrafeoDB::new_in_memory();
5514 let session = db.session();
5515
5516 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5518 }
5519
5520 #[test]
5521 fn test_start_transaction_via_gql() {
5522 let db = GrafeoDB::new_in_memory();
5523 let session = db.session();
5524
5525 session.execute("START TRANSACTION").unwrap();
5526 assert!(session.in_transaction());
5527 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5528 session.execute("COMMIT").unwrap();
5529 assert!(!session.in_transaction());
5530
5531 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5532 assert_eq!(result.rows.len(), 1);
5533 }
5534
5535 #[test]
5536 fn test_start_transaction_read_only_blocks_insert() {
5537 let db = GrafeoDB::new_in_memory();
5538 let session = db.session();
5539
5540 session.execute("START TRANSACTION READ ONLY").unwrap();
5541 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5542 assert!(result.is_err());
5543 let err = result.unwrap_err().to_string();
5544 assert!(
5545 err.contains("read-only"),
5546 "Expected read-only error, got: {err}"
5547 );
5548 session.execute("ROLLBACK").unwrap();
5549 }
5550
5551 #[test]
5552 fn test_start_transaction_read_only_allows_reads() {
5553 let db = GrafeoDB::new_in_memory();
5554 let mut session = db.session();
5555 session.begin_transaction().unwrap();
5556 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5557 session.commit().unwrap();
5558
5559 session.execute("START TRANSACTION READ ONLY").unwrap();
5560 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5561 assert_eq!(result.rows.len(), 1);
5562 session.execute("COMMIT").unwrap();
5563 }
5564
5565 #[test]
5566 fn test_rollback_via_gql() {
5567 let db = GrafeoDB::new_in_memory();
5568 let session = db.session();
5569
5570 session.execute("START TRANSACTION").unwrap();
5571 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5572 session.execute("ROLLBACK").unwrap();
5573
5574 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5575 assert!(result.rows.is_empty());
5576 }
5577
5578 #[test]
5579 fn test_start_transaction_with_isolation_level() {
5580 let db = GrafeoDB::new_in_memory();
5581 let session = db.session();
5582
5583 session
5584 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5585 .unwrap();
5586 assert!(session.in_transaction());
5587 session.execute("ROLLBACK").unwrap();
5588 }
5589
5590 #[test]
5591 fn test_session_commands_return_empty_result() {
5592 let db = GrafeoDB::new_in_memory();
5593 let session = db.session();
5594
5595 session.execute("CREATE GRAPH test").unwrap();
5596 let result = session.execute("SESSION SET GRAPH test").unwrap();
5597 assert_eq!(result.row_count(), 0);
5598 assert_eq!(result.column_count(), 0);
5599 }
5600
5601 #[test]
5602 fn test_current_graph_default_is_none() {
5603 let db = GrafeoDB::new_in_memory();
5604 let session = db.session();
5605
5606 assert_eq!(session.current_graph(), None);
5607 }
5608
5609 #[test]
5610 fn test_time_zone_default_is_none() {
5611 let db = GrafeoDB::new_in_memory();
5612 let session = db.session();
5613
5614 assert_eq!(session.time_zone(), None);
5615 }
5616
5617 #[test]
5618 fn test_session_state_independent_across_sessions() {
5619 let db = GrafeoDB::new_in_memory();
5620 let session1 = db.session();
5621 let session2 = db.session();
5622
5623 session1.execute("CREATE GRAPH first").unwrap();
5624 session1.execute("CREATE GRAPH second").unwrap();
5625 session1.execute("SESSION SET GRAPH first").unwrap();
5626 session2.execute("SESSION SET GRAPH second").unwrap();
5627
5628 assert_eq!(session1.current_graph(), Some("first".to_string()));
5629 assert_eq!(session2.current_graph(), Some("second".to_string()));
5630 }
5631
5632 #[test]
5633 fn test_show_node_types() {
5634 let db = GrafeoDB::new_in_memory();
5635 let session = db.session();
5636
5637 session
5638 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5639 .unwrap();
5640
5641 let result = session.execute("SHOW NODE TYPES").unwrap();
5642 assert_eq!(
5643 result.columns,
5644 vec!["name", "properties", "constraints", "parents"]
5645 );
5646 assert_eq!(result.rows.len(), 1);
5647 assert_eq!(result.rows[0][0], Value::from("Person"));
5649 }
5650
5651 #[test]
5652 fn test_show_edge_types() {
5653 let db = GrafeoDB::new_in_memory();
5654 let session = db.session();
5655
5656 session
5657 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5658 .unwrap();
5659
5660 let result = session.execute("SHOW EDGE TYPES").unwrap();
5661 assert_eq!(
5662 result.columns,
5663 vec!["name", "properties", "source_types", "target_types"]
5664 );
5665 assert_eq!(result.rows.len(), 1);
5666 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5667 }
5668
5669 #[test]
5670 fn test_show_graph_types() {
5671 let db = GrafeoDB::new_in_memory();
5672 let session = db.session();
5673
5674 session
5675 .execute("CREATE NODE TYPE Person (name STRING)")
5676 .unwrap();
5677 session
5678 .execute(
5679 "CREATE GRAPH TYPE social (\
5680 NODE TYPE Person (name STRING)\
5681 )",
5682 )
5683 .unwrap();
5684
5685 let result = session.execute("SHOW GRAPH TYPES").unwrap();
5686 assert_eq!(
5687 result.columns,
5688 vec!["name", "open", "node_types", "edge_types"]
5689 );
5690 assert_eq!(result.rows.len(), 1);
5691 assert_eq!(result.rows[0][0], Value::from("social"));
5692 }
5693
5694 #[test]
5695 fn test_show_graph_type_named() {
5696 let db = GrafeoDB::new_in_memory();
5697 let session = db.session();
5698
5699 session
5700 .execute("CREATE NODE TYPE Person (name STRING)")
5701 .unwrap();
5702 session
5703 .execute(
5704 "CREATE GRAPH TYPE social (\
5705 NODE TYPE Person (name STRING)\
5706 )",
5707 )
5708 .unwrap();
5709
5710 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5711 assert_eq!(result.rows.len(), 1);
5712 assert_eq!(result.rows[0][0], Value::from("social"));
5713 }
5714
5715 #[test]
5716 fn test_show_graph_type_not_found() {
5717 let db = GrafeoDB::new_in_memory();
5718 let session = db.session();
5719
5720 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5721 assert!(result.is_err());
5722 }
5723
5724 #[test]
5725 fn test_show_indexes_via_gql() {
5726 let db = GrafeoDB::new_in_memory();
5727 let session = db.session();
5728
5729 let result = session.execute("SHOW INDEXES").unwrap();
5730 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5731 }
5732
5733 #[test]
5734 fn test_show_constraints_via_gql() {
5735 let db = GrafeoDB::new_in_memory();
5736 let session = db.session();
5737
5738 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5739 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5740 }
5741
5742 #[test]
5743 fn test_pattern_form_graph_type_roundtrip() {
5744 let db = GrafeoDB::new_in_memory();
5745 let session = db.session();
5746
5747 session
5749 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5750 .unwrap();
5751 session
5752 .execute("CREATE NODE TYPE City (name STRING)")
5753 .unwrap();
5754 session
5755 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5756 .unwrap();
5757 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5758
5759 session
5761 .execute(
5762 "CREATE GRAPH TYPE social (\
5763 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5764 (:Person)-[:LIVES_IN]->(:City)\
5765 )",
5766 )
5767 .unwrap();
5768
5769 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5771 assert_eq!(result.rows.len(), 1);
5772 assert_eq!(result.rows[0][0], Value::from("social"));
5773 }
5774 }
5775}