1#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_common::{grafeo_debug_span, grafeo_info_span, grafeo_warn};
17use grafeo_core::graph::Direction;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21use grafeo_core::graph::{GraphStore, GraphStoreMut};
22
23use crate::catalog::{Catalog, CatalogConstraintValidator};
24use crate::config::{AdaptiveConfig, GraphModel};
25use crate::database::QueryResult;
26use crate::query::cache::QueryCache;
27use crate::transaction::TransactionManager;
28
29const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
32
33fn parse_default_literal(text: &str) -> Value {
38 if text.eq_ignore_ascii_case("null") {
39 return Value::Null;
40 }
41 if text.eq_ignore_ascii_case("true") {
42 return Value::Bool(true);
43 }
44 if text.eq_ignore_ascii_case("false") {
45 return Value::Bool(false);
46 }
47 if (text.starts_with('\'') && text.ends_with('\''))
49 || (text.starts_with('"') && text.ends_with('"'))
50 {
51 return Value::String(text[1..text.len() - 1].into());
52 }
53 if let Ok(i) = text.parse::<i64>() {
55 return Value::Int64(i);
56 }
57 if let Ok(f) = text.parse::<f64>() {
58 return Value::Float64(f);
59 }
60 Value::String(text.into())
62}
63
64pub(crate) struct SessionConfig {
69 pub transaction_manager: Arc<TransactionManager>,
70 pub query_cache: Arc<QueryCache>,
71 pub catalog: Arc<Catalog>,
72 pub adaptive_config: AdaptiveConfig,
73 pub factorized_execution: bool,
74 pub graph_model: GraphModel,
75 pub query_timeout: Option<Duration>,
76 pub commit_counter: Arc<AtomicUsize>,
77 pub gc_interval: usize,
78 pub read_only: bool,
80}
81
82pub struct Session {
88 store: Arc<LpgStore>,
90 graph_store: Arc<dyn GraphStore>,
92 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
94 catalog: Arc<Catalog>,
96 #[cfg(feature = "rdf")]
98 rdf_store: Arc<RdfStore>,
99 transaction_manager: Arc<TransactionManager>,
101 query_cache: Arc<QueryCache>,
103 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
107 read_only_tx: parking_lot::Mutex<bool>,
109 db_read_only: bool,
112 auto_commit: bool,
114 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
117 factorized_execution: bool,
119 graph_model: GraphModel,
121 query_timeout: Option<Duration>,
123 commit_counter: Arc<AtomicUsize>,
125 gc_interval: usize,
127 transaction_start_node_count: AtomicUsize,
129 transaction_start_edge_count: AtomicUsize,
131 #[cfg(feature = "wal")]
133 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
134 #[cfg(feature = "wal")]
136 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
137 #[cfg(feature = "cdc")]
139 cdc_log: Arc<crate::cdc::CdcLog>,
140 #[cfg(feature = "cdc")]
143 cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
144 current_graph: parking_lot::Mutex<Option<String>>,
146 current_schema: parking_lot::Mutex<Option<String>>,
149 time_zone: parking_lot::Mutex<Option<String>>,
151 session_params:
153 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
154 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
156 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
158 transaction_nesting_depth: parking_lot::Mutex<u32>,
162 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
166 #[cfg(feature = "metrics")]
168 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
169 #[cfg(feature = "metrics")]
171 tx_start_time: parking_lot::Mutex<Option<Instant>>,
172}
173
174#[derive(Clone)]
176struct GraphSavepoint {
177 graph_name: Option<String>,
178 next_node_id: u64,
179 next_edge_id: u64,
180 undo_log_position: usize,
181}
182
183#[derive(Clone)]
185struct SavepointState {
186 name: String,
187 graph_snapshots: Vec<GraphSavepoint>,
188 #[allow(dead_code)]
191 active_graph: Option<String>,
192 #[cfg(feature = "cdc")]
195 cdc_event_position: usize,
196}
197
198impl Session {
199 #[allow(dead_code)]
201 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
202 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
203 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
204 Self {
205 store,
206 graph_store,
207 graph_store_mut,
208 catalog: cfg.catalog,
209 #[cfg(feature = "rdf")]
210 rdf_store: Arc::new(RdfStore::new()),
211 transaction_manager: cfg.transaction_manager,
212 query_cache: cfg.query_cache,
213 current_transaction: parking_lot::Mutex::new(None),
214 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
215 db_read_only: cfg.read_only,
216 auto_commit: true,
217 adaptive_config: cfg.adaptive_config,
218 factorized_execution: cfg.factorized_execution,
219 graph_model: cfg.graph_model,
220 query_timeout: cfg.query_timeout,
221 commit_counter: cfg.commit_counter,
222 gc_interval: cfg.gc_interval,
223 transaction_start_node_count: AtomicUsize::new(0),
224 transaction_start_edge_count: AtomicUsize::new(0),
225 #[cfg(feature = "wal")]
226 wal: None,
227 #[cfg(feature = "wal")]
228 wal_graph_context: None,
229 #[cfg(feature = "cdc")]
230 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
231 #[cfg(feature = "cdc")]
232 cdc_pending_events: None,
233 current_graph: parking_lot::Mutex::new(None),
234 current_schema: parking_lot::Mutex::new(None),
235 time_zone: parking_lot::Mutex::new(None),
236 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
237 viewing_epoch_override: parking_lot::Mutex::new(None),
238 savepoints: parking_lot::Mutex::new(Vec::new()),
239 transaction_nesting_depth: parking_lot::Mutex::new(0),
240 touched_graphs: parking_lot::Mutex::new(Vec::new()),
241 #[cfg(feature = "metrics")]
242 metrics: None,
243 #[cfg(feature = "metrics")]
244 tx_start_time: parking_lot::Mutex::new(None),
245 }
246 }
247
248 #[cfg(feature = "wal")]
253 pub(crate) fn set_wal(
254 &mut self,
255 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
256 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
257 ) {
258 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
260 Arc::clone(&self.store),
261 Arc::clone(&wal),
262 Arc::clone(&wal_graph_context),
263 ));
264 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
265 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
266 self.wal = Some(wal);
267 self.wal_graph_context = Some(wal_graph_context);
268 }
269
270 #[cfg(feature = "cdc")]
277 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
278 if let Some(ref write_store) = self.graph_store_mut {
281 let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
282 Arc::clone(write_store),
283 Arc::clone(&cdc_log),
284 ));
285 self.cdc_pending_events = Some(cdc_store.pending_events());
286 self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
287 }
288 self.cdc_log = cdc_log;
289 }
290
291 #[cfg(feature = "metrics")]
293 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
294 self.metrics = Some(metrics);
295 }
296
297 pub(crate) fn with_external_store(
306 read_store: Arc<dyn GraphStore>,
307 write_store: Option<Arc<dyn GraphStoreMut>>,
308 cfg: SessionConfig,
309 ) -> Result<Self> {
310 Ok(Self {
311 store: Arc::new(LpgStore::new()?),
312 graph_store: read_store,
313 graph_store_mut: write_store,
314 catalog: cfg.catalog,
315 #[cfg(feature = "rdf")]
316 rdf_store: Arc::new(RdfStore::new()),
317 transaction_manager: cfg.transaction_manager,
318 query_cache: cfg.query_cache,
319 current_transaction: parking_lot::Mutex::new(None),
320 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
321 db_read_only: cfg.read_only,
322 auto_commit: true,
323 adaptive_config: cfg.adaptive_config,
324 factorized_execution: cfg.factorized_execution,
325 graph_model: cfg.graph_model,
326 query_timeout: cfg.query_timeout,
327 commit_counter: cfg.commit_counter,
328 gc_interval: cfg.gc_interval,
329 transaction_start_node_count: AtomicUsize::new(0),
330 transaction_start_edge_count: AtomicUsize::new(0),
331 #[cfg(feature = "wal")]
332 wal: None,
333 #[cfg(feature = "wal")]
334 wal_graph_context: None,
335 #[cfg(feature = "cdc")]
336 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
337 #[cfg(feature = "cdc")]
338 cdc_pending_events: None,
339 current_graph: parking_lot::Mutex::new(None),
340 current_schema: parking_lot::Mutex::new(None),
341 time_zone: parking_lot::Mutex::new(None),
342 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
343 viewing_epoch_override: parking_lot::Mutex::new(None),
344 savepoints: parking_lot::Mutex::new(Vec::new()),
345 transaction_nesting_depth: parking_lot::Mutex::new(0),
346 touched_graphs: parking_lot::Mutex::new(Vec::new()),
347 #[cfg(feature = "metrics")]
348 metrics: None,
349 #[cfg(feature = "metrics")]
350 tx_start_time: parking_lot::Mutex::new(None),
351 })
352 }
353
354 #[must_use]
356 pub fn graph_model(&self) -> GraphModel {
357 self.graph_model
358 }
359
360 pub fn use_graph(&self, name: &str) {
364 *self.current_graph.lock() = Some(name.to_string());
365 }
366
367 #[must_use]
369 pub fn current_graph(&self) -> Option<String> {
370 self.current_graph.lock().clone()
371 }
372
373 pub fn set_schema(&self, name: &str) {
377 *self.current_schema.lock() = Some(name.to_string());
378 }
379
380 #[must_use]
384 pub fn current_schema(&self) -> Option<String> {
385 self.current_schema.lock().clone()
386 }
387
388 fn effective_graph_key(&self, graph_name: &str) -> String {
393 let schema = self.current_schema.lock().clone();
394 match schema {
395 Some(s) => format!("{s}/{graph_name}"),
396 None => graph_name.to_string(),
397 }
398 }
399
400 fn effective_type_key(&self, type_name: &str) -> String {
404 let schema = self.current_schema.lock().clone();
405 match schema {
406 Some(s) => format!("{s}/{type_name}"),
407 None => type_name.to_string(),
408 }
409 }
410
411 fn active_graph_storage_key(&self) -> Option<String> {
415 let graph = self.current_graph.lock().clone();
416 let schema = self.current_schema.lock().clone();
417 match (&schema, &graph) {
418 (None, None) => None,
419 (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
420 (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
421 (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
422 Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
423 }
424 (None, Some(name)) => Some(name.clone()),
425 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
426 }
427 }
428
429 fn active_store(&self) -> Arc<dyn GraphStore> {
437 let key = self.active_graph_storage_key();
438 match key {
439 None => Arc::clone(&self.graph_store),
440 Some(ref name) => match self.store.graph(name) {
441 Some(named_store) => {
442 #[cfg(feature = "wal")]
443 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
444 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
445 named_store,
446 Arc::clone(wal),
447 name.clone(),
448 Arc::clone(ctx),
449 )) as Arc<dyn GraphStore>;
450 }
451 named_store as Arc<dyn GraphStore>
452 }
453 None => Arc::clone(&self.graph_store),
454 },
455 }
456 }
457
458 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
463 let key = self.active_graph_storage_key();
464 match key {
465 None => self.graph_store_mut.as_ref().map(Arc::clone),
466 Some(ref name) => match self.store.graph(name) {
467 Some(named_store) => {
468 let mut store: Arc<dyn GraphStoreMut> = named_store;
469
470 #[cfg(feature = "wal")]
471 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
472 store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
473 self.store
475 .graph(name)
476 .unwrap_or_else(|| Arc::clone(&self.store)),
477 Arc::clone(wal),
478 name.clone(),
479 Arc::clone(ctx),
480 ));
481 }
482
483 #[cfg(feature = "cdc")]
484 if let Some(ref pending) = self.cdc_pending_events {
485 store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
486 store,
487 Arc::clone(&self.cdc_log),
488 Arc::clone(pending),
489 ));
490 }
491
492 Some(store)
493 }
494 None => self.graph_store_mut.as_ref().map(Arc::clone),
495 },
496 }
497 }
498
499 fn active_lpg_store(&self) -> Arc<LpgStore> {
504 let key = self.active_graph_storage_key();
505 match key {
506 None => Arc::clone(&self.store),
507 Some(ref name) => self
508 .store
509 .graph(name)
510 .unwrap_or_else(|| Arc::clone(&self.store)),
511 }
512 }
513
514 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
517 match graph_name {
518 None => Arc::clone(&self.store),
519 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
520 Some(name) => self
521 .store
522 .graph(name)
523 .unwrap_or_else(|| Arc::clone(&self.store)),
524 }
525 }
526
527 fn track_graph_touch(&self) {
532 if self.current_transaction.lock().is_some() {
533 let key = self.active_graph_storage_key();
534 let mut touched = self.touched_graphs.lock();
535 if !touched.contains(&key) {
536 touched.push(key);
537 }
538 }
539 }
540
541 pub fn set_time_zone(&self, tz: &str) {
543 *self.time_zone.lock() = Some(tz.to_string());
544 }
545
546 #[must_use]
548 pub fn time_zone(&self) -> Option<String> {
549 self.time_zone.lock().clone()
550 }
551
552 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
554 self.session_params.lock().insert(key.to_string(), value);
555 }
556
557 #[must_use]
559 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
560 self.session_params.lock().get(key).cloned()
561 }
562
563 pub fn reset_session(&self) {
565 *self.current_schema.lock() = None;
566 *self.current_graph.lock() = None;
567 *self.time_zone.lock() = None;
568 self.session_params.lock().clear();
569 *self.viewing_epoch_override.lock() = None;
570 }
571
572 pub fn reset_schema(&self) {
574 *self.current_schema.lock() = None;
575 }
576
577 pub fn reset_graph(&self) {
579 *self.current_graph.lock() = None;
580 }
581
582 pub fn reset_time_zone(&self) {
584 *self.time_zone.lock() = None;
585 }
586
587 pub fn reset_parameters(&self) {
589 self.session_params.lock().clear();
590 }
591
592 pub fn set_viewing_epoch(&self, epoch: EpochId) {
600 *self.viewing_epoch_override.lock() = Some(epoch);
601 }
602
603 pub fn clear_viewing_epoch(&self) {
605 *self.viewing_epoch_override.lock() = None;
606 }
607
608 #[must_use]
610 pub fn viewing_epoch(&self) -> Option<EpochId> {
611 *self.viewing_epoch_override.lock()
612 }
613
614 #[must_use]
618 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
619 self.active_lpg_store().get_node_history(id)
620 }
621
622 #[must_use]
626 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
627 self.active_lpg_store().get_edge_history(id)
628 }
629
630 fn require_lpg(&self, language: &str) -> Result<()> {
632 if self.graph_model == GraphModel::Rdf {
633 return Err(grafeo_common::utils::error::Error::Internal(format!(
634 "This is an RDF database. {language} queries require an LPG database."
635 )));
636 }
637 Ok(())
638 }
639
640 #[cfg(feature = "gql")]
642 fn execute_session_command(
643 &self,
644 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
645 ) -> Result<QueryResult> {
646 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
647 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
648
649 if *self.read_only_tx.lock() {
651 match &cmd {
652 SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
653 return Err(Error::Transaction(
654 grafeo_common::utils::error::TransactionError::ReadOnly,
655 ));
656 }
657 _ => {} }
659 }
660
661 match cmd {
662 SessionCommand::CreateGraph {
663 name,
664 if_not_exists,
665 typed,
666 like_graph,
667 copy_of,
668 open: _,
669 } => {
670 let storage_key = self.effective_graph_key(&name);
672
673 if let Some(ref src) = like_graph {
675 let src_key = self.effective_graph_key(src);
676 if self.store.graph(&src_key).is_none() {
677 return Err(Error::Query(QueryError::new(
678 QueryErrorKind::Semantic,
679 format!("Source graph '{src}' does not exist"),
680 )));
681 }
682 }
683 if let Some(ref src) = copy_of {
684 let src_key = self.effective_graph_key(src);
685 if self.store.graph(&src_key).is_none() {
686 return Err(Error::Query(QueryError::new(
687 QueryErrorKind::Semantic,
688 format!("Source graph '{src}' does not exist"),
689 )));
690 }
691 }
692
693 let created = self
694 .store
695 .create_graph(&storage_key)
696 .map_err(|e| Error::Internal(e.to_string()))?;
697 if !created && !if_not_exists {
698 return Err(Error::Query(QueryError::new(
699 QueryErrorKind::Semantic,
700 format!("Graph '{name}' already exists"),
701 )));
702 }
703 if created {
704 #[cfg(feature = "wal")]
705 self.log_schema_wal(
706 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
707 name: storage_key.clone(),
708 },
709 );
710 }
711
712 if let Some(ref src) = copy_of {
714 let src_key = self.effective_graph_key(src);
715 self.store
716 .copy_graph(Some(&src_key), Some(&storage_key))
717 .map_err(|e| Error::Internal(e.to_string()))?;
718 }
719
720 if let Some(type_name) = typed
724 && let Err(e) = self.catalog.bind_graph_type(
725 &storage_key,
726 if type_name.contains('/') {
727 type_name.clone()
728 } else {
729 self.effective_type_key(&type_name)
730 },
731 )
732 {
733 return Err(Error::Query(QueryError::new(
734 QueryErrorKind::Semantic,
735 e.to_string(),
736 )));
737 }
738
739 if let Some(ref src) = like_graph {
741 let src_key = self.effective_graph_key(src);
742 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
743 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
744 }
745 }
746
747 Ok(QueryResult::empty())
748 }
749 SessionCommand::DropGraph { name, if_exists } => {
750 let storage_key = self.effective_graph_key(&name);
751 let dropped = self.store.drop_graph(&storage_key);
752 if !dropped && !if_exists {
753 return Err(Error::Query(QueryError::new(
754 QueryErrorKind::Semantic,
755 format!("Graph '{name}' does not exist"),
756 )));
757 }
758 if dropped {
759 #[cfg(feature = "wal")]
760 self.log_schema_wal(
761 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
762 name: storage_key.clone(),
763 },
764 );
765 let mut current = self.current_graph.lock();
767 if current
768 .as_deref()
769 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
770 {
771 *current = None;
772 }
773 }
774 Ok(QueryResult::empty())
775 }
776 SessionCommand::UseGraph(name) => {
777 let effective_key = self.effective_graph_key(&name);
779 if !name.eq_ignore_ascii_case("default")
780 && self.store.graph(&effective_key).is_none()
781 {
782 return Err(Error::Query(QueryError::new(
783 QueryErrorKind::Semantic,
784 format!("Graph '{name}' does not exist"),
785 )));
786 }
787 self.use_graph(&name);
788 self.track_graph_touch();
790 Ok(QueryResult::empty())
791 }
792 SessionCommand::SessionSetGraph(name) => {
793 let effective_key = self.effective_graph_key(&name);
795 if !name.eq_ignore_ascii_case("default")
796 && self.store.graph(&effective_key).is_none()
797 {
798 return Err(Error::Query(QueryError::new(
799 QueryErrorKind::Semantic,
800 format!("Graph '{name}' does not exist"),
801 )));
802 }
803 self.use_graph(&name);
804 self.track_graph_touch();
806 Ok(QueryResult::empty())
807 }
808 SessionCommand::SessionSetSchema(name) => {
809 if !self.catalog.schema_exists(&name) {
811 return Err(Error::Query(QueryError::new(
812 QueryErrorKind::Semantic,
813 format!("Schema '{name}' does not exist"),
814 )));
815 }
816 self.set_schema(&name);
817 Ok(QueryResult::empty())
818 }
819 SessionCommand::SessionSetTimeZone(tz) => {
820 self.set_time_zone(&tz);
821 Ok(QueryResult::empty())
822 }
823 SessionCommand::SessionSetParameter(key, expr) => {
824 if key.eq_ignore_ascii_case("viewing_epoch") {
825 match Self::eval_integer_literal(&expr) {
826 Some(n) if n >= 0 => {
827 self.set_viewing_epoch(EpochId::new(n as u64));
828 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
829 }
830 _ => Err(Error::Query(QueryError::new(
831 QueryErrorKind::Semantic,
832 "viewing_epoch must be a non-negative integer literal",
833 ))),
834 }
835 } else {
836 self.set_parameter(&key, Value::Null);
839 Ok(QueryResult::empty())
840 }
841 }
842 SessionCommand::SessionReset(target) => {
843 use grafeo_adapters::query::gql::ast::SessionResetTarget;
844 match target {
845 SessionResetTarget::All => self.reset_session(),
846 SessionResetTarget::Schema => self.reset_schema(),
847 SessionResetTarget::Graph => self.reset_graph(),
848 SessionResetTarget::TimeZone => self.reset_time_zone(),
849 SessionResetTarget::Parameters => self.reset_parameters(),
850 }
851 Ok(QueryResult::empty())
852 }
853 SessionCommand::SessionClose => {
854 self.reset_session();
855 Ok(QueryResult::empty())
856 }
857 SessionCommand::StartTransaction {
858 read_only,
859 isolation_level,
860 } => {
861 let engine_level = isolation_level.map(|l| match l {
862 TransactionIsolationLevel::ReadCommitted => {
863 crate::transaction::IsolationLevel::ReadCommitted
864 }
865 TransactionIsolationLevel::SnapshotIsolation => {
866 crate::transaction::IsolationLevel::SnapshotIsolation
867 }
868 TransactionIsolationLevel::Serializable => {
869 crate::transaction::IsolationLevel::Serializable
870 }
871 });
872 self.begin_transaction_inner(read_only, engine_level)?;
873 Ok(QueryResult::status("Transaction started"))
874 }
875 SessionCommand::Commit => {
876 self.commit_inner()?;
877 Ok(QueryResult::status("Transaction committed"))
878 }
879 SessionCommand::Rollback => {
880 self.rollback_inner()?;
881 Ok(QueryResult::status("Transaction rolled back"))
882 }
883 SessionCommand::Savepoint(name) => {
884 self.savepoint(&name)?;
885 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
886 }
887 SessionCommand::RollbackToSavepoint(name) => {
888 self.rollback_to_savepoint(&name)?;
889 Ok(QueryResult::status(format!(
890 "Rolled back to savepoint '{name}'"
891 )))
892 }
893 SessionCommand::ReleaseSavepoint(name) => {
894 self.release_savepoint(&name)?;
895 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
896 }
897 }
898 }
899
900 #[cfg(feature = "wal")]
902 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
903 if let Some(ref wal) = self.wal
904 && let Err(e) = wal.log(record)
905 {
906 grafeo_warn!("Failed to log schema change to WAL: {}", e);
907 }
908 }
909
910 #[cfg(feature = "gql")]
912 fn execute_schema_command(
913 &self,
914 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
915 ) -> Result<QueryResult> {
916 use crate::catalog::{
917 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
918 };
919 use grafeo_adapters::query::gql::ast::SchemaStatement;
920 #[cfg(feature = "wal")]
921 use grafeo_adapters::storage::wal::WalRecord;
922 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
923
924 macro_rules! wal_log {
926 ($self:expr, $record:expr) => {
927 #[cfg(feature = "wal")]
928 $self.log_schema_wal(&$record);
929 };
930 }
931
932 let result = match cmd {
933 SchemaStatement::CreateNodeType(stmt) => {
934 let effective_name = self.effective_type_key(&stmt.name);
935 #[cfg(feature = "wal")]
936 let props_for_wal: Vec<(String, String, bool)> = stmt
937 .properties
938 .iter()
939 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
940 .collect();
941 let def = NodeTypeDefinition {
942 name: effective_name.clone(),
943 properties: stmt
944 .properties
945 .iter()
946 .map(|p| TypedProperty {
947 name: p.name.clone(),
948 data_type: PropertyDataType::from_type_name(&p.data_type),
949 nullable: p.nullable,
950 default_value: p
951 .default_value
952 .as_ref()
953 .map(|s| parse_default_literal(s)),
954 })
955 .collect(),
956 constraints: Vec::new(),
957 parent_types: stmt.parent_types.clone(),
958 };
959 let result = if stmt.or_replace {
960 let _ = self.catalog.drop_node_type(&effective_name);
961 self.catalog.register_node_type(def)
962 } else {
963 self.catalog.register_node_type(def)
964 };
965 match result {
966 Ok(()) => {
967 wal_log!(
968 self,
969 WalRecord::CreateNodeType {
970 name: effective_name.clone(),
971 properties: props_for_wal,
972 constraints: Vec::new(),
973 }
974 );
975 Ok(QueryResult::status(format!(
976 "Created node type '{}'",
977 stmt.name
978 )))
979 }
980 Err(e) if stmt.if_not_exists => {
981 let _ = e;
982 Ok(QueryResult::status("No change"))
983 }
984 Err(e) => Err(Error::Query(QueryError::new(
985 QueryErrorKind::Semantic,
986 e.to_string(),
987 ))),
988 }
989 }
990 SchemaStatement::CreateEdgeType(stmt) => {
991 let effective_name = self.effective_type_key(&stmt.name);
992 #[cfg(feature = "wal")]
993 let props_for_wal: Vec<(String, String, bool)> = stmt
994 .properties
995 .iter()
996 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
997 .collect();
998 let def = EdgeTypeDefinition {
999 name: effective_name.clone(),
1000 properties: stmt
1001 .properties
1002 .iter()
1003 .map(|p| TypedProperty {
1004 name: p.name.clone(),
1005 data_type: PropertyDataType::from_type_name(&p.data_type),
1006 nullable: p.nullable,
1007 default_value: p
1008 .default_value
1009 .as_ref()
1010 .map(|s| parse_default_literal(s)),
1011 })
1012 .collect(),
1013 constraints: Vec::new(),
1014 source_node_types: stmt.source_node_types.clone(),
1015 target_node_types: stmt.target_node_types.clone(),
1016 };
1017 let result = if stmt.or_replace {
1018 let _ = self.catalog.drop_edge_type_def(&effective_name);
1019 self.catalog.register_edge_type_def(def)
1020 } else {
1021 self.catalog.register_edge_type_def(def)
1022 };
1023 match result {
1024 Ok(()) => {
1025 wal_log!(
1026 self,
1027 WalRecord::CreateEdgeType {
1028 name: effective_name.clone(),
1029 properties: props_for_wal,
1030 constraints: Vec::new(),
1031 }
1032 );
1033 Ok(QueryResult::status(format!(
1034 "Created edge type '{}'",
1035 stmt.name
1036 )))
1037 }
1038 Err(e) if stmt.if_not_exists => {
1039 let _ = e;
1040 Ok(QueryResult::status("No change"))
1041 }
1042 Err(e) => Err(Error::Query(QueryError::new(
1043 QueryErrorKind::Semantic,
1044 e.to_string(),
1045 ))),
1046 }
1047 }
1048 SchemaStatement::CreateVectorIndex(stmt) => {
1049 Self::create_vector_index_on_store(
1050 &self.active_lpg_store(),
1051 &stmt.node_label,
1052 &stmt.property,
1053 stmt.dimensions,
1054 stmt.metric.as_deref(),
1055 )?;
1056 wal_log!(
1057 self,
1058 WalRecord::CreateIndex {
1059 name: stmt.name.clone(),
1060 label: stmt.node_label.clone(),
1061 property: stmt.property.clone(),
1062 index_type: "vector".to_string(),
1063 }
1064 );
1065 Ok(QueryResult::status(format!(
1066 "Created vector index '{}'",
1067 stmt.name
1068 )))
1069 }
1070 SchemaStatement::DropNodeType { name, if_exists } => {
1071 let effective_name = self.effective_type_key(&name);
1072 match self.catalog.drop_node_type(&effective_name) {
1073 Ok(()) => {
1074 wal_log!(
1075 self,
1076 WalRecord::DropNodeType {
1077 name: effective_name
1078 }
1079 );
1080 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1081 }
1082 Err(e) if if_exists => {
1083 let _ = e;
1084 Ok(QueryResult::status("No change"))
1085 }
1086 Err(e) => Err(Error::Query(QueryError::new(
1087 QueryErrorKind::Semantic,
1088 e.to_string(),
1089 ))),
1090 }
1091 }
1092 SchemaStatement::DropEdgeType { name, if_exists } => {
1093 let effective_name = self.effective_type_key(&name);
1094 match self.catalog.drop_edge_type_def(&effective_name) {
1095 Ok(()) => {
1096 wal_log!(
1097 self,
1098 WalRecord::DropEdgeType {
1099 name: effective_name
1100 }
1101 );
1102 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1103 }
1104 Err(e) if if_exists => {
1105 let _ = e;
1106 Ok(QueryResult::status("No change"))
1107 }
1108 Err(e) => Err(Error::Query(QueryError::new(
1109 QueryErrorKind::Semantic,
1110 e.to_string(),
1111 ))),
1112 }
1113 }
1114 SchemaStatement::CreateIndex(stmt) => {
1115 use crate::catalog::IndexType as CatalogIndexType;
1116 use grafeo_adapters::query::gql::ast::IndexKind;
1117 let active = self.active_lpg_store();
1118 let index_type_str = match stmt.index_kind {
1119 IndexKind::Property => "property",
1120 IndexKind::BTree => "btree",
1121 IndexKind::Text => "text",
1122 IndexKind::Vector => "vector",
1123 };
1124 match stmt.index_kind {
1125 IndexKind::Property | IndexKind::BTree => {
1126 for prop in &stmt.properties {
1127 active.create_property_index(prop);
1128 }
1129 }
1130 IndexKind::Text => {
1131 for prop in &stmt.properties {
1132 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1133 }
1134 }
1135 IndexKind::Vector => {
1136 for prop in &stmt.properties {
1137 Self::create_vector_index_on_store(
1138 &active,
1139 &stmt.label,
1140 prop,
1141 stmt.options.dimensions,
1142 stmt.options.metric.as_deref(),
1143 )?;
1144 }
1145 }
1146 }
1147 let catalog_index_type = match stmt.index_kind {
1150 IndexKind::Property => CatalogIndexType::Hash,
1151 IndexKind::BTree => CatalogIndexType::BTree,
1152 IndexKind::Text => CatalogIndexType::FullText,
1153 IndexKind::Vector => CatalogIndexType::Hash,
1154 };
1155 let label_id = self.catalog.get_or_create_label(&stmt.label);
1156 for prop in &stmt.properties {
1157 let prop_id = self.catalog.get_or_create_property_key(prop);
1158 self.catalog
1159 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1160 }
1161 #[cfg(feature = "wal")]
1162 for prop in &stmt.properties {
1163 wal_log!(
1164 self,
1165 WalRecord::CreateIndex {
1166 name: stmt.name.clone(),
1167 label: stmt.label.clone(),
1168 property: prop.clone(),
1169 index_type: index_type_str.to_string(),
1170 }
1171 );
1172 }
1173 Ok(QueryResult::status(format!(
1174 "Created {} index '{}'",
1175 index_type_str, stmt.name
1176 )))
1177 }
1178 SchemaStatement::DropIndex { name, if_exists } => {
1179 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1182 let def = self.catalog.get_index(index_id);
1183 self.catalog.drop_index(index_id);
1184 if let Some(def) = def
1185 && let Some(prop_name) =
1186 self.catalog.get_property_key_name(def.property_key)
1187 {
1188 self.active_lpg_store().drop_property_index(&prop_name);
1189 }
1190 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1191 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1192 } else if if_exists {
1193 Ok(QueryResult::status("No change".to_string()))
1194 } else {
1195 Err(Error::Query(QueryError::new(
1196 QueryErrorKind::Semantic,
1197 format!("Index '{name}' does not exist"),
1198 )))
1199 }
1200 }
1201 SchemaStatement::CreateConstraint(stmt) => {
1202 use crate::catalog::TypeConstraint;
1203 use grafeo_adapters::query::gql::ast::ConstraintKind;
1204 let kind_str = match stmt.constraint_kind {
1205 ConstraintKind::Unique => "unique",
1206 ConstraintKind::NodeKey => "node_key",
1207 ConstraintKind::NotNull => "not_null",
1208 ConstraintKind::Exists => "exists",
1209 };
1210 let constraint_name = stmt
1211 .name
1212 .clone()
1213 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1214
1215 match stmt.constraint_kind {
1217 ConstraintKind::Unique => {
1218 for prop in &stmt.properties {
1219 let label_id = self.catalog.get_or_create_label(&stmt.label);
1220 let prop_id = self.catalog.get_or_create_property_key(prop);
1221 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1222 }
1223 let _ = self.catalog.add_constraint_to_type(
1224 &stmt.label,
1225 TypeConstraint::Unique(stmt.properties.clone()),
1226 );
1227 }
1228 ConstraintKind::NodeKey => {
1229 for prop in &stmt.properties {
1230 let label_id = self.catalog.get_or_create_label(&stmt.label);
1231 let prop_id = self.catalog.get_or_create_property_key(prop);
1232 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1233 let _ = self.catalog.add_required_property(label_id, prop_id);
1234 }
1235 let _ = self.catalog.add_constraint_to_type(
1236 &stmt.label,
1237 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1238 );
1239 }
1240 ConstraintKind::NotNull | ConstraintKind::Exists => {
1241 for prop in &stmt.properties {
1242 let label_id = self.catalog.get_or_create_label(&stmt.label);
1243 let prop_id = self.catalog.get_or_create_property_key(prop);
1244 let _ = self.catalog.add_required_property(label_id, prop_id);
1245 let _ = self.catalog.add_constraint_to_type(
1246 &stmt.label,
1247 TypeConstraint::NotNull(prop.clone()),
1248 );
1249 }
1250 }
1251 }
1252
1253 wal_log!(
1254 self,
1255 WalRecord::CreateConstraint {
1256 name: constraint_name.clone(),
1257 label: stmt.label.clone(),
1258 properties: stmt.properties.clone(),
1259 kind: kind_str.to_string(),
1260 }
1261 );
1262 Ok(QueryResult::status(format!(
1263 "Created {kind_str} constraint '{constraint_name}'"
1264 )))
1265 }
1266 SchemaStatement::DropConstraint { name, if_exists } => {
1267 let _ = if_exists;
1268 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1269 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1270 }
1271 SchemaStatement::CreateGraphType(stmt) => {
1272 use crate::catalog::GraphTypeDefinition;
1273 use grafeo_adapters::query::gql::ast::InlineElementType;
1274
1275 let effective_name = self.effective_type_key(&stmt.name);
1276
1277 let (mut node_types, mut edge_types, open) =
1279 if let Some(ref like_graph) = stmt.like_graph {
1280 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1282 if let Some(existing) = self
1283 .catalog
1284 .schema()
1285 .and_then(|s| s.get_graph_type(&type_name))
1286 {
1287 (
1288 existing.allowed_node_types.clone(),
1289 existing.allowed_edge_types.clone(),
1290 existing.open,
1291 )
1292 } else {
1293 (Vec::new(), Vec::new(), true)
1294 }
1295 } else {
1296 let nt = self.catalog.all_node_type_names();
1298 let et = self.catalog.all_edge_type_names();
1299 if nt.is_empty() && et.is_empty() {
1300 (Vec::new(), Vec::new(), true)
1301 } else {
1302 (nt, et, false)
1303 }
1304 }
1305 } else {
1306 let nt = stmt
1308 .node_types
1309 .iter()
1310 .map(|n| self.effective_type_key(n))
1311 .collect();
1312 let et = stmt
1313 .edge_types
1314 .iter()
1315 .map(|n| self.effective_type_key(n))
1316 .collect();
1317 (nt, et, stmt.open)
1318 };
1319
1320 for inline in &stmt.inline_types {
1322 match inline {
1323 InlineElementType::Node {
1324 name,
1325 properties,
1326 key_labels,
1327 ..
1328 } => {
1329 let inline_effective = self.effective_type_key(name);
1330 let def = NodeTypeDefinition {
1331 name: inline_effective.clone(),
1332 properties: properties
1333 .iter()
1334 .map(|p| TypedProperty {
1335 name: p.name.clone(),
1336 data_type: PropertyDataType::from_type_name(&p.data_type),
1337 nullable: p.nullable,
1338 default_value: None,
1339 })
1340 .collect(),
1341 constraints: Vec::new(),
1342 parent_types: key_labels.clone(),
1343 };
1344 self.catalog.register_or_replace_node_type(def);
1346 #[cfg(feature = "wal")]
1347 {
1348 let props_for_wal: Vec<(String, String, bool)> = properties
1349 .iter()
1350 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1351 .collect();
1352 self.log_schema_wal(&WalRecord::CreateNodeType {
1353 name: inline_effective.clone(),
1354 properties: props_for_wal,
1355 constraints: Vec::new(),
1356 });
1357 }
1358 if !node_types.contains(&inline_effective) {
1359 node_types.push(inline_effective);
1360 }
1361 }
1362 InlineElementType::Edge {
1363 name,
1364 properties,
1365 source_node_types,
1366 target_node_types,
1367 ..
1368 } => {
1369 let inline_effective = self.effective_type_key(name);
1370 let def = EdgeTypeDefinition {
1371 name: inline_effective.clone(),
1372 properties: properties
1373 .iter()
1374 .map(|p| TypedProperty {
1375 name: p.name.clone(),
1376 data_type: PropertyDataType::from_type_name(&p.data_type),
1377 nullable: p.nullable,
1378 default_value: None,
1379 })
1380 .collect(),
1381 constraints: Vec::new(),
1382 source_node_types: source_node_types.clone(),
1383 target_node_types: target_node_types.clone(),
1384 };
1385 self.catalog.register_or_replace_edge_type_def(def);
1386 #[cfg(feature = "wal")]
1387 {
1388 let props_for_wal: Vec<(String, String, bool)> = properties
1389 .iter()
1390 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1391 .collect();
1392 self.log_schema_wal(&WalRecord::CreateEdgeType {
1393 name: inline_effective.clone(),
1394 properties: props_for_wal,
1395 constraints: Vec::new(),
1396 });
1397 }
1398 if !edge_types.contains(&inline_effective) {
1399 edge_types.push(inline_effective);
1400 }
1401 }
1402 }
1403 }
1404
1405 let def = GraphTypeDefinition {
1406 name: effective_name.clone(),
1407 allowed_node_types: node_types.clone(),
1408 allowed_edge_types: edge_types.clone(),
1409 open,
1410 };
1411 let result = if stmt.or_replace {
1412 let _ = self.catalog.drop_graph_type(&effective_name);
1414 self.catalog.register_graph_type(def)
1415 } else {
1416 self.catalog.register_graph_type(def)
1417 };
1418 match result {
1419 Ok(()) => {
1420 wal_log!(
1421 self,
1422 WalRecord::CreateGraphType {
1423 name: effective_name.clone(),
1424 node_types,
1425 edge_types,
1426 open,
1427 }
1428 );
1429 Ok(QueryResult::status(format!(
1430 "Created graph type '{}'",
1431 stmt.name
1432 )))
1433 }
1434 Err(e) if stmt.if_not_exists => {
1435 let _ = e;
1436 Ok(QueryResult::status("No change"))
1437 }
1438 Err(e) => Err(Error::Query(QueryError::new(
1439 QueryErrorKind::Semantic,
1440 e.to_string(),
1441 ))),
1442 }
1443 }
1444 SchemaStatement::DropGraphType { name, if_exists } => {
1445 let effective_name = self.effective_type_key(&name);
1446 match self.catalog.drop_graph_type(&effective_name) {
1447 Ok(()) => {
1448 wal_log!(
1449 self,
1450 WalRecord::DropGraphType {
1451 name: effective_name
1452 }
1453 );
1454 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1455 }
1456 Err(e) if if_exists => {
1457 let _ = e;
1458 Ok(QueryResult::status("No change"))
1459 }
1460 Err(e) => Err(Error::Query(QueryError::new(
1461 QueryErrorKind::Semantic,
1462 e.to_string(),
1463 ))),
1464 }
1465 }
1466 SchemaStatement::CreateSchema {
1467 name,
1468 if_not_exists,
1469 } => match self.catalog.register_schema_namespace(name.clone()) {
1470 Ok(()) => {
1471 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1472 let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1475 if self.store.create_graph(&default_key).unwrap_or(false) {
1476 wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1477 }
1478 Ok(QueryResult::status(format!("Created schema '{name}'")))
1479 }
1480 Err(e) if if_not_exists => {
1481 let _ = e;
1482 Ok(QueryResult::status("No change"))
1483 }
1484 Err(e) => Err(Error::Query(QueryError::new(
1485 QueryErrorKind::Semantic,
1486 e.to_string(),
1487 ))),
1488 },
1489 SchemaStatement::DropSchema { name, if_exists } => {
1490 let prefix = format!("{name}/");
1493 let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1494 let has_graphs = self
1495 .store
1496 .graph_names()
1497 .iter()
1498 .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1499 let has_types = self
1500 .catalog
1501 .all_node_type_names()
1502 .iter()
1503 .any(|n| n.starts_with(&prefix))
1504 || self
1505 .catalog
1506 .all_edge_type_names()
1507 .iter()
1508 .any(|n| n.starts_with(&prefix))
1509 || self
1510 .catalog
1511 .all_graph_type_names()
1512 .iter()
1513 .any(|n| n.starts_with(&prefix));
1514 if has_graphs || has_types {
1515 return Err(Error::Query(QueryError::new(
1516 QueryErrorKind::Semantic,
1517 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1518 )));
1519 }
1520 match self.catalog.drop_schema_namespace(&name) {
1521 Ok(()) => {
1522 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1523 if self.store.drop_graph(&default_graph_key) {
1525 wal_log!(
1526 self,
1527 WalRecord::DropNamedGraph {
1528 name: default_graph_key,
1529 }
1530 );
1531 }
1532 let mut current = self.current_schema.lock();
1534 if current
1535 .as_deref()
1536 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1537 {
1538 *current = None;
1539 }
1540 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1541 }
1542 Err(e) if if_exists => {
1543 let _ = e;
1544 Ok(QueryResult::status("No change"))
1545 }
1546 Err(e) => Err(Error::Query(QueryError::new(
1547 QueryErrorKind::Semantic,
1548 e.to_string(),
1549 ))),
1550 }
1551 }
1552 SchemaStatement::AlterNodeType(stmt) => {
1553 use grafeo_adapters::query::gql::ast::TypeAlteration;
1554 let effective_name = self.effective_type_key(&stmt.name);
1555 let mut wal_alts = Vec::new();
1556 for alt in &stmt.alterations {
1557 match alt {
1558 TypeAlteration::AddProperty(prop) => {
1559 let typed = TypedProperty {
1560 name: prop.name.clone(),
1561 data_type: PropertyDataType::from_type_name(&prop.data_type),
1562 nullable: prop.nullable,
1563 default_value: prop
1564 .default_value
1565 .as_ref()
1566 .map(|s| parse_default_literal(s)),
1567 };
1568 self.catalog
1569 .alter_node_type_add_property(&effective_name, typed)
1570 .map_err(|e| {
1571 Error::Query(QueryError::new(
1572 QueryErrorKind::Semantic,
1573 e.to_string(),
1574 ))
1575 })?;
1576 wal_alts.push((
1577 "add".to_string(),
1578 prop.name.clone(),
1579 prop.data_type.clone(),
1580 prop.nullable,
1581 ));
1582 }
1583 TypeAlteration::DropProperty(name) => {
1584 self.catalog
1585 .alter_node_type_drop_property(&effective_name, name)
1586 .map_err(|e| {
1587 Error::Query(QueryError::new(
1588 QueryErrorKind::Semantic,
1589 e.to_string(),
1590 ))
1591 })?;
1592 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1593 }
1594 }
1595 }
1596 wal_log!(
1597 self,
1598 WalRecord::AlterNodeType {
1599 name: effective_name,
1600 alterations: wal_alts,
1601 }
1602 );
1603 Ok(QueryResult::status(format!(
1604 "Altered node type '{}'",
1605 stmt.name
1606 )))
1607 }
1608 SchemaStatement::AlterEdgeType(stmt) => {
1609 use grafeo_adapters::query::gql::ast::TypeAlteration;
1610 let effective_name = self.effective_type_key(&stmt.name);
1611 let mut wal_alts = Vec::new();
1612 for alt in &stmt.alterations {
1613 match alt {
1614 TypeAlteration::AddProperty(prop) => {
1615 let typed = TypedProperty {
1616 name: prop.name.clone(),
1617 data_type: PropertyDataType::from_type_name(&prop.data_type),
1618 nullable: prop.nullable,
1619 default_value: prop
1620 .default_value
1621 .as_ref()
1622 .map(|s| parse_default_literal(s)),
1623 };
1624 self.catalog
1625 .alter_edge_type_add_property(&effective_name, typed)
1626 .map_err(|e| {
1627 Error::Query(QueryError::new(
1628 QueryErrorKind::Semantic,
1629 e.to_string(),
1630 ))
1631 })?;
1632 wal_alts.push((
1633 "add".to_string(),
1634 prop.name.clone(),
1635 prop.data_type.clone(),
1636 prop.nullable,
1637 ));
1638 }
1639 TypeAlteration::DropProperty(name) => {
1640 self.catalog
1641 .alter_edge_type_drop_property(&effective_name, name)
1642 .map_err(|e| {
1643 Error::Query(QueryError::new(
1644 QueryErrorKind::Semantic,
1645 e.to_string(),
1646 ))
1647 })?;
1648 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1649 }
1650 }
1651 }
1652 wal_log!(
1653 self,
1654 WalRecord::AlterEdgeType {
1655 name: effective_name,
1656 alterations: wal_alts,
1657 }
1658 );
1659 Ok(QueryResult::status(format!(
1660 "Altered edge type '{}'",
1661 stmt.name
1662 )))
1663 }
1664 SchemaStatement::AlterGraphType(stmt) => {
1665 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1666 let effective_name = self.effective_type_key(&stmt.name);
1667 let mut wal_alts = Vec::new();
1668 for alt in &stmt.alterations {
1669 match alt {
1670 GraphTypeAlteration::AddNodeType(name) => {
1671 self.catalog
1672 .alter_graph_type_add_node_type(&effective_name, name.clone())
1673 .map_err(|e| {
1674 Error::Query(QueryError::new(
1675 QueryErrorKind::Semantic,
1676 e.to_string(),
1677 ))
1678 })?;
1679 wal_alts.push(("add_node_type".to_string(), name.clone()));
1680 }
1681 GraphTypeAlteration::DropNodeType(name) => {
1682 self.catalog
1683 .alter_graph_type_drop_node_type(&effective_name, name)
1684 .map_err(|e| {
1685 Error::Query(QueryError::new(
1686 QueryErrorKind::Semantic,
1687 e.to_string(),
1688 ))
1689 })?;
1690 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1691 }
1692 GraphTypeAlteration::AddEdgeType(name) => {
1693 self.catalog
1694 .alter_graph_type_add_edge_type(&effective_name, name.clone())
1695 .map_err(|e| {
1696 Error::Query(QueryError::new(
1697 QueryErrorKind::Semantic,
1698 e.to_string(),
1699 ))
1700 })?;
1701 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1702 }
1703 GraphTypeAlteration::DropEdgeType(name) => {
1704 self.catalog
1705 .alter_graph_type_drop_edge_type(&effective_name, name)
1706 .map_err(|e| {
1707 Error::Query(QueryError::new(
1708 QueryErrorKind::Semantic,
1709 e.to_string(),
1710 ))
1711 })?;
1712 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1713 }
1714 }
1715 }
1716 wal_log!(
1717 self,
1718 WalRecord::AlterGraphType {
1719 name: effective_name,
1720 alterations: wal_alts,
1721 }
1722 );
1723 Ok(QueryResult::status(format!(
1724 "Altered graph type '{}'",
1725 stmt.name
1726 )))
1727 }
1728 SchemaStatement::CreateProcedure(stmt) => {
1729 use crate::catalog::ProcedureDefinition;
1730
1731 let def = ProcedureDefinition {
1732 name: stmt.name.clone(),
1733 params: stmt
1734 .params
1735 .iter()
1736 .map(|p| (p.name.clone(), p.param_type.clone()))
1737 .collect(),
1738 returns: stmt
1739 .returns
1740 .iter()
1741 .map(|r| (r.name.clone(), r.return_type.clone()))
1742 .collect(),
1743 body: stmt.body.clone(),
1744 };
1745
1746 if stmt.or_replace {
1747 self.catalog.replace_procedure(def).map_err(|e| {
1748 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1749 })?;
1750 } else {
1751 match self.catalog.register_procedure(def) {
1752 Ok(()) => {}
1753 Err(_) if stmt.if_not_exists => {
1754 return Ok(QueryResult::empty());
1755 }
1756 Err(e) => {
1757 return Err(Error::Query(QueryError::new(
1758 QueryErrorKind::Semantic,
1759 e.to_string(),
1760 )));
1761 }
1762 }
1763 }
1764
1765 wal_log!(
1766 self,
1767 WalRecord::CreateProcedure {
1768 name: stmt.name.clone(),
1769 params: stmt
1770 .params
1771 .iter()
1772 .map(|p| (p.name.clone(), p.param_type.clone()))
1773 .collect(),
1774 returns: stmt
1775 .returns
1776 .iter()
1777 .map(|r| (r.name.clone(), r.return_type.clone()))
1778 .collect(),
1779 body: stmt.body,
1780 }
1781 );
1782 Ok(QueryResult::status(format!(
1783 "Created procedure '{}'",
1784 stmt.name
1785 )))
1786 }
1787 SchemaStatement::DropProcedure { name, if_exists } => {
1788 match self.catalog.drop_procedure(&name) {
1789 Ok(()) => {}
1790 Err(_) if if_exists => {
1791 return Ok(QueryResult::empty());
1792 }
1793 Err(e) => {
1794 return Err(Error::Query(QueryError::new(
1795 QueryErrorKind::Semantic,
1796 e.to_string(),
1797 )));
1798 }
1799 }
1800 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1801 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1802 }
1803 SchemaStatement::ShowIndexes => {
1804 return self.execute_show_indexes();
1805 }
1806 SchemaStatement::ShowConstraints => {
1807 return self.execute_show_constraints();
1808 }
1809 SchemaStatement::ShowNodeTypes => {
1810 return self.execute_show_node_types();
1811 }
1812 SchemaStatement::ShowEdgeTypes => {
1813 return self.execute_show_edge_types();
1814 }
1815 SchemaStatement::ShowGraphTypes => {
1816 return self.execute_show_graph_types();
1817 }
1818 SchemaStatement::ShowGraphType(name) => {
1819 return self.execute_show_graph_type(&name);
1820 }
1821 SchemaStatement::ShowCurrentGraphType => {
1822 return self.execute_show_current_graph_type();
1823 }
1824 SchemaStatement::ShowGraphs => {
1825 return self.execute_show_graphs();
1826 }
1827 SchemaStatement::ShowSchemas => {
1828 return self.execute_show_schemas();
1829 }
1830 };
1831
1832 if result.is_ok() {
1835 self.query_cache.clear();
1836 }
1837
1838 result
1839 }
1840
1841 #[cfg(all(feature = "gql", feature = "vector-index"))]
1843 fn create_vector_index_on_store(
1844 store: &LpgStore,
1845 label: &str,
1846 property: &str,
1847 dimensions: Option<usize>,
1848 metric: Option<&str>,
1849 ) -> Result<()> {
1850 use grafeo_common::types::{PropertyKey, Value};
1851 use grafeo_common::utils::error::Error;
1852 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1853
1854 let metric = match metric {
1855 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1856 Error::Internal(format!(
1857 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1858 ))
1859 })?,
1860 None => DistanceMetric::Cosine,
1861 };
1862
1863 let prop_key = PropertyKey::new(property);
1864 let mut found_dims: Option<usize> = dimensions;
1865 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1866
1867 for node in store.nodes_with_label(label) {
1868 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1869 if let Some(expected) = found_dims {
1870 if v.len() != expected {
1871 return Err(Error::Internal(format!(
1872 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1873 v.len(),
1874 node.id.0
1875 )));
1876 }
1877 } else {
1878 found_dims = Some(v.len());
1879 }
1880 vectors.push((node.id, v.to_vec()));
1881 }
1882 }
1883
1884 let Some(dims) = found_dims else {
1885 return Err(Error::Internal(format!(
1886 "No vector properties found on :{label}({property}) and no dimensions specified"
1887 )));
1888 };
1889
1890 let config = HnswConfig::new(dims, metric);
1891 let index = HnswIndex::with_capacity(config, vectors.len());
1892 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1893 for (node_id, vec) in &vectors {
1894 index.insert(*node_id, vec, &accessor);
1895 }
1896
1897 store.add_vector_index(label, property, Arc::new(index));
1898 Ok(())
1899 }
1900
1901 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1903 fn create_vector_index_on_store(
1904 _store: &LpgStore,
1905 _label: &str,
1906 _property: &str,
1907 _dimensions: Option<usize>,
1908 _metric: Option<&str>,
1909 ) -> Result<()> {
1910 Err(grafeo_common::utils::error::Error::Internal(
1911 "Vector index support requires the 'vector-index' feature".to_string(),
1912 ))
1913 }
1914
1915 #[cfg(all(feature = "gql", feature = "text-index"))]
1917 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1918 use grafeo_common::types::{PropertyKey, Value};
1919 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1920
1921 let mut index = InvertedIndex::new(BM25Config::default());
1922 let prop_key = PropertyKey::new(property);
1923
1924 let nodes = store.nodes_by_label(label);
1925 for node_id in nodes {
1926 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1927 index.insert(node_id, text.as_str());
1928 }
1929 }
1930
1931 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1932 Ok(())
1933 }
1934
1935 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1937 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1938 Err(grafeo_common::utils::error::Error::Internal(
1939 "Text index support requires the 'text-index' feature".to_string(),
1940 ))
1941 }
1942
1943 fn execute_show_indexes(&self) -> Result<QueryResult> {
1945 let indexes = self.catalog.all_indexes();
1946 let columns = vec![
1947 "name".to_string(),
1948 "type".to_string(),
1949 "label".to_string(),
1950 "property".to_string(),
1951 ];
1952 let rows: Vec<Vec<Value>> = indexes
1953 .into_iter()
1954 .map(|def| {
1955 let label_name = self
1956 .catalog
1957 .get_label_name(def.label)
1958 .unwrap_or_else(|| "?".into());
1959 let prop_name = self
1960 .catalog
1961 .get_property_key_name(def.property_key)
1962 .unwrap_or_else(|| "?".into());
1963 vec![
1964 Value::from(def.name),
1965 Value::from(format!("{:?}", def.index_type)),
1966 Value::from(&*label_name),
1967 Value::from(&*prop_name),
1968 ]
1969 })
1970 .collect();
1971 Ok(QueryResult {
1972 columns,
1973 column_types: Vec::new(),
1974 rows,
1975 ..QueryResult::empty()
1976 })
1977 }
1978
1979 fn execute_show_constraints(&self) -> Result<QueryResult> {
1981 Ok(QueryResult {
1984 columns: vec![
1985 "name".to_string(),
1986 "type".to_string(),
1987 "label".to_string(),
1988 "properties".to_string(),
1989 ],
1990 column_types: Vec::new(),
1991 rows: Vec::new(),
1992 ..QueryResult::empty()
1993 })
1994 }
1995
1996 fn execute_show_node_types(&self) -> Result<QueryResult> {
1998 let columns = vec![
1999 "name".to_string(),
2000 "properties".to_string(),
2001 "constraints".to_string(),
2002 "parents".to_string(),
2003 ];
2004 let schema = self.current_schema.lock().clone();
2005 let all_names = self.catalog.all_node_type_names();
2006 let type_names: Vec<String> = match &schema {
2007 Some(s) => {
2008 let prefix = format!("{s}/");
2009 all_names
2010 .into_iter()
2011 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2012 .collect()
2013 }
2014 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2015 };
2016 let rows: Vec<Vec<Value>> = type_names
2017 .into_iter()
2018 .filter_map(|name| {
2019 let lookup = match &schema {
2020 Some(s) => format!("{s}/{name}"),
2021 None => name.clone(),
2022 };
2023 let def = self.catalog.get_node_type(&lookup)?;
2024 let props: Vec<String> = def
2025 .properties
2026 .iter()
2027 .map(|p| {
2028 let nullable = if p.nullable { "" } else { " NOT NULL" };
2029 format!("{} {}{}", p.name, p.data_type, nullable)
2030 })
2031 .collect();
2032 let constraints: Vec<String> =
2033 def.constraints.iter().map(|c| format!("{c:?}")).collect();
2034 let parents = def.parent_types.join(", ");
2035 Some(vec![
2036 Value::from(name),
2037 Value::from(props.join(", ")),
2038 Value::from(constraints.join(", ")),
2039 Value::from(parents),
2040 ])
2041 })
2042 .collect();
2043 Ok(QueryResult {
2044 columns,
2045 column_types: Vec::new(),
2046 rows,
2047 ..QueryResult::empty()
2048 })
2049 }
2050
2051 fn execute_show_edge_types(&self) -> Result<QueryResult> {
2053 let columns = vec![
2054 "name".to_string(),
2055 "properties".to_string(),
2056 "source_types".to_string(),
2057 "target_types".to_string(),
2058 ];
2059 let schema = self.current_schema.lock().clone();
2060 let all_names = self.catalog.all_edge_type_names();
2061 let type_names: Vec<String> = match &schema {
2062 Some(s) => {
2063 let prefix = format!("{s}/");
2064 all_names
2065 .into_iter()
2066 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2067 .collect()
2068 }
2069 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2070 };
2071 let rows: Vec<Vec<Value>> = type_names
2072 .into_iter()
2073 .filter_map(|name| {
2074 let lookup = match &schema {
2075 Some(s) => format!("{s}/{name}"),
2076 None => name.clone(),
2077 };
2078 let def = self.catalog.get_edge_type_def(&lookup)?;
2079 let props: Vec<String> = def
2080 .properties
2081 .iter()
2082 .map(|p| {
2083 let nullable = if p.nullable { "" } else { " NOT NULL" };
2084 format!("{} {}{}", p.name, p.data_type, nullable)
2085 })
2086 .collect();
2087 let src = def.source_node_types.join(", ");
2088 let tgt = def.target_node_types.join(", ");
2089 Some(vec![
2090 Value::from(name),
2091 Value::from(props.join(", ")),
2092 Value::from(src),
2093 Value::from(tgt),
2094 ])
2095 })
2096 .collect();
2097 Ok(QueryResult {
2098 columns,
2099 column_types: Vec::new(),
2100 rows,
2101 ..QueryResult::empty()
2102 })
2103 }
2104
2105 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2107 let columns = vec![
2108 "name".to_string(),
2109 "open".to_string(),
2110 "node_types".to_string(),
2111 "edge_types".to_string(),
2112 ];
2113 let schema = self.current_schema.lock().clone();
2114 let all_names = self.catalog.all_graph_type_names();
2115 let type_names: Vec<String> = match &schema {
2116 Some(s) => {
2117 let prefix = format!("{s}/");
2118 all_names
2119 .into_iter()
2120 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2121 .collect()
2122 }
2123 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2124 };
2125 let rows: Vec<Vec<Value>> = type_names
2126 .into_iter()
2127 .filter_map(|name| {
2128 let lookup = match &schema {
2129 Some(s) => format!("{s}/{name}"),
2130 None => name.clone(),
2131 };
2132 let def = self.catalog.get_graph_type_def(&lookup)?;
2133 let strip = |n: &String| -> String {
2135 match &schema {
2136 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2137 None => n.clone(),
2138 }
2139 };
2140 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2141 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2142 Some(vec![
2143 Value::from(name),
2144 Value::from(def.open),
2145 Value::from(node_types.join(", ")),
2146 Value::from(edge_types.join(", ")),
2147 ])
2148 })
2149 .collect();
2150 Ok(QueryResult {
2151 columns,
2152 column_types: Vec::new(),
2153 rows,
2154 ..QueryResult::empty()
2155 })
2156 }
2157
2158 fn execute_show_graphs(&self) -> Result<QueryResult> {
2164 let schema = self.current_schema.lock().clone();
2165 let all_names = self.store.graph_names();
2166
2167 let mut names: Vec<String> = match &schema {
2168 Some(s) => {
2169 let prefix = format!("{s}/");
2170 all_names
2171 .into_iter()
2172 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2173 .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2174 .collect()
2175 }
2176 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2177 };
2178 names.sort();
2179
2180 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2181 Ok(QueryResult {
2182 columns: vec!["name".to_string()],
2183 column_types: Vec::new(),
2184 rows,
2185 ..QueryResult::empty()
2186 })
2187 }
2188
2189 fn execute_show_schemas(&self) -> Result<QueryResult> {
2191 let mut names = self.catalog.schema_names();
2192 names.sort();
2193 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2194 Ok(QueryResult {
2195 columns: vec!["name".to_string()],
2196 column_types: Vec::new(),
2197 rows,
2198 ..QueryResult::empty()
2199 })
2200 }
2201
2202 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2204 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2205
2206 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2207 Error::Query(QueryError::new(
2208 QueryErrorKind::Semantic,
2209 format!("Graph type '{name}' not found"),
2210 ))
2211 })?;
2212
2213 let columns = vec![
2214 "name".to_string(),
2215 "open".to_string(),
2216 "node_types".to_string(),
2217 "edge_types".to_string(),
2218 ];
2219 let rows = vec![vec![
2220 Value::from(def.name),
2221 Value::from(def.open),
2222 Value::from(def.allowed_node_types.join(", ")),
2223 Value::from(def.allowed_edge_types.join(", ")),
2224 ]];
2225 Ok(QueryResult {
2226 columns,
2227 column_types: Vec::new(),
2228 rows,
2229 ..QueryResult::empty()
2230 })
2231 }
2232
2233 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2235 let graph_name = self
2236 .current_graph()
2237 .unwrap_or_else(|| "default".to_string());
2238 let columns = vec![
2239 "graph".to_string(),
2240 "graph_type".to_string(),
2241 "open".to_string(),
2242 "node_types".to_string(),
2243 "edge_types".to_string(),
2244 ];
2245
2246 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2247 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2248 {
2249 let rows = vec![vec![
2250 Value::from(graph_name),
2251 Value::from(type_name),
2252 Value::from(def.open),
2253 Value::from(def.allowed_node_types.join(", ")),
2254 Value::from(def.allowed_edge_types.join(", ")),
2255 ]];
2256 return Ok(QueryResult {
2257 columns,
2258 column_types: Vec::new(),
2259 rows,
2260 ..QueryResult::empty()
2261 });
2262 }
2263
2264 Ok(QueryResult {
2266 columns,
2267 column_types: Vec::new(),
2268 rows: vec![vec![
2269 Value::from(graph_name),
2270 Value::Null,
2271 Value::Null,
2272 Value::Null,
2273 Value::Null,
2274 ]],
2275 ..QueryResult::empty()
2276 })
2277 }
2278
2279 #[cfg(feature = "gql")]
2306 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2307 self.require_lpg("GQL")?;
2308
2309 use crate::query::{
2310 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2311 processor::QueryLanguage, translators::gql,
2312 };
2313
2314 let _span = grafeo_info_span!(
2315 "grafeo::session::execute",
2316 language = "gql",
2317 query_len = query.len(),
2318 );
2319
2320 #[cfg(not(target_arch = "wasm32"))]
2321 let start_time = std::time::Instant::now();
2322
2323 let translation = gql::translate_full(query)?;
2325 let logical_plan = match translation {
2326 gql::GqlTranslationResult::SessionCommand(cmd) => {
2327 return self.execute_session_command(cmd);
2328 }
2329 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2330 if *self.read_only_tx.lock() {
2332 return Err(grafeo_common::utils::error::Error::Transaction(
2333 grafeo_common::utils::error::TransactionError::ReadOnly,
2334 ));
2335 }
2336 return self.execute_schema_command(cmd);
2337 }
2338 gql::GqlTranslationResult::Plan(plan) => {
2339 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2341 return Err(grafeo_common::utils::error::Error::Transaction(
2342 grafeo_common::utils::error::TransactionError::ReadOnly,
2343 ));
2344 }
2345 plan
2346 }
2347 };
2348
2349 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2351
2352 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2354 cached_plan
2355 } else {
2356 let mut binder = Binder::new();
2358 let _binding_context = binder.bind(&logical_plan)?;
2359
2360 let active = self.active_store();
2362 let optimizer = Optimizer::from_graph_store(&*active);
2363 let plan = optimizer.optimize(logical_plan)?;
2364
2365 self.query_cache.put_optimized(cache_key, plan.clone());
2367
2368 plan
2369 };
2370
2371 let active = self.active_store();
2373
2374 if optimized_plan.explain {
2376 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2377 let mut plan = optimized_plan;
2378 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2379 return Ok(explain_result(&plan));
2380 }
2381
2382 if optimized_plan.profile {
2384 let has_mutations = optimized_plan.root.has_mutations();
2385 return self.with_auto_commit(has_mutations, || {
2386 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2387 let planner = self.create_planner_for_store(
2388 Arc::clone(&active),
2389 viewing_epoch,
2390 transaction_id,
2391 );
2392 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2393
2394 let executor = Executor::with_columns(physical_plan.columns.clone())
2395 .with_deadline(self.query_deadline());
2396 let _result = executor.execute(physical_plan.operator.as_mut())?;
2397
2398 let total_time_ms;
2399 #[cfg(not(target_arch = "wasm32"))]
2400 {
2401 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2402 }
2403 #[cfg(target_arch = "wasm32")]
2404 {
2405 total_time_ms = 0.0;
2406 }
2407
2408 let profile_tree = crate::query::profile::build_profile_tree(
2409 &optimized_plan.root,
2410 &mut entries.into_iter(),
2411 );
2412 Ok(crate::query::profile::profile_result(
2413 &profile_tree,
2414 total_time_ms,
2415 ))
2416 });
2417 }
2418
2419 let has_mutations = optimized_plan.root.has_mutations();
2420
2421 let result = self.with_auto_commit(has_mutations, || {
2422 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2424
2425 let has_active_tx = self.current_transaction.lock().is_some();
2430 let read_only = !has_mutations && !has_active_tx;
2431 let planner = self.create_planner_for_store_with_read_only(
2432 Arc::clone(&active),
2433 viewing_epoch,
2434 transaction_id,
2435 read_only,
2436 );
2437 let mut physical_plan = planner.plan(&optimized_plan)?;
2438
2439 let executor = Executor::with_columns(physical_plan.columns.clone())
2441 .with_deadline(self.query_deadline());
2442 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2443
2444 let rows_scanned = result.rows.len() as u64;
2446 #[cfg(not(target_arch = "wasm32"))]
2447 {
2448 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2449 result.execution_time_ms = Some(elapsed_ms);
2450 }
2451 result.rows_scanned = Some(rows_scanned);
2452
2453 Ok(result)
2454 });
2455
2456 #[cfg(feature = "metrics")]
2458 {
2459 #[cfg(not(target_arch = "wasm32"))]
2460 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2461 #[cfg(target_arch = "wasm32")]
2462 let elapsed_ms = None;
2463 self.record_query_metrics("gql", elapsed_ms, &result);
2464 }
2465
2466 result
2467 }
2468
2469 #[cfg(feature = "gql")]
2478 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2479 let previous = self.viewing_epoch_override.lock().replace(epoch);
2480 let result = self.execute(query);
2481 *self.viewing_epoch_override.lock() = previous;
2482 result
2483 }
2484
2485 #[cfg(feature = "gql")]
2493 pub fn execute_at_epoch_with_params(
2494 &self,
2495 query: &str,
2496 epoch: EpochId,
2497 params: Option<std::collections::HashMap<String, Value>>,
2498 ) -> Result<QueryResult> {
2499 let previous = self.viewing_epoch_override.lock().replace(epoch);
2500 let result = if let Some(p) = params {
2501 self.execute_with_params(query, p)
2502 } else {
2503 self.execute(query)
2504 };
2505 *self.viewing_epoch_override.lock() = previous;
2506 result
2507 }
2508
2509 #[cfg(feature = "gql")]
2515 pub fn execute_with_params(
2516 &self,
2517 query: &str,
2518 params: std::collections::HashMap<String, Value>,
2519 ) -> Result<QueryResult> {
2520 self.require_lpg("GQL")?;
2521
2522 use crate::query::processor::{QueryLanguage, QueryProcessor};
2523
2524 let has_mutations = Self::query_looks_like_mutation(query);
2525 let active = self.active_store();
2526
2527 self.with_auto_commit(has_mutations, || {
2528 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2530
2531 let processor = QueryProcessor::for_stores_with_transaction(
2533 Arc::clone(&active),
2534 self.active_write_store(),
2535 Arc::clone(&self.transaction_manager),
2536 )?;
2537
2538 let processor = if let Some(transaction_id) = transaction_id {
2540 processor.with_transaction_context(viewing_epoch, transaction_id)
2541 } else {
2542 processor
2543 };
2544
2545 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2546 })
2547 }
2548
2549 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2555 pub fn execute_with_params(
2556 &self,
2557 _query: &str,
2558 _params: std::collections::HashMap<String, Value>,
2559 ) -> Result<QueryResult> {
2560 Err(grafeo_common::utils::error::Error::Internal(
2561 "No query language enabled".to_string(),
2562 ))
2563 }
2564
2565 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2571 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2572 Err(grafeo_common::utils::error::Error::Internal(
2573 "No query language enabled".to_string(),
2574 ))
2575 }
2576
2577 #[cfg(feature = "cypher")]
2583 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2584 use crate::query::{
2585 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2586 processor::QueryLanguage, translators::cypher,
2587 };
2588 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2589
2590 let translation = cypher::translate_full(query)?;
2592 match translation {
2593 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2594 if *self.read_only_tx.lock() {
2595 return Err(GrafeoError::Query(QueryError::new(
2596 QueryErrorKind::Semantic,
2597 "Cannot execute schema DDL in a read-only transaction",
2598 )));
2599 }
2600 return self.execute_schema_command(cmd);
2601 }
2602 cypher::CypherTranslationResult::ShowIndexes => {
2603 return self.execute_show_indexes();
2604 }
2605 cypher::CypherTranslationResult::ShowConstraints => {
2606 return self.execute_show_constraints();
2607 }
2608 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2609 return self.execute_show_current_graph_type();
2610 }
2611 cypher::CypherTranslationResult::Plan(_) => {
2612 }
2614 }
2615
2616 #[cfg(not(target_arch = "wasm32"))]
2617 let start_time = std::time::Instant::now();
2618
2619 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2621
2622 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2624 cached_plan
2625 } else {
2626 let logical_plan = cypher::translate(query)?;
2628
2629 let mut binder = Binder::new();
2631 let _binding_context = binder.bind(&logical_plan)?;
2632
2633 let active = self.active_store();
2635 let optimizer = Optimizer::from_graph_store(&*active);
2636 let plan = optimizer.optimize(logical_plan)?;
2637
2638 self.query_cache.put_optimized(cache_key, plan.clone());
2640
2641 plan
2642 };
2643
2644 let active = self.active_store();
2646
2647 if optimized_plan.explain {
2649 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2650 let mut plan = optimized_plan;
2651 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2652 return Ok(explain_result(&plan));
2653 }
2654
2655 if optimized_plan.profile {
2657 let has_mutations = optimized_plan.root.has_mutations();
2658 return self.with_auto_commit(has_mutations, || {
2659 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2660 let planner = self.create_planner_for_store(
2661 Arc::clone(&active),
2662 viewing_epoch,
2663 transaction_id,
2664 );
2665 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2666
2667 let executor = Executor::with_columns(physical_plan.columns.clone())
2668 .with_deadline(self.query_deadline());
2669 let _result = executor.execute(physical_plan.operator.as_mut())?;
2670
2671 let total_time_ms;
2672 #[cfg(not(target_arch = "wasm32"))]
2673 {
2674 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2675 }
2676 #[cfg(target_arch = "wasm32")]
2677 {
2678 total_time_ms = 0.0;
2679 }
2680
2681 let profile_tree = crate::query::profile::build_profile_tree(
2682 &optimized_plan.root,
2683 &mut entries.into_iter(),
2684 );
2685 Ok(crate::query::profile::profile_result(
2686 &profile_tree,
2687 total_time_ms,
2688 ))
2689 });
2690 }
2691
2692 let has_mutations = optimized_plan.root.has_mutations();
2693
2694 let result = self.with_auto_commit(has_mutations, || {
2695 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2697
2698 let planner =
2700 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2701 let mut physical_plan = planner.plan(&optimized_plan)?;
2702
2703 let executor = Executor::with_columns(physical_plan.columns.clone())
2705 .with_deadline(self.query_deadline());
2706 executor.execute(physical_plan.operator.as_mut())
2707 });
2708
2709 #[cfg(feature = "metrics")]
2710 {
2711 #[cfg(not(target_arch = "wasm32"))]
2712 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2713 #[cfg(target_arch = "wasm32")]
2714 let elapsed_ms = None;
2715 self.record_query_metrics("cypher", elapsed_ms, &result);
2716 }
2717
2718 result
2719 }
2720
2721 #[cfg(feature = "gremlin")]
2745 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2746 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2747
2748 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2749 let start_time = Instant::now();
2750
2751 let logical_plan = gremlin::translate(query)?;
2753
2754 let mut binder = Binder::new();
2756 let _binding_context = binder.bind(&logical_plan)?;
2757
2758 let active = self.active_store();
2760 let optimizer = Optimizer::from_graph_store(&*active);
2761 let optimized_plan = optimizer.optimize(logical_plan)?;
2762
2763 let has_mutations = optimized_plan.root.has_mutations();
2764
2765 let result = self.with_auto_commit(has_mutations, || {
2766 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2768
2769 let planner =
2771 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2772 let mut physical_plan = planner.plan(&optimized_plan)?;
2773
2774 let executor = Executor::with_columns(physical_plan.columns.clone())
2776 .with_deadline(self.query_deadline());
2777 executor.execute(physical_plan.operator.as_mut())
2778 });
2779
2780 #[cfg(feature = "metrics")]
2781 {
2782 #[cfg(not(target_arch = "wasm32"))]
2783 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2784 #[cfg(target_arch = "wasm32")]
2785 let elapsed_ms = None;
2786 self.record_query_metrics("gremlin", elapsed_ms, &result);
2787 }
2788
2789 result
2790 }
2791
2792 #[cfg(feature = "gremlin")]
2798 pub fn execute_gremlin_with_params(
2799 &self,
2800 query: &str,
2801 params: std::collections::HashMap<String, Value>,
2802 ) -> Result<QueryResult> {
2803 use crate::query::processor::{QueryLanguage, QueryProcessor};
2804
2805 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2806 let start_time = Instant::now();
2807
2808 let has_mutations = Self::query_looks_like_mutation(query);
2809 let active = self.active_store();
2810
2811 let result = self.with_auto_commit(has_mutations, || {
2812 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2813 let processor = QueryProcessor::for_stores_with_transaction(
2814 Arc::clone(&active),
2815 self.active_write_store(),
2816 Arc::clone(&self.transaction_manager),
2817 )?;
2818 let processor = if let Some(transaction_id) = transaction_id {
2819 processor.with_transaction_context(viewing_epoch, transaction_id)
2820 } else {
2821 processor
2822 };
2823 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2824 });
2825
2826 #[cfg(feature = "metrics")]
2827 {
2828 #[cfg(not(target_arch = "wasm32"))]
2829 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2830 #[cfg(target_arch = "wasm32")]
2831 let elapsed_ms = None;
2832 self.record_query_metrics("gremlin", elapsed_ms, &result);
2833 }
2834
2835 result
2836 }
2837
2838 #[cfg(feature = "graphql")]
2862 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2863 use crate::query::{
2864 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
2865 translators::graphql,
2866 };
2867
2868 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2869 let start_time = Instant::now();
2870
2871 let mut logical_plan = graphql::translate(query)?;
2872
2873 if !logical_plan.default_params.is_empty() {
2875 let defaults = logical_plan.default_params.clone();
2876 substitute_params(&mut logical_plan, &defaults)?;
2877 }
2878
2879 let mut binder = Binder::new();
2880 let _binding_context = binder.bind(&logical_plan)?;
2881
2882 let active = self.active_store();
2883 let optimizer = Optimizer::from_graph_store(&*active);
2884 let optimized_plan = optimizer.optimize(logical_plan)?;
2885 let has_mutations = optimized_plan.root.has_mutations();
2886
2887 let result = self.with_auto_commit(has_mutations, || {
2888 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2889 let planner =
2890 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2891 let mut physical_plan = planner.plan(&optimized_plan)?;
2892 let executor = Executor::with_columns(physical_plan.columns.clone())
2893 .with_deadline(self.query_deadline());
2894 executor.execute(physical_plan.operator.as_mut())
2895 });
2896
2897 #[cfg(feature = "metrics")]
2898 {
2899 #[cfg(not(target_arch = "wasm32"))]
2900 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2901 #[cfg(target_arch = "wasm32")]
2902 let elapsed_ms = None;
2903 self.record_query_metrics("graphql", elapsed_ms, &result);
2904 }
2905
2906 result
2907 }
2908
2909 #[cfg(feature = "graphql")]
2915 pub fn execute_graphql_with_params(
2916 &self,
2917 query: &str,
2918 params: std::collections::HashMap<String, Value>,
2919 ) -> Result<QueryResult> {
2920 use crate::query::processor::{QueryLanguage, QueryProcessor};
2921
2922 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2923 let start_time = Instant::now();
2924
2925 let has_mutations = Self::query_looks_like_mutation(query);
2926 let active = self.active_store();
2927
2928 let result = self.with_auto_commit(has_mutations, || {
2929 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2930 let processor = QueryProcessor::for_stores_with_transaction(
2931 Arc::clone(&active),
2932 self.active_write_store(),
2933 Arc::clone(&self.transaction_manager),
2934 )?;
2935 let processor = if let Some(transaction_id) = transaction_id {
2936 processor.with_transaction_context(viewing_epoch, transaction_id)
2937 } else {
2938 processor
2939 };
2940 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2941 });
2942
2943 #[cfg(feature = "metrics")]
2944 {
2945 #[cfg(not(target_arch = "wasm32"))]
2946 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2947 #[cfg(target_arch = "wasm32")]
2948 let elapsed_ms = None;
2949 self.record_query_metrics("graphql", elapsed_ms, &result);
2950 }
2951
2952 result
2953 }
2954
2955 #[cfg(feature = "sql-pgq")]
2980 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2981 use crate::query::{
2982 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2983 processor::QueryLanguage, translators::sql_pgq,
2984 };
2985
2986 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2987 let start_time = Instant::now();
2988
2989 let logical_plan = sql_pgq::translate(query)?;
2991
2992 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2994 return Ok(QueryResult {
2995 columns: vec!["status".into()],
2996 column_types: vec![grafeo_common::types::LogicalType::String],
2997 rows: vec![vec![Value::from(format!(
2998 "Property graph '{}' created ({} node tables, {} edge tables)",
2999 cpg.name,
3000 cpg.node_tables.len(),
3001 cpg.edge_tables.len()
3002 ))]],
3003 execution_time_ms: None,
3004 rows_scanned: None,
3005 status_message: None,
3006 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3007 });
3008 }
3009
3010 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3011
3012 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3013 cached_plan
3014 } else {
3015 let mut binder = Binder::new();
3016 let _binding_context = binder.bind(&logical_plan)?;
3017 let active = self.active_store();
3018 let optimizer = Optimizer::from_graph_store(&*active);
3019 let plan = optimizer.optimize(logical_plan)?;
3020 self.query_cache.put_optimized(cache_key, plan.clone());
3021 plan
3022 };
3023
3024 let active = self.active_store();
3025 let has_mutations = optimized_plan.root.has_mutations();
3026
3027 let result = self.with_auto_commit(has_mutations, || {
3028 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3029 let planner =
3030 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3031 let mut physical_plan = planner.plan(&optimized_plan)?;
3032 let executor = Executor::with_columns(physical_plan.columns.clone())
3033 .with_deadline(self.query_deadline());
3034 executor.execute(physical_plan.operator.as_mut())
3035 });
3036
3037 #[cfg(feature = "metrics")]
3038 {
3039 #[cfg(not(target_arch = "wasm32"))]
3040 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3041 #[cfg(target_arch = "wasm32")]
3042 let elapsed_ms = None;
3043 self.record_query_metrics("sql", elapsed_ms, &result);
3044 }
3045
3046 result
3047 }
3048
3049 #[cfg(feature = "sql-pgq")]
3055 pub fn execute_sql_with_params(
3056 &self,
3057 query: &str,
3058 params: std::collections::HashMap<String, Value>,
3059 ) -> Result<QueryResult> {
3060 use crate::query::processor::{QueryLanguage, QueryProcessor};
3061
3062 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3063 let start_time = Instant::now();
3064
3065 let has_mutations = Self::query_looks_like_mutation(query);
3066 let active = self.active_store();
3067
3068 let result = self.with_auto_commit(has_mutations, || {
3069 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3070 let processor = QueryProcessor::for_stores_with_transaction(
3071 Arc::clone(&active),
3072 self.active_write_store(),
3073 Arc::clone(&self.transaction_manager),
3074 )?;
3075 let processor = if let Some(transaction_id) = transaction_id {
3076 processor.with_transaction_context(viewing_epoch, transaction_id)
3077 } else {
3078 processor
3079 };
3080 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3081 });
3082
3083 #[cfg(feature = "metrics")]
3084 {
3085 #[cfg(not(target_arch = "wasm32"))]
3086 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3087 #[cfg(target_arch = "wasm32")]
3088 let elapsed_ms = None;
3089 self.record_query_metrics("sql", elapsed_ms, &result);
3090 }
3091
3092 result
3093 }
3094
3095 pub fn execute_language(
3104 &self,
3105 query: &str,
3106 language: &str,
3107 params: Option<std::collections::HashMap<String, Value>>,
3108 ) -> Result<QueryResult> {
3109 let _span = grafeo_info_span!(
3110 "grafeo::session::execute",
3111 language,
3112 query_len = query.len(),
3113 );
3114 match language {
3115 "gql" => {
3116 if let Some(p) = params {
3117 self.execute_with_params(query, p)
3118 } else {
3119 self.execute(query)
3120 }
3121 }
3122 #[cfg(feature = "cypher")]
3123 "cypher" => {
3124 if let Some(p) = params {
3125 use crate::query::processor::{QueryLanguage, QueryProcessor};
3126
3127 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3128 let start_time = Instant::now();
3129
3130 let has_mutations = Self::query_looks_like_mutation(query);
3131 let active = self.active_store();
3132 let result = self.with_auto_commit(has_mutations, || {
3133 let processor = QueryProcessor::for_stores_with_transaction(
3134 Arc::clone(&active),
3135 self.active_write_store(),
3136 Arc::clone(&self.transaction_manager),
3137 )?;
3138 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3139 let processor = if let Some(transaction_id) = transaction_id {
3140 processor.with_transaction_context(viewing_epoch, transaction_id)
3141 } else {
3142 processor
3143 };
3144 processor.process(query, QueryLanguage::Cypher, Some(&p))
3145 });
3146
3147 #[cfg(feature = "metrics")]
3148 {
3149 #[cfg(not(target_arch = "wasm32"))]
3150 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3151 #[cfg(target_arch = "wasm32")]
3152 let elapsed_ms = None;
3153 self.record_query_metrics("cypher", elapsed_ms, &result);
3154 }
3155
3156 result
3157 } else {
3158 self.execute_cypher(query)
3159 }
3160 }
3161 #[cfg(feature = "gremlin")]
3162 "gremlin" => {
3163 if let Some(p) = params {
3164 self.execute_gremlin_with_params(query, p)
3165 } else {
3166 self.execute_gremlin(query)
3167 }
3168 }
3169 #[cfg(feature = "graphql")]
3170 "graphql" => {
3171 if let Some(p) = params {
3172 self.execute_graphql_with_params(query, p)
3173 } else {
3174 self.execute_graphql(query)
3175 }
3176 }
3177 #[cfg(all(feature = "graphql", feature = "rdf"))]
3178 "graphql-rdf" => {
3179 if let Some(p) = params {
3180 self.execute_graphql_rdf_with_params(query, p)
3181 } else {
3182 self.execute_graphql_rdf(query)
3183 }
3184 }
3185 #[cfg(feature = "sql-pgq")]
3186 "sql" | "sql-pgq" => {
3187 if let Some(p) = params {
3188 self.execute_sql_with_params(query, p)
3189 } else {
3190 self.execute_sql(query)
3191 }
3192 }
3193 #[cfg(all(feature = "sparql", feature = "rdf"))]
3194 "sparql" => {
3195 if let Some(p) = params {
3196 self.execute_sparql_with_params(query, p)
3197 } else {
3198 self.execute_sparql(query)
3199 }
3200 }
3201 other => Err(grafeo_common::utils::error::Error::Query(
3202 grafeo_common::utils::error::QueryError::new(
3203 grafeo_common::utils::error::QueryErrorKind::Semantic,
3204 format!("Unknown query language: '{other}'"),
3205 ),
3206 )),
3207 }
3208 }
3209
3210 pub fn clear_plan_cache(&self) {
3237 self.query_cache.clear();
3238 }
3239
3240 pub fn begin_transaction(&mut self) -> Result<()> {
3248 self.begin_transaction_inner(false, None)
3249 }
3250
3251 pub fn begin_transaction_with_isolation(
3259 &mut self,
3260 isolation_level: crate::transaction::IsolationLevel,
3261 ) -> Result<()> {
3262 self.begin_transaction_inner(false, Some(isolation_level))
3263 }
3264
3265 fn begin_transaction_inner(
3267 &self,
3268 read_only: bool,
3269 isolation_level: Option<crate::transaction::IsolationLevel>,
3270 ) -> Result<()> {
3271 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3272 let mut current = self.current_transaction.lock();
3273 if current.is_some() {
3274 drop(current);
3276 let mut depth = self.transaction_nesting_depth.lock();
3277 *depth += 1;
3278 let sp_name = format!("_nested_tx_{}", *depth);
3279 self.savepoint(&sp_name)?;
3280 return Ok(());
3281 }
3282
3283 let active = self.active_lpg_store();
3284 self.transaction_start_node_count
3285 .store(active.node_count(), Ordering::Relaxed);
3286 self.transaction_start_edge_count
3287 .store(active.edge_count(), Ordering::Relaxed);
3288 let transaction_id = if let Some(level) = isolation_level {
3289 self.transaction_manager.begin_with_isolation(level)
3290 } else {
3291 self.transaction_manager.begin()
3292 };
3293 *current = Some(transaction_id);
3294 *self.read_only_tx.lock() = read_only || self.db_read_only;
3295
3296 let key = self.active_graph_storage_key();
3299 let mut touched = self.touched_graphs.lock();
3300 touched.clear();
3301 touched.push(key);
3302
3303 #[cfg(feature = "metrics")]
3304 {
3305 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3306 #[cfg(not(target_arch = "wasm32"))]
3307 {
3308 *self.tx_start_time.lock() = Some(Instant::now());
3309 }
3310 }
3311
3312 Ok(())
3313 }
3314
3315 pub fn commit(&mut self) -> Result<()> {
3323 self.commit_inner()
3324 }
3325
3326 fn commit_inner(&self) -> Result<()> {
3328 let _span = grafeo_debug_span!("grafeo::tx::commit");
3329 {
3331 let mut depth = self.transaction_nesting_depth.lock();
3332 if *depth > 0 {
3333 let sp_name = format!("_nested_tx_{depth}");
3334 *depth -= 1;
3335 drop(depth);
3336 return self.release_savepoint(&sp_name);
3337 }
3338 }
3339
3340 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3341 grafeo_common::utils::error::Error::Transaction(
3342 grafeo_common::utils::error::TransactionError::InvalidState(
3343 "No active transaction".to_string(),
3344 ),
3345 )
3346 })?;
3347
3348 let touched = self.touched_graphs.lock().clone();
3351 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3352 Ok(epoch) => epoch,
3353 Err(e) => {
3354 for graph_name in &touched {
3356 let store = self.resolve_store(graph_name);
3357 store.rollback_transaction_properties(transaction_id);
3358 }
3359 #[cfg(feature = "rdf")]
3360 self.rollback_rdf_transaction(transaction_id);
3361 #[cfg(feature = "cdc")]
3363 if let Some(ref pending) = self.cdc_pending_events {
3364 pending.lock().clear();
3365 }
3366 *self.read_only_tx.lock() = self.db_read_only;
3367 self.savepoints.lock().clear();
3368 self.touched_graphs.lock().clear();
3369 #[cfg(feature = "metrics")]
3370 {
3371 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3372 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3373 #[cfg(not(target_arch = "wasm32"))]
3374 if let Some(start) = self.tx_start_time.lock().take() {
3375 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3376 crate::metrics::record_metric!(
3377 self.metrics,
3378 tx_duration,
3379 observe duration_ms
3380 );
3381 }
3382 }
3383 return Err(e);
3384 }
3385 };
3386
3387 for graph_name in &touched {
3389 let store = self.resolve_store(graph_name);
3390 store.finalize_version_epochs(transaction_id, commit_epoch);
3391 }
3392
3393 #[cfg(feature = "rdf")]
3395 self.commit_rdf_transaction(transaction_id);
3396
3397 for graph_name in &touched {
3398 let store = self.resolve_store(graph_name);
3399 store.commit_transaction_properties(transaction_id);
3400 }
3401
3402 #[cfg(feature = "cdc")]
3406 if let Some(ref pending) = self.cdc_pending_events {
3407 let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3408 self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3409 e.epoch = commit_epoch;
3410 e
3411 }));
3412 }
3413
3414 let current_epoch = self.transaction_manager.current_epoch();
3417 for graph_name in &touched {
3418 let store = self.resolve_store(graph_name);
3419 store.sync_epoch(current_epoch);
3420 }
3421
3422 *self.read_only_tx.lock() = self.db_read_only;
3424 self.savepoints.lock().clear();
3425 self.touched_graphs.lock().clear();
3426
3427 if self.gc_interval > 0 {
3429 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3430 if count.is_multiple_of(self.gc_interval) {
3431 let min_epoch = self.transaction_manager.min_active_epoch();
3432 for graph_name in &touched {
3433 let store = self.resolve_store(graph_name);
3434 store.gc_versions(min_epoch);
3435 }
3436 self.transaction_manager.gc();
3437 #[cfg(feature = "metrics")]
3438 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3439 }
3440 }
3441
3442 #[cfg(feature = "metrics")]
3443 {
3444 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3445 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3446 #[cfg(not(target_arch = "wasm32"))]
3447 if let Some(start) = self.tx_start_time.lock().take() {
3448 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3449 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3450 }
3451 }
3452
3453 Ok(())
3454 }
3455
3456 pub fn rollback(&mut self) -> Result<()> {
3480 self.rollback_inner()
3481 }
3482
3483 fn rollback_inner(&self) -> Result<()> {
3485 let _span = grafeo_debug_span!("grafeo::tx::rollback");
3486 {
3488 let mut depth = self.transaction_nesting_depth.lock();
3489 if *depth > 0 {
3490 let sp_name = format!("_nested_tx_{depth}");
3491 *depth -= 1;
3492 drop(depth);
3493 return self.rollback_to_savepoint(&sp_name);
3494 }
3495 }
3496
3497 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3498 grafeo_common::utils::error::Error::Transaction(
3499 grafeo_common::utils::error::TransactionError::InvalidState(
3500 "No active transaction".to_string(),
3501 ),
3502 )
3503 })?;
3504
3505 *self.read_only_tx.lock() = self.db_read_only;
3507
3508 let touched = self.touched_graphs.lock().clone();
3510 for graph_name in &touched {
3511 let store = self.resolve_store(graph_name);
3512 store.discard_uncommitted_versions(transaction_id);
3513 }
3514
3515 #[cfg(feature = "rdf")]
3517 self.rollback_rdf_transaction(transaction_id);
3518
3519 #[cfg(feature = "cdc")]
3521 if let Some(ref pending) = self.cdc_pending_events {
3522 pending.lock().clear();
3523 }
3524
3525 self.savepoints.lock().clear();
3527 self.touched_graphs.lock().clear();
3528
3529 let result = self.transaction_manager.abort(transaction_id);
3531
3532 #[cfg(feature = "metrics")]
3533 if result.is_ok() {
3534 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3535 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3536 #[cfg(not(target_arch = "wasm32"))]
3537 if let Some(start) = self.tx_start_time.lock().take() {
3538 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3539 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3540 }
3541 }
3542
3543 result
3544 }
3545
3546 pub fn savepoint(&self, name: &str) -> Result<()> {
3556 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3557 grafeo_common::utils::error::Error::Transaction(
3558 grafeo_common::utils::error::TransactionError::InvalidState(
3559 "No active transaction".to_string(),
3560 ),
3561 )
3562 })?;
3563
3564 let touched = self.touched_graphs.lock().clone();
3566 let graph_snapshots: Vec<GraphSavepoint> = touched
3567 .iter()
3568 .map(|graph_name| {
3569 let store = self.resolve_store(graph_name);
3570 GraphSavepoint {
3571 graph_name: graph_name.clone(),
3572 next_node_id: store.peek_next_node_id(),
3573 next_edge_id: store.peek_next_edge_id(),
3574 undo_log_position: store.property_undo_log_position(tx_id),
3575 }
3576 })
3577 .collect();
3578
3579 self.savepoints.lock().push(SavepointState {
3580 name: name.to_string(),
3581 graph_snapshots,
3582 active_graph: self.current_graph.lock().clone(),
3583 #[cfg(feature = "cdc")]
3584 cdc_event_position: self
3585 .cdc_pending_events
3586 .as_ref()
3587 .map_or(0, |p| p.lock().len()),
3588 });
3589 Ok(())
3590 }
3591
3592 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3601 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3602 grafeo_common::utils::error::Error::Transaction(
3603 grafeo_common::utils::error::TransactionError::InvalidState(
3604 "No active transaction".to_string(),
3605 ),
3606 )
3607 })?;
3608
3609 let mut savepoints = self.savepoints.lock();
3610
3611 let pos = savepoints
3613 .iter()
3614 .rposition(|sp| sp.name == name)
3615 .ok_or_else(|| {
3616 grafeo_common::utils::error::Error::Transaction(
3617 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3618 "Savepoint '{name}' not found"
3619 )),
3620 )
3621 })?;
3622
3623 let sp_state = savepoints[pos].clone();
3624
3625 savepoints.truncate(pos);
3627 drop(savepoints);
3628
3629 for gs in &sp_state.graph_snapshots {
3631 let store = self.resolve_store(&gs.graph_name);
3632
3633 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3635
3636 let current_next_node = store.peek_next_node_id();
3638 let current_next_edge = store.peek_next_edge_id();
3639
3640 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3641 .map(NodeId::new)
3642 .collect();
3643 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3644 .map(EdgeId::new)
3645 .collect();
3646
3647 if !node_ids.is_empty() || !edge_ids.is_empty() {
3648 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3649 }
3650 }
3651
3652 let touched = self.touched_graphs.lock().clone();
3656 for graph_name in &touched {
3657 let already_captured = sp_state
3658 .graph_snapshots
3659 .iter()
3660 .any(|gs| gs.graph_name == *graph_name);
3661 if !already_captured {
3662 let store = self.resolve_store(graph_name);
3663 store.discard_uncommitted_versions(transaction_id);
3664 }
3665 }
3666
3667 #[cfg(feature = "cdc")]
3669 if let Some(ref pending) = self.cdc_pending_events {
3670 pending.lock().truncate(sp_state.cdc_event_position);
3671 }
3672
3673 let mut touched = self.touched_graphs.lock();
3675 touched.clear();
3676 for gs in &sp_state.graph_snapshots {
3677 if !touched.contains(&gs.graph_name) {
3678 touched.push(gs.graph_name.clone());
3679 }
3680 }
3681
3682 Ok(())
3683 }
3684
3685 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3691 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3692 grafeo_common::utils::error::Error::Transaction(
3693 grafeo_common::utils::error::TransactionError::InvalidState(
3694 "No active transaction".to_string(),
3695 ),
3696 )
3697 })?;
3698
3699 let mut savepoints = self.savepoints.lock();
3700 let pos = savepoints
3701 .iter()
3702 .rposition(|sp| sp.name == name)
3703 .ok_or_else(|| {
3704 grafeo_common::utils::error::Error::Transaction(
3705 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3706 "Savepoint '{name}' not found"
3707 )),
3708 )
3709 })?;
3710 savepoints.remove(pos);
3711 Ok(())
3712 }
3713
3714 #[must_use]
3716 pub fn in_transaction(&self) -> bool {
3717 self.current_transaction.lock().is_some()
3718 }
3719
3720 #[must_use]
3722 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3723 *self.current_transaction.lock()
3724 }
3725
3726 #[must_use]
3728 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3729 &self.transaction_manager
3730 }
3731
3732 #[must_use]
3734 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3735 (
3736 self.transaction_start_node_count.load(Ordering::Relaxed),
3737 self.active_lpg_store().node_count(),
3738 )
3739 }
3740
3741 #[must_use]
3743 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3744 (
3745 self.transaction_start_edge_count.load(Ordering::Relaxed),
3746 self.active_lpg_store().edge_count(),
3747 )
3748 }
3749
3750 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3784 crate::transaction::PreparedCommit::new(self)
3785 }
3786
3787 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3789 self.auto_commit = auto_commit;
3790 }
3791
3792 #[must_use]
3794 pub fn auto_commit(&self) -> bool {
3795 self.auto_commit
3796 }
3797
3798 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3803 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3804 }
3805
3806 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3809 where
3810 F: FnOnce() -> Result<QueryResult>,
3811 {
3812 if self.needs_auto_commit(has_mutations) {
3813 self.begin_transaction_inner(false, None)?;
3814 match body() {
3815 Ok(result) => {
3816 self.commit_inner()?;
3817 Ok(result)
3818 }
3819 Err(e) => {
3820 let _ = self.rollback_inner();
3821 Err(e)
3822 }
3823 }
3824 } else {
3825 body()
3826 }
3827 }
3828
3829 fn query_looks_like_mutation(query: &str) -> bool {
3835 let upper = query.to_ascii_uppercase();
3836 upper.contains("INSERT")
3837 || upper.contains("CREATE")
3838 || upper.contains("DELETE")
3839 || upper.contains("MERGE")
3840 || upper.contains("SET")
3841 || upper.contains("REMOVE")
3842 || upper.contains("DROP")
3843 || upper.contains("ALTER")
3844 }
3845
3846 #[must_use]
3848 fn query_deadline(&self) -> Option<Instant> {
3849 #[cfg(not(target_arch = "wasm32"))]
3850 {
3851 self.query_timeout.map(|d| Instant::now() + d)
3852 }
3853 #[cfg(target_arch = "wasm32")]
3854 {
3855 let _ = &self.query_timeout;
3856 None
3857 }
3858 }
3859
3860 #[cfg(feature = "metrics")]
3866 fn record_query_metrics(
3867 &self,
3868 language: &str,
3869 elapsed_ms: Option<f64>,
3870 result: &Result<crate::database::QueryResult>,
3871 ) {
3872 use crate::metrics::record_metric;
3873
3874 record_metric!(self.metrics, query_count, inc);
3875 if let Some(ref reg) = self.metrics {
3876 reg.query_count_by_language.increment(language);
3877 }
3878 if let Some(ms) = elapsed_ms {
3879 record_metric!(self.metrics, query_latency, observe ms);
3880 }
3881 match result {
3882 Ok(r) => {
3883 let returned = r.rows.len() as u64;
3884 record_metric!(self.metrics, rows_returned, add returned);
3885 if let Some(scanned) = r.rows_scanned {
3886 record_metric!(self.metrics, rows_scanned, add scanned);
3887 }
3888 }
3889 Err(e) => {
3890 record_metric!(self.metrics, query_errors, inc);
3891 let msg = e.to_string();
3893 if msg.contains("exceeded timeout") {
3894 record_metric!(self.metrics, query_timeouts, inc);
3895 }
3896 }
3897 }
3898 }
3899
3900 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3902 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3903 match expr {
3904 Expression::Literal(Literal::Integer(n)) => Some(*n),
3905 _ => None,
3906 }
3907 }
3908
3909 #[must_use]
3915 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3916 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3918 return (epoch, None);
3919 }
3920
3921 if let Some(transaction_id) = *self.current_transaction.lock() {
3922 let epoch = self
3924 .transaction_manager
3925 .start_epoch(transaction_id)
3926 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3927 (epoch, Some(transaction_id))
3928 } else {
3929 (self.transaction_manager.current_epoch(), None)
3931 }
3932 }
3933
3934 fn create_planner_for_store(
3939 &self,
3940 store: Arc<dyn GraphStore>,
3941 viewing_epoch: EpochId,
3942 transaction_id: Option<TransactionId>,
3943 ) -> crate::query::Planner {
3944 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3945 }
3946
3947 fn create_planner_for_store_with_read_only(
3948 &self,
3949 store: Arc<dyn GraphStore>,
3950 viewing_epoch: EpochId,
3951 transaction_id: Option<TransactionId>,
3952 read_only: bool,
3953 ) -> crate::query::Planner {
3954 use crate::query::Planner;
3955 use grafeo_core::execution::operators::{LazyValue, SessionContext};
3956
3957 let info_store = Arc::clone(&store);
3959 let schema_store = Arc::clone(&store);
3960
3961 let session_context = SessionContext {
3962 current_schema: self.current_schema(),
3963 current_graph: self.current_graph(),
3964 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3965 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3966 };
3967
3968 let write_store = self.active_write_store();
3969
3970 let mut planner = Planner::with_context(
3971 Arc::clone(&store),
3972 write_store,
3973 Arc::clone(&self.transaction_manager),
3974 transaction_id,
3975 viewing_epoch,
3976 )
3977 .with_factorized_execution(self.factorized_execution)
3978 .with_catalog(Arc::clone(&self.catalog))
3979 .with_session_context(session_context)
3980 .with_read_only(read_only);
3981
3982 let validator =
3984 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3985 planner = planner.with_validator(Arc::new(validator));
3986
3987 planner
3988 }
3989
3990 fn build_info_value(store: &dyn GraphStore) -> Value {
3992 use grafeo_common::types::PropertyKey;
3993 use std::collections::BTreeMap;
3994
3995 let mut map = BTreeMap::new();
3996 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3997 map.insert(
3998 PropertyKey::from("node_count"),
3999 Value::Int64(store.node_count() as i64),
4000 );
4001 map.insert(
4002 PropertyKey::from("edge_count"),
4003 Value::Int64(store.edge_count() as i64),
4004 );
4005 map.insert(
4006 PropertyKey::from("version"),
4007 Value::String(env!("CARGO_PKG_VERSION").into()),
4008 );
4009 Value::Map(map.into())
4010 }
4011
4012 fn build_schema_value(store: &dyn GraphStore) -> Value {
4014 use grafeo_common::types::PropertyKey;
4015 use std::collections::BTreeMap;
4016
4017 let labels: Vec<Value> = store
4018 .all_labels()
4019 .into_iter()
4020 .map(|l| Value::String(l.into()))
4021 .collect();
4022 let edge_types: Vec<Value> = store
4023 .all_edge_types()
4024 .into_iter()
4025 .map(|t| Value::String(t.into()))
4026 .collect();
4027 let property_keys: Vec<Value> = store
4028 .all_property_keys()
4029 .into_iter()
4030 .map(|k| Value::String(k.into()))
4031 .collect();
4032
4033 let mut map = BTreeMap::new();
4034 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4035 map.insert(
4036 PropertyKey::from("edge_types"),
4037 Value::List(edge_types.into()),
4038 );
4039 map.insert(
4040 PropertyKey::from("property_keys"),
4041 Value::List(property_keys.into()),
4042 );
4043 Value::Map(map.into())
4044 }
4045
4046 pub fn create_node(&self, labels: &[&str]) -> NodeId {
4051 let (epoch, transaction_id) = self.get_transaction_context();
4052 self.active_lpg_store().create_node_versioned(
4053 labels,
4054 epoch,
4055 transaction_id.unwrap_or(TransactionId::SYSTEM),
4056 )
4057 }
4058
4059 pub fn create_node_with_props<'a>(
4063 &self,
4064 labels: &[&str],
4065 properties: impl IntoIterator<Item = (&'a str, Value)>,
4066 ) -> NodeId {
4067 let (epoch, transaction_id) = self.get_transaction_context();
4068 self.active_lpg_store().create_node_with_props_versioned(
4069 labels,
4070 properties,
4071 epoch,
4072 transaction_id.unwrap_or(TransactionId::SYSTEM),
4073 )
4074 }
4075
4076 pub fn create_edge(
4081 &self,
4082 src: NodeId,
4083 dst: NodeId,
4084 edge_type: &str,
4085 ) -> grafeo_common::types::EdgeId {
4086 let (epoch, transaction_id) = self.get_transaction_context();
4087 self.active_lpg_store().create_edge_versioned(
4088 src,
4089 dst,
4090 edge_type,
4091 epoch,
4092 transaction_id.unwrap_or(TransactionId::SYSTEM),
4093 )
4094 }
4095
4096 pub fn create_edge_with_props<'a>(
4098 &self,
4099 src: NodeId,
4100 dst: NodeId,
4101 edge_type: &str,
4102 properties: impl IntoIterator<Item = (&'a str, Value)>,
4103 ) -> grafeo_common::types::EdgeId {
4104 let (epoch, transaction_id) = self.get_transaction_context();
4105 let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4106 let store = self.active_lpg_store();
4107 let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4108 for (key, value) in properties {
4109 store.set_edge_property_versioned(eid, key, value, tid);
4110 }
4111 eid
4112 }
4113
4114 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4116 let (_, transaction_id) = self.get_transaction_context();
4117 if let Some(tid) = transaction_id {
4118 self.active_lpg_store()
4119 .set_node_property_versioned(id, key, value, tid);
4120 } else {
4121 self.active_lpg_store().set_node_property(id, key, value);
4122 }
4123 }
4124
4125 pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4127 let (_, transaction_id) = self.get_transaction_context();
4128 if let Some(tid) = transaction_id {
4129 self.active_lpg_store()
4130 .set_edge_property_versioned(id, key, value, tid);
4131 } else {
4132 self.active_lpg_store().set_edge_property(id, key, value);
4133 }
4134 }
4135
4136 pub fn delete_node(&self, id: NodeId) -> bool {
4138 let (epoch, transaction_id) = self.get_transaction_context();
4139 if let Some(tid) = transaction_id {
4140 self.active_lpg_store()
4141 .delete_node_versioned(id, epoch, tid)
4142 } else {
4143 self.active_lpg_store().delete_node(id)
4144 }
4145 }
4146
4147 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4149 let (epoch, transaction_id) = self.get_transaction_context();
4150 if let Some(tid) = transaction_id {
4151 self.active_lpg_store()
4152 .delete_edge_versioned(id, epoch, tid)
4153 } else {
4154 self.active_lpg_store().delete_edge(id)
4155 }
4156 }
4157
4158 #[must_use]
4186 pub fn get_node(&self, id: NodeId) -> Option<Node> {
4187 let (epoch, transaction_id) = self.get_transaction_context();
4188 self.active_lpg_store().get_node_versioned(
4189 id,
4190 epoch,
4191 transaction_id.unwrap_or(TransactionId::SYSTEM),
4192 )
4193 }
4194
4195 #[must_use]
4219 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4220 self.get_node(id)
4221 .and_then(|node| node.get_property(key).cloned())
4222 }
4223
4224 #[must_use]
4231 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4232 let (epoch, transaction_id) = self.get_transaction_context();
4233 self.active_lpg_store().get_edge_versioned(
4234 id,
4235 epoch,
4236 transaction_id.unwrap_or(TransactionId::SYSTEM),
4237 )
4238 }
4239
4240 #[must_use]
4266 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4267 self.active_lpg_store()
4268 .edges_from(node, Direction::Outgoing)
4269 .collect()
4270 }
4271
4272 #[must_use]
4281 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4282 self.active_lpg_store()
4283 .edges_from(node, Direction::Incoming)
4284 .collect()
4285 }
4286
4287 #[must_use]
4299 pub fn get_neighbors_outgoing_by_type(
4300 &self,
4301 node: NodeId,
4302 edge_type: &str,
4303 ) -> Vec<(NodeId, EdgeId)> {
4304 self.active_lpg_store()
4305 .edges_from(node, Direction::Outgoing)
4306 .filter(|(_, edge_id)| {
4307 self.get_edge(*edge_id)
4308 .is_some_and(|e| e.edge_type.as_str() == edge_type)
4309 })
4310 .collect()
4311 }
4312
4313 #[must_use]
4320 pub fn node_exists(&self, id: NodeId) -> bool {
4321 self.get_node(id).is_some()
4322 }
4323
4324 #[must_use]
4326 pub fn edge_exists(&self, id: EdgeId) -> bool {
4327 self.get_edge(id).is_some()
4328 }
4329
4330 #[must_use]
4334 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4335 let active = self.active_lpg_store();
4336 let out = active.out_degree(node);
4337 let in_degree = active.in_degree(node);
4338 (out, in_degree)
4339 }
4340
4341 #[must_use]
4351 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4352 let (epoch, transaction_id) = self.get_transaction_context();
4353 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4354 let active = self.active_lpg_store();
4355 ids.iter()
4356 .map(|&id| active.get_node_versioned(id, epoch, tx))
4357 .collect()
4358 }
4359
4360 #[cfg(feature = "cdc")]
4368 pub fn history(
4369 &self,
4370 entity_id: impl Into<crate::cdc::EntityId>,
4371 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4372 Ok(self.cdc_log.history(entity_id.into()))
4373 }
4374
4375 #[cfg(feature = "cdc")]
4381 pub fn history_since(
4382 &self,
4383 entity_id: impl Into<crate::cdc::EntityId>,
4384 since_epoch: EpochId,
4385 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4386 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4387 }
4388
4389 #[cfg(feature = "cdc")]
4395 pub fn changes_between(
4396 &self,
4397 start_epoch: EpochId,
4398 end_epoch: EpochId,
4399 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4400 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4401 }
4402}
4403
4404impl Drop for Session {
4405 fn drop(&mut self) {
4406 if self.in_transaction() {
4409 let _ = self.rollback_inner();
4410 }
4411
4412 #[cfg(feature = "metrics")]
4413 if let Some(ref reg) = self.metrics {
4414 reg.session_active
4415 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4416 }
4417 }
4418}
4419
4420#[cfg(test)]
4421mod tests {
4422 use super::parse_default_literal;
4423 use crate::database::GrafeoDB;
4424 use grafeo_common::types::Value;
4425
4426 #[test]
4431 fn parse_default_literal_null() {
4432 assert_eq!(parse_default_literal("null"), Value::Null);
4433 assert_eq!(parse_default_literal("NULL"), Value::Null);
4434 assert_eq!(parse_default_literal("Null"), Value::Null);
4435 }
4436
4437 #[test]
4438 fn parse_default_literal_bool() {
4439 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4440 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4441 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4442 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4443 }
4444
4445 #[test]
4446 fn parse_default_literal_string_single_quoted() {
4447 assert_eq!(
4448 parse_default_literal("'hello'"),
4449 Value::String("hello".into())
4450 );
4451 }
4452
4453 #[test]
4454 fn parse_default_literal_string_double_quoted() {
4455 assert_eq!(
4456 parse_default_literal("\"world\""),
4457 Value::String("world".into())
4458 );
4459 }
4460
4461 #[test]
4462 fn parse_default_literal_integer() {
4463 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4464 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4465 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4466 }
4467
4468 #[test]
4469 fn parse_default_literal_float() {
4470 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4471 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4472 }
4473
4474 #[test]
4475 fn parse_default_literal_fallback_string() {
4476 assert_eq!(
4478 parse_default_literal("some_identifier"),
4479 Value::String("some_identifier".into())
4480 );
4481 }
4482
4483 #[test]
4484 fn test_session_create_node() {
4485 let db = GrafeoDB::new_in_memory();
4486 let session = db.session();
4487
4488 let id = session.create_node(&["Person"]);
4489 assert!(id.is_valid());
4490 assert_eq!(db.node_count(), 1);
4491 }
4492
4493 #[test]
4494 fn test_session_transaction() {
4495 let db = GrafeoDB::new_in_memory();
4496 let mut session = db.session();
4497
4498 assert!(!session.in_transaction());
4499
4500 session.begin_transaction().unwrap();
4501 assert!(session.in_transaction());
4502
4503 session.commit().unwrap();
4504 assert!(!session.in_transaction());
4505 }
4506
4507 #[test]
4508 fn test_session_transaction_context() {
4509 let db = GrafeoDB::new_in_memory();
4510 let mut session = db.session();
4511
4512 let (_epoch1, transaction_id1) = session.get_transaction_context();
4514 assert!(transaction_id1.is_none());
4515
4516 session.begin_transaction().unwrap();
4518 let (epoch2, transaction_id2) = session.get_transaction_context();
4519 assert!(transaction_id2.is_some());
4520 let _ = epoch2; session.commit().unwrap();
4525 let (epoch3, tx_id3) = session.get_transaction_context();
4526 assert!(tx_id3.is_none());
4527 assert!(epoch3.as_u64() >= epoch2.as_u64());
4529 }
4530
4531 #[test]
4532 fn test_session_rollback() {
4533 let db = GrafeoDB::new_in_memory();
4534 let mut session = db.session();
4535
4536 session.begin_transaction().unwrap();
4537 session.rollback().unwrap();
4538 assert!(!session.in_transaction());
4539 }
4540
4541 #[test]
4542 fn test_session_rollback_discards_versions() {
4543 use grafeo_common::types::TransactionId;
4544
4545 let db = GrafeoDB::new_in_memory();
4546
4547 let node_before = db.store().create_node(&["Person"]);
4549 assert!(node_before.is_valid());
4550 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4551
4552 let mut session = db.session();
4554 session.begin_transaction().unwrap();
4555 let transaction_id = session.current_transaction.lock().unwrap();
4556
4557 let epoch = db.store().current_epoch();
4559 let node_in_tx = db
4560 .store()
4561 .create_node_versioned(&["Person"], epoch, transaction_id);
4562 assert!(node_in_tx.is_valid());
4563
4564 assert_eq!(
4568 db.node_count(),
4569 1,
4570 "PENDING nodes should be invisible to non-versioned node_count()"
4571 );
4572 assert!(
4573 db.store()
4574 .get_node_versioned(node_in_tx, epoch, transaction_id)
4575 .is_some(),
4576 "Transaction node should be visible to its own transaction"
4577 );
4578
4579 session.rollback().unwrap();
4581 assert!(!session.in_transaction());
4582
4583 let count_after = db.node_count();
4586 assert_eq!(
4587 count_after, 1,
4588 "Rollback should discard uncommitted node, but got {count_after}"
4589 );
4590
4591 let current_epoch = db.store().current_epoch();
4593 assert!(
4594 db.store()
4595 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4596 .is_some(),
4597 "Original node should still exist"
4598 );
4599
4600 assert!(
4602 db.store()
4603 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4604 .is_none(),
4605 "Transaction node should be gone"
4606 );
4607 }
4608
4609 #[test]
4610 fn test_session_create_node_in_transaction() {
4611 let db = GrafeoDB::new_in_memory();
4613
4614 let node_before = db.create_node(&["Person"]);
4616 assert!(node_before.is_valid());
4617 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4618
4619 let mut session = db.session();
4621 session.begin_transaction().unwrap();
4622 let transaction_id = session.current_transaction.lock().unwrap();
4623
4624 let node_in_tx = session.create_node(&["Person"]);
4626 assert!(node_in_tx.is_valid());
4627
4628 assert_eq!(
4631 db.node_count(),
4632 1,
4633 "PENDING nodes should be invisible to non-versioned node_count()"
4634 );
4635 let epoch = db.store().current_epoch();
4636 assert!(
4637 db.store()
4638 .get_node_versioned(node_in_tx, epoch, transaction_id)
4639 .is_some(),
4640 "Transaction node should be visible to its own transaction"
4641 );
4642
4643 session.rollback().unwrap();
4645
4646 let count_after = db.node_count();
4648 assert_eq!(
4649 count_after, 1,
4650 "Rollback should discard node created via session.create_node(), but got {count_after}"
4651 );
4652 }
4653
4654 #[test]
4655 fn test_session_create_node_with_props_in_transaction() {
4656 use grafeo_common::types::Value;
4657
4658 let db = GrafeoDB::new_in_memory();
4660
4661 db.create_node(&["Person"]);
4663 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4664
4665 let mut session = db.session();
4667 session.begin_transaction().unwrap();
4668 let transaction_id = session.current_transaction.lock().unwrap();
4669
4670 let node_in_tx =
4671 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4672 assert!(node_in_tx.is_valid());
4673
4674 assert_eq!(
4677 db.node_count(),
4678 1,
4679 "PENDING nodes should be invisible to non-versioned node_count()"
4680 );
4681 let epoch = db.store().current_epoch();
4682 assert!(
4683 db.store()
4684 .get_node_versioned(node_in_tx, epoch, transaction_id)
4685 .is_some(),
4686 "Transaction node should be visible to its own transaction"
4687 );
4688
4689 session.rollback().unwrap();
4691
4692 let count_after = db.node_count();
4694 assert_eq!(
4695 count_after, 1,
4696 "Rollback should discard node created via session.create_node_with_props()"
4697 );
4698 }
4699
4700 #[cfg(feature = "gql")]
4701 mod gql_tests {
4702 use super::*;
4703
4704 #[test]
4705 fn test_gql_query_execution() {
4706 let db = GrafeoDB::new_in_memory();
4707 let session = db.session();
4708
4709 session.create_node(&["Person"]);
4711 session.create_node(&["Person"]);
4712 session.create_node(&["Animal"]);
4713
4714 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4716
4717 assert_eq!(result.row_count(), 2);
4719 assert_eq!(result.column_count(), 1);
4720 assert_eq!(result.columns[0], "n");
4721 }
4722
4723 #[test]
4724 fn test_gql_empty_result() {
4725 let db = GrafeoDB::new_in_memory();
4726 let session = db.session();
4727
4728 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4730
4731 assert_eq!(result.row_count(), 0);
4732 }
4733
4734 #[test]
4735 fn test_gql_parse_error() {
4736 let db = GrafeoDB::new_in_memory();
4737 let session = db.session();
4738
4739 let result = session.execute("MATCH (n RETURN n");
4741
4742 assert!(result.is_err());
4743 }
4744
4745 #[test]
4746 fn test_gql_relationship_traversal() {
4747 let db = GrafeoDB::new_in_memory();
4748 let session = db.session();
4749
4750 let alix = session.create_node(&["Person"]);
4752 let gus = session.create_node(&["Person"]);
4753 let vincent = session.create_node(&["Person"]);
4754
4755 session.create_edge(alix, gus, "KNOWS");
4756 session.create_edge(alix, vincent, "KNOWS");
4757
4758 let result = session
4760 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4761 .unwrap();
4762
4763 assert_eq!(result.row_count(), 2);
4765 assert_eq!(result.column_count(), 2);
4766 assert_eq!(result.columns[0], "a");
4767 assert_eq!(result.columns[1], "b");
4768 }
4769
4770 #[test]
4771 fn test_gql_relationship_with_type_filter() {
4772 let db = GrafeoDB::new_in_memory();
4773 let session = db.session();
4774
4775 let alix = session.create_node(&["Person"]);
4777 let gus = session.create_node(&["Person"]);
4778 let vincent = session.create_node(&["Person"]);
4779
4780 session.create_edge(alix, gus, "KNOWS");
4781 session.create_edge(alix, vincent, "WORKS_WITH");
4782
4783 let result = session
4785 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4786 .unwrap();
4787
4788 assert_eq!(result.row_count(), 1);
4790 }
4791
4792 #[test]
4793 fn test_gql_semantic_error_undefined_variable() {
4794 let db = GrafeoDB::new_in_memory();
4795 let session = db.session();
4796
4797 let result = session.execute("MATCH (n:Person) RETURN x");
4799
4800 assert!(result.is_err());
4802 let Err(err) = result else {
4803 panic!("Expected error")
4804 };
4805 assert!(
4806 err.to_string().contains("Undefined variable"),
4807 "Expected undefined variable error, got: {}",
4808 err
4809 );
4810 }
4811
4812 #[test]
4813 fn test_gql_where_clause_property_filter() {
4814 use grafeo_common::types::Value;
4815
4816 let db = GrafeoDB::new_in_memory();
4817 let session = db.session();
4818
4819 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4821 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4822 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4823
4824 let result = session
4826 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4827 .unwrap();
4828
4829 assert_eq!(result.row_count(), 2);
4831 }
4832
4833 #[test]
4834 fn test_gql_where_clause_equality() {
4835 use grafeo_common::types::Value;
4836
4837 let db = GrafeoDB::new_in_memory();
4838 let session = db.session();
4839
4840 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4842 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4843 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4844
4845 let result = session
4847 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4848 .unwrap();
4849
4850 assert_eq!(result.row_count(), 2);
4852 }
4853
4854 #[test]
4855 fn test_gql_return_property_access() {
4856 use grafeo_common::types::Value;
4857
4858 let db = GrafeoDB::new_in_memory();
4859 let session = db.session();
4860
4861 session.create_node_with_props(
4863 &["Person"],
4864 [
4865 ("name", Value::String("Alix".into())),
4866 ("age", Value::Int64(30)),
4867 ],
4868 );
4869 session.create_node_with_props(
4870 &["Person"],
4871 [
4872 ("name", Value::String("Gus".into())),
4873 ("age", Value::Int64(25)),
4874 ],
4875 );
4876
4877 let result = session
4879 .execute("MATCH (n:Person) RETURN n.name, n.age")
4880 .unwrap();
4881
4882 assert_eq!(result.row_count(), 2);
4884 assert_eq!(result.column_count(), 2);
4885 assert_eq!(result.columns[0], "n.name");
4886 assert_eq!(result.columns[1], "n.age");
4887
4888 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4890 assert!(names.contains(&&Value::String("Alix".into())));
4891 assert!(names.contains(&&Value::String("Gus".into())));
4892 }
4893
4894 #[test]
4895 fn test_gql_return_mixed_expressions() {
4896 use grafeo_common::types::Value;
4897
4898 let db = GrafeoDB::new_in_memory();
4899 let session = db.session();
4900
4901 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4903
4904 let result = session
4906 .execute("MATCH (n:Person) RETURN n, n.name")
4907 .unwrap();
4908
4909 assert_eq!(result.row_count(), 1);
4910 assert_eq!(result.column_count(), 2);
4911 assert_eq!(result.columns[0], "n");
4912 assert_eq!(result.columns[1], "n.name");
4913
4914 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4916 }
4917 }
4918
4919 #[cfg(feature = "cypher")]
4920 mod cypher_tests {
4921 use super::*;
4922
4923 #[test]
4924 fn test_cypher_query_execution() {
4925 let db = GrafeoDB::new_in_memory();
4926 let session = db.session();
4927
4928 session.create_node(&["Person"]);
4930 session.create_node(&["Person"]);
4931 session.create_node(&["Animal"]);
4932
4933 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4935
4936 assert_eq!(result.row_count(), 2);
4938 assert_eq!(result.column_count(), 1);
4939 assert_eq!(result.columns[0], "n");
4940 }
4941
4942 #[test]
4943 fn test_cypher_empty_result() {
4944 let db = GrafeoDB::new_in_memory();
4945 let session = db.session();
4946
4947 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4949
4950 assert_eq!(result.row_count(), 0);
4951 }
4952
4953 #[test]
4954 fn test_cypher_parse_error() {
4955 let db = GrafeoDB::new_in_memory();
4956 let session = db.session();
4957
4958 let result = session.execute_cypher("MATCH (n RETURN n");
4960
4961 assert!(result.is_err());
4962 }
4963 }
4964
4965 mod direct_lookup_tests {
4968 use super::*;
4969 use grafeo_common::types::Value;
4970
4971 #[test]
4972 fn test_get_node() {
4973 let db = GrafeoDB::new_in_memory();
4974 let session = db.session();
4975
4976 let id = session.create_node(&["Person"]);
4977 let node = session.get_node(id);
4978
4979 assert!(node.is_some());
4980 let node = node.unwrap();
4981 assert_eq!(node.id, id);
4982 }
4983
4984 #[test]
4985 fn test_get_node_not_found() {
4986 use grafeo_common::types::NodeId;
4987
4988 let db = GrafeoDB::new_in_memory();
4989 let session = db.session();
4990
4991 let node = session.get_node(NodeId::new(9999));
4993 assert!(node.is_none());
4994 }
4995
4996 #[test]
4997 fn test_get_node_property() {
4998 let db = GrafeoDB::new_in_memory();
4999 let session = db.session();
5000
5001 let id = session
5002 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5003
5004 let name = session.get_node_property(id, "name");
5005 assert_eq!(name, Some(Value::String("Alix".into())));
5006
5007 let missing = session.get_node_property(id, "missing");
5009 assert!(missing.is_none());
5010 }
5011
5012 #[test]
5013 fn test_get_edge() {
5014 let db = GrafeoDB::new_in_memory();
5015 let session = db.session();
5016
5017 let alix = session.create_node(&["Person"]);
5018 let gus = session.create_node(&["Person"]);
5019 let edge_id = session.create_edge(alix, gus, "KNOWS");
5020
5021 let edge = session.get_edge(edge_id);
5022 assert!(edge.is_some());
5023 let edge = edge.unwrap();
5024 assert_eq!(edge.id, edge_id);
5025 assert_eq!(edge.src, alix);
5026 assert_eq!(edge.dst, gus);
5027 }
5028
5029 #[test]
5030 fn test_get_edge_not_found() {
5031 use grafeo_common::types::EdgeId;
5032
5033 let db = GrafeoDB::new_in_memory();
5034 let session = db.session();
5035
5036 let edge = session.get_edge(EdgeId::new(9999));
5037 assert!(edge.is_none());
5038 }
5039
5040 #[test]
5041 fn test_get_neighbors_outgoing() {
5042 let db = GrafeoDB::new_in_memory();
5043 let session = db.session();
5044
5045 let alix = session.create_node(&["Person"]);
5046 let gus = session.create_node(&["Person"]);
5047 let harm = session.create_node(&["Person"]);
5048
5049 session.create_edge(alix, gus, "KNOWS");
5050 session.create_edge(alix, harm, "KNOWS");
5051
5052 let neighbors = session.get_neighbors_outgoing(alix);
5053 assert_eq!(neighbors.len(), 2);
5054
5055 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5056 assert!(neighbor_ids.contains(&gus));
5057 assert!(neighbor_ids.contains(&harm));
5058 }
5059
5060 #[test]
5061 fn test_get_neighbors_incoming() {
5062 let db = GrafeoDB::new_in_memory();
5063 let session = db.session();
5064
5065 let alix = session.create_node(&["Person"]);
5066 let gus = session.create_node(&["Person"]);
5067 let harm = session.create_node(&["Person"]);
5068
5069 session.create_edge(gus, alix, "KNOWS");
5070 session.create_edge(harm, alix, "KNOWS");
5071
5072 let neighbors = session.get_neighbors_incoming(alix);
5073 assert_eq!(neighbors.len(), 2);
5074
5075 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5076 assert!(neighbor_ids.contains(&gus));
5077 assert!(neighbor_ids.contains(&harm));
5078 }
5079
5080 #[test]
5081 fn test_get_neighbors_outgoing_by_type() {
5082 let db = GrafeoDB::new_in_memory();
5083 let session = db.session();
5084
5085 let alix = session.create_node(&["Person"]);
5086 let gus = session.create_node(&["Person"]);
5087 let company = session.create_node(&["Company"]);
5088
5089 session.create_edge(alix, gus, "KNOWS");
5090 session.create_edge(alix, company, "WORKS_AT");
5091
5092 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5093 assert_eq!(knows_neighbors.len(), 1);
5094 assert_eq!(knows_neighbors[0].0, gus);
5095
5096 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5097 assert_eq!(works_neighbors.len(), 1);
5098 assert_eq!(works_neighbors[0].0, company);
5099
5100 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5102 assert!(no_neighbors.is_empty());
5103 }
5104
5105 #[test]
5106 fn test_node_exists() {
5107 use grafeo_common::types::NodeId;
5108
5109 let db = GrafeoDB::new_in_memory();
5110 let session = db.session();
5111
5112 let id = session.create_node(&["Person"]);
5113
5114 assert!(session.node_exists(id));
5115 assert!(!session.node_exists(NodeId::new(9999)));
5116 }
5117
5118 #[test]
5119 fn test_edge_exists() {
5120 use grafeo_common::types::EdgeId;
5121
5122 let db = GrafeoDB::new_in_memory();
5123 let session = db.session();
5124
5125 let alix = session.create_node(&["Person"]);
5126 let gus = session.create_node(&["Person"]);
5127 let edge_id = session.create_edge(alix, gus, "KNOWS");
5128
5129 assert!(session.edge_exists(edge_id));
5130 assert!(!session.edge_exists(EdgeId::new(9999)));
5131 }
5132
5133 #[test]
5134 fn test_get_degree() {
5135 let db = GrafeoDB::new_in_memory();
5136 let session = db.session();
5137
5138 let alix = session.create_node(&["Person"]);
5139 let gus = session.create_node(&["Person"]);
5140 let harm = session.create_node(&["Person"]);
5141
5142 session.create_edge(alix, gus, "KNOWS");
5144 session.create_edge(alix, harm, "KNOWS");
5145 session.create_edge(gus, alix, "KNOWS");
5147
5148 let (out_degree, in_degree) = session.get_degree(alix);
5149 assert_eq!(out_degree, 2);
5150 assert_eq!(in_degree, 1);
5151
5152 let lonely = session.create_node(&["Person"]);
5154 let (out, in_deg) = session.get_degree(lonely);
5155 assert_eq!(out, 0);
5156 assert_eq!(in_deg, 0);
5157 }
5158
5159 #[test]
5160 fn test_get_nodes_batch() {
5161 let db = GrafeoDB::new_in_memory();
5162 let session = db.session();
5163
5164 let alix = session.create_node(&["Person"]);
5165 let gus = session.create_node(&["Person"]);
5166 let harm = session.create_node(&["Person"]);
5167
5168 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5169 assert_eq!(nodes.len(), 3);
5170 assert!(nodes[0].is_some());
5171 assert!(nodes[1].is_some());
5172 assert!(nodes[2].is_some());
5173
5174 use grafeo_common::types::NodeId;
5176 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5177 assert_eq!(nodes_with_missing.len(), 3);
5178 assert!(nodes_with_missing[0].is_some());
5179 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
5181 }
5182
5183 #[test]
5184 fn test_auto_commit_setting() {
5185 let db = GrafeoDB::new_in_memory();
5186 let mut session = db.session();
5187
5188 assert!(session.auto_commit());
5190
5191 session.set_auto_commit(false);
5192 assert!(!session.auto_commit());
5193
5194 session.set_auto_commit(true);
5195 assert!(session.auto_commit());
5196 }
5197
5198 #[test]
5199 fn test_transaction_double_begin_nests() {
5200 let db = GrafeoDB::new_in_memory();
5201 let mut session = db.session();
5202
5203 session.begin_transaction().unwrap();
5204 let result = session.begin_transaction();
5206 assert!(result.is_ok());
5207 session.commit().unwrap();
5209 session.commit().unwrap();
5211 }
5212
5213 #[test]
5214 fn test_commit_without_transaction_error() {
5215 let db = GrafeoDB::new_in_memory();
5216 let mut session = db.session();
5217
5218 let result = session.commit();
5219 assert!(result.is_err());
5220 }
5221
5222 #[test]
5223 fn test_rollback_without_transaction_error() {
5224 let db = GrafeoDB::new_in_memory();
5225 let mut session = db.session();
5226
5227 let result = session.rollback();
5228 assert!(result.is_err());
5229 }
5230
5231 #[test]
5232 fn test_create_edge_in_transaction() {
5233 let db = GrafeoDB::new_in_memory();
5234 let mut session = db.session();
5235
5236 let alix = session.create_node(&["Person"]);
5238 let gus = session.create_node(&["Person"]);
5239
5240 session.begin_transaction().unwrap();
5242 let edge_id = session.create_edge(alix, gus, "KNOWS");
5243
5244 assert!(session.edge_exists(edge_id));
5246
5247 session.commit().unwrap();
5249
5250 assert!(session.edge_exists(edge_id));
5252 }
5253
5254 #[test]
5255 fn test_neighbors_empty_node() {
5256 let db = GrafeoDB::new_in_memory();
5257 let session = db.session();
5258
5259 let lonely = session.create_node(&["Person"]);
5260
5261 assert!(session.get_neighbors_outgoing(lonely).is_empty());
5262 assert!(session.get_neighbors_incoming(lonely).is_empty());
5263 assert!(
5264 session
5265 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5266 .is_empty()
5267 );
5268 }
5269 }
5270
5271 #[test]
5272 fn test_auto_gc_triggers_on_commit_interval() {
5273 use crate::config::Config;
5274
5275 let config = Config::in_memory().with_gc_interval(2);
5276 let db = GrafeoDB::with_config(config).unwrap();
5277 let mut session = db.session();
5278
5279 session.begin_transaction().unwrap();
5281 session.create_node(&["A"]);
5282 session.commit().unwrap();
5283
5284 session.begin_transaction().unwrap();
5286 session.create_node(&["B"]);
5287 session.commit().unwrap();
5288
5289 assert_eq!(db.node_count(), 2);
5291 }
5292
5293 #[test]
5294 fn test_query_timeout_config_propagates_to_session() {
5295 use crate::config::Config;
5296 use std::time::Duration;
5297
5298 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5299 let db = GrafeoDB::with_config(config).unwrap();
5300 let session = db.session();
5301
5302 assert!(session.query_deadline().is_some());
5304 }
5305
5306 #[test]
5307 fn test_no_query_timeout_returns_no_deadline() {
5308 let db = GrafeoDB::new_in_memory();
5309 let session = db.session();
5310
5311 assert!(session.query_deadline().is_none());
5313 }
5314
5315 #[test]
5316 fn test_graph_model_accessor() {
5317 use crate::config::GraphModel;
5318
5319 let db = GrafeoDB::new_in_memory();
5320 let session = db.session();
5321
5322 assert_eq!(session.graph_model(), GraphModel::Lpg);
5323 }
5324
5325 #[cfg(feature = "gql")]
5326 #[test]
5327 fn test_external_store_session() {
5328 use grafeo_core::graph::GraphStoreMut;
5329 use std::sync::Arc;
5330
5331 let config = crate::config::Config::in_memory();
5332 let store =
5333 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5334 let db = GrafeoDB::with_store(store, config).unwrap();
5335
5336 let mut session = db.session();
5337
5338 session.begin_transaction().unwrap();
5342 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5343
5344 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5346 assert_eq!(result.row_count(), 1);
5347
5348 session.commit().unwrap();
5349 }
5350
5351 #[cfg(feature = "gql")]
5354 mod session_command_tests {
5355 use super::*;
5356 use grafeo_common::types::Value;
5357
5358 #[test]
5359 fn test_use_graph_sets_current_graph() {
5360 let db = GrafeoDB::new_in_memory();
5361 let session = db.session();
5362
5363 session.execute("CREATE GRAPH mydb").unwrap();
5365 session.execute("USE GRAPH mydb").unwrap();
5366
5367 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5368 }
5369
5370 #[test]
5371 fn test_use_graph_nonexistent_errors() {
5372 let db = GrafeoDB::new_in_memory();
5373 let session = db.session();
5374
5375 let result = session.execute("USE GRAPH doesnotexist");
5376 assert!(result.is_err());
5377 let err = result.unwrap_err().to_string();
5378 assert!(
5379 err.contains("does not exist"),
5380 "Expected 'does not exist' error, got: {err}"
5381 );
5382 }
5383
5384 #[test]
5385 fn test_use_graph_default_always_valid() {
5386 let db = GrafeoDB::new_in_memory();
5387 let session = db.session();
5388
5389 session.execute("USE GRAPH default").unwrap();
5391 assert_eq!(session.current_graph(), Some("default".to_string()));
5392 }
5393
5394 #[test]
5395 fn test_session_set_graph() {
5396 let db = GrafeoDB::new_in_memory();
5397 let session = db.session();
5398
5399 session.execute("CREATE GRAPH analytics").unwrap();
5400 session.execute("SESSION SET GRAPH analytics").unwrap();
5401 assert_eq!(session.current_graph(), Some("analytics".to_string()));
5402 }
5403
5404 #[test]
5405 fn test_session_set_graph_nonexistent_errors() {
5406 let db = GrafeoDB::new_in_memory();
5407 let session = db.session();
5408
5409 let result = session.execute("SESSION SET GRAPH nosuchgraph");
5410 assert!(result.is_err());
5411 }
5412
5413 #[test]
5414 fn test_session_set_time_zone() {
5415 let db = GrafeoDB::new_in_memory();
5416 let session = db.session();
5417
5418 assert_eq!(session.time_zone(), None);
5419
5420 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5421 assert_eq!(session.time_zone(), Some("UTC".to_string()));
5422
5423 session
5424 .execute("SESSION SET TIME ZONE 'America/New_York'")
5425 .unwrap();
5426 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5427 }
5428
5429 #[test]
5430 fn test_session_set_parameter() {
5431 let db = GrafeoDB::new_in_memory();
5432 let session = db.session();
5433
5434 session
5435 .execute("SESSION SET PARAMETER $timeout = 30")
5436 .unwrap();
5437
5438 assert!(session.get_parameter("timeout").is_some());
5441 }
5442
5443 #[test]
5444 fn test_session_reset_clears_all_state() {
5445 let db = GrafeoDB::new_in_memory();
5446 let session = db.session();
5447
5448 session.execute("CREATE GRAPH analytics").unwrap();
5450 session.execute("SESSION SET GRAPH analytics").unwrap();
5451 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5452 session
5453 .execute("SESSION SET PARAMETER $limit = 100")
5454 .unwrap();
5455
5456 assert!(session.current_graph().is_some());
5458 assert!(session.time_zone().is_some());
5459 assert!(session.get_parameter("limit").is_some());
5460
5461 session.execute("SESSION RESET").unwrap();
5463
5464 assert_eq!(session.current_graph(), None);
5465 assert_eq!(session.time_zone(), None);
5466 assert!(session.get_parameter("limit").is_none());
5467 }
5468
5469 #[test]
5470 fn test_session_close_clears_state() {
5471 let db = GrafeoDB::new_in_memory();
5472 let session = db.session();
5473
5474 session.execute("CREATE GRAPH analytics").unwrap();
5475 session.execute("SESSION SET GRAPH analytics").unwrap();
5476 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5477
5478 session.execute("SESSION CLOSE").unwrap();
5479
5480 assert_eq!(session.current_graph(), None);
5481 assert_eq!(session.time_zone(), None);
5482 }
5483
5484 #[test]
5485 fn test_create_graph() {
5486 let db = GrafeoDB::new_in_memory();
5487 let session = db.session();
5488
5489 session.execute("CREATE GRAPH mydb").unwrap();
5490
5491 session.execute("USE GRAPH mydb").unwrap();
5493 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5494 }
5495
5496 #[test]
5497 fn test_create_graph_duplicate_errors() {
5498 let db = GrafeoDB::new_in_memory();
5499 let session = db.session();
5500
5501 session.execute("CREATE GRAPH mydb").unwrap();
5502 let result = session.execute("CREATE GRAPH mydb");
5503
5504 assert!(result.is_err());
5505 let err = result.unwrap_err().to_string();
5506 assert!(
5507 err.contains("already exists"),
5508 "Expected 'already exists' error, got: {err}"
5509 );
5510 }
5511
5512 #[test]
5513 fn test_create_graph_if_not_exists() {
5514 let db = GrafeoDB::new_in_memory();
5515 let session = db.session();
5516
5517 session.execute("CREATE GRAPH mydb").unwrap();
5518 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5520 }
5521
5522 #[test]
5523 fn test_drop_graph() {
5524 let db = GrafeoDB::new_in_memory();
5525 let session = db.session();
5526
5527 session.execute("CREATE GRAPH mydb").unwrap();
5528 session.execute("DROP GRAPH mydb").unwrap();
5529
5530 let result = session.execute("USE GRAPH mydb");
5532 assert!(result.is_err());
5533 }
5534
5535 #[test]
5536 fn test_drop_graph_nonexistent_errors() {
5537 let db = GrafeoDB::new_in_memory();
5538 let session = db.session();
5539
5540 let result = session.execute("DROP GRAPH nosuchgraph");
5541 assert!(result.is_err());
5542 let err = result.unwrap_err().to_string();
5543 assert!(
5544 err.contains("does not exist"),
5545 "Expected 'does not exist' error, got: {err}"
5546 );
5547 }
5548
5549 #[test]
5550 fn test_drop_graph_if_exists() {
5551 let db = GrafeoDB::new_in_memory();
5552 let session = db.session();
5553
5554 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5556 }
5557
5558 #[test]
5559 fn test_start_transaction_via_gql() {
5560 let db = GrafeoDB::new_in_memory();
5561 let session = db.session();
5562
5563 session.execute("START TRANSACTION").unwrap();
5564 assert!(session.in_transaction());
5565 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5566 session.execute("COMMIT").unwrap();
5567 assert!(!session.in_transaction());
5568
5569 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5570 assert_eq!(result.rows.len(), 1);
5571 }
5572
5573 #[test]
5574 fn test_start_transaction_read_only_blocks_insert() {
5575 let db = GrafeoDB::new_in_memory();
5576 let session = db.session();
5577
5578 session.execute("START TRANSACTION READ ONLY").unwrap();
5579 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5580 assert!(result.is_err());
5581 let err = result.unwrap_err().to_string();
5582 assert!(
5583 err.contains("read-only"),
5584 "Expected read-only error, got: {err}"
5585 );
5586 session.execute("ROLLBACK").unwrap();
5587 }
5588
5589 #[test]
5590 fn test_start_transaction_read_only_allows_reads() {
5591 let db = GrafeoDB::new_in_memory();
5592 let mut session = db.session();
5593 session.begin_transaction().unwrap();
5594 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5595 session.commit().unwrap();
5596
5597 session.execute("START TRANSACTION READ ONLY").unwrap();
5598 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5599 assert_eq!(result.rows.len(), 1);
5600 session.execute("COMMIT").unwrap();
5601 }
5602
5603 #[test]
5604 fn test_rollback_via_gql() {
5605 let db = GrafeoDB::new_in_memory();
5606 let session = db.session();
5607
5608 session.execute("START TRANSACTION").unwrap();
5609 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5610 session.execute("ROLLBACK").unwrap();
5611
5612 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5613 assert!(result.rows.is_empty());
5614 }
5615
5616 #[test]
5617 fn test_start_transaction_with_isolation_level() {
5618 let db = GrafeoDB::new_in_memory();
5619 let session = db.session();
5620
5621 session
5622 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5623 .unwrap();
5624 assert!(session.in_transaction());
5625 session.execute("ROLLBACK").unwrap();
5626 }
5627
5628 #[test]
5629 fn test_session_commands_return_empty_result() {
5630 let db = GrafeoDB::new_in_memory();
5631 let session = db.session();
5632
5633 session.execute("CREATE GRAPH test").unwrap();
5634 let result = session.execute("SESSION SET GRAPH test").unwrap();
5635 assert_eq!(result.row_count(), 0);
5636 assert_eq!(result.column_count(), 0);
5637 }
5638
5639 #[test]
5640 fn test_current_graph_default_is_none() {
5641 let db = GrafeoDB::new_in_memory();
5642 let session = db.session();
5643
5644 assert_eq!(session.current_graph(), None);
5645 }
5646
5647 #[test]
5648 fn test_time_zone_default_is_none() {
5649 let db = GrafeoDB::new_in_memory();
5650 let session = db.session();
5651
5652 assert_eq!(session.time_zone(), None);
5653 }
5654
5655 #[test]
5656 fn test_session_state_independent_across_sessions() {
5657 let db = GrafeoDB::new_in_memory();
5658 let session1 = db.session();
5659 let session2 = db.session();
5660
5661 session1.execute("CREATE GRAPH first").unwrap();
5662 session1.execute("CREATE GRAPH second").unwrap();
5663 session1.execute("SESSION SET GRAPH first").unwrap();
5664 session2.execute("SESSION SET GRAPH second").unwrap();
5665
5666 assert_eq!(session1.current_graph(), Some("first".to_string()));
5667 assert_eq!(session2.current_graph(), Some("second".to_string()));
5668 }
5669
5670 #[test]
5671 fn test_show_node_types() {
5672 let db = GrafeoDB::new_in_memory();
5673 let session = db.session();
5674
5675 session
5676 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5677 .unwrap();
5678
5679 let result = session.execute("SHOW NODE TYPES").unwrap();
5680 assert_eq!(
5681 result.columns,
5682 vec!["name", "properties", "constraints", "parents"]
5683 );
5684 assert_eq!(result.rows.len(), 1);
5685 assert_eq!(result.rows[0][0], Value::from("Person"));
5687 }
5688
5689 #[test]
5690 fn test_show_edge_types() {
5691 let db = GrafeoDB::new_in_memory();
5692 let session = db.session();
5693
5694 session
5695 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5696 .unwrap();
5697
5698 let result = session.execute("SHOW EDGE TYPES").unwrap();
5699 assert_eq!(
5700 result.columns,
5701 vec!["name", "properties", "source_types", "target_types"]
5702 );
5703 assert_eq!(result.rows.len(), 1);
5704 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5705 }
5706
5707 #[test]
5708 fn test_show_graph_types() {
5709 let db = GrafeoDB::new_in_memory();
5710 let session = db.session();
5711
5712 session
5713 .execute("CREATE NODE TYPE Person (name STRING)")
5714 .unwrap();
5715 session
5716 .execute(
5717 "CREATE GRAPH TYPE social (\
5718 NODE TYPE Person (name STRING)\
5719 )",
5720 )
5721 .unwrap();
5722
5723 let result = session.execute("SHOW GRAPH TYPES").unwrap();
5724 assert_eq!(
5725 result.columns,
5726 vec!["name", "open", "node_types", "edge_types"]
5727 );
5728 assert_eq!(result.rows.len(), 1);
5729 assert_eq!(result.rows[0][0], Value::from("social"));
5730 }
5731
5732 #[test]
5733 fn test_show_graph_type_named() {
5734 let db = GrafeoDB::new_in_memory();
5735 let session = db.session();
5736
5737 session
5738 .execute("CREATE NODE TYPE Person (name STRING)")
5739 .unwrap();
5740 session
5741 .execute(
5742 "CREATE GRAPH TYPE social (\
5743 NODE TYPE Person (name STRING)\
5744 )",
5745 )
5746 .unwrap();
5747
5748 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5749 assert_eq!(result.rows.len(), 1);
5750 assert_eq!(result.rows[0][0], Value::from("social"));
5751 }
5752
5753 #[test]
5754 fn test_show_graph_type_not_found() {
5755 let db = GrafeoDB::new_in_memory();
5756 let session = db.session();
5757
5758 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5759 assert!(result.is_err());
5760 }
5761
5762 #[test]
5763 fn test_show_indexes_via_gql() {
5764 let db = GrafeoDB::new_in_memory();
5765 let session = db.session();
5766
5767 let result = session.execute("SHOW INDEXES").unwrap();
5768 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5769 }
5770
5771 #[test]
5772 fn test_show_constraints_via_gql() {
5773 let db = GrafeoDB::new_in_memory();
5774 let session = db.session();
5775
5776 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5777 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5778 }
5779
5780 #[test]
5781 fn test_pattern_form_graph_type_roundtrip() {
5782 let db = GrafeoDB::new_in_memory();
5783 let session = db.session();
5784
5785 session
5787 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5788 .unwrap();
5789 session
5790 .execute("CREATE NODE TYPE City (name STRING)")
5791 .unwrap();
5792 session
5793 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5794 .unwrap();
5795 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5796
5797 session
5799 .execute(
5800 "CREATE GRAPH TYPE social (\
5801 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5802 (:Person)-[:LIVES_IN]->(:City)\
5803 )",
5804 )
5805 .unwrap();
5806
5807 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5809 assert_eq!(result.rows.len(), 1);
5810 assert_eq!(result.rows[0][0], Value::from("social"));
5811 }
5812 }
5813}