1#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::cache::QueryCache;
37use crate::transaction::TransactionManager;
38
39const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
42
43fn parse_default_literal(text: &str) -> Value {
48 if text.eq_ignore_ascii_case("null") {
49 return Value::Null;
50 }
51 if text.eq_ignore_ascii_case("true") {
52 return Value::Bool(true);
53 }
54 if text.eq_ignore_ascii_case("false") {
55 return Value::Bool(false);
56 }
57 if (text.starts_with('\'') && text.ends_with('\''))
59 || (text.starts_with('"') && text.ends_with('"'))
60 {
61 return Value::String(text[1..text.len() - 1].into());
62 }
63 if let Ok(i) = text.parse::<i64>() {
65 return Value::Int64(i);
66 }
67 if let Ok(f) = text.parse::<f64>() {
68 return Value::Float64(f);
69 }
70 Value::String(text.into())
72}
73
74pub(crate) struct SessionConfig {
79 pub transaction_manager: Arc<TransactionManager>,
80 pub query_cache: Arc<QueryCache>,
81 pub catalog: Arc<Catalog>,
82 pub adaptive_config: AdaptiveConfig,
83 pub factorized_execution: bool,
84 pub graph_model: GraphModel,
85 pub query_timeout: Option<Duration>,
86 pub commit_counter: Arc<AtomicUsize>,
87 pub gc_interval: usize,
88 pub read_only: bool,
90}
91
92pub struct Session {
98 #[cfg(feature = "lpg")]
100 store: Arc<LpgStore>,
101 graph_store: Arc<dyn GraphStore>,
103 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
105 catalog: Arc<Catalog>,
107 #[cfg(feature = "triple-store")]
109 rdf_store: Arc<RdfStore>,
110 transaction_manager: Arc<TransactionManager>,
112 query_cache: Arc<QueryCache>,
114 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
118 read_only_tx: parking_lot::Mutex<bool>,
120 db_read_only: bool,
123 auto_commit: bool,
125 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
128 factorized_execution: bool,
130 graph_model: GraphModel,
132 query_timeout: Option<Duration>,
134 commit_counter: Arc<AtomicUsize>,
136 gc_interval: usize,
138 transaction_start_node_count: AtomicUsize,
140 transaction_start_edge_count: AtomicUsize,
142 #[cfg(feature = "wal")]
144 wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
145 #[cfg(feature = "wal")]
147 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
148 #[cfg(feature = "cdc")]
150 cdc_log: Arc<crate::cdc::CdcLog>,
151 #[cfg(feature = "cdc")]
154 cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
155 current_graph: parking_lot::Mutex<Option<String>>,
157 current_schema: parking_lot::Mutex<Option<String>>,
160 time_zone: parking_lot::Mutex<Option<String>>,
162 session_params:
164 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
165 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
167 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
169 transaction_nesting_depth: parking_lot::Mutex<u32>,
173 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
177 #[cfg(feature = "metrics")]
179 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
180 #[cfg(feature = "metrics")]
182 tx_start_time: parking_lot::Mutex<Option<Instant>>,
183}
184
185#[derive(Clone)]
187struct GraphSavepoint {
188 graph_name: Option<String>,
189 next_node_id: u64,
190 next_edge_id: u64,
191 undo_log_position: usize,
192}
193
194#[derive(Clone)]
196struct SavepointState {
197 name: String,
198 graph_snapshots: Vec<GraphSavepoint>,
199 #[allow(dead_code)]
202 active_graph: Option<String>,
203 #[cfg(feature = "cdc")]
206 cdc_event_position: usize,
207}
208
209impl Session {
210 #[cfg(feature = "lpg")]
212 #[allow(dead_code)] pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
214 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
215 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
216 Self {
217 store,
218 graph_store,
219 graph_store_mut,
220 catalog: cfg.catalog,
221 #[cfg(feature = "triple-store")]
222 rdf_store: Arc::new(RdfStore::new()),
223 transaction_manager: cfg.transaction_manager,
224 query_cache: cfg.query_cache,
225 current_transaction: parking_lot::Mutex::new(None),
226 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
227 db_read_only: cfg.read_only,
228 auto_commit: true,
229 adaptive_config: cfg.adaptive_config,
230 factorized_execution: cfg.factorized_execution,
231 graph_model: cfg.graph_model,
232 query_timeout: cfg.query_timeout,
233 commit_counter: cfg.commit_counter,
234 gc_interval: cfg.gc_interval,
235 transaction_start_node_count: AtomicUsize::new(0),
236 transaction_start_edge_count: AtomicUsize::new(0),
237 #[cfg(feature = "wal")]
238 wal: None,
239 #[cfg(feature = "wal")]
240 wal_graph_context: None,
241 #[cfg(feature = "cdc")]
242 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
243 #[cfg(feature = "cdc")]
244 cdc_pending_events: None,
245 current_graph: parking_lot::Mutex::new(None),
246 current_schema: parking_lot::Mutex::new(None),
247 time_zone: parking_lot::Mutex::new(None),
248 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
249 viewing_epoch_override: parking_lot::Mutex::new(None),
250 savepoints: parking_lot::Mutex::new(Vec::new()),
251 transaction_nesting_depth: parking_lot::Mutex::new(0),
252 touched_graphs: parking_lot::Mutex::new(Vec::new()),
253 #[cfg(feature = "metrics")]
254 metrics: None,
255 #[cfg(feature = "metrics")]
256 tx_start_time: parking_lot::Mutex::new(None),
257 }
258 }
259
260 #[cfg(all(feature = "wal", feature = "lpg"))]
265 pub(crate) fn set_wal(
266 &mut self,
267 wal: Arc<grafeo_storage::wal::LpgWal>,
268 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
269 ) {
270 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
272 Arc::clone(&self.store),
273 Arc::clone(&wal),
274 Arc::clone(&wal_graph_context),
275 ));
276 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
277 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
278 self.wal = Some(wal);
279 self.wal_graph_context = Some(wal_graph_context);
280 }
281
282 #[cfg(feature = "cdc")]
289 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
290 if let Some(ref write_store) = self.graph_store_mut {
293 let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
294 Arc::clone(write_store),
295 Arc::clone(&cdc_log),
296 ));
297 self.cdc_pending_events = Some(cdc_store.pending_events());
298 self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
299 }
300 self.cdc_log = cdc_log;
301 }
302
303 #[cfg(feature = "metrics")]
305 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
306 self.metrics = Some(metrics);
307 }
308
309 pub(crate) fn with_external_store(
318 read_store: Arc<dyn GraphStore>,
319 write_store: Option<Arc<dyn GraphStoreMut>>,
320 cfg: SessionConfig,
321 ) -> Result<Self> {
322 Ok(Self {
323 #[cfg(feature = "lpg")]
324 store: Arc::new(LpgStore::new()?),
325 graph_store: read_store,
326 graph_store_mut: write_store,
327 catalog: cfg.catalog,
328 #[cfg(feature = "triple-store")]
329 rdf_store: Arc::new(RdfStore::new()),
330 transaction_manager: cfg.transaction_manager,
331 query_cache: cfg.query_cache,
332 current_transaction: parking_lot::Mutex::new(None),
333 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
334 db_read_only: cfg.read_only,
335 auto_commit: true,
336 adaptive_config: cfg.adaptive_config,
337 factorized_execution: cfg.factorized_execution,
338 graph_model: cfg.graph_model,
339 query_timeout: cfg.query_timeout,
340 commit_counter: cfg.commit_counter,
341 gc_interval: cfg.gc_interval,
342 transaction_start_node_count: AtomicUsize::new(0),
343 transaction_start_edge_count: AtomicUsize::new(0),
344 #[cfg(feature = "wal")]
345 wal: None,
346 #[cfg(feature = "wal")]
347 wal_graph_context: None,
348 #[cfg(feature = "cdc")]
349 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
350 #[cfg(feature = "cdc")]
351 cdc_pending_events: None,
352 current_graph: parking_lot::Mutex::new(None),
353 current_schema: parking_lot::Mutex::new(None),
354 time_zone: parking_lot::Mutex::new(None),
355 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
356 viewing_epoch_override: parking_lot::Mutex::new(None),
357 savepoints: parking_lot::Mutex::new(Vec::new()),
358 transaction_nesting_depth: parking_lot::Mutex::new(0),
359 touched_graphs: parking_lot::Mutex::new(Vec::new()),
360 #[cfg(feature = "metrics")]
361 metrics: None,
362 #[cfg(feature = "metrics")]
363 tx_start_time: parking_lot::Mutex::new(None),
364 })
365 }
366
367 #[must_use]
369 pub fn graph_model(&self) -> GraphModel {
370 self.graph_model
371 }
372
373 pub fn use_graph(&self, name: &str) {
377 *self.current_graph.lock() = Some(name.to_string());
378 }
379
380 #[must_use]
382 pub fn current_graph(&self) -> Option<String> {
383 self.current_graph.lock().clone()
384 }
385
386 pub fn set_schema(&self, name: &str) {
390 *self.current_schema.lock() = Some(name.to_string());
391 }
392
393 #[must_use]
397 pub fn current_schema(&self) -> Option<String> {
398 self.current_schema.lock().clone()
399 }
400
401 fn effective_graph_key(&self, graph_name: &str) -> String {
406 let schema = self.current_schema.lock().clone();
407 match schema {
408 Some(s) => format!("{s}/{graph_name}"),
409 None => graph_name.to_string(),
410 }
411 }
412
413 fn effective_type_key(&self, type_name: &str) -> String {
417 let schema = self.current_schema.lock().clone();
418 match schema {
419 Some(s) => format!("{s}/{type_name}"),
420 None => type_name.to_string(),
421 }
422 }
423
424 fn active_graph_storage_key(&self) -> Option<String> {
428 let graph = self.current_graph.lock().clone();
429 let schema = self.current_schema.lock().clone();
430 match (&schema, &graph) {
431 (None, None) => None,
432 (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
433 (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
434 (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
435 Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
436 }
437 (None, Some(name)) => Some(name.clone()),
438 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
439 }
440 }
441
442 fn active_store(&self) -> Arc<dyn GraphStore> {
450 let key = self.active_graph_storage_key();
451 match key {
452 None => Arc::clone(&self.graph_store),
453 #[cfg(feature = "lpg")]
454 Some(ref name) => match self.store.graph(name) {
455 Some(named_store) => {
456 #[cfg(feature = "wal")]
457 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
458 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
459 named_store,
460 Arc::clone(wal),
461 name.clone(),
462 Arc::clone(ctx),
463 )) as Arc<dyn GraphStore>;
464 }
465 named_store as Arc<dyn GraphStore>
466 }
467 None => Arc::clone(&self.graph_store),
468 },
469 #[cfg(not(feature = "lpg"))]
470 Some(_) => Arc::clone(&self.graph_store),
471 }
472 }
473
474 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
479 let key = self.active_graph_storage_key();
480 match key {
481 None => self.graph_store_mut.as_ref().map(Arc::clone),
482 #[cfg(feature = "lpg")]
483 Some(ref name) => match self.store.graph(name) {
484 Some(named_store) => {
485 let mut store: Arc<dyn GraphStoreMut> = named_store;
486
487 #[cfg(feature = "wal")]
488 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
489 store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
490 self.store
492 .graph(name)
493 .unwrap_or_else(|| Arc::clone(&self.store)),
494 Arc::clone(wal),
495 name.clone(),
496 Arc::clone(ctx),
497 ));
498 }
499
500 #[cfg(feature = "cdc")]
501 if let Some(ref pending) = self.cdc_pending_events {
502 store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
503 store,
504 Arc::clone(&self.cdc_log),
505 Arc::clone(pending),
506 ));
507 }
508
509 Some(store)
510 }
511 None => self.graph_store_mut.as_ref().map(Arc::clone),
512 },
513 #[cfg(not(feature = "lpg"))]
514 Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
515 }
516 }
517
518 #[cfg(feature = "lpg")]
523 fn active_lpg_store(&self) -> Arc<LpgStore> {
524 let key = self.active_graph_storage_key();
525 match key {
526 None => Arc::clone(&self.store),
527 Some(ref name) => self
528 .store
529 .graph(name)
530 .unwrap_or_else(|| Arc::clone(&self.store)),
531 }
532 }
533
534 #[cfg(feature = "lpg")]
537 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
538 match graph_name {
539 None => Arc::clone(&self.store),
540 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
541 Some(name) => self
542 .store
543 .graph(name)
544 .unwrap_or_else(|| Arc::clone(&self.store)),
545 }
546 }
547
548 fn track_graph_touch(&self) {
553 if self.current_transaction.lock().is_some() {
554 let key = self.active_graph_storage_key();
555 let mut touched = self.touched_graphs.lock();
556 if !touched.contains(&key) {
557 touched.push(key);
558 }
559 }
560 }
561
562 pub fn set_time_zone(&self, tz: &str) {
564 *self.time_zone.lock() = Some(tz.to_string());
565 }
566
567 #[must_use]
569 pub fn time_zone(&self) -> Option<String> {
570 self.time_zone.lock().clone()
571 }
572
573 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
575 self.session_params.lock().insert(key.to_string(), value);
576 }
577
578 #[must_use]
580 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
581 self.session_params.lock().get(key).cloned()
582 }
583
584 pub fn reset_session(&self) {
586 *self.current_schema.lock() = None;
587 *self.current_graph.lock() = None;
588 *self.time_zone.lock() = None;
589 self.session_params.lock().clear();
590 *self.viewing_epoch_override.lock() = None;
591 }
592
593 pub fn reset_schema(&self) {
595 *self.current_schema.lock() = None;
596 }
597
598 pub fn reset_graph(&self) {
600 *self.current_graph.lock() = None;
601 }
602
603 pub fn reset_time_zone(&self) {
605 *self.time_zone.lock() = None;
606 }
607
608 pub fn reset_parameters(&self) {
610 self.session_params.lock().clear();
611 }
612
613 pub fn set_viewing_epoch(&self, epoch: EpochId) {
621 *self.viewing_epoch_override.lock() = Some(epoch);
622 }
623
624 pub fn clear_viewing_epoch(&self) {
626 *self.viewing_epoch_override.lock() = None;
627 }
628
629 #[must_use]
631 pub fn viewing_epoch(&self) -> Option<EpochId> {
632 *self.viewing_epoch_override.lock()
633 }
634
635 #[cfg(feature = "lpg")]
639 #[must_use]
640 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
641 self.active_lpg_store().get_node_history(id)
642 }
643
644 #[cfg(feature = "lpg")]
648 #[must_use]
649 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
650 self.active_lpg_store().get_edge_history(id)
651 }
652
653 fn require_lpg(&self, language: &str) -> Result<()> {
655 if self.graph_model == GraphModel::Rdf {
656 return Err(grafeo_common::utils::error::Error::Internal(format!(
657 "This is an RDF database. {language} queries require an LPG database."
658 )));
659 }
660 Ok(())
661 }
662
663 #[cfg(feature = "gql")]
665 fn execute_session_command(
666 &self,
667 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
668 ) -> Result<QueryResult> {
669 use grafeo_adapters::query::gql::ast::SessionCommand;
670 #[cfg(feature = "lpg")]
671 use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
672 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
673
674 if *self.read_only_tx.lock() {
676 match &cmd {
677 SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
678 return Err(Error::Transaction(
679 grafeo_common::utils::error::TransactionError::ReadOnly,
680 ));
681 }
682 _ => {} }
684 }
685
686 match cmd {
687 #[cfg(feature = "lpg")]
688 SessionCommand::CreateGraph {
689 name,
690 if_not_exists,
691 typed,
692 like_graph,
693 copy_of,
694 open: _,
695 } => {
696 let storage_key = self.effective_graph_key(&name);
698
699 if let Some(ref src) = like_graph {
701 let src_key = self.effective_graph_key(src);
702 if self.store.graph(&src_key).is_none() {
703 return Err(Error::Query(QueryError::new(
704 QueryErrorKind::Semantic,
705 format!("Source graph '{src}' does not exist"),
706 )));
707 }
708 }
709 if let Some(ref src) = copy_of {
710 let src_key = self.effective_graph_key(src);
711 if self.store.graph(&src_key).is_none() {
712 return Err(Error::Query(QueryError::new(
713 QueryErrorKind::Semantic,
714 format!("Source graph '{src}' does not exist"),
715 )));
716 }
717 }
718
719 let created = self
720 .store
721 .create_graph(&storage_key)
722 .map_err(|e| Error::Internal(e.to_string()))?;
723 if !created && !if_not_exists {
724 return Err(Error::Query(QueryError::new(
725 QueryErrorKind::Semantic,
726 format!("Graph '{name}' already exists"),
727 )));
728 }
729 if created {
730 #[cfg(feature = "wal")]
731 self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
732 name: storage_key.clone(),
733 });
734 }
735
736 if let Some(ref src) = copy_of {
738 let src_key = self.effective_graph_key(src);
739 self.store
740 .copy_graph(Some(&src_key), Some(&storage_key))
741 .map_err(|e| Error::Internal(e.to_string()))?;
742 }
743
744 if let Some(type_name) = typed
748 && let Err(e) = self.catalog.bind_graph_type(
749 &storage_key,
750 if type_name.contains('/') {
751 type_name.clone()
752 } else {
753 self.effective_type_key(&type_name)
754 },
755 )
756 {
757 return Err(Error::Query(QueryError::new(
758 QueryErrorKind::Semantic,
759 e.to_string(),
760 )));
761 }
762
763 if let Some(ref src) = like_graph {
765 let src_key = self.effective_graph_key(src);
766 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
767 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
768 }
769 }
770
771 Ok(QueryResult::empty())
772 }
773 #[cfg(feature = "lpg")]
774 SessionCommand::DropGraph { name, if_exists } => {
775 let storage_key = self.effective_graph_key(&name);
776 let dropped = self.store.drop_graph(&storage_key);
777 if !dropped && !if_exists {
778 return Err(Error::Query(QueryError::new(
779 QueryErrorKind::Semantic,
780 format!("Graph '{name}' does not exist"),
781 )));
782 }
783 if dropped {
784 #[cfg(feature = "wal")]
785 self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
786 name: storage_key.clone(),
787 });
788 let mut current = self.current_graph.lock();
790 if current
791 .as_deref()
792 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
793 {
794 *current = None;
795 }
796 }
797 Ok(QueryResult::empty())
798 }
799 #[cfg(feature = "lpg")]
800 SessionCommand::UseGraph(name) => {
801 let effective_key = self.effective_graph_key(&name);
803 if !name.eq_ignore_ascii_case("default")
804 && self.store.graph(&effective_key).is_none()
805 {
806 return Err(Error::Query(QueryError::new(
807 QueryErrorKind::Semantic,
808 format!("Graph '{name}' does not exist"),
809 )));
810 }
811 self.use_graph(&name);
812 self.track_graph_touch();
814 Ok(QueryResult::empty())
815 }
816 #[cfg(feature = "lpg")]
817 SessionCommand::SessionSetGraph(name) => {
818 let effective_key = self.effective_graph_key(&name);
820 if !name.eq_ignore_ascii_case("default")
821 && self.store.graph(&effective_key).is_none()
822 {
823 return Err(Error::Query(QueryError::new(
824 QueryErrorKind::Semantic,
825 format!("Graph '{name}' does not exist"),
826 )));
827 }
828 self.use_graph(&name);
829 self.track_graph_touch();
831 Ok(QueryResult::empty())
832 }
833 SessionCommand::SessionSetSchema(name) => {
834 if !self.catalog.schema_exists(&name) {
836 return Err(Error::Query(QueryError::new(
837 QueryErrorKind::Semantic,
838 format!("Schema '{name}' does not exist"),
839 )));
840 }
841 self.set_schema(&name);
842 Ok(QueryResult::empty())
843 }
844 SessionCommand::SessionSetTimeZone(tz) => {
845 self.set_time_zone(&tz);
846 Ok(QueryResult::empty())
847 }
848 SessionCommand::SessionSetParameter(key, expr) => {
849 if key.eq_ignore_ascii_case("viewing_epoch") {
850 match Self::eval_integer_literal(&expr) {
851 Some(n) if n >= 0 => {
852 self.set_viewing_epoch(EpochId::new(n as u64));
853 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
854 }
855 _ => Err(Error::Query(QueryError::new(
856 QueryErrorKind::Semantic,
857 "viewing_epoch must be a non-negative integer literal",
858 ))),
859 }
860 } else {
861 self.set_parameter(&key, Value::Null);
864 Ok(QueryResult::empty())
865 }
866 }
867 SessionCommand::SessionReset(target) => {
868 use grafeo_adapters::query::gql::ast::SessionResetTarget;
869 match target {
870 SessionResetTarget::All => self.reset_session(),
871 SessionResetTarget::Schema => self.reset_schema(),
872 SessionResetTarget::Graph => self.reset_graph(),
873 SessionResetTarget::TimeZone => self.reset_time_zone(),
874 SessionResetTarget::Parameters => self.reset_parameters(),
875 }
876 Ok(QueryResult::empty())
877 }
878 SessionCommand::SessionClose => {
879 self.reset_session();
880 Ok(QueryResult::empty())
881 }
882 #[cfg(feature = "lpg")]
883 SessionCommand::StartTransaction {
884 read_only,
885 isolation_level,
886 } => {
887 let engine_level = isolation_level.map(|l| match l {
888 TransactionIsolationLevel::ReadCommitted => {
889 crate::transaction::IsolationLevel::ReadCommitted
890 }
891 TransactionIsolationLevel::SnapshotIsolation => {
892 crate::transaction::IsolationLevel::SnapshotIsolation
893 }
894 TransactionIsolationLevel::Serializable => {
895 crate::transaction::IsolationLevel::Serializable
896 }
897 });
898 self.begin_transaction_inner(read_only, engine_level)?;
899 Ok(QueryResult::status("Transaction started"))
900 }
901 #[cfg(feature = "lpg")]
902 SessionCommand::Commit => {
903 self.commit_inner()?;
904 Ok(QueryResult::status("Transaction committed"))
905 }
906 #[cfg(feature = "lpg")]
907 SessionCommand::Rollback => {
908 self.rollback_inner()?;
909 Ok(QueryResult::status("Transaction rolled back"))
910 }
911 #[cfg(feature = "lpg")]
912 SessionCommand::Savepoint(name) => {
913 self.savepoint(&name)?;
914 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
915 }
916 #[cfg(feature = "lpg")]
917 SessionCommand::RollbackToSavepoint(name) => {
918 self.rollback_to_savepoint(&name)?;
919 Ok(QueryResult::status(format!(
920 "Rolled back to savepoint '{name}'"
921 )))
922 }
923 #[cfg(feature = "lpg")]
924 SessionCommand::ReleaseSavepoint(name) => {
925 self.release_savepoint(&name)?;
926 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
927 }
928 #[cfg(not(feature = "lpg"))]
929 _ => Err(grafeo_common::utils::error::Error::Internal(
930 "This command requires the `lpg` feature".to_string(),
931 )),
932 }
933 }
934
935 #[cfg(feature = "wal")]
937 fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
938 if let Some(ref wal) = self.wal
939 && let Err(e) = wal.log(record)
940 {
941 grafeo_warn!("Failed to log schema change to WAL: {}", e);
942 }
943 }
944
945 #[cfg(all(feature = "lpg", feature = "gql"))]
947 fn execute_schema_command(
948 &self,
949 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
950 ) -> Result<QueryResult> {
951 use crate::catalog::{
952 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
953 };
954 use grafeo_adapters::query::gql::ast::SchemaStatement;
955 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
956 #[cfg(feature = "wal")]
957 use grafeo_storage::wal::WalRecord;
958
959 macro_rules! wal_log {
961 ($self:expr, $record:expr) => {
962 #[cfg(feature = "wal")]
963 $self.log_schema_wal(&$record);
964 };
965 }
966
967 let result = match cmd {
968 SchemaStatement::CreateNodeType(stmt) => {
969 let effective_name = self.effective_type_key(&stmt.name);
970 #[cfg(feature = "wal")]
971 let props_for_wal: Vec<(String, String, bool)> = stmt
972 .properties
973 .iter()
974 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
975 .collect();
976 let def = NodeTypeDefinition {
977 name: effective_name.clone(),
978 properties: stmt
979 .properties
980 .iter()
981 .map(|p| TypedProperty {
982 name: p.name.clone(),
983 data_type: PropertyDataType::from_type_name(&p.data_type),
984 nullable: p.nullable,
985 default_value: p
986 .default_value
987 .as_ref()
988 .map(|s| parse_default_literal(s)),
989 })
990 .collect(),
991 constraints: Vec::new(),
992 parent_types: stmt.parent_types.clone(),
993 };
994 let result = if stmt.or_replace {
995 let _ = self.catalog.drop_node_type(&effective_name);
996 self.catalog.register_node_type(def)
997 } else {
998 self.catalog.register_node_type(def)
999 };
1000 match result {
1001 Ok(()) => {
1002 wal_log!(
1003 self,
1004 WalRecord::CreateNodeType {
1005 name: effective_name.clone(),
1006 properties: props_for_wal,
1007 constraints: Vec::new(),
1008 }
1009 );
1010 Ok(QueryResult::status(format!(
1011 "Created node type '{}'",
1012 stmt.name
1013 )))
1014 }
1015 Err(e) if stmt.if_not_exists => {
1016 let _ = e;
1017 Ok(QueryResult::status("No change"))
1018 }
1019 Err(e) => Err(Error::Query(QueryError::new(
1020 QueryErrorKind::Semantic,
1021 e.to_string(),
1022 ))),
1023 }
1024 }
1025 SchemaStatement::CreateEdgeType(stmt) => {
1026 let effective_name = self.effective_type_key(&stmt.name);
1027 #[cfg(feature = "wal")]
1028 let props_for_wal: Vec<(String, String, bool)> = stmt
1029 .properties
1030 .iter()
1031 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1032 .collect();
1033 let def = EdgeTypeDefinition {
1034 name: effective_name.clone(),
1035 properties: stmt
1036 .properties
1037 .iter()
1038 .map(|p| TypedProperty {
1039 name: p.name.clone(),
1040 data_type: PropertyDataType::from_type_name(&p.data_type),
1041 nullable: p.nullable,
1042 default_value: p
1043 .default_value
1044 .as_ref()
1045 .map(|s| parse_default_literal(s)),
1046 })
1047 .collect(),
1048 constraints: Vec::new(),
1049 source_node_types: stmt.source_node_types.clone(),
1050 target_node_types: stmt.target_node_types.clone(),
1051 };
1052 let result = if stmt.or_replace {
1053 let _ = self.catalog.drop_edge_type_def(&effective_name);
1054 self.catalog.register_edge_type_def(def)
1055 } else {
1056 self.catalog.register_edge_type_def(def)
1057 };
1058 match result {
1059 Ok(()) => {
1060 wal_log!(
1061 self,
1062 WalRecord::CreateEdgeType {
1063 name: effective_name.clone(),
1064 properties: props_for_wal,
1065 constraints: Vec::new(),
1066 }
1067 );
1068 Ok(QueryResult::status(format!(
1069 "Created edge type '{}'",
1070 stmt.name
1071 )))
1072 }
1073 Err(e) if stmt.if_not_exists => {
1074 let _ = e;
1075 Ok(QueryResult::status("No change"))
1076 }
1077 Err(e) => Err(Error::Query(QueryError::new(
1078 QueryErrorKind::Semantic,
1079 e.to_string(),
1080 ))),
1081 }
1082 }
1083 SchemaStatement::CreateVectorIndex(stmt) => {
1084 Self::create_vector_index_on_store(
1085 &self.active_lpg_store(),
1086 &stmt.node_label,
1087 &stmt.property,
1088 stmt.dimensions,
1089 stmt.metric.as_deref(),
1090 )?;
1091 wal_log!(
1092 self,
1093 WalRecord::CreateIndex {
1094 name: stmt.name.clone(),
1095 label: stmt.node_label.clone(),
1096 property: stmt.property.clone(),
1097 index_type: "vector".to_string(),
1098 }
1099 );
1100 Ok(QueryResult::status(format!(
1101 "Created vector index '{}'",
1102 stmt.name
1103 )))
1104 }
1105 SchemaStatement::DropNodeType { name, if_exists } => {
1106 let effective_name = self.effective_type_key(&name);
1107 match self.catalog.drop_node_type(&effective_name) {
1108 Ok(()) => {
1109 wal_log!(
1110 self,
1111 WalRecord::DropNodeType {
1112 name: effective_name
1113 }
1114 );
1115 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1116 }
1117 Err(e) if if_exists => {
1118 let _ = e;
1119 Ok(QueryResult::status("No change"))
1120 }
1121 Err(e) => Err(Error::Query(QueryError::new(
1122 QueryErrorKind::Semantic,
1123 e.to_string(),
1124 ))),
1125 }
1126 }
1127 SchemaStatement::DropEdgeType { name, if_exists } => {
1128 let effective_name = self.effective_type_key(&name);
1129 match self.catalog.drop_edge_type_def(&effective_name) {
1130 Ok(()) => {
1131 wal_log!(
1132 self,
1133 WalRecord::DropEdgeType {
1134 name: effective_name
1135 }
1136 );
1137 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1138 }
1139 Err(e) if if_exists => {
1140 let _ = e;
1141 Ok(QueryResult::status("No change"))
1142 }
1143 Err(e) => Err(Error::Query(QueryError::new(
1144 QueryErrorKind::Semantic,
1145 e.to_string(),
1146 ))),
1147 }
1148 }
1149 SchemaStatement::CreateIndex(stmt) => {
1150 use crate::catalog::IndexType as CatalogIndexType;
1151 use grafeo_adapters::query::gql::ast::IndexKind;
1152 let active = self.active_lpg_store();
1153 let index_type_str = match stmt.index_kind {
1154 IndexKind::Property => "property",
1155 IndexKind::BTree => "btree",
1156 IndexKind::Text => "text",
1157 IndexKind::Vector => "vector",
1158 };
1159 match stmt.index_kind {
1160 IndexKind::Property | IndexKind::BTree => {
1161 for prop in &stmt.properties {
1162 active.create_property_index(prop);
1163 }
1164 }
1165 IndexKind::Text => {
1166 for prop in &stmt.properties {
1167 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1168 }
1169 }
1170 IndexKind::Vector => {
1171 for prop in &stmt.properties {
1172 Self::create_vector_index_on_store(
1173 &active,
1174 &stmt.label,
1175 prop,
1176 stmt.options.dimensions,
1177 stmt.options.metric.as_deref(),
1178 )?;
1179 }
1180 }
1181 }
1182 let catalog_index_type = match stmt.index_kind {
1185 IndexKind::Property => CatalogIndexType::Hash,
1186 IndexKind::BTree => CatalogIndexType::BTree,
1187 IndexKind::Text => CatalogIndexType::FullText,
1188 IndexKind::Vector => CatalogIndexType::Hash,
1189 };
1190 let label_id = self.catalog.get_or_create_label(&stmt.label);
1191 for prop in &stmt.properties {
1192 let prop_id = self.catalog.get_or_create_property_key(prop);
1193 self.catalog
1194 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1195 }
1196 #[cfg(feature = "wal")]
1197 for prop in &stmt.properties {
1198 wal_log!(
1199 self,
1200 WalRecord::CreateIndex {
1201 name: stmt.name.clone(),
1202 label: stmt.label.clone(),
1203 property: prop.clone(),
1204 index_type: index_type_str.to_string(),
1205 }
1206 );
1207 }
1208 Ok(QueryResult::status(format!(
1209 "Created {} index '{}'",
1210 index_type_str, stmt.name
1211 )))
1212 }
1213 SchemaStatement::DropIndex { name, if_exists } => {
1214 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1217 let def = self.catalog.get_index(index_id);
1218 self.catalog.drop_index(index_id);
1219 if let Some(def) = def
1220 && let Some(prop_name) =
1221 self.catalog.get_property_key_name(def.property_key)
1222 {
1223 self.active_lpg_store().drop_property_index(&prop_name);
1224 }
1225 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1226 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1227 } else if if_exists {
1228 Ok(QueryResult::status("No change".to_string()))
1229 } else {
1230 Err(Error::Query(QueryError::new(
1231 QueryErrorKind::Semantic,
1232 format!("Index '{name}' does not exist"),
1233 )))
1234 }
1235 }
1236 SchemaStatement::CreateConstraint(stmt) => {
1237 use crate::catalog::TypeConstraint;
1238 use grafeo_adapters::query::gql::ast::ConstraintKind;
1239 let kind_str = match stmt.constraint_kind {
1240 ConstraintKind::Unique => "unique",
1241 ConstraintKind::NodeKey => "node_key",
1242 ConstraintKind::NotNull => "not_null",
1243 ConstraintKind::Exists => "exists",
1244 };
1245 let constraint_name = stmt
1246 .name
1247 .clone()
1248 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1249
1250 match stmt.constraint_kind {
1252 ConstraintKind::Unique => {
1253 for prop in &stmt.properties {
1254 let label_id = self.catalog.get_or_create_label(&stmt.label);
1255 let prop_id = self.catalog.get_or_create_property_key(prop);
1256 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1257 }
1258 let _ = self.catalog.add_constraint_to_type(
1259 &stmt.label,
1260 TypeConstraint::Unique(stmt.properties.clone()),
1261 );
1262 }
1263 ConstraintKind::NodeKey => {
1264 for prop in &stmt.properties {
1265 let label_id = self.catalog.get_or_create_label(&stmt.label);
1266 let prop_id = self.catalog.get_or_create_property_key(prop);
1267 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1268 let _ = self.catalog.add_required_property(label_id, prop_id);
1269 }
1270 let _ = self.catalog.add_constraint_to_type(
1271 &stmt.label,
1272 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1273 );
1274 }
1275 ConstraintKind::NotNull | ConstraintKind::Exists => {
1276 for prop in &stmt.properties {
1277 let label_id = self.catalog.get_or_create_label(&stmt.label);
1278 let prop_id = self.catalog.get_or_create_property_key(prop);
1279 let _ = self.catalog.add_required_property(label_id, prop_id);
1280 let _ = self.catalog.add_constraint_to_type(
1281 &stmt.label,
1282 TypeConstraint::NotNull(prop.clone()),
1283 );
1284 }
1285 }
1286 }
1287
1288 wal_log!(
1289 self,
1290 WalRecord::CreateConstraint {
1291 name: constraint_name.clone(),
1292 label: stmt.label.clone(),
1293 properties: stmt.properties.clone(),
1294 kind: kind_str.to_string(),
1295 }
1296 );
1297 Ok(QueryResult::status(format!(
1298 "Created {kind_str} constraint '{constraint_name}'"
1299 )))
1300 }
1301 SchemaStatement::DropConstraint { name, if_exists } => {
1302 let _ = if_exists;
1303 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1304 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1305 }
1306 SchemaStatement::CreateGraphType(stmt) => {
1307 use crate::catalog::GraphTypeDefinition;
1308 use grafeo_adapters::query::gql::ast::InlineElementType;
1309
1310 let effective_name = self.effective_type_key(&stmt.name);
1311
1312 let (mut node_types, mut edge_types, open) =
1314 if let Some(ref like_graph) = stmt.like_graph {
1315 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1317 if let Some(existing) = self
1318 .catalog
1319 .schema()
1320 .and_then(|s| s.get_graph_type(&type_name))
1321 {
1322 (
1323 existing.allowed_node_types.clone(),
1324 existing.allowed_edge_types.clone(),
1325 existing.open,
1326 )
1327 } else {
1328 (Vec::new(), Vec::new(), true)
1329 }
1330 } else {
1331 let nt = self.catalog.all_node_type_names();
1333 let et = self.catalog.all_edge_type_names();
1334 if nt.is_empty() && et.is_empty() {
1335 (Vec::new(), Vec::new(), true)
1336 } else {
1337 (nt, et, false)
1338 }
1339 }
1340 } else {
1341 let nt = stmt
1343 .node_types
1344 .iter()
1345 .map(|n| self.effective_type_key(n))
1346 .collect();
1347 let et = stmt
1348 .edge_types
1349 .iter()
1350 .map(|n| self.effective_type_key(n))
1351 .collect();
1352 (nt, et, stmt.open)
1353 };
1354
1355 for inline in &stmt.inline_types {
1357 match inline {
1358 InlineElementType::Node {
1359 name,
1360 properties,
1361 key_labels,
1362 ..
1363 } => {
1364 let inline_effective = self.effective_type_key(name);
1365 let def = NodeTypeDefinition {
1366 name: inline_effective.clone(),
1367 properties: properties
1368 .iter()
1369 .map(|p| TypedProperty {
1370 name: p.name.clone(),
1371 data_type: PropertyDataType::from_type_name(&p.data_type),
1372 nullable: p.nullable,
1373 default_value: None,
1374 })
1375 .collect(),
1376 constraints: Vec::new(),
1377 parent_types: key_labels.clone(),
1378 };
1379 self.catalog.register_or_replace_node_type(def);
1381 #[cfg(feature = "wal")]
1382 {
1383 let props_for_wal: Vec<(String, String, bool)> = properties
1384 .iter()
1385 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1386 .collect();
1387 self.log_schema_wal(&WalRecord::CreateNodeType {
1388 name: inline_effective.clone(),
1389 properties: props_for_wal,
1390 constraints: Vec::new(),
1391 });
1392 }
1393 if !node_types.contains(&inline_effective) {
1394 node_types.push(inline_effective);
1395 }
1396 }
1397 InlineElementType::Edge {
1398 name,
1399 properties,
1400 source_node_types,
1401 target_node_types,
1402 ..
1403 } => {
1404 let inline_effective = self.effective_type_key(name);
1405 let def = EdgeTypeDefinition {
1406 name: inline_effective.clone(),
1407 properties: properties
1408 .iter()
1409 .map(|p| TypedProperty {
1410 name: p.name.clone(),
1411 data_type: PropertyDataType::from_type_name(&p.data_type),
1412 nullable: p.nullable,
1413 default_value: None,
1414 })
1415 .collect(),
1416 constraints: Vec::new(),
1417 source_node_types: source_node_types.clone(),
1418 target_node_types: target_node_types.clone(),
1419 };
1420 self.catalog.register_or_replace_edge_type_def(def);
1421 #[cfg(feature = "wal")]
1422 {
1423 let props_for_wal: Vec<(String, String, bool)> = properties
1424 .iter()
1425 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1426 .collect();
1427 self.log_schema_wal(&WalRecord::CreateEdgeType {
1428 name: inline_effective.clone(),
1429 properties: props_for_wal,
1430 constraints: Vec::new(),
1431 });
1432 }
1433 if !edge_types.contains(&inline_effective) {
1434 edge_types.push(inline_effective);
1435 }
1436 }
1437 }
1438 }
1439
1440 let def = GraphTypeDefinition {
1441 name: effective_name.clone(),
1442 allowed_node_types: node_types.clone(),
1443 allowed_edge_types: edge_types.clone(),
1444 open,
1445 };
1446 let result = if stmt.or_replace {
1447 let _ = self.catalog.drop_graph_type(&effective_name);
1449 self.catalog.register_graph_type(def)
1450 } else {
1451 self.catalog.register_graph_type(def)
1452 };
1453 match result {
1454 Ok(()) => {
1455 wal_log!(
1456 self,
1457 WalRecord::CreateGraphType {
1458 name: effective_name.clone(),
1459 node_types,
1460 edge_types,
1461 open,
1462 }
1463 );
1464 Ok(QueryResult::status(format!(
1465 "Created graph type '{}'",
1466 stmt.name
1467 )))
1468 }
1469 Err(e) if stmt.if_not_exists => {
1470 let _ = e;
1471 Ok(QueryResult::status("No change"))
1472 }
1473 Err(e) => Err(Error::Query(QueryError::new(
1474 QueryErrorKind::Semantic,
1475 e.to_string(),
1476 ))),
1477 }
1478 }
1479 SchemaStatement::DropGraphType { name, if_exists } => {
1480 let effective_name = self.effective_type_key(&name);
1481 match self.catalog.drop_graph_type(&effective_name) {
1482 Ok(()) => {
1483 wal_log!(
1484 self,
1485 WalRecord::DropGraphType {
1486 name: effective_name
1487 }
1488 );
1489 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1490 }
1491 Err(e) if if_exists => {
1492 let _ = e;
1493 Ok(QueryResult::status("No change"))
1494 }
1495 Err(e) => Err(Error::Query(QueryError::new(
1496 QueryErrorKind::Semantic,
1497 e.to_string(),
1498 ))),
1499 }
1500 }
1501 SchemaStatement::CreateSchema {
1502 name,
1503 if_not_exists,
1504 } => match self.catalog.register_schema_namespace(name.clone()) {
1505 Ok(()) => {
1506 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1507 let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1510 if self.store.create_graph(&default_key).unwrap_or(false) {
1511 wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1512 }
1513 Ok(QueryResult::status(format!("Created schema '{name}'")))
1514 }
1515 Err(e) if if_not_exists => {
1516 let _ = e;
1517 Ok(QueryResult::status("No change"))
1518 }
1519 Err(e) => Err(Error::Query(QueryError::new(
1520 QueryErrorKind::Semantic,
1521 e.to_string(),
1522 ))),
1523 },
1524 SchemaStatement::DropSchema { name, if_exists } => {
1525 let prefix = format!("{name}/");
1528 let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1529 let has_graphs = self
1530 .store
1531 .graph_names()
1532 .iter()
1533 .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1534 let has_types = self
1535 .catalog
1536 .all_node_type_names()
1537 .iter()
1538 .any(|n| n.starts_with(&prefix))
1539 || self
1540 .catalog
1541 .all_edge_type_names()
1542 .iter()
1543 .any(|n| n.starts_with(&prefix))
1544 || self
1545 .catalog
1546 .all_graph_type_names()
1547 .iter()
1548 .any(|n| n.starts_with(&prefix));
1549 if has_graphs || has_types {
1550 return Err(Error::Query(QueryError::new(
1551 QueryErrorKind::Semantic,
1552 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1553 )));
1554 }
1555 match self.catalog.drop_schema_namespace(&name) {
1556 Ok(()) => {
1557 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1558 if self.store.drop_graph(&default_graph_key) {
1560 wal_log!(
1561 self,
1562 WalRecord::DropNamedGraph {
1563 name: default_graph_key,
1564 }
1565 );
1566 }
1567 let mut current = self.current_schema.lock();
1569 if current
1570 .as_deref()
1571 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1572 {
1573 *current = None;
1574 }
1575 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1576 }
1577 Err(e) if if_exists => {
1578 let _ = e;
1579 Ok(QueryResult::status("No change"))
1580 }
1581 Err(e) => Err(Error::Query(QueryError::new(
1582 QueryErrorKind::Semantic,
1583 e.to_string(),
1584 ))),
1585 }
1586 }
1587 SchemaStatement::AlterNodeType(stmt) => {
1588 use grafeo_adapters::query::gql::ast::TypeAlteration;
1589 let effective_name = self.effective_type_key(&stmt.name);
1590 let mut wal_alts = Vec::new();
1591 for alt in &stmt.alterations {
1592 match alt {
1593 TypeAlteration::AddProperty(prop) => {
1594 let typed = TypedProperty {
1595 name: prop.name.clone(),
1596 data_type: PropertyDataType::from_type_name(&prop.data_type),
1597 nullable: prop.nullable,
1598 default_value: prop
1599 .default_value
1600 .as_ref()
1601 .map(|s| parse_default_literal(s)),
1602 };
1603 self.catalog
1604 .alter_node_type_add_property(&effective_name, typed)
1605 .map_err(|e| {
1606 Error::Query(QueryError::new(
1607 QueryErrorKind::Semantic,
1608 e.to_string(),
1609 ))
1610 })?;
1611 wal_alts.push((
1612 "add".to_string(),
1613 prop.name.clone(),
1614 prop.data_type.clone(),
1615 prop.nullable,
1616 ));
1617 }
1618 TypeAlteration::DropProperty(name) => {
1619 self.catalog
1620 .alter_node_type_drop_property(&effective_name, name)
1621 .map_err(|e| {
1622 Error::Query(QueryError::new(
1623 QueryErrorKind::Semantic,
1624 e.to_string(),
1625 ))
1626 })?;
1627 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1628 }
1629 }
1630 }
1631 wal_log!(
1632 self,
1633 WalRecord::AlterNodeType {
1634 name: effective_name,
1635 alterations: wal_alts,
1636 }
1637 );
1638 Ok(QueryResult::status(format!(
1639 "Altered node type '{}'",
1640 stmt.name
1641 )))
1642 }
1643 SchemaStatement::AlterEdgeType(stmt) => {
1644 use grafeo_adapters::query::gql::ast::TypeAlteration;
1645 let effective_name = self.effective_type_key(&stmt.name);
1646 let mut wal_alts = Vec::new();
1647 for alt in &stmt.alterations {
1648 match alt {
1649 TypeAlteration::AddProperty(prop) => {
1650 let typed = TypedProperty {
1651 name: prop.name.clone(),
1652 data_type: PropertyDataType::from_type_name(&prop.data_type),
1653 nullable: prop.nullable,
1654 default_value: prop
1655 .default_value
1656 .as_ref()
1657 .map(|s| parse_default_literal(s)),
1658 };
1659 self.catalog
1660 .alter_edge_type_add_property(&effective_name, typed)
1661 .map_err(|e| {
1662 Error::Query(QueryError::new(
1663 QueryErrorKind::Semantic,
1664 e.to_string(),
1665 ))
1666 })?;
1667 wal_alts.push((
1668 "add".to_string(),
1669 prop.name.clone(),
1670 prop.data_type.clone(),
1671 prop.nullable,
1672 ));
1673 }
1674 TypeAlteration::DropProperty(name) => {
1675 self.catalog
1676 .alter_edge_type_drop_property(&effective_name, name)
1677 .map_err(|e| {
1678 Error::Query(QueryError::new(
1679 QueryErrorKind::Semantic,
1680 e.to_string(),
1681 ))
1682 })?;
1683 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1684 }
1685 }
1686 }
1687 wal_log!(
1688 self,
1689 WalRecord::AlterEdgeType {
1690 name: effective_name,
1691 alterations: wal_alts,
1692 }
1693 );
1694 Ok(QueryResult::status(format!(
1695 "Altered edge type '{}'",
1696 stmt.name
1697 )))
1698 }
1699 SchemaStatement::AlterGraphType(stmt) => {
1700 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1701 let effective_name = self.effective_type_key(&stmt.name);
1702 let mut wal_alts = Vec::new();
1703 for alt in &stmt.alterations {
1704 match alt {
1705 GraphTypeAlteration::AddNodeType(name) => {
1706 self.catalog
1707 .alter_graph_type_add_node_type(&effective_name, name.clone())
1708 .map_err(|e| {
1709 Error::Query(QueryError::new(
1710 QueryErrorKind::Semantic,
1711 e.to_string(),
1712 ))
1713 })?;
1714 wal_alts.push(("add_node_type".to_string(), name.clone()));
1715 }
1716 GraphTypeAlteration::DropNodeType(name) => {
1717 self.catalog
1718 .alter_graph_type_drop_node_type(&effective_name, name)
1719 .map_err(|e| {
1720 Error::Query(QueryError::new(
1721 QueryErrorKind::Semantic,
1722 e.to_string(),
1723 ))
1724 })?;
1725 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1726 }
1727 GraphTypeAlteration::AddEdgeType(name) => {
1728 self.catalog
1729 .alter_graph_type_add_edge_type(&effective_name, name.clone())
1730 .map_err(|e| {
1731 Error::Query(QueryError::new(
1732 QueryErrorKind::Semantic,
1733 e.to_string(),
1734 ))
1735 })?;
1736 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1737 }
1738 GraphTypeAlteration::DropEdgeType(name) => {
1739 self.catalog
1740 .alter_graph_type_drop_edge_type(&effective_name, name)
1741 .map_err(|e| {
1742 Error::Query(QueryError::new(
1743 QueryErrorKind::Semantic,
1744 e.to_string(),
1745 ))
1746 })?;
1747 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1748 }
1749 }
1750 }
1751 wal_log!(
1752 self,
1753 WalRecord::AlterGraphType {
1754 name: effective_name,
1755 alterations: wal_alts,
1756 }
1757 );
1758 Ok(QueryResult::status(format!(
1759 "Altered graph type '{}'",
1760 stmt.name
1761 )))
1762 }
1763 SchemaStatement::CreateProcedure(stmt) => {
1764 use crate::catalog::ProcedureDefinition;
1765
1766 let def = ProcedureDefinition {
1767 name: stmt.name.clone(),
1768 params: stmt
1769 .params
1770 .iter()
1771 .map(|p| (p.name.clone(), p.param_type.clone()))
1772 .collect(),
1773 returns: stmt
1774 .returns
1775 .iter()
1776 .map(|r| (r.name.clone(), r.return_type.clone()))
1777 .collect(),
1778 body: stmt.body.clone(),
1779 };
1780
1781 if stmt.or_replace {
1782 self.catalog.replace_procedure(def).map_err(|e| {
1783 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1784 })?;
1785 } else {
1786 match self.catalog.register_procedure(def) {
1787 Ok(()) => {}
1788 Err(_) if stmt.if_not_exists => {
1789 return Ok(QueryResult::empty());
1790 }
1791 Err(e) => {
1792 return Err(Error::Query(QueryError::new(
1793 QueryErrorKind::Semantic,
1794 e.to_string(),
1795 )));
1796 }
1797 }
1798 }
1799
1800 wal_log!(
1801 self,
1802 WalRecord::CreateProcedure {
1803 name: stmt.name.clone(),
1804 params: stmt
1805 .params
1806 .iter()
1807 .map(|p| (p.name.clone(), p.param_type.clone()))
1808 .collect(),
1809 returns: stmt
1810 .returns
1811 .iter()
1812 .map(|r| (r.name.clone(), r.return_type.clone()))
1813 .collect(),
1814 body: stmt.body,
1815 }
1816 );
1817 Ok(QueryResult::status(format!(
1818 "Created procedure '{}'",
1819 stmt.name
1820 )))
1821 }
1822 SchemaStatement::DropProcedure { name, if_exists } => {
1823 match self.catalog.drop_procedure(&name) {
1824 Ok(()) => {}
1825 Err(_) if if_exists => {
1826 return Ok(QueryResult::empty());
1827 }
1828 Err(e) => {
1829 return Err(Error::Query(QueryError::new(
1830 QueryErrorKind::Semantic,
1831 e.to_string(),
1832 )));
1833 }
1834 }
1835 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1836 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1837 }
1838 SchemaStatement::ShowIndexes => {
1839 return self.execute_show_indexes();
1840 }
1841 SchemaStatement::ShowConstraints => {
1842 return self.execute_show_constraints();
1843 }
1844 SchemaStatement::ShowNodeTypes => {
1845 return self.execute_show_node_types();
1846 }
1847 SchemaStatement::ShowEdgeTypes => {
1848 return self.execute_show_edge_types();
1849 }
1850 SchemaStatement::ShowGraphTypes => {
1851 return self.execute_show_graph_types();
1852 }
1853 SchemaStatement::ShowGraphType(name) => {
1854 return self.execute_show_graph_type(&name);
1855 }
1856 SchemaStatement::ShowCurrentGraphType => {
1857 return self.execute_show_current_graph_type();
1858 }
1859 SchemaStatement::ShowGraphs => {
1860 return self.execute_show_graphs();
1861 }
1862 SchemaStatement::ShowSchemas => {
1863 return self.execute_show_schemas();
1864 }
1865 };
1866
1867 if result.is_ok() {
1870 self.query_cache.clear();
1871 }
1872
1873 result
1874 }
1875
1876 #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
1878 fn create_vector_index_on_store(
1879 store: &LpgStore,
1880 label: &str,
1881 property: &str,
1882 dimensions: Option<usize>,
1883 metric: Option<&str>,
1884 ) -> Result<()> {
1885 use grafeo_common::types::{PropertyKey, Value};
1886 use grafeo_common::utils::error::Error;
1887 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1888
1889 let metric = match metric {
1890 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1891 Error::Internal(format!(
1892 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1893 ))
1894 })?,
1895 None => DistanceMetric::Cosine,
1896 };
1897
1898 let prop_key = PropertyKey::new(property);
1899 let mut found_dims: Option<usize> = dimensions;
1900 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1901
1902 for node in store.nodes_with_label(label) {
1903 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1904 if let Some(expected) = found_dims {
1905 if v.len() != expected {
1906 return Err(Error::Internal(format!(
1907 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1908 v.len(),
1909 node.id.0
1910 )));
1911 }
1912 } else {
1913 found_dims = Some(v.len());
1914 }
1915 vectors.push((node.id, v.to_vec()));
1916 }
1917 }
1918
1919 let Some(dims) = found_dims else {
1920 return Err(Error::Internal(format!(
1921 "No vector properties found on :{label}({property}) and no dimensions specified"
1922 )));
1923 };
1924
1925 let config = HnswConfig::new(dims, metric);
1926 let index = HnswIndex::with_capacity(config, vectors.len());
1927 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1928 for (node_id, vec) in &vectors {
1929 index.insert(*node_id, vec, &accessor);
1930 }
1931
1932 store.add_vector_index(label, property, Arc::new(index));
1933 Ok(())
1934 }
1935
1936 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
1938 fn create_vector_index_on_store(
1939 _store: &LpgStore,
1940 _label: &str,
1941 _property: &str,
1942 _dimensions: Option<usize>,
1943 _metric: Option<&str>,
1944 ) -> Result<()> {
1945 Err(grafeo_common::utils::error::Error::Internal(
1946 "Vector index support requires the 'vector-index' feature".to_string(),
1947 ))
1948 }
1949
1950 #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
1952 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1953 use grafeo_common::types::{PropertyKey, Value};
1954 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1955
1956 let mut index = InvertedIndex::new(BM25Config::default());
1957 let prop_key = PropertyKey::new(property);
1958
1959 let nodes = store.nodes_by_label(label);
1960 for node_id in nodes {
1961 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1962 index.insert(node_id, text.as_str());
1963 }
1964 }
1965
1966 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1967 Ok(())
1968 }
1969
1970 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
1972 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1973 Err(grafeo_common::utils::error::Error::Internal(
1974 "Text index support requires the 'text-index' feature".to_string(),
1975 ))
1976 }
1977
1978 fn execute_show_indexes(&self) -> Result<QueryResult> {
1980 let indexes = self.catalog.all_indexes();
1981 let columns = vec![
1982 "name".to_string(),
1983 "type".to_string(),
1984 "label".to_string(),
1985 "property".to_string(),
1986 ];
1987 let rows: Vec<Vec<Value>> = indexes
1988 .into_iter()
1989 .map(|def| {
1990 let label_name = self
1991 .catalog
1992 .get_label_name(def.label)
1993 .unwrap_or_else(|| "?".into());
1994 let prop_name = self
1995 .catalog
1996 .get_property_key_name(def.property_key)
1997 .unwrap_or_else(|| "?".into());
1998 vec![
1999 Value::from(def.name),
2000 Value::from(format!("{:?}", def.index_type)),
2001 Value::from(&*label_name),
2002 Value::from(&*prop_name),
2003 ]
2004 })
2005 .collect();
2006 Ok(QueryResult {
2007 columns,
2008 column_types: Vec::new(),
2009 rows,
2010 ..QueryResult::empty()
2011 })
2012 }
2013
2014 fn execute_show_constraints(&self) -> Result<QueryResult> {
2016 Ok(QueryResult {
2019 columns: vec![
2020 "name".to_string(),
2021 "type".to_string(),
2022 "label".to_string(),
2023 "properties".to_string(),
2024 ],
2025 column_types: Vec::new(),
2026 rows: Vec::new(),
2027 ..QueryResult::empty()
2028 })
2029 }
2030
2031 fn execute_show_node_types(&self) -> Result<QueryResult> {
2033 let columns = vec![
2034 "name".to_string(),
2035 "properties".to_string(),
2036 "constraints".to_string(),
2037 "parents".to_string(),
2038 ];
2039 let schema = self.current_schema.lock().clone();
2040 let all_names = self.catalog.all_node_type_names();
2041 let type_names: Vec<String> = match &schema {
2042 Some(s) => {
2043 let prefix = format!("{s}/");
2044 all_names
2045 .into_iter()
2046 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2047 .collect()
2048 }
2049 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2050 };
2051 let rows: Vec<Vec<Value>> = type_names
2052 .into_iter()
2053 .filter_map(|name| {
2054 let lookup = match &schema {
2055 Some(s) => format!("{s}/{name}"),
2056 None => name.clone(),
2057 };
2058 let def = self.catalog.get_node_type(&lookup)?;
2059 let props: Vec<String> = def
2060 .properties
2061 .iter()
2062 .map(|p| {
2063 let nullable = if p.nullable { "" } else { " NOT NULL" };
2064 format!("{} {}{}", p.name, p.data_type, nullable)
2065 })
2066 .collect();
2067 let constraints: Vec<String> =
2068 def.constraints.iter().map(|c| format!("{c:?}")).collect();
2069 let parents = def.parent_types.join(", ");
2070 Some(vec![
2071 Value::from(name),
2072 Value::from(props.join(", ")),
2073 Value::from(constraints.join(", ")),
2074 Value::from(parents),
2075 ])
2076 })
2077 .collect();
2078 Ok(QueryResult {
2079 columns,
2080 column_types: Vec::new(),
2081 rows,
2082 ..QueryResult::empty()
2083 })
2084 }
2085
2086 fn execute_show_edge_types(&self) -> Result<QueryResult> {
2088 let columns = vec![
2089 "name".to_string(),
2090 "properties".to_string(),
2091 "source_types".to_string(),
2092 "target_types".to_string(),
2093 ];
2094 let schema = self.current_schema.lock().clone();
2095 let all_names = self.catalog.all_edge_type_names();
2096 let type_names: Vec<String> = match &schema {
2097 Some(s) => {
2098 let prefix = format!("{s}/");
2099 all_names
2100 .into_iter()
2101 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2102 .collect()
2103 }
2104 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2105 };
2106 let rows: Vec<Vec<Value>> = type_names
2107 .into_iter()
2108 .filter_map(|name| {
2109 let lookup = match &schema {
2110 Some(s) => format!("{s}/{name}"),
2111 None => name.clone(),
2112 };
2113 let def = self.catalog.get_edge_type_def(&lookup)?;
2114 let props: Vec<String> = def
2115 .properties
2116 .iter()
2117 .map(|p| {
2118 let nullable = if p.nullable { "" } else { " NOT NULL" };
2119 format!("{} {}{}", p.name, p.data_type, nullable)
2120 })
2121 .collect();
2122 let src = def.source_node_types.join(", ");
2123 let tgt = def.target_node_types.join(", ");
2124 Some(vec![
2125 Value::from(name),
2126 Value::from(props.join(", ")),
2127 Value::from(src),
2128 Value::from(tgt),
2129 ])
2130 })
2131 .collect();
2132 Ok(QueryResult {
2133 columns,
2134 column_types: Vec::new(),
2135 rows,
2136 ..QueryResult::empty()
2137 })
2138 }
2139
2140 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2142 let columns = vec![
2143 "name".to_string(),
2144 "open".to_string(),
2145 "node_types".to_string(),
2146 "edge_types".to_string(),
2147 ];
2148 let schema = self.current_schema.lock().clone();
2149 let all_names = self.catalog.all_graph_type_names();
2150 let type_names: Vec<String> = match &schema {
2151 Some(s) => {
2152 let prefix = format!("{s}/");
2153 all_names
2154 .into_iter()
2155 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2156 .collect()
2157 }
2158 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2159 };
2160 let rows: Vec<Vec<Value>> = type_names
2161 .into_iter()
2162 .filter_map(|name| {
2163 let lookup = match &schema {
2164 Some(s) => format!("{s}/{name}"),
2165 None => name.clone(),
2166 };
2167 let def = self.catalog.get_graph_type_def(&lookup)?;
2168 let strip = |n: &String| -> String {
2170 match &schema {
2171 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2172 None => n.clone(),
2173 }
2174 };
2175 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2176 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2177 Some(vec![
2178 Value::from(name),
2179 Value::from(def.open),
2180 Value::from(node_types.join(", ")),
2181 Value::from(edge_types.join(", ")),
2182 ])
2183 })
2184 .collect();
2185 Ok(QueryResult {
2186 columns,
2187 column_types: Vec::new(),
2188 rows,
2189 ..QueryResult::empty()
2190 })
2191 }
2192
2193 #[cfg(feature = "lpg")]
2199 fn execute_show_graphs(&self) -> Result<QueryResult> {
2200 let schema = self.current_schema.lock().clone();
2201 let all_names = self.store.graph_names();
2202
2203 let mut names: Vec<String> = match &schema {
2204 Some(s) => {
2205 let prefix = format!("{s}/");
2206 all_names
2207 .into_iter()
2208 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2209 .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2210 .collect()
2211 }
2212 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2213 };
2214 names.sort();
2215
2216 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2217 Ok(QueryResult {
2218 columns: vec!["name".to_string()],
2219 column_types: Vec::new(),
2220 rows,
2221 ..QueryResult::empty()
2222 })
2223 }
2224
2225 fn execute_show_schemas(&self) -> Result<QueryResult> {
2227 let mut names = self.catalog.schema_names();
2228 names.sort();
2229 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2230 Ok(QueryResult {
2231 columns: vec!["name".to_string()],
2232 column_types: Vec::new(),
2233 rows,
2234 ..QueryResult::empty()
2235 })
2236 }
2237
2238 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2240 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2241
2242 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2243 Error::Query(QueryError::new(
2244 QueryErrorKind::Semantic,
2245 format!("Graph type '{name}' not found"),
2246 ))
2247 })?;
2248
2249 let columns = vec![
2250 "name".to_string(),
2251 "open".to_string(),
2252 "node_types".to_string(),
2253 "edge_types".to_string(),
2254 ];
2255 let rows = vec![vec![
2256 Value::from(def.name),
2257 Value::from(def.open),
2258 Value::from(def.allowed_node_types.join(", ")),
2259 Value::from(def.allowed_edge_types.join(", ")),
2260 ]];
2261 Ok(QueryResult {
2262 columns,
2263 column_types: Vec::new(),
2264 rows,
2265 ..QueryResult::empty()
2266 })
2267 }
2268
2269 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2271 let graph_name = self
2272 .current_graph()
2273 .unwrap_or_else(|| "default".to_string());
2274 let columns = vec![
2275 "graph".to_string(),
2276 "graph_type".to_string(),
2277 "open".to_string(),
2278 "node_types".to_string(),
2279 "edge_types".to_string(),
2280 ];
2281
2282 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2283 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2284 {
2285 let rows = vec![vec![
2286 Value::from(graph_name),
2287 Value::from(type_name),
2288 Value::from(def.open),
2289 Value::from(def.allowed_node_types.join(", ")),
2290 Value::from(def.allowed_edge_types.join(", ")),
2291 ]];
2292 return Ok(QueryResult {
2293 columns,
2294 column_types: Vec::new(),
2295 rows,
2296 ..QueryResult::empty()
2297 });
2298 }
2299
2300 Ok(QueryResult {
2302 columns,
2303 column_types: Vec::new(),
2304 rows: vec![vec![
2305 Value::from(graph_name),
2306 Value::Null,
2307 Value::Null,
2308 Value::Null,
2309 Value::Null,
2310 ]],
2311 ..QueryResult::empty()
2312 })
2313 }
2314
2315 #[cfg(feature = "gql")]
2342 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2343 self.require_lpg("GQL")?;
2344
2345 use crate::query::{
2346 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2347 processor::QueryLanguage, translators::gql,
2348 };
2349
2350 let _span = grafeo_info_span!(
2351 "grafeo::session::execute",
2352 language = "gql",
2353 query_len = query.len(),
2354 );
2355
2356 #[cfg(not(target_arch = "wasm32"))]
2357 let start_time = std::time::Instant::now();
2358
2359 let translation = gql::translate_full(query)?;
2361 let logical_plan = match translation {
2362 gql::GqlTranslationResult::SessionCommand(cmd) => {
2363 return self.execute_session_command(cmd);
2364 }
2365 #[cfg(feature = "lpg")]
2366 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2367 if *self.read_only_tx.lock() {
2369 return Err(grafeo_common::utils::error::Error::Transaction(
2370 grafeo_common::utils::error::TransactionError::ReadOnly,
2371 ));
2372 }
2373 return self.execute_schema_command(cmd);
2374 }
2375 gql::GqlTranslationResult::Plan(plan) => {
2376 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2378 return Err(grafeo_common::utils::error::Error::Transaction(
2379 grafeo_common::utils::error::TransactionError::ReadOnly,
2380 ));
2381 }
2382 plan
2383 }
2384 #[cfg(not(feature = "lpg"))]
2385 gql::GqlTranslationResult::SchemaCommand(_) => {
2386 return Err(grafeo_common::utils::error::Error::Internal(
2387 "Schema commands require the `lpg` feature".to_string(),
2388 ));
2389 }
2390 };
2391
2392 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2394
2395 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2397 cached_plan
2398 } else {
2399 let mut binder = Binder::new();
2401 let _binding_context = binder.bind(&logical_plan)?;
2402
2403 let active = self.active_store();
2405 let optimizer = Optimizer::from_graph_store(&*active);
2406 let plan = optimizer.optimize(logical_plan)?;
2407
2408 self.query_cache.put_optimized(cache_key, plan.clone());
2410
2411 plan
2412 };
2413
2414 let active = self.active_store();
2416
2417 if optimized_plan.explain {
2419 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2420 let mut plan = optimized_plan;
2421 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2422 return Ok(explain_result(&plan));
2423 }
2424
2425 if optimized_plan.profile {
2427 let has_mutations = optimized_plan.root.has_mutations();
2428 return self.with_auto_commit(has_mutations, || {
2429 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2430 let planner = self.create_planner_for_store(
2431 Arc::clone(&active),
2432 viewing_epoch,
2433 transaction_id,
2434 );
2435 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2436
2437 let executor = Executor::with_columns(physical_plan.columns.clone())
2438 .with_deadline(self.query_deadline());
2439 let _result = executor.execute(physical_plan.operator.as_mut())?;
2440
2441 let total_time_ms;
2442 #[cfg(not(target_arch = "wasm32"))]
2443 {
2444 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2445 }
2446 #[cfg(target_arch = "wasm32")]
2447 {
2448 total_time_ms = 0.0;
2449 }
2450
2451 let profile_tree = crate::query::profile::build_profile_tree(
2452 &optimized_plan.root,
2453 &mut entries.into_iter(),
2454 );
2455 Ok(crate::query::profile::profile_result(
2456 &profile_tree,
2457 total_time_ms,
2458 ))
2459 });
2460 }
2461
2462 let has_mutations = optimized_plan.root.has_mutations();
2463
2464 let result = self.with_auto_commit(has_mutations, || {
2465 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2467
2468 let has_active_tx = self.current_transaction.lock().is_some();
2473 let read_only = !has_mutations && !has_active_tx;
2474 let planner = self.create_planner_for_store_with_read_only(
2475 Arc::clone(&active),
2476 viewing_epoch,
2477 transaction_id,
2478 read_only,
2479 );
2480 let mut physical_plan = planner.plan(&optimized_plan)?;
2481
2482 let executor = Executor::with_columns(physical_plan.columns.clone())
2484 .with_deadline(self.query_deadline());
2485 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2486
2487 let rows_scanned = result.rows.len() as u64;
2489 #[cfg(not(target_arch = "wasm32"))]
2490 {
2491 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2492 result.execution_time_ms = Some(elapsed_ms);
2493 }
2494 result.rows_scanned = Some(rows_scanned);
2495
2496 Ok(result)
2497 });
2498
2499 #[cfg(feature = "metrics")]
2501 {
2502 #[cfg(not(target_arch = "wasm32"))]
2503 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2504 #[cfg(target_arch = "wasm32")]
2505 let elapsed_ms = None;
2506 self.record_query_metrics("gql", elapsed_ms, &result);
2507 }
2508
2509 result
2510 }
2511
2512 #[cfg(feature = "gql")]
2521 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2522 let previous = self.viewing_epoch_override.lock().replace(epoch);
2523 let result = self.execute(query);
2524 *self.viewing_epoch_override.lock() = previous;
2525 result
2526 }
2527
2528 #[cfg(feature = "gql")]
2536 pub fn execute_at_epoch_with_params(
2537 &self,
2538 query: &str,
2539 epoch: EpochId,
2540 params: Option<std::collections::HashMap<String, Value>>,
2541 ) -> Result<QueryResult> {
2542 let previous = self.viewing_epoch_override.lock().replace(epoch);
2543 let result = if let Some(p) = params {
2544 self.execute_with_params(query, p)
2545 } else {
2546 self.execute(query)
2547 };
2548 *self.viewing_epoch_override.lock() = previous;
2549 result
2550 }
2551
2552 #[cfg(feature = "gql")]
2558 pub fn execute_with_params(
2559 &self,
2560 query: &str,
2561 params: std::collections::HashMap<String, Value>,
2562 ) -> Result<QueryResult> {
2563 self.require_lpg("GQL")?;
2564
2565 use crate::query::processor::{QueryLanguage, QueryProcessor};
2566
2567 let has_mutations = Self::query_looks_like_mutation(query);
2568 let active = self.active_store();
2569
2570 self.with_auto_commit(has_mutations, || {
2571 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2573
2574 let processor = QueryProcessor::for_stores_with_transaction(
2576 Arc::clone(&active),
2577 self.active_write_store(),
2578 Arc::clone(&self.transaction_manager),
2579 )?;
2580
2581 let processor = if let Some(transaction_id) = transaction_id {
2583 processor.with_transaction_context(viewing_epoch, transaction_id)
2584 } else {
2585 processor
2586 };
2587
2588 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2589 })
2590 }
2591
2592 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2598 pub fn execute_with_params(
2599 &self,
2600 _query: &str,
2601 _params: std::collections::HashMap<String, Value>,
2602 ) -> Result<QueryResult> {
2603 Err(grafeo_common::utils::error::Error::Internal(
2604 "No query language enabled".to_string(),
2605 ))
2606 }
2607
2608 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2614 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2615 Err(grafeo_common::utils::error::Error::Internal(
2616 "No query language enabled".to_string(),
2617 ))
2618 }
2619
2620 #[cfg(feature = "cypher")]
2626 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2627 use crate::query::{
2628 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2629 processor::QueryLanguage, translators::cypher,
2630 };
2631
2632 let translation = cypher::translate_full(query)?;
2634 match translation {
2635 #[cfg(feature = "lpg")]
2636 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2637 use grafeo_common::utils::error::{
2638 Error as GrafeoError, QueryError, QueryErrorKind,
2639 };
2640 if *self.read_only_tx.lock() {
2641 return Err(GrafeoError::Query(QueryError::new(
2642 QueryErrorKind::Semantic,
2643 "Cannot execute schema DDL in a read-only transaction",
2644 )));
2645 }
2646 return self.execute_schema_command(cmd);
2647 }
2648 #[cfg(not(feature = "lpg"))]
2649 cypher::CypherTranslationResult::SchemaCommand(_) => {
2650 return Err(grafeo_common::utils::error::Error::Internal(
2651 "Schema DDL requires the `lpg` feature".to_string(),
2652 ));
2653 }
2654 cypher::CypherTranslationResult::ShowIndexes => {
2655 return self.execute_show_indexes();
2656 }
2657 cypher::CypherTranslationResult::ShowConstraints => {
2658 return self.execute_show_constraints();
2659 }
2660 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2661 return self.execute_show_current_graph_type();
2662 }
2663 cypher::CypherTranslationResult::Plan(_) => {
2664 }
2666 }
2667
2668 #[cfg(not(target_arch = "wasm32"))]
2669 let start_time = std::time::Instant::now();
2670
2671 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2673
2674 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2676 cached_plan
2677 } else {
2678 let logical_plan = cypher::translate(query)?;
2680
2681 let mut binder = Binder::new();
2683 let _binding_context = binder.bind(&logical_plan)?;
2684
2685 let active = self.active_store();
2687 let optimizer = Optimizer::from_graph_store(&*active);
2688 let plan = optimizer.optimize(logical_plan)?;
2689
2690 self.query_cache.put_optimized(cache_key, plan.clone());
2692
2693 plan
2694 };
2695
2696 let active = self.active_store();
2698
2699 if optimized_plan.explain {
2701 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2702 let mut plan = optimized_plan;
2703 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2704 return Ok(explain_result(&plan));
2705 }
2706
2707 if optimized_plan.profile {
2709 let has_mutations = optimized_plan.root.has_mutations();
2710 return self.with_auto_commit(has_mutations, || {
2711 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2712 let planner = self.create_planner_for_store(
2713 Arc::clone(&active),
2714 viewing_epoch,
2715 transaction_id,
2716 );
2717 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2718
2719 let executor = Executor::with_columns(physical_plan.columns.clone())
2720 .with_deadline(self.query_deadline());
2721 let _result = executor.execute(physical_plan.operator.as_mut())?;
2722
2723 let total_time_ms;
2724 #[cfg(not(target_arch = "wasm32"))]
2725 {
2726 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2727 }
2728 #[cfg(target_arch = "wasm32")]
2729 {
2730 total_time_ms = 0.0;
2731 }
2732
2733 let profile_tree = crate::query::profile::build_profile_tree(
2734 &optimized_plan.root,
2735 &mut entries.into_iter(),
2736 );
2737 Ok(crate::query::profile::profile_result(
2738 &profile_tree,
2739 total_time_ms,
2740 ))
2741 });
2742 }
2743
2744 let has_mutations = optimized_plan.root.has_mutations();
2745
2746 let result = self.with_auto_commit(has_mutations, || {
2747 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2749
2750 let planner =
2752 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2753 let mut physical_plan = planner.plan(&optimized_plan)?;
2754
2755 let executor = Executor::with_columns(physical_plan.columns.clone())
2757 .with_deadline(self.query_deadline());
2758 executor.execute(physical_plan.operator.as_mut())
2759 });
2760
2761 #[cfg(feature = "metrics")]
2762 {
2763 #[cfg(not(target_arch = "wasm32"))]
2764 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2765 #[cfg(target_arch = "wasm32")]
2766 let elapsed_ms = None;
2767 self.record_query_metrics("cypher", elapsed_ms, &result);
2768 }
2769
2770 result
2771 }
2772
2773 #[cfg(feature = "gremlin")]
2797 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2798 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2799
2800 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2801 let start_time = Instant::now();
2802
2803 let logical_plan = gremlin::translate(query)?;
2805
2806 let mut binder = Binder::new();
2808 let _binding_context = binder.bind(&logical_plan)?;
2809
2810 let active = self.active_store();
2812 let optimizer = Optimizer::from_graph_store(&*active);
2813 let optimized_plan = optimizer.optimize(logical_plan)?;
2814
2815 let has_mutations = optimized_plan.root.has_mutations();
2816
2817 let result = self.with_auto_commit(has_mutations, || {
2818 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2820
2821 let planner =
2823 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2824 let mut physical_plan = planner.plan(&optimized_plan)?;
2825
2826 let executor = Executor::with_columns(physical_plan.columns.clone())
2828 .with_deadline(self.query_deadline());
2829 executor.execute(physical_plan.operator.as_mut())
2830 });
2831
2832 #[cfg(feature = "metrics")]
2833 {
2834 #[cfg(not(target_arch = "wasm32"))]
2835 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2836 #[cfg(target_arch = "wasm32")]
2837 let elapsed_ms = None;
2838 self.record_query_metrics("gremlin", elapsed_ms, &result);
2839 }
2840
2841 result
2842 }
2843
2844 #[cfg(feature = "gremlin")]
2850 pub fn execute_gremlin_with_params(
2851 &self,
2852 query: &str,
2853 params: std::collections::HashMap<String, Value>,
2854 ) -> Result<QueryResult> {
2855 use crate::query::processor::{QueryLanguage, QueryProcessor};
2856
2857 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2858 let start_time = Instant::now();
2859
2860 let has_mutations = Self::query_looks_like_mutation(query);
2861 let active = self.active_store();
2862
2863 let result = self.with_auto_commit(has_mutations, || {
2864 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2865 let processor = QueryProcessor::for_stores_with_transaction(
2866 Arc::clone(&active),
2867 self.active_write_store(),
2868 Arc::clone(&self.transaction_manager),
2869 )?;
2870 let processor = if let Some(transaction_id) = transaction_id {
2871 processor.with_transaction_context(viewing_epoch, transaction_id)
2872 } else {
2873 processor
2874 };
2875 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2876 });
2877
2878 #[cfg(feature = "metrics")]
2879 {
2880 #[cfg(not(target_arch = "wasm32"))]
2881 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2882 #[cfg(target_arch = "wasm32")]
2883 let elapsed_ms = None;
2884 self.record_query_metrics("gremlin", elapsed_ms, &result);
2885 }
2886
2887 result
2888 }
2889
2890 #[cfg(feature = "graphql")]
2914 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2915 use crate::query::{
2916 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
2917 translators::graphql,
2918 };
2919
2920 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2921 let start_time = Instant::now();
2922
2923 let mut logical_plan = graphql::translate(query)?;
2924
2925 if !logical_plan.default_params.is_empty() {
2927 let defaults = logical_plan.default_params.clone();
2928 substitute_params(&mut logical_plan, &defaults)?;
2929 }
2930
2931 let mut binder = Binder::new();
2932 let _binding_context = binder.bind(&logical_plan)?;
2933
2934 let active = self.active_store();
2935 let optimizer = Optimizer::from_graph_store(&*active);
2936 let optimized_plan = optimizer.optimize(logical_plan)?;
2937 let has_mutations = optimized_plan.root.has_mutations();
2938
2939 let result = self.with_auto_commit(has_mutations, || {
2940 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2941 let planner =
2942 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2943 let mut physical_plan = planner.plan(&optimized_plan)?;
2944 let executor = Executor::with_columns(physical_plan.columns.clone())
2945 .with_deadline(self.query_deadline());
2946 executor.execute(physical_plan.operator.as_mut())
2947 });
2948
2949 #[cfg(feature = "metrics")]
2950 {
2951 #[cfg(not(target_arch = "wasm32"))]
2952 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2953 #[cfg(target_arch = "wasm32")]
2954 let elapsed_ms = None;
2955 self.record_query_metrics("graphql", elapsed_ms, &result);
2956 }
2957
2958 result
2959 }
2960
2961 #[cfg(feature = "graphql")]
2967 pub fn execute_graphql_with_params(
2968 &self,
2969 query: &str,
2970 params: std::collections::HashMap<String, Value>,
2971 ) -> Result<QueryResult> {
2972 use crate::query::processor::{QueryLanguage, QueryProcessor};
2973
2974 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2975 let start_time = Instant::now();
2976
2977 let has_mutations = Self::query_looks_like_mutation(query);
2978 let active = self.active_store();
2979
2980 let result = self.with_auto_commit(has_mutations, || {
2981 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2982 let processor = QueryProcessor::for_stores_with_transaction(
2983 Arc::clone(&active),
2984 self.active_write_store(),
2985 Arc::clone(&self.transaction_manager),
2986 )?;
2987 let processor = if let Some(transaction_id) = transaction_id {
2988 processor.with_transaction_context(viewing_epoch, transaction_id)
2989 } else {
2990 processor
2991 };
2992 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2993 });
2994
2995 #[cfg(feature = "metrics")]
2996 {
2997 #[cfg(not(target_arch = "wasm32"))]
2998 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2999 #[cfg(target_arch = "wasm32")]
3000 let elapsed_ms = None;
3001 self.record_query_metrics("graphql", elapsed_ms, &result);
3002 }
3003
3004 result
3005 }
3006
3007 #[cfg(feature = "sql-pgq")]
3032 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3033 use crate::query::{
3034 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3035 processor::QueryLanguage, translators::sql_pgq,
3036 };
3037
3038 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3039 let start_time = Instant::now();
3040
3041 let logical_plan = sql_pgq::translate(query)?;
3043
3044 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3046 return Ok(QueryResult {
3047 columns: vec!["status".into()],
3048 column_types: vec![grafeo_common::types::LogicalType::String],
3049 rows: vec![vec![Value::from(format!(
3050 "Property graph '{}' created ({} node tables, {} edge tables)",
3051 cpg.name,
3052 cpg.node_tables.len(),
3053 cpg.edge_tables.len()
3054 ))]],
3055 execution_time_ms: None,
3056 rows_scanned: None,
3057 status_message: None,
3058 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3059 });
3060 }
3061
3062 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3063
3064 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3065 cached_plan
3066 } else {
3067 let mut binder = Binder::new();
3068 let _binding_context = binder.bind(&logical_plan)?;
3069 let active = self.active_store();
3070 let optimizer = Optimizer::from_graph_store(&*active);
3071 let plan = optimizer.optimize(logical_plan)?;
3072 self.query_cache.put_optimized(cache_key, plan.clone());
3073 plan
3074 };
3075
3076 let active = self.active_store();
3077 let has_mutations = optimized_plan.root.has_mutations();
3078
3079 let result = self.with_auto_commit(has_mutations, || {
3080 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3081 let planner =
3082 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3083 let mut physical_plan = planner.plan(&optimized_plan)?;
3084 let executor = Executor::with_columns(physical_plan.columns.clone())
3085 .with_deadline(self.query_deadline());
3086 executor.execute(physical_plan.operator.as_mut())
3087 });
3088
3089 #[cfg(feature = "metrics")]
3090 {
3091 #[cfg(not(target_arch = "wasm32"))]
3092 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3093 #[cfg(target_arch = "wasm32")]
3094 let elapsed_ms = None;
3095 self.record_query_metrics("sql", elapsed_ms, &result);
3096 }
3097
3098 result
3099 }
3100
3101 #[cfg(feature = "sql-pgq")]
3107 pub fn execute_sql_with_params(
3108 &self,
3109 query: &str,
3110 params: std::collections::HashMap<String, Value>,
3111 ) -> Result<QueryResult> {
3112 use crate::query::processor::{QueryLanguage, QueryProcessor};
3113
3114 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3115 let start_time = Instant::now();
3116
3117 let has_mutations = Self::query_looks_like_mutation(query);
3118 let active = self.active_store();
3119
3120 let result = self.with_auto_commit(has_mutations, || {
3121 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3122 let processor = QueryProcessor::for_stores_with_transaction(
3123 Arc::clone(&active),
3124 self.active_write_store(),
3125 Arc::clone(&self.transaction_manager),
3126 )?;
3127 let processor = if let Some(transaction_id) = transaction_id {
3128 processor.with_transaction_context(viewing_epoch, transaction_id)
3129 } else {
3130 processor
3131 };
3132 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3133 });
3134
3135 #[cfg(feature = "metrics")]
3136 {
3137 #[cfg(not(target_arch = "wasm32"))]
3138 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3139 #[cfg(target_arch = "wasm32")]
3140 let elapsed_ms = None;
3141 self.record_query_metrics("sql", elapsed_ms, &result);
3142 }
3143
3144 result
3145 }
3146
3147 pub fn execute_language(
3156 &self,
3157 query: &str,
3158 language: &str,
3159 params: Option<std::collections::HashMap<String, Value>>,
3160 ) -> Result<QueryResult> {
3161 let _span = grafeo_info_span!(
3162 "grafeo::session::execute",
3163 language,
3164 query_len = query.len(),
3165 );
3166 match language {
3167 "gql" => {
3168 if let Some(p) = params {
3169 self.execute_with_params(query, p)
3170 } else {
3171 self.execute(query)
3172 }
3173 }
3174 #[cfg(feature = "cypher")]
3175 "cypher" => {
3176 if let Some(p) = params {
3177 use crate::query::processor::{QueryLanguage, QueryProcessor};
3178
3179 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3180 let start_time = Instant::now();
3181
3182 let has_mutations = Self::query_looks_like_mutation(query);
3183 let active = self.active_store();
3184 let result = self.with_auto_commit(has_mutations, || {
3185 let processor = QueryProcessor::for_stores_with_transaction(
3186 Arc::clone(&active),
3187 self.active_write_store(),
3188 Arc::clone(&self.transaction_manager),
3189 )?;
3190 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3191 let processor = if let Some(transaction_id) = transaction_id {
3192 processor.with_transaction_context(viewing_epoch, transaction_id)
3193 } else {
3194 processor
3195 };
3196 processor.process(query, QueryLanguage::Cypher, Some(&p))
3197 });
3198
3199 #[cfg(feature = "metrics")]
3200 {
3201 #[cfg(not(target_arch = "wasm32"))]
3202 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3203 #[cfg(target_arch = "wasm32")]
3204 let elapsed_ms = None;
3205 self.record_query_metrics("cypher", elapsed_ms, &result);
3206 }
3207
3208 result
3209 } else {
3210 self.execute_cypher(query)
3211 }
3212 }
3213 #[cfg(feature = "gremlin")]
3214 "gremlin" => {
3215 if let Some(p) = params {
3216 self.execute_gremlin_with_params(query, p)
3217 } else {
3218 self.execute_gremlin(query)
3219 }
3220 }
3221 #[cfg(feature = "graphql")]
3222 "graphql" => {
3223 if let Some(p) = params {
3224 self.execute_graphql_with_params(query, p)
3225 } else {
3226 self.execute_graphql(query)
3227 }
3228 }
3229 #[cfg(all(feature = "graphql", feature = "triple-store"))]
3230 "graphql-rdf" => {
3231 if let Some(p) = params {
3232 self.execute_graphql_rdf_with_params(query, p)
3233 } else {
3234 self.execute_graphql_rdf(query)
3235 }
3236 }
3237 #[cfg(feature = "sql-pgq")]
3238 "sql" | "sql-pgq" => {
3239 if let Some(p) = params {
3240 self.execute_sql_with_params(query, p)
3241 } else {
3242 self.execute_sql(query)
3243 }
3244 }
3245 #[cfg(all(feature = "sparql", feature = "triple-store"))]
3246 "sparql" => {
3247 if let Some(p) = params {
3248 self.execute_sparql_with_params(query, p)
3249 } else {
3250 self.execute_sparql(query)
3251 }
3252 }
3253 other => Err(grafeo_common::utils::error::Error::Query(
3254 grafeo_common::utils::error::QueryError::new(
3255 grafeo_common::utils::error::QueryErrorKind::Semantic,
3256 format!("Unknown query language: '{other}'"),
3257 ),
3258 )),
3259 }
3260 }
3261
3262 pub fn clear_plan_cache(&self) {
3289 self.query_cache.clear();
3290 }
3291
3292 #[cfg(feature = "lpg")]
3300 pub fn begin_transaction(&mut self) -> Result<()> {
3301 self.begin_transaction_inner(false, None)
3302 }
3303
3304 #[cfg(feature = "lpg")]
3312 pub fn begin_transaction_with_isolation(
3313 &mut self,
3314 isolation_level: crate::transaction::IsolationLevel,
3315 ) -> Result<()> {
3316 self.begin_transaction_inner(false, Some(isolation_level))
3317 }
3318
3319 #[cfg(feature = "lpg")]
3321 fn begin_transaction_inner(
3322 &self,
3323 read_only: bool,
3324 isolation_level: Option<crate::transaction::IsolationLevel>,
3325 ) -> Result<()> {
3326 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3327 let mut current = self.current_transaction.lock();
3328 if current.is_some() {
3329 drop(current);
3331 let mut depth = self.transaction_nesting_depth.lock();
3332 *depth += 1;
3333 let sp_name = format!("_nested_tx_{}", *depth);
3334 self.savepoint(&sp_name)?;
3335 return Ok(());
3336 }
3337
3338 let active = self.active_lpg_store();
3339 self.transaction_start_node_count
3340 .store(active.node_count(), Ordering::Relaxed);
3341 self.transaction_start_edge_count
3342 .store(active.edge_count(), Ordering::Relaxed);
3343 let transaction_id = if let Some(level) = isolation_level {
3344 self.transaction_manager.begin_with_isolation(level)
3345 } else {
3346 self.transaction_manager.begin()
3347 };
3348 *current = Some(transaction_id);
3349 *self.read_only_tx.lock() = read_only || self.db_read_only;
3350
3351 let key = self.active_graph_storage_key();
3354 let mut touched = self.touched_graphs.lock();
3355 touched.clear();
3356 touched.push(key);
3357
3358 #[cfg(feature = "metrics")]
3359 {
3360 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3361 #[cfg(not(target_arch = "wasm32"))]
3362 {
3363 *self.tx_start_time.lock() = Some(Instant::now());
3364 }
3365 }
3366
3367 Ok(())
3368 }
3369
3370 #[cfg(feature = "lpg")]
3378 pub fn commit(&mut self) -> Result<()> {
3379 self.commit_inner()
3380 }
3381
3382 #[cfg(feature = "lpg")]
3384 fn commit_inner(&self) -> Result<()> {
3385 let _span = grafeo_debug_span!("grafeo::tx::commit");
3386 {
3388 let mut depth = self.transaction_nesting_depth.lock();
3389 if *depth > 0 {
3390 let sp_name = format!("_nested_tx_{depth}");
3391 *depth -= 1;
3392 drop(depth);
3393 return self.release_savepoint(&sp_name);
3394 }
3395 }
3396
3397 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3398 grafeo_common::utils::error::Error::Transaction(
3399 grafeo_common::utils::error::TransactionError::InvalidState(
3400 "No active transaction".to_string(),
3401 ),
3402 )
3403 })?;
3404
3405 let touched = self.touched_graphs.lock().clone();
3408 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3409 Ok(epoch) => epoch,
3410 Err(e) => {
3411 for graph_name in &touched {
3413 let store = self.resolve_store(graph_name);
3414 store.rollback_transaction_properties(transaction_id);
3415 }
3416 #[cfg(feature = "triple-store")]
3417 self.rollback_rdf_transaction(transaction_id);
3418 #[cfg(feature = "cdc")]
3420 if let Some(ref pending) = self.cdc_pending_events {
3421 pending.lock().clear();
3422 }
3423 *self.read_only_tx.lock() = self.db_read_only;
3424 self.savepoints.lock().clear();
3425 self.touched_graphs.lock().clear();
3426 #[cfg(feature = "metrics")]
3427 {
3428 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3429 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3430 #[cfg(not(target_arch = "wasm32"))]
3431 if let Some(start) = self.tx_start_time.lock().take() {
3432 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3433 crate::metrics::record_metric!(
3434 self.metrics,
3435 tx_duration,
3436 observe duration_ms
3437 );
3438 }
3439 }
3440 return Err(e);
3441 }
3442 };
3443
3444 for graph_name in &touched {
3446 let store = self.resolve_store(graph_name);
3447 store.finalize_version_epochs(transaction_id, commit_epoch);
3448 }
3449
3450 #[cfg(feature = "triple-store")]
3452 self.commit_rdf_transaction(transaction_id);
3453
3454 for graph_name in &touched {
3455 let store = self.resolve_store(graph_name);
3456 store.commit_transaction_properties(transaction_id);
3457 }
3458
3459 #[cfg(feature = "cdc")]
3463 if let Some(ref pending) = self.cdc_pending_events {
3464 let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3465 self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3466 e.epoch = commit_epoch;
3467 e
3468 }));
3469 }
3470
3471 #[cfg(feature = "wal")]
3476 if let Some(ref wal) = self.wal {
3477 use grafeo_storage::wal::WalRecord;
3478 if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
3479 grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
3480 }
3481 if let Err(e) = wal.log(&WalRecord::EpochAdvance {
3482 epoch: commit_epoch,
3483 }) {
3484 grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
3485 }
3486 }
3487
3488 let current_epoch = self.transaction_manager.current_epoch();
3491 for graph_name in &touched {
3492 let store = self.resolve_store(graph_name);
3493 store.sync_epoch(current_epoch);
3494 }
3495
3496 *self.read_only_tx.lock() = self.db_read_only;
3498 self.savepoints.lock().clear();
3499 self.touched_graphs.lock().clear();
3500
3501 if self.gc_interval > 0 {
3503 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3504 if count.is_multiple_of(self.gc_interval) {
3505 let min_epoch = self.transaction_manager.min_active_epoch();
3506 for graph_name in &touched {
3507 let store = self.resolve_store(graph_name);
3508 store.gc_versions(min_epoch);
3509 }
3510 self.transaction_manager.gc();
3511 #[cfg(feature = "metrics")]
3512 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3513 }
3514 }
3515
3516 #[cfg(feature = "metrics")]
3517 {
3518 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3519 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3520 #[cfg(not(target_arch = "wasm32"))]
3521 if let Some(start) = self.tx_start_time.lock().take() {
3522 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3523 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3524 }
3525 }
3526
3527 Ok(())
3528 }
3529
3530 #[cfg(feature = "lpg")]
3554 pub fn rollback(&mut self) -> Result<()> {
3555 self.rollback_inner()
3556 }
3557
3558 #[cfg(feature = "lpg")]
3560 fn rollback_inner(&self) -> Result<()> {
3561 let _span = grafeo_debug_span!("grafeo::tx::rollback");
3562 {
3564 let mut depth = self.transaction_nesting_depth.lock();
3565 if *depth > 0 {
3566 let sp_name = format!("_nested_tx_{depth}");
3567 *depth -= 1;
3568 drop(depth);
3569 return self.rollback_to_savepoint(&sp_name);
3570 }
3571 }
3572
3573 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3574 grafeo_common::utils::error::Error::Transaction(
3575 grafeo_common::utils::error::TransactionError::InvalidState(
3576 "No active transaction".to_string(),
3577 ),
3578 )
3579 })?;
3580
3581 *self.read_only_tx.lock() = self.db_read_only;
3583
3584 let touched = self.touched_graphs.lock().clone();
3586 for graph_name in &touched {
3587 let store = self.resolve_store(graph_name);
3588 store.discard_uncommitted_versions(transaction_id);
3589 }
3590
3591 #[cfg(feature = "triple-store")]
3593 self.rollback_rdf_transaction(transaction_id);
3594
3595 #[cfg(feature = "cdc")]
3597 if let Some(ref pending) = self.cdc_pending_events {
3598 pending.lock().clear();
3599 }
3600
3601 self.savepoints.lock().clear();
3603 self.touched_graphs.lock().clear();
3604
3605 let result = self.transaction_manager.abort(transaction_id);
3607
3608 #[cfg(feature = "metrics")]
3609 if result.is_ok() {
3610 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3611 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3612 #[cfg(not(target_arch = "wasm32"))]
3613 if let Some(start) = self.tx_start_time.lock().take() {
3614 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3615 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3616 }
3617 }
3618
3619 result
3620 }
3621
3622 #[cfg(feature = "lpg")]
3632 pub fn savepoint(&self, name: &str) -> Result<()> {
3633 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3634 grafeo_common::utils::error::Error::Transaction(
3635 grafeo_common::utils::error::TransactionError::InvalidState(
3636 "No active transaction".to_string(),
3637 ),
3638 )
3639 })?;
3640
3641 let touched = self.touched_graphs.lock().clone();
3643 let graph_snapshots: Vec<GraphSavepoint> = touched
3644 .iter()
3645 .map(|graph_name| {
3646 let store = self.resolve_store(graph_name);
3647 GraphSavepoint {
3648 graph_name: graph_name.clone(),
3649 next_node_id: store.peek_next_node_id(),
3650 next_edge_id: store.peek_next_edge_id(),
3651 undo_log_position: store.property_undo_log_position(tx_id),
3652 }
3653 })
3654 .collect();
3655
3656 self.savepoints.lock().push(SavepointState {
3657 name: name.to_string(),
3658 graph_snapshots,
3659 active_graph: self.current_graph.lock().clone(),
3660 #[cfg(feature = "cdc")]
3661 cdc_event_position: self
3662 .cdc_pending_events
3663 .as_ref()
3664 .map_or(0, |p| p.lock().len()),
3665 });
3666 Ok(())
3667 }
3668
3669 #[cfg(feature = "lpg")]
3678 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3679 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3680 grafeo_common::utils::error::Error::Transaction(
3681 grafeo_common::utils::error::TransactionError::InvalidState(
3682 "No active transaction".to_string(),
3683 ),
3684 )
3685 })?;
3686
3687 let mut savepoints = self.savepoints.lock();
3688
3689 let pos = savepoints
3691 .iter()
3692 .rposition(|sp| sp.name == name)
3693 .ok_or_else(|| {
3694 grafeo_common::utils::error::Error::Transaction(
3695 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3696 "Savepoint '{name}' not found"
3697 )),
3698 )
3699 })?;
3700
3701 let sp_state = savepoints[pos].clone();
3702
3703 savepoints.truncate(pos);
3705 drop(savepoints);
3706
3707 for gs in &sp_state.graph_snapshots {
3709 let store = self.resolve_store(&gs.graph_name);
3710
3711 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3713
3714 let current_next_node = store.peek_next_node_id();
3716 let current_next_edge = store.peek_next_edge_id();
3717
3718 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3719 .map(NodeId::new)
3720 .collect();
3721 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3722 .map(EdgeId::new)
3723 .collect();
3724
3725 if !node_ids.is_empty() || !edge_ids.is_empty() {
3726 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3727 }
3728 }
3729
3730 let touched = self.touched_graphs.lock().clone();
3734 for graph_name in &touched {
3735 let already_captured = sp_state
3736 .graph_snapshots
3737 .iter()
3738 .any(|gs| gs.graph_name == *graph_name);
3739 if !already_captured {
3740 let store = self.resolve_store(graph_name);
3741 store.discard_uncommitted_versions(transaction_id);
3742 }
3743 }
3744
3745 #[cfg(feature = "cdc")]
3747 if let Some(ref pending) = self.cdc_pending_events {
3748 pending.lock().truncate(sp_state.cdc_event_position);
3749 }
3750
3751 let mut touched = self.touched_graphs.lock();
3753 touched.clear();
3754 for gs in &sp_state.graph_snapshots {
3755 if !touched.contains(&gs.graph_name) {
3756 touched.push(gs.graph_name.clone());
3757 }
3758 }
3759
3760 Ok(())
3761 }
3762
3763 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3769 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3770 grafeo_common::utils::error::Error::Transaction(
3771 grafeo_common::utils::error::TransactionError::InvalidState(
3772 "No active transaction".to_string(),
3773 ),
3774 )
3775 })?;
3776
3777 let mut savepoints = self.savepoints.lock();
3778 let pos = savepoints
3779 .iter()
3780 .rposition(|sp| sp.name == name)
3781 .ok_or_else(|| {
3782 grafeo_common::utils::error::Error::Transaction(
3783 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3784 "Savepoint '{name}' not found"
3785 )),
3786 )
3787 })?;
3788 savepoints.remove(pos);
3789 Ok(())
3790 }
3791
3792 #[must_use]
3794 pub fn in_transaction(&self) -> bool {
3795 self.current_transaction.lock().is_some()
3796 }
3797
3798 #[must_use]
3800 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3801 *self.current_transaction.lock()
3802 }
3803
3804 #[must_use]
3806 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3807 &self.transaction_manager
3808 }
3809
3810 #[cfg(feature = "lpg")]
3812 #[must_use]
3813 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3814 (
3815 self.transaction_start_node_count.load(Ordering::Relaxed),
3816 self.active_lpg_store().node_count(),
3817 )
3818 }
3819
3820 #[cfg(feature = "lpg")]
3822 #[must_use]
3823 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3824 (
3825 self.transaction_start_edge_count.load(Ordering::Relaxed),
3826 self.active_lpg_store().edge_count(),
3827 )
3828 }
3829
3830 #[cfg(feature = "lpg")]
3864 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3865 crate::transaction::PreparedCommit::new(self)
3866 }
3867
3868 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3870 self.auto_commit = auto_commit;
3871 }
3872
3873 #[must_use]
3875 pub fn auto_commit(&self) -> bool {
3876 self.auto_commit
3877 }
3878
3879 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3884 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3885 }
3886
3887 #[cfg(feature = "lpg")]
3890 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3891 where
3892 F: FnOnce() -> Result<QueryResult>,
3893 {
3894 if self.needs_auto_commit(has_mutations) {
3895 self.begin_transaction_inner(false, None)?;
3896 match body() {
3897 Ok(result) => {
3898 self.commit_inner()?;
3899 Ok(result)
3900 }
3901 Err(e) => {
3902 let _ = self.rollback_inner();
3903 Err(e)
3904 }
3905 }
3906 } else {
3907 body()
3908 }
3909 }
3910
3911 #[cfg(not(feature = "lpg"))]
3913 fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
3914 where
3915 F: FnOnce() -> Result<QueryResult>,
3916 {
3917 body()
3918 }
3919
3920 fn query_looks_like_mutation(query: &str) -> bool {
3926 let upper = query.to_ascii_uppercase();
3927 upper.contains("INSERT")
3928 || upper.contains("CREATE")
3929 || upper.contains("DELETE")
3930 || upper.contains("MERGE")
3931 || upper.contains("SET")
3932 || upper.contains("REMOVE")
3933 || upper.contains("DROP")
3934 || upper.contains("ALTER")
3935 }
3936
3937 #[must_use]
3939 fn query_deadline(&self) -> Option<Instant> {
3940 #[cfg(not(target_arch = "wasm32"))]
3941 {
3942 self.query_timeout.map(|d| Instant::now() + d)
3943 }
3944 #[cfg(target_arch = "wasm32")]
3945 {
3946 let _ = &self.query_timeout;
3947 None
3948 }
3949 }
3950
3951 #[cfg(feature = "metrics")]
3957 fn record_query_metrics(
3958 &self,
3959 language: &str,
3960 elapsed_ms: Option<f64>,
3961 result: &Result<crate::database::QueryResult>,
3962 ) {
3963 use crate::metrics::record_metric;
3964
3965 record_metric!(self.metrics, query_count, inc);
3966 if let Some(ref reg) = self.metrics {
3967 reg.query_count_by_language.increment(language);
3968 }
3969 if let Some(ms) = elapsed_ms {
3970 record_metric!(self.metrics, query_latency, observe ms);
3971 }
3972 match result {
3973 Ok(r) => {
3974 let returned = r.rows.len() as u64;
3975 record_metric!(self.metrics, rows_returned, add returned);
3976 if let Some(scanned) = r.rows_scanned {
3977 record_metric!(self.metrics, rows_scanned, add scanned);
3978 }
3979 }
3980 Err(e) => {
3981 record_metric!(self.metrics, query_errors, inc);
3982 let msg = e.to_string();
3984 if msg.contains("exceeded timeout") {
3985 record_metric!(self.metrics, query_timeouts, inc);
3986 }
3987 }
3988 }
3989 }
3990
3991 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3993 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3994 match expr {
3995 Expression::Literal(Literal::Integer(n)) => Some(*n),
3996 _ => None,
3997 }
3998 }
3999
4000 #[must_use]
4006 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4007 if let Some(epoch) = *self.viewing_epoch_override.lock() {
4009 return (epoch, None);
4010 }
4011
4012 if let Some(transaction_id) = *self.current_transaction.lock() {
4013 let epoch = self
4015 .transaction_manager
4016 .start_epoch(transaction_id)
4017 .unwrap_or_else(|| self.transaction_manager.current_epoch());
4018 (epoch, Some(transaction_id))
4019 } else {
4020 (self.transaction_manager.current_epoch(), None)
4022 }
4023 }
4024
4025 fn create_planner_for_store(
4030 &self,
4031 store: Arc<dyn GraphStore>,
4032 viewing_epoch: EpochId,
4033 transaction_id: Option<TransactionId>,
4034 ) -> crate::query::Planner {
4035 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4036 }
4037
4038 fn create_planner_for_store_with_read_only(
4039 &self,
4040 store: Arc<dyn GraphStore>,
4041 viewing_epoch: EpochId,
4042 transaction_id: Option<TransactionId>,
4043 read_only: bool,
4044 ) -> crate::query::Planner {
4045 use crate::query::Planner;
4046 use grafeo_core::execution::operators::{LazyValue, SessionContext};
4047
4048 let info_store = Arc::clone(&store);
4050 let schema_store = Arc::clone(&store);
4051
4052 let session_context = SessionContext {
4053 current_schema: self.current_schema(),
4054 current_graph: self.current_graph(),
4055 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4056 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4057 };
4058
4059 let write_store = self.active_write_store();
4060
4061 let mut planner = Planner::with_context(
4062 Arc::clone(&store),
4063 write_store,
4064 Arc::clone(&self.transaction_manager),
4065 transaction_id,
4066 viewing_epoch,
4067 )
4068 .with_factorized_execution(self.factorized_execution)
4069 .with_catalog(Arc::clone(&self.catalog))
4070 .with_session_context(session_context)
4071 .with_read_only(read_only);
4072
4073 let validator =
4075 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
4076 planner = planner.with_validator(Arc::new(validator));
4077
4078 planner
4079 }
4080
4081 fn build_info_value(store: &dyn GraphStore) -> Value {
4083 use grafeo_common::types::PropertyKey;
4084 use std::collections::BTreeMap;
4085
4086 let mut map = BTreeMap::new();
4087 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4088 map.insert(
4089 PropertyKey::from("node_count"),
4090 Value::Int64(store.node_count() as i64),
4091 );
4092 map.insert(
4093 PropertyKey::from("edge_count"),
4094 Value::Int64(store.edge_count() as i64),
4095 );
4096 map.insert(
4097 PropertyKey::from("version"),
4098 Value::String(env!("CARGO_PKG_VERSION").into()),
4099 );
4100 Value::Map(map.into())
4101 }
4102
4103 fn build_schema_value(store: &dyn GraphStore) -> Value {
4105 use grafeo_common::types::PropertyKey;
4106 use std::collections::BTreeMap;
4107
4108 let labels: Vec<Value> = store
4109 .all_labels()
4110 .into_iter()
4111 .map(|l| Value::String(l.into()))
4112 .collect();
4113 let edge_types: Vec<Value> = store
4114 .all_edge_types()
4115 .into_iter()
4116 .map(|t| Value::String(t.into()))
4117 .collect();
4118 let property_keys: Vec<Value> = store
4119 .all_property_keys()
4120 .into_iter()
4121 .map(|k| Value::String(k.into()))
4122 .collect();
4123
4124 let mut map = BTreeMap::new();
4125 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4126 map.insert(
4127 PropertyKey::from("edge_types"),
4128 Value::List(edge_types.into()),
4129 );
4130 map.insert(
4131 PropertyKey::from("property_keys"),
4132 Value::List(property_keys.into()),
4133 );
4134 Value::Map(map.into())
4135 }
4136
4137 #[cfg(feature = "lpg")]
4142 pub fn create_node(&self, labels: &[&str]) -> NodeId {
4143 let (epoch, transaction_id) = self.get_transaction_context();
4144 self.active_lpg_store().create_node_versioned(
4145 labels,
4146 epoch,
4147 transaction_id.unwrap_or(TransactionId::SYSTEM),
4148 )
4149 }
4150
4151 #[cfg(feature = "lpg")]
4155 pub fn create_node_with_props<'a>(
4156 &self,
4157 labels: &[&str],
4158 properties: impl IntoIterator<Item = (&'a str, Value)>,
4159 ) -> NodeId {
4160 let (epoch, transaction_id) = self.get_transaction_context();
4161 self.active_lpg_store().create_node_with_props_versioned(
4162 labels,
4163 properties,
4164 epoch,
4165 transaction_id.unwrap_or(TransactionId::SYSTEM),
4166 )
4167 }
4168
4169 #[cfg(feature = "lpg")]
4174 pub fn create_edge(
4175 &self,
4176 src: NodeId,
4177 dst: NodeId,
4178 edge_type: &str,
4179 ) -> grafeo_common::types::EdgeId {
4180 let (epoch, transaction_id) = self.get_transaction_context();
4181 self.active_lpg_store().create_edge_versioned(
4182 src,
4183 dst,
4184 edge_type,
4185 epoch,
4186 transaction_id.unwrap_or(TransactionId::SYSTEM),
4187 )
4188 }
4189
4190 #[cfg(feature = "lpg")]
4192 pub fn create_edge_with_props<'a>(
4193 &self,
4194 src: NodeId,
4195 dst: NodeId,
4196 edge_type: &str,
4197 properties: impl IntoIterator<Item = (&'a str, Value)>,
4198 ) -> grafeo_common::types::EdgeId {
4199 let (epoch, transaction_id) = self.get_transaction_context();
4200 let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4201 let store = self.active_lpg_store();
4202 let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4203 for (key, value) in properties {
4204 store.set_edge_property_versioned(eid, key, value, tid);
4205 }
4206 eid
4207 }
4208
4209 #[cfg(feature = "lpg")]
4211 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4212 let (_, transaction_id) = self.get_transaction_context();
4213 if let Some(tid) = transaction_id {
4214 self.active_lpg_store()
4215 .set_node_property_versioned(id, key, value, tid);
4216 } else {
4217 self.active_lpg_store().set_node_property(id, key, value);
4218 }
4219 }
4220
4221 #[cfg(feature = "lpg")]
4223 pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4224 let (_, transaction_id) = self.get_transaction_context();
4225 if let Some(tid) = transaction_id {
4226 self.active_lpg_store()
4227 .set_edge_property_versioned(id, key, value, tid);
4228 } else {
4229 self.active_lpg_store().set_edge_property(id, key, value);
4230 }
4231 }
4232
4233 #[cfg(feature = "lpg")]
4235 pub fn delete_node(&self, id: NodeId) -> bool {
4236 let (epoch, transaction_id) = self.get_transaction_context();
4237 if let Some(tid) = transaction_id {
4238 self.active_lpg_store()
4239 .delete_node_versioned(id, epoch, tid)
4240 } else {
4241 self.active_lpg_store().delete_node(id)
4242 }
4243 }
4244
4245 #[cfg(feature = "lpg")]
4247 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4248 let (epoch, transaction_id) = self.get_transaction_context();
4249 if let Some(tid) = transaction_id {
4250 self.active_lpg_store()
4251 .delete_edge_versioned(id, epoch, tid)
4252 } else {
4253 self.active_lpg_store().delete_edge(id)
4254 }
4255 }
4256
4257 #[cfg(feature = "lpg")]
4285 #[must_use]
4286 pub fn get_node(&self, id: NodeId) -> Option<Node> {
4287 let (epoch, transaction_id) = self.get_transaction_context();
4288 self.active_lpg_store().get_node_versioned(
4289 id,
4290 epoch,
4291 transaction_id.unwrap_or(TransactionId::SYSTEM),
4292 )
4293 }
4294
4295 #[cfg(feature = "lpg")]
4319 #[must_use]
4320 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4321 self.get_node(id)
4322 .and_then(|node| node.get_property(key).cloned())
4323 }
4324
4325 #[cfg(feature = "lpg")]
4332 #[must_use]
4333 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4334 let (epoch, transaction_id) = self.get_transaction_context();
4335 self.active_lpg_store().get_edge_versioned(
4336 id,
4337 epoch,
4338 transaction_id.unwrap_or(TransactionId::SYSTEM),
4339 )
4340 }
4341
4342 #[cfg(feature = "lpg")]
4368 #[must_use]
4369 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4370 self.active_lpg_store()
4371 .edges_from(node, Direction::Outgoing)
4372 .collect()
4373 }
4374
4375 #[cfg(feature = "lpg")]
4384 #[must_use]
4385 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4386 self.active_lpg_store()
4387 .edges_from(node, Direction::Incoming)
4388 .collect()
4389 }
4390
4391 #[cfg(feature = "lpg")]
4403 #[must_use]
4404 pub fn get_neighbors_outgoing_by_type(
4405 &self,
4406 node: NodeId,
4407 edge_type: &str,
4408 ) -> Vec<(NodeId, EdgeId)> {
4409 self.active_lpg_store()
4410 .edges_from(node, Direction::Outgoing)
4411 .filter(|(_, edge_id)| {
4412 self.get_edge(*edge_id)
4413 .is_some_and(|e| e.edge_type.as_str() == edge_type)
4414 })
4415 .collect()
4416 }
4417
4418 #[cfg(feature = "lpg")]
4425 #[must_use]
4426 pub fn node_exists(&self, id: NodeId) -> bool {
4427 self.get_node(id).is_some()
4428 }
4429
4430 #[cfg(feature = "lpg")]
4432 #[must_use]
4433 pub fn edge_exists(&self, id: EdgeId) -> bool {
4434 self.get_edge(id).is_some()
4435 }
4436
4437 #[cfg(feature = "lpg")]
4441 #[must_use]
4442 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4443 let active = self.active_lpg_store();
4444 let out = active.out_degree(node);
4445 let in_degree = active.in_degree(node);
4446 (out, in_degree)
4447 }
4448
4449 #[cfg(feature = "lpg")]
4459 #[must_use]
4460 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4461 let (epoch, transaction_id) = self.get_transaction_context();
4462 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4463 let active = self.active_lpg_store();
4464 ids.iter()
4465 .map(|&id| active.get_node_versioned(id, epoch, tx))
4466 .collect()
4467 }
4468
4469 #[cfg(feature = "cdc")]
4477 pub fn history(
4478 &self,
4479 entity_id: impl Into<crate::cdc::EntityId>,
4480 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4481 Ok(self.cdc_log.history(entity_id.into()))
4482 }
4483
4484 #[cfg(feature = "cdc")]
4490 pub fn history_since(
4491 &self,
4492 entity_id: impl Into<crate::cdc::EntityId>,
4493 since_epoch: EpochId,
4494 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4495 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4496 }
4497
4498 #[cfg(feature = "cdc")]
4504 pub fn changes_between(
4505 &self,
4506 start_epoch: EpochId,
4507 end_epoch: EpochId,
4508 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4509 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4510 }
4511}
4512
4513impl Drop for Session {
4514 fn drop(&mut self) {
4515 #[cfg(feature = "lpg")]
4518 if self.in_transaction() {
4519 let _ = self.rollback_inner();
4520 }
4521
4522 #[cfg(feature = "metrics")]
4523 if let Some(ref reg) = self.metrics {
4524 reg.session_active
4525 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4526 }
4527 }
4528}
4529
4530#[cfg(test)]
4531mod tests {
4532 use super::parse_default_literal;
4533 use crate::database::GrafeoDB;
4534 use grafeo_common::types::Value;
4535
4536 #[test]
4541 fn parse_default_literal_null() {
4542 assert_eq!(parse_default_literal("null"), Value::Null);
4543 assert_eq!(parse_default_literal("NULL"), Value::Null);
4544 assert_eq!(parse_default_literal("Null"), Value::Null);
4545 }
4546
4547 #[test]
4548 fn parse_default_literal_bool() {
4549 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4550 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4551 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4552 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4553 }
4554
4555 #[test]
4556 fn parse_default_literal_string_single_quoted() {
4557 assert_eq!(
4558 parse_default_literal("'hello'"),
4559 Value::String("hello".into())
4560 );
4561 }
4562
4563 #[test]
4564 fn parse_default_literal_string_double_quoted() {
4565 assert_eq!(
4566 parse_default_literal("\"world\""),
4567 Value::String("world".into())
4568 );
4569 }
4570
4571 #[test]
4572 fn parse_default_literal_integer() {
4573 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4574 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4575 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4576 }
4577
4578 #[test]
4579 fn parse_default_literal_float() {
4580 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4581 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4582 }
4583
4584 #[test]
4585 fn parse_default_literal_fallback_string() {
4586 assert_eq!(
4588 parse_default_literal("some_identifier"),
4589 Value::String("some_identifier".into())
4590 );
4591 }
4592
4593 #[test]
4594 fn test_session_create_node() {
4595 let db = GrafeoDB::new_in_memory();
4596 let session = db.session();
4597
4598 let id = session.create_node(&["Person"]);
4599 assert!(id.is_valid());
4600 assert_eq!(db.node_count(), 1);
4601 }
4602
4603 #[test]
4604 fn test_session_transaction() {
4605 let db = GrafeoDB::new_in_memory();
4606 let mut session = db.session();
4607
4608 assert!(!session.in_transaction());
4609
4610 session.begin_transaction().unwrap();
4611 assert!(session.in_transaction());
4612
4613 session.commit().unwrap();
4614 assert!(!session.in_transaction());
4615 }
4616
4617 #[test]
4618 fn test_session_transaction_context() {
4619 let db = GrafeoDB::new_in_memory();
4620 let mut session = db.session();
4621
4622 let (_epoch1, transaction_id1) = session.get_transaction_context();
4624 assert!(transaction_id1.is_none());
4625
4626 session.begin_transaction().unwrap();
4628 let (epoch2, transaction_id2) = session.get_transaction_context();
4629 assert!(transaction_id2.is_some());
4630 let _ = epoch2; session.commit().unwrap();
4635 let (epoch3, tx_id3) = session.get_transaction_context();
4636 assert!(tx_id3.is_none());
4637 assert!(epoch3.as_u64() >= epoch2.as_u64());
4639 }
4640
4641 #[test]
4642 fn test_session_rollback() {
4643 let db = GrafeoDB::new_in_memory();
4644 let mut session = db.session();
4645
4646 session.begin_transaction().unwrap();
4647 session.rollback().unwrap();
4648 assert!(!session.in_transaction());
4649 }
4650
4651 #[test]
4652 fn test_session_rollback_discards_versions() {
4653 use grafeo_common::types::TransactionId;
4654
4655 let db = GrafeoDB::new_in_memory();
4656
4657 let node_before = db.store().create_node(&["Person"]);
4659 assert!(node_before.is_valid());
4660 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4661
4662 let mut session = db.session();
4664 session.begin_transaction().unwrap();
4665 let transaction_id = session.current_transaction.lock().unwrap();
4666
4667 let epoch = db.store().current_epoch();
4669 let node_in_tx = db
4670 .store()
4671 .create_node_versioned(&["Person"], epoch, transaction_id);
4672 assert!(node_in_tx.is_valid());
4673
4674 assert_eq!(
4678 db.node_count(),
4679 1,
4680 "PENDING nodes should be invisible to non-versioned node_count()"
4681 );
4682 assert!(
4683 db.store()
4684 .get_node_versioned(node_in_tx, epoch, transaction_id)
4685 .is_some(),
4686 "Transaction node should be visible to its own transaction"
4687 );
4688
4689 session.rollback().unwrap();
4691 assert!(!session.in_transaction());
4692
4693 let count_after = db.node_count();
4696 assert_eq!(
4697 count_after, 1,
4698 "Rollback should discard uncommitted node, but got {count_after}"
4699 );
4700
4701 let current_epoch = db.store().current_epoch();
4703 assert!(
4704 db.store()
4705 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4706 .is_some(),
4707 "Original node should still exist"
4708 );
4709
4710 assert!(
4712 db.store()
4713 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4714 .is_none(),
4715 "Transaction node should be gone"
4716 );
4717 }
4718
4719 #[test]
4720 fn test_session_create_node_in_transaction() {
4721 let db = GrafeoDB::new_in_memory();
4723
4724 let node_before = db.create_node(&["Person"]);
4726 assert!(node_before.is_valid());
4727 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4728
4729 let mut session = db.session();
4731 session.begin_transaction().unwrap();
4732 let transaction_id = session.current_transaction.lock().unwrap();
4733
4734 let node_in_tx = session.create_node(&["Person"]);
4736 assert!(node_in_tx.is_valid());
4737
4738 assert_eq!(
4741 db.node_count(),
4742 1,
4743 "PENDING nodes should be invisible to non-versioned node_count()"
4744 );
4745 let epoch = db.store().current_epoch();
4746 assert!(
4747 db.store()
4748 .get_node_versioned(node_in_tx, epoch, transaction_id)
4749 .is_some(),
4750 "Transaction node should be visible to its own transaction"
4751 );
4752
4753 session.rollback().unwrap();
4755
4756 let count_after = db.node_count();
4758 assert_eq!(
4759 count_after, 1,
4760 "Rollback should discard node created via session.create_node(), but got {count_after}"
4761 );
4762 }
4763
4764 #[test]
4765 fn test_session_create_node_with_props_in_transaction() {
4766 use grafeo_common::types::Value;
4767
4768 let db = GrafeoDB::new_in_memory();
4770
4771 db.create_node(&["Person"]);
4773 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4774
4775 let mut session = db.session();
4777 session.begin_transaction().unwrap();
4778 let transaction_id = session.current_transaction.lock().unwrap();
4779
4780 let node_in_tx =
4781 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4782 assert!(node_in_tx.is_valid());
4783
4784 assert_eq!(
4787 db.node_count(),
4788 1,
4789 "PENDING nodes should be invisible to non-versioned node_count()"
4790 );
4791 let epoch = db.store().current_epoch();
4792 assert!(
4793 db.store()
4794 .get_node_versioned(node_in_tx, epoch, transaction_id)
4795 .is_some(),
4796 "Transaction node should be visible to its own transaction"
4797 );
4798
4799 session.rollback().unwrap();
4801
4802 let count_after = db.node_count();
4804 assert_eq!(
4805 count_after, 1,
4806 "Rollback should discard node created via session.create_node_with_props()"
4807 );
4808 }
4809
4810 #[cfg(feature = "gql")]
4811 mod gql_tests {
4812 use super::*;
4813
4814 #[test]
4815 fn test_gql_query_execution() {
4816 let db = GrafeoDB::new_in_memory();
4817 let session = db.session();
4818
4819 session.create_node(&["Person"]);
4821 session.create_node(&["Person"]);
4822 session.create_node(&["Animal"]);
4823
4824 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4826
4827 assert_eq!(result.row_count(), 2);
4829 assert_eq!(result.column_count(), 1);
4830 assert_eq!(result.columns[0], "n");
4831 }
4832
4833 #[test]
4834 fn test_gql_empty_result() {
4835 let db = GrafeoDB::new_in_memory();
4836 let session = db.session();
4837
4838 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4840
4841 assert_eq!(result.row_count(), 0);
4842 }
4843
4844 #[test]
4845 fn test_gql_parse_error() {
4846 let db = GrafeoDB::new_in_memory();
4847 let session = db.session();
4848
4849 let result = session.execute("MATCH (n RETURN n");
4851
4852 assert!(result.is_err());
4853 }
4854
4855 #[test]
4856 fn test_gql_relationship_traversal() {
4857 let db = GrafeoDB::new_in_memory();
4858 let session = db.session();
4859
4860 let alix = session.create_node(&["Person"]);
4862 let gus = session.create_node(&["Person"]);
4863 let vincent = session.create_node(&["Person"]);
4864
4865 session.create_edge(alix, gus, "KNOWS");
4866 session.create_edge(alix, vincent, "KNOWS");
4867
4868 let result = session
4870 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4871 .unwrap();
4872
4873 assert_eq!(result.row_count(), 2);
4875 assert_eq!(result.column_count(), 2);
4876 assert_eq!(result.columns[0], "a");
4877 assert_eq!(result.columns[1], "b");
4878 }
4879
4880 #[test]
4881 fn test_gql_relationship_with_type_filter() {
4882 let db = GrafeoDB::new_in_memory();
4883 let session = db.session();
4884
4885 let alix = session.create_node(&["Person"]);
4887 let gus = session.create_node(&["Person"]);
4888 let vincent = session.create_node(&["Person"]);
4889
4890 session.create_edge(alix, gus, "KNOWS");
4891 session.create_edge(alix, vincent, "WORKS_WITH");
4892
4893 let result = session
4895 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4896 .unwrap();
4897
4898 assert_eq!(result.row_count(), 1);
4900 }
4901
4902 #[test]
4903 fn test_gql_semantic_error_undefined_variable() {
4904 let db = GrafeoDB::new_in_memory();
4905 let session = db.session();
4906
4907 let result = session.execute("MATCH (n:Person) RETURN x");
4909
4910 assert!(result.is_err());
4912 let Err(err) = result else {
4913 panic!("Expected error")
4914 };
4915 assert!(
4916 err.to_string().contains("Undefined variable"),
4917 "Expected undefined variable error, got: {}",
4918 err
4919 );
4920 }
4921
4922 #[test]
4923 fn test_gql_where_clause_property_filter() {
4924 use grafeo_common::types::Value;
4925
4926 let db = GrafeoDB::new_in_memory();
4927 let session = db.session();
4928
4929 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4931 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4932 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4933
4934 let result = session
4936 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4937 .unwrap();
4938
4939 assert_eq!(result.row_count(), 2);
4941 }
4942
4943 #[test]
4944 fn test_gql_where_clause_equality() {
4945 use grafeo_common::types::Value;
4946
4947 let db = GrafeoDB::new_in_memory();
4948 let session = db.session();
4949
4950 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4952 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4953 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4954
4955 let result = session
4957 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4958 .unwrap();
4959
4960 assert_eq!(result.row_count(), 2);
4962 }
4963
4964 #[test]
4965 fn test_gql_return_property_access() {
4966 use grafeo_common::types::Value;
4967
4968 let db = GrafeoDB::new_in_memory();
4969 let session = db.session();
4970
4971 session.create_node_with_props(
4973 &["Person"],
4974 [
4975 ("name", Value::String("Alix".into())),
4976 ("age", Value::Int64(30)),
4977 ],
4978 );
4979 session.create_node_with_props(
4980 &["Person"],
4981 [
4982 ("name", Value::String("Gus".into())),
4983 ("age", Value::Int64(25)),
4984 ],
4985 );
4986
4987 let result = session
4989 .execute("MATCH (n:Person) RETURN n.name, n.age")
4990 .unwrap();
4991
4992 assert_eq!(result.row_count(), 2);
4994 assert_eq!(result.column_count(), 2);
4995 assert_eq!(result.columns[0], "n.name");
4996 assert_eq!(result.columns[1], "n.age");
4997
4998 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5000 assert!(names.contains(&&Value::String("Alix".into())));
5001 assert!(names.contains(&&Value::String("Gus".into())));
5002 }
5003
5004 #[test]
5005 fn test_gql_return_mixed_expressions() {
5006 use grafeo_common::types::Value;
5007
5008 let db = GrafeoDB::new_in_memory();
5009 let session = db.session();
5010
5011 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5013
5014 let result = session
5016 .execute("MATCH (n:Person) RETURN n, n.name")
5017 .unwrap();
5018
5019 assert_eq!(result.row_count(), 1);
5020 assert_eq!(result.column_count(), 2);
5021 assert_eq!(result.columns[0], "n");
5022 assert_eq!(result.columns[1], "n.name");
5023
5024 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5026 }
5027 }
5028
5029 #[cfg(feature = "cypher")]
5030 mod cypher_tests {
5031 use super::*;
5032
5033 #[test]
5034 fn test_cypher_query_execution() {
5035 let db = GrafeoDB::new_in_memory();
5036 let session = db.session();
5037
5038 session.create_node(&["Person"]);
5040 session.create_node(&["Person"]);
5041 session.create_node(&["Animal"]);
5042
5043 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5045
5046 assert_eq!(result.row_count(), 2);
5048 assert_eq!(result.column_count(), 1);
5049 assert_eq!(result.columns[0], "n");
5050 }
5051
5052 #[test]
5053 fn test_cypher_empty_result() {
5054 let db = GrafeoDB::new_in_memory();
5055 let session = db.session();
5056
5057 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5059
5060 assert_eq!(result.row_count(), 0);
5061 }
5062
5063 #[test]
5064 fn test_cypher_parse_error() {
5065 let db = GrafeoDB::new_in_memory();
5066 let session = db.session();
5067
5068 let result = session.execute_cypher("MATCH (n RETURN n");
5070
5071 assert!(result.is_err());
5072 }
5073 }
5074
5075 mod direct_lookup_tests {
5078 use super::*;
5079 use grafeo_common::types::Value;
5080
5081 #[test]
5082 fn test_get_node() {
5083 let db = GrafeoDB::new_in_memory();
5084 let session = db.session();
5085
5086 let id = session.create_node(&["Person"]);
5087 let node = session.get_node(id);
5088
5089 assert!(node.is_some());
5090 let node = node.unwrap();
5091 assert_eq!(node.id, id);
5092 }
5093
5094 #[test]
5095 fn test_get_node_not_found() {
5096 use grafeo_common::types::NodeId;
5097
5098 let db = GrafeoDB::new_in_memory();
5099 let session = db.session();
5100
5101 let node = session.get_node(NodeId::new(9999));
5103 assert!(node.is_none());
5104 }
5105
5106 #[test]
5107 fn test_get_node_property() {
5108 let db = GrafeoDB::new_in_memory();
5109 let session = db.session();
5110
5111 let id = session
5112 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5113
5114 let name = session.get_node_property(id, "name");
5115 assert_eq!(name, Some(Value::String("Alix".into())));
5116
5117 let missing = session.get_node_property(id, "missing");
5119 assert!(missing.is_none());
5120 }
5121
5122 #[test]
5123 fn test_get_edge() {
5124 let db = GrafeoDB::new_in_memory();
5125 let session = db.session();
5126
5127 let alix = session.create_node(&["Person"]);
5128 let gus = session.create_node(&["Person"]);
5129 let edge_id = session.create_edge(alix, gus, "KNOWS");
5130
5131 let edge = session.get_edge(edge_id);
5132 assert!(edge.is_some());
5133 let edge = edge.unwrap();
5134 assert_eq!(edge.id, edge_id);
5135 assert_eq!(edge.src, alix);
5136 assert_eq!(edge.dst, gus);
5137 }
5138
5139 #[test]
5140 fn test_get_edge_not_found() {
5141 use grafeo_common::types::EdgeId;
5142
5143 let db = GrafeoDB::new_in_memory();
5144 let session = db.session();
5145
5146 let edge = session.get_edge(EdgeId::new(9999));
5147 assert!(edge.is_none());
5148 }
5149
5150 #[test]
5151 fn test_get_neighbors_outgoing() {
5152 let db = GrafeoDB::new_in_memory();
5153 let session = db.session();
5154
5155 let alix = session.create_node(&["Person"]);
5156 let gus = session.create_node(&["Person"]);
5157 let harm = session.create_node(&["Person"]);
5158
5159 session.create_edge(alix, gus, "KNOWS");
5160 session.create_edge(alix, harm, "KNOWS");
5161
5162 let neighbors = session.get_neighbors_outgoing(alix);
5163 assert_eq!(neighbors.len(), 2);
5164
5165 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5166 assert!(neighbor_ids.contains(&gus));
5167 assert!(neighbor_ids.contains(&harm));
5168 }
5169
5170 #[test]
5171 fn test_get_neighbors_incoming() {
5172 let db = GrafeoDB::new_in_memory();
5173 let session = db.session();
5174
5175 let alix = session.create_node(&["Person"]);
5176 let gus = session.create_node(&["Person"]);
5177 let harm = session.create_node(&["Person"]);
5178
5179 session.create_edge(gus, alix, "KNOWS");
5180 session.create_edge(harm, alix, "KNOWS");
5181
5182 let neighbors = session.get_neighbors_incoming(alix);
5183 assert_eq!(neighbors.len(), 2);
5184
5185 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5186 assert!(neighbor_ids.contains(&gus));
5187 assert!(neighbor_ids.contains(&harm));
5188 }
5189
5190 #[test]
5191 fn test_get_neighbors_outgoing_by_type() {
5192 let db = GrafeoDB::new_in_memory();
5193 let session = db.session();
5194
5195 let alix = session.create_node(&["Person"]);
5196 let gus = session.create_node(&["Person"]);
5197 let company = session.create_node(&["Company"]);
5198
5199 session.create_edge(alix, gus, "KNOWS");
5200 session.create_edge(alix, company, "WORKS_AT");
5201
5202 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5203 assert_eq!(knows_neighbors.len(), 1);
5204 assert_eq!(knows_neighbors[0].0, gus);
5205
5206 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5207 assert_eq!(works_neighbors.len(), 1);
5208 assert_eq!(works_neighbors[0].0, company);
5209
5210 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5212 assert!(no_neighbors.is_empty());
5213 }
5214
5215 #[test]
5216 fn test_node_exists() {
5217 use grafeo_common::types::NodeId;
5218
5219 let db = GrafeoDB::new_in_memory();
5220 let session = db.session();
5221
5222 let id = session.create_node(&["Person"]);
5223
5224 assert!(session.node_exists(id));
5225 assert!(!session.node_exists(NodeId::new(9999)));
5226 }
5227
5228 #[test]
5229 fn test_edge_exists() {
5230 use grafeo_common::types::EdgeId;
5231
5232 let db = GrafeoDB::new_in_memory();
5233 let session = db.session();
5234
5235 let alix = session.create_node(&["Person"]);
5236 let gus = session.create_node(&["Person"]);
5237 let edge_id = session.create_edge(alix, gus, "KNOWS");
5238
5239 assert!(session.edge_exists(edge_id));
5240 assert!(!session.edge_exists(EdgeId::new(9999)));
5241 }
5242
5243 #[test]
5244 fn test_get_degree() {
5245 let db = GrafeoDB::new_in_memory();
5246 let session = db.session();
5247
5248 let alix = session.create_node(&["Person"]);
5249 let gus = session.create_node(&["Person"]);
5250 let harm = session.create_node(&["Person"]);
5251
5252 session.create_edge(alix, gus, "KNOWS");
5254 session.create_edge(alix, harm, "KNOWS");
5255 session.create_edge(gus, alix, "KNOWS");
5257
5258 let (out_degree, in_degree) = session.get_degree(alix);
5259 assert_eq!(out_degree, 2);
5260 assert_eq!(in_degree, 1);
5261
5262 let lonely = session.create_node(&["Person"]);
5264 let (out, in_deg) = session.get_degree(lonely);
5265 assert_eq!(out, 0);
5266 assert_eq!(in_deg, 0);
5267 }
5268
5269 #[test]
5270 fn test_get_nodes_batch() {
5271 let db = GrafeoDB::new_in_memory();
5272 let session = db.session();
5273
5274 let alix = session.create_node(&["Person"]);
5275 let gus = session.create_node(&["Person"]);
5276 let harm = session.create_node(&["Person"]);
5277
5278 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5279 assert_eq!(nodes.len(), 3);
5280 assert!(nodes[0].is_some());
5281 assert!(nodes[1].is_some());
5282 assert!(nodes[2].is_some());
5283
5284 use grafeo_common::types::NodeId;
5286 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5287 assert_eq!(nodes_with_missing.len(), 3);
5288 assert!(nodes_with_missing[0].is_some());
5289 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
5291 }
5292
5293 #[test]
5294 fn test_auto_commit_setting() {
5295 let db = GrafeoDB::new_in_memory();
5296 let mut session = db.session();
5297
5298 assert!(session.auto_commit());
5300
5301 session.set_auto_commit(false);
5302 assert!(!session.auto_commit());
5303
5304 session.set_auto_commit(true);
5305 assert!(session.auto_commit());
5306 }
5307
5308 #[test]
5309 fn test_transaction_double_begin_nests() {
5310 let db = GrafeoDB::new_in_memory();
5311 let mut session = db.session();
5312
5313 session.begin_transaction().unwrap();
5314 let result = session.begin_transaction();
5316 assert!(result.is_ok());
5317 session.commit().unwrap();
5319 session.commit().unwrap();
5321 }
5322
5323 #[test]
5324 fn test_commit_without_transaction_error() {
5325 let db = GrafeoDB::new_in_memory();
5326 let mut session = db.session();
5327
5328 let result = session.commit();
5329 assert!(result.is_err());
5330 }
5331
5332 #[test]
5333 fn test_rollback_without_transaction_error() {
5334 let db = GrafeoDB::new_in_memory();
5335 let mut session = db.session();
5336
5337 let result = session.rollback();
5338 assert!(result.is_err());
5339 }
5340
5341 #[test]
5342 fn test_create_edge_in_transaction() {
5343 let db = GrafeoDB::new_in_memory();
5344 let mut session = db.session();
5345
5346 let alix = session.create_node(&["Person"]);
5348 let gus = session.create_node(&["Person"]);
5349
5350 session.begin_transaction().unwrap();
5352 let edge_id = session.create_edge(alix, gus, "KNOWS");
5353
5354 assert!(session.edge_exists(edge_id));
5356
5357 session.commit().unwrap();
5359
5360 assert!(session.edge_exists(edge_id));
5362 }
5363
5364 #[test]
5365 fn test_neighbors_empty_node() {
5366 let db = GrafeoDB::new_in_memory();
5367 let session = db.session();
5368
5369 let lonely = session.create_node(&["Person"]);
5370
5371 assert!(session.get_neighbors_outgoing(lonely).is_empty());
5372 assert!(session.get_neighbors_incoming(lonely).is_empty());
5373 assert!(
5374 session
5375 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5376 .is_empty()
5377 );
5378 }
5379 }
5380
5381 #[test]
5382 fn test_auto_gc_triggers_on_commit_interval() {
5383 use crate::config::Config;
5384
5385 let config = Config::in_memory().with_gc_interval(2);
5386 let db = GrafeoDB::with_config(config).unwrap();
5387 let mut session = db.session();
5388
5389 session.begin_transaction().unwrap();
5391 session.create_node(&["A"]);
5392 session.commit().unwrap();
5393
5394 session.begin_transaction().unwrap();
5396 session.create_node(&["B"]);
5397 session.commit().unwrap();
5398
5399 assert_eq!(db.node_count(), 2);
5401 }
5402
5403 #[test]
5404 fn test_query_timeout_config_propagates_to_session() {
5405 use crate::config::Config;
5406 use std::time::Duration;
5407
5408 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5409 let db = GrafeoDB::with_config(config).unwrap();
5410 let session = db.session();
5411
5412 assert!(session.query_deadline().is_some());
5414 }
5415
5416 #[test]
5417 fn test_no_query_timeout_returns_no_deadline() {
5418 let db = GrafeoDB::new_in_memory();
5419 let session = db.session();
5420
5421 assert!(session.query_deadline().is_none());
5423 }
5424
5425 #[test]
5426 fn test_graph_model_accessor() {
5427 use crate::config::GraphModel;
5428
5429 let db = GrafeoDB::new_in_memory();
5430 let session = db.session();
5431
5432 assert_eq!(session.graph_model(), GraphModel::Lpg);
5433 }
5434
5435 #[cfg(feature = "gql")]
5436 #[test]
5437 fn test_external_store_session() {
5438 use grafeo_core::graph::GraphStoreMut;
5439 use std::sync::Arc;
5440
5441 let config = crate::config::Config::in_memory();
5442 let store =
5443 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5444 let db = GrafeoDB::with_store(store, config).unwrap();
5445
5446 let mut session = db.session();
5447
5448 session.begin_transaction().unwrap();
5452 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5453
5454 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5456 assert_eq!(result.row_count(), 1);
5457
5458 session.commit().unwrap();
5459 }
5460
5461 #[cfg(feature = "gql")]
5464 mod session_command_tests {
5465 use super::*;
5466 use grafeo_common::types::Value;
5467
5468 #[test]
5469 fn test_use_graph_sets_current_graph() {
5470 let db = GrafeoDB::new_in_memory();
5471 let session = db.session();
5472
5473 session.execute("CREATE GRAPH mydb").unwrap();
5475 session.execute("USE GRAPH mydb").unwrap();
5476
5477 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5478 }
5479
5480 #[test]
5481 fn test_use_graph_nonexistent_errors() {
5482 let db = GrafeoDB::new_in_memory();
5483 let session = db.session();
5484
5485 let result = session.execute("USE GRAPH doesnotexist");
5486 assert!(result.is_err());
5487 let err = result.unwrap_err().to_string();
5488 assert!(
5489 err.contains("does not exist"),
5490 "Expected 'does not exist' error, got: {err}"
5491 );
5492 }
5493
5494 #[test]
5495 fn test_use_graph_default_always_valid() {
5496 let db = GrafeoDB::new_in_memory();
5497 let session = db.session();
5498
5499 session.execute("USE GRAPH default").unwrap();
5501 assert_eq!(session.current_graph(), Some("default".to_string()));
5502 }
5503
5504 #[test]
5505 fn test_session_set_graph() {
5506 let db = GrafeoDB::new_in_memory();
5507 let session = db.session();
5508
5509 session.execute("CREATE GRAPH analytics").unwrap();
5510 session.execute("SESSION SET GRAPH analytics").unwrap();
5511 assert_eq!(session.current_graph(), Some("analytics".to_string()));
5512 }
5513
5514 #[test]
5515 fn test_session_set_graph_nonexistent_errors() {
5516 let db = GrafeoDB::new_in_memory();
5517 let session = db.session();
5518
5519 let result = session.execute("SESSION SET GRAPH nosuchgraph");
5520 assert!(result.is_err());
5521 }
5522
5523 #[test]
5524 fn test_session_set_time_zone() {
5525 let db = GrafeoDB::new_in_memory();
5526 let session = db.session();
5527
5528 assert_eq!(session.time_zone(), None);
5529
5530 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5531 assert_eq!(session.time_zone(), Some("UTC".to_string()));
5532
5533 session
5534 .execute("SESSION SET TIME ZONE 'America/New_York'")
5535 .unwrap();
5536 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5537 }
5538
5539 #[test]
5540 fn test_session_set_parameter() {
5541 let db = GrafeoDB::new_in_memory();
5542 let session = db.session();
5543
5544 session
5545 .execute("SESSION SET PARAMETER $timeout = 30")
5546 .unwrap();
5547
5548 assert!(session.get_parameter("timeout").is_some());
5551 }
5552
5553 #[test]
5554 fn test_session_reset_clears_all_state() {
5555 let db = GrafeoDB::new_in_memory();
5556 let session = db.session();
5557
5558 session.execute("CREATE GRAPH analytics").unwrap();
5560 session.execute("SESSION SET GRAPH analytics").unwrap();
5561 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5562 session
5563 .execute("SESSION SET PARAMETER $limit = 100")
5564 .unwrap();
5565
5566 assert!(session.current_graph().is_some());
5568 assert!(session.time_zone().is_some());
5569 assert!(session.get_parameter("limit").is_some());
5570
5571 session.execute("SESSION RESET").unwrap();
5573
5574 assert_eq!(session.current_graph(), None);
5575 assert_eq!(session.time_zone(), None);
5576 assert!(session.get_parameter("limit").is_none());
5577 }
5578
5579 #[test]
5580 fn test_session_close_clears_state() {
5581 let db = GrafeoDB::new_in_memory();
5582 let session = db.session();
5583
5584 session.execute("CREATE GRAPH analytics").unwrap();
5585 session.execute("SESSION SET GRAPH analytics").unwrap();
5586 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5587
5588 session.execute("SESSION CLOSE").unwrap();
5589
5590 assert_eq!(session.current_graph(), None);
5591 assert_eq!(session.time_zone(), None);
5592 }
5593
5594 #[test]
5595 fn test_create_graph() {
5596 let db = GrafeoDB::new_in_memory();
5597 let session = db.session();
5598
5599 session.execute("CREATE GRAPH mydb").unwrap();
5600
5601 session.execute("USE GRAPH mydb").unwrap();
5603 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5604 }
5605
5606 #[test]
5607 fn test_create_graph_duplicate_errors() {
5608 let db = GrafeoDB::new_in_memory();
5609 let session = db.session();
5610
5611 session.execute("CREATE GRAPH mydb").unwrap();
5612 let result = session.execute("CREATE GRAPH mydb");
5613
5614 assert!(result.is_err());
5615 let err = result.unwrap_err().to_string();
5616 assert!(
5617 err.contains("already exists"),
5618 "Expected 'already exists' error, got: {err}"
5619 );
5620 }
5621
5622 #[test]
5623 fn test_create_graph_if_not_exists() {
5624 let db = GrafeoDB::new_in_memory();
5625 let session = db.session();
5626
5627 session.execute("CREATE GRAPH mydb").unwrap();
5628 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5630 }
5631
5632 #[test]
5633 fn test_drop_graph() {
5634 let db = GrafeoDB::new_in_memory();
5635 let session = db.session();
5636
5637 session.execute("CREATE GRAPH mydb").unwrap();
5638 session.execute("DROP GRAPH mydb").unwrap();
5639
5640 let result = session.execute("USE GRAPH mydb");
5642 assert!(result.is_err());
5643 }
5644
5645 #[test]
5646 fn test_drop_graph_nonexistent_errors() {
5647 let db = GrafeoDB::new_in_memory();
5648 let session = db.session();
5649
5650 let result = session.execute("DROP GRAPH nosuchgraph");
5651 assert!(result.is_err());
5652 let err = result.unwrap_err().to_string();
5653 assert!(
5654 err.contains("does not exist"),
5655 "Expected 'does not exist' error, got: {err}"
5656 );
5657 }
5658
5659 #[test]
5660 fn test_drop_graph_if_exists() {
5661 let db = GrafeoDB::new_in_memory();
5662 let session = db.session();
5663
5664 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5666 }
5667
5668 #[test]
5669 fn test_start_transaction_via_gql() {
5670 let db = GrafeoDB::new_in_memory();
5671 let session = db.session();
5672
5673 session.execute("START TRANSACTION").unwrap();
5674 assert!(session.in_transaction());
5675 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5676 session.execute("COMMIT").unwrap();
5677 assert!(!session.in_transaction());
5678
5679 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5680 assert_eq!(result.rows.len(), 1);
5681 }
5682
5683 #[test]
5684 fn test_start_transaction_read_only_blocks_insert() {
5685 let db = GrafeoDB::new_in_memory();
5686 let session = db.session();
5687
5688 session.execute("START TRANSACTION READ ONLY").unwrap();
5689 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5690 assert!(result.is_err());
5691 let err = result.unwrap_err().to_string();
5692 assert!(
5693 err.contains("read-only"),
5694 "Expected read-only error, got: {err}"
5695 );
5696 session.execute("ROLLBACK").unwrap();
5697 }
5698
5699 #[test]
5700 fn test_start_transaction_read_only_allows_reads() {
5701 let db = GrafeoDB::new_in_memory();
5702 let mut session = db.session();
5703 session.begin_transaction().unwrap();
5704 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5705 session.commit().unwrap();
5706
5707 session.execute("START TRANSACTION READ ONLY").unwrap();
5708 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5709 assert_eq!(result.rows.len(), 1);
5710 session.execute("COMMIT").unwrap();
5711 }
5712
5713 #[test]
5714 fn test_rollback_via_gql() {
5715 let db = GrafeoDB::new_in_memory();
5716 let session = db.session();
5717
5718 session.execute("START TRANSACTION").unwrap();
5719 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5720 session.execute("ROLLBACK").unwrap();
5721
5722 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5723 assert!(result.rows.is_empty());
5724 }
5725
5726 #[test]
5727 fn test_start_transaction_with_isolation_level() {
5728 let db = GrafeoDB::new_in_memory();
5729 let session = db.session();
5730
5731 session
5732 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5733 .unwrap();
5734 assert!(session.in_transaction());
5735 session.execute("ROLLBACK").unwrap();
5736 }
5737
5738 #[test]
5739 fn test_session_commands_return_empty_result() {
5740 let db = GrafeoDB::new_in_memory();
5741 let session = db.session();
5742
5743 session.execute("CREATE GRAPH test").unwrap();
5744 let result = session.execute("SESSION SET GRAPH test").unwrap();
5745 assert_eq!(result.row_count(), 0);
5746 assert_eq!(result.column_count(), 0);
5747 }
5748
5749 #[test]
5750 fn test_current_graph_default_is_none() {
5751 let db = GrafeoDB::new_in_memory();
5752 let session = db.session();
5753
5754 assert_eq!(session.current_graph(), None);
5755 }
5756
5757 #[test]
5758 fn test_time_zone_default_is_none() {
5759 let db = GrafeoDB::new_in_memory();
5760 let session = db.session();
5761
5762 assert_eq!(session.time_zone(), None);
5763 }
5764
5765 #[test]
5766 fn test_session_state_independent_across_sessions() {
5767 let db = GrafeoDB::new_in_memory();
5768 let session1 = db.session();
5769 let session2 = db.session();
5770
5771 session1.execute("CREATE GRAPH first").unwrap();
5772 session1.execute("CREATE GRAPH second").unwrap();
5773 session1.execute("SESSION SET GRAPH first").unwrap();
5774 session2.execute("SESSION SET GRAPH second").unwrap();
5775
5776 assert_eq!(session1.current_graph(), Some("first".to_string()));
5777 assert_eq!(session2.current_graph(), Some("second".to_string()));
5778 }
5779
5780 #[test]
5781 fn test_show_node_types() {
5782 let db = GrafeoDB::new_in_memory();
5783 let session = db.session();
5784
5785 session
5786 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5787 .unwrap();
5788
5789 let result = session.execute("SHOW NODE TYPES").unwrap();
5790 assert_eq!(
5791 result.columns,
5792 vec!["name", "properties", "constraints", "parents"]
5793 );
5794 assert_eq!(result.rows.len(), 1);
5795 assert_eq!(result.rows[0][0], Value::from("Person"));
5797 }
5798
5799 #[test]
5800 fn test_show_edge_types() {
5801 let db = GrafeoDB::new_in_memory();
5802 let session = db.session();
5803
5804 session
5805 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5806 .unwrap();
5807
5808 let result = session.execute("SHOW EDGE TYPES").unwrap();
5809 assert_eq!(
5810 result.columns,
5811 vec!["name", "properties", "source_types", "target_types"]
5812 );
5813 assert_eq!(result.rows.len(), 1);
5814 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5815 }
5816
5817 #[test]
5818 fn test_show_graph_types() {
5819 let db = GrafeoDB::new_in_memory();
5820 let session = db.session();
5821
5822 session
5823 .execute("CREATE NODE TYPE Person (name STRING)")
5824 .unwrap();
5825 session
5826 .execute(
5827 "CREATE GRAPH TYPE social (\
5828 NODE TYPE Person (name STRING)\
5829 )",
5830 )
5831 .unwrap();
5832
5833 let result = session.execute("SHOW GRAPH TYPES").unwrap();
5834 assert_eq!(
5835 result.columns,
5836 vec!["name", "open", "node_types", "edge_types"]
5837 );
5838 assert_eq!(result.rows.len(), 1);
5839 assert_eq!(result.rows[0][0], Value::from("social"));
5840 }
5841
5842 #[test]
5843 fn test_show_graph_type_named() {
5844 let db = GrafeoDB::new_in_memory();
5845 let session = db.session();
5846
5847 session
5848 .execute("CREATE NODE TYPE Person (name STRING)")
5849 .unwrap();
5850 session
5851 .execute(
5852 "CREATE GRAPH TYPE social (\
5853 NODE TYPE Person (name STRING)\
5854 )",
5855 )
5856 .unwrap();
5857
5858 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5859 assert_eq!(result.rows.len(), 1);
5860 assert_eq!(result.rows[0][0], Value::from("social"));
5861 }
5862
5863 #[test]
5864 fn test_show_graph_type_not_found() {
5865 let db = GrafeoDB::new_in_memory();
5866 let session = db.session();
5867
5868 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5869 assert!(result.is_err());
5870 }
5871
5872 #[test]
5873 fn test_show_indexes_via_gql() {
5874 let db = GrafeoDB::new_in_memory();
5875 let session = db.session();
5876
5877 let result = session.execute("SHOW INDEXES").unwrap();
5878 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5879 }
5880
5881 #[test]
5882 fn test_show_constraints_via_gql() {
5883 let db = GrafeoDB::new_in_memory();
5884 let session = db.session();
5885
5886 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5887 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5888 }
5889
5890 #[test]
5891 fn test_pattern_form_graph_type_roundtrip() {
5892 let db = GrafeoDB::new_in_memory();
5893 let session = db.session();
5894
5895 session
5897 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5898 .unwrap();
5899 session
5900 .execute("CREATE NODE TYPE City (name STRING)")
5901 .unwrap();
5902 session
5903 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5904 .unwrap();
5905 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5906
5907 session
5909 .execute(
5910 "CREATE GRAPH TYPE social (\
5911 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5912 (:Person)-[:LIVES_IN]->(:City)\
5913 )",
5914 )
5915 .unwrap();
5916
5917 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5919 assert_eq!(result.rows.len(), 1);
5920 assert_eq!(result.rows[0][0], Value::from("social"));
5921 }
5922 }
5923}