1#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_common::{grafeo_debug_span, grafeo_info_span, grafeo_warn};
17use grafeo_core::graph::Direction;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21use grafeo_core::graph::{GraphStore, GraphStoreMut};
22
23use crate::catalog::{Catalog, CatalogConstraintValidator};
24use crate::config::{AdaptiveConfig, GraphModel};
25use crate::database::QueryResult;
26use crate::query::cache::QueryCache;
27use crate::transaction::TransactionManager;
28
29fn parse_default_literal(text: &str) -> Value {
34 if text.eq_ignore_ascii_case("null") {
35 return Value::Null;
36 }
37 if text.eq_ignore_ascii_case("true") {
38 return Value::Bool(true);
39 }
40 if text.eq_ignore_ascii_case("false") {
41 return Value::Bool(false);
42 }
43 if (text.starts_with('\'') && text.ends_with('\''))
45 || (text.starts_with('"') && text.ends_with('"'))
46 {
47 return Value::String(text[1..text.len() - 1].into());
48 }
49 if let Ok(i) = text.parse::<i64>() {
51 return Value::Int64(i);
52 }
53 if let Ok(f) = text.parse::<f64>() {
54 return Value::Float64(f);
55 }
56 Value::String(text.into())
58}
59
60pub(crate) struct SessionConfig {
65 pub transaction_manager: Arc<TransactionManager>,
66 pub query_cache: Arc<QueryCache>,
67 pub catalog: Arc<Catalog>,
68 pub adaptive_config: AdaptiveConfig,
69 pub factorized_execution: bool,
70 pub graph_model: GraphModel,
71 pub query_timeout: Option<Duration>,
72 pub commit_counter: Arc<AtomicUsize>,
73 pub gc_interval: usize,
74 pub read_only: bool,
76}
77
78pub struct Session {
84 store: Arc<LpgStore>,
86 graph_store: Arc<dyn GraphStore>,
88 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
90 catalog: Arc<Catalog>,
92 #[cfg(feature = "rdf")]
94 rdf_store: Arc<RdfStore>,
95 transaction_manager: Arc<TransactionManager>,
97 query_cache: Arc<QueryCache>,
99 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
103 read_only_tx: parking_lot::Mutex<bool>,
105 db_read_only: bool,
108 auto_commit: bool,
110 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
113 factorized_execution: bool,
115 graph_model: GraphModel,
117 query_timeout: Option<Duration>,
119 commit_counter: Arc<AtomicUsize>,
121 gc_interval: usize,
123 transaction_start_node_count: AtomicUsize,
125 transaction_start_edge_count: AtomicUsize,
127 #[cfg(feature = "wal")]
129 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
130 #[cfg(feature = "wal")]
132 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
133 #[cfg(feature = "cdc")]
135 cdc_log: Arc<crate::cdc::CdcLog>,
136 current_graph: parking_lot::Mutex<Option<String>>,
138 current_schema: parking_lot::Mutex<Option<String>>,
141 time_zone: parking_lot::Mutex<Option<String>>,
143 session_params:
145 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
146 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
148 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
150 transaction_nesting_depth: parking_lot::Mutex<u32>,
154 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
158 #[cfg(feature = "metrics")]
160 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
161 #[cfg(feature = "metrics")]
163 tx_start_time: parking_lot::Mutex<Option<Instant>>,
164}
165
166#[derive(Clone)]
168struct GraphSavepoint {
169 graph_name: Option<String>,
170 next_node_id: u64,
171 next_edge_id: u64,
172 undo_log_position: usize,
173}
174
175#[derive(Clone)]
177struct SavepointState {
178 name: String,
179 graph_snapshots: Vec<GraphSavepoint>,
180 #[allow(dead_code)]
183 active_graph: Option<String>,
184}
185
186impl Session {
187 #[allow(dead_code)]
189 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
190 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
191 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
192 Self {
193 store,
194 graph_store,
195 graph_store_mut,
196 catalog: cfg.catalog,
197 #[cfg(feature = "rdf")]
198 rdf_store: Arc::new(RdfStore::new()),
199 transaction_manager: cfg.transaction_manager,
200 query_cache: cfg.query_cache,
201 current_transaction: parking_lot::Mutex::new(None),
202 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
203 db_read_only: cfg.read_only,
204 auto_commit: true,
205 adaptive_config: cfg.adaptive_config,
206 factorized_execution: cfg.factorized_execution,
207 graph_model: cfg.graph_model,
208 query_timeout: cfg.query_timeout,
209 commit_counter: cfg.commit_counter,
210 gc_interval: cfg.gc_interval,
211 transaction_start_node_count: AtomicUsize::new(0),
212 transaction_start_edge_count: AtomicUsize::new(0),
213 #[cfg(feature = "wal")]
214 wal: None,
215 #[cfg(feature = "wal")]
216 wal_graph_context: None,
217 #[cfg(feature = "cdc")]
218 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
219 current_graph: parking_lot::Mutex::new(None),
220 current_schema: parking_lot::Mutex::new(None),
221 time_zone: parking_lot::Mutex::new(None),
222 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
223 viewing_epoch_override: parking_lot::Mutex::new(None),
224 savepoints: parking_lot::Mutex::new(Vec::new()),
225 transaction_nesting_depth: parking_lot::Mutex::new(0),
226 touched_graphs: parking_lot::Mutex::new(Vec::new()),
227 #[cfg(feature = "metrics")]
228 metrics: None,
229 #[cfg(feature = "metrics")]
230 tx_start_time: parking_lot::Mutex::new(None),
231 }
232 }
233
234 #[cfg(feature = "wal")]
239 pub(crate) fn set_wal(
240 &mut self,
241 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
242 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
243 ) {
244 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
246 Arc::clone(&self.store),
247 Arc::clone(&wal),
248 Arc::clone(&wal_graph_context),
249 ));
250 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
251 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
252 self.wal = Some(wal);
253 self.wal_graph_context = Some(wal_graph_context);
254 }
255
256 #[cfg(feature = "cdc")]
258 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
259 self.cdc_log = cdc_log;
260 }
261
262 #[cfg(feature = "metrics")]
264 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
265 self.metrics = Some(metrics);
266 }
267
268 pub(crate) fn with_external_store(
277 read_store: Arc<dyn GraphStore>,
278 write_store: Option<Arc<dyn GraphStoreMut>>,
279 cfg: SessionConfig,
280 ) -> Result<Self> {
281 Ok(Self {
282 store: Arc::new(LpgStore::new()?),
283 graph_store: read_store,
284 graph_store_mut: write_store,
285 catalog: cfg.catalog,
286 #[cfg(feature = "rdf")]
287 rdf_store: Arc::new(RdfStore::new()),
288 transaction_manager: cfg.transaction_manager,
289 query_cache: cfg.query_cache,
290 current_transaction: parking_lot::Mutex::new(None),
291 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
292 db_read_only: cfg.read_only,
293 auto_commit: true,
294 adaptive_config: cfg.adaptive_config,
295 factorized_execution: cfg.factorized_execution,
296 graph_model: cfg.graph_model,
297 query_timeout: cfg.query_timeout,
298 commit_counter: cfg.commit_counter,
299 gc_interval: cfg.gc_interval,
300 transaction_start_node_count: AtomicUsize::new(0),
301 transaction_start_edge_count: AtomicUsize::new(0),
302 #[cfg(feature = "wal")]
303 wal: None,
304 #[cfg(feature = "wal")]
305 wal_graph_context: None,
306 #[cfg(feature = "cdc")]
307 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
308 current_graph: parking_lot::Mutex::new(None),
309 current_schema: parking_lot::Mutex::new(None),
310 time_zone: parking_lot::Mutex::new(None),
311 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
312 viewing_epoch_override: parking_lot::Mutex::new(None),
313 savepoints: parking_lot::Mutex::new(Vec::new()),
314 transaction_nesting_depth: parking_lot::Mutex::new(0),
315 touched_graphs: parking_lot::Mutex::new(Vec::new()),
316 #[cfg(feature = "metrics")]
317 metrics: None,
318 #[cfg(feature = "metrics")]
319 tx_start_time: parking_lot::Mutex::new(None),
320 })
321 }
322
323 #[must_use]
325 pub fn graph_model(&self) -> GraphModel {
326 self.graph_model
327 }
328
329 pub fn use_graph(&self, name: &str) {
333 *self.current_graph.lock() = Some(name.to_string());
334 }
335
336 #[must_use]
338 pub fn current_graph(&self) -> Option<String> {
339 self.current_graph.lock().clone()
340 }
341
342 pub fn set_schema(&self, name: &str) {
346 *self.current_schema.lock() = Some(name.to_string());
347 }
348
349 #[must_use]
353 pub fn current_schema(&self) -> Option<String> {
354 self.current_schema.lock().clone()
355 }
356
357 fn effective_graph_key(&self, graph_name: &str) -> String {
362 let schema = self.current_schema.lock().clone();
363 match schema {
364 Some(s) => format!("{s}/{graph_name}"),
365 None => graph_name.to_string(),
366 }
367 }
368
369 fn effective_type_key(&self, type_name: &str) -> String {
373 let schema = self.current_schema.lock().clone();
374 match schema {
375 Some(s) => format!("{s}/{type_name}"),
376 None => type_name.to_string(),
377 }
378 }
379
380 fn active_graph_storage_key(&self) -> Option<String> {
384 let graph = self.current_graph.lock().clone();
385 let schema = self.current_schema.lock().clone();
386 match (schema, graph) {
387 (_, None) => None,
388 (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
389 (None, Some(name)) => Some(name),
390 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
391 }
392 }
393
394 fn active_store(&self) -> Arc<dyn GraphStore> {
402 let key = self.active_graph_storage_key();
403 match key {
404 None => Arc::clone(&self.graph_store),
405 Some(ref name) => match self.store.graph(name) {
406 Some(named_store) => {
407 #[cfg(feature = "wal")]
408 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
409 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
410 named_store,
411 Arc::clone(wal),
412 name.clone(),
413 Arc::clone(ctx),
414 )) as Arc<dyn GraphStore>;
415 }
416 named_store as Arc<dyn GraphStore>
417 }
418 None => Arc::clone(&self.graph_store),
419 },
420 }
421 }
422
423 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
428 let key = self.active_graph_storage_key();
429 match key {
430 None => self.graph_store_mut.as_ref().map(Arc::clone),
431 Some(ref name) => match self.store.graph(name) {
432 Some(named_store) => {
433 #[cfg(feature = "wal")]
434 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
435 return Some(Arc::new(
436 crate::database::wal_store::WalGraphStore::new_for_graph(
437 named_store,
438 Arc::clone(wal),
439 name.clone(),
440 Arc::clone(ctx),
441 ),
442 ) as Arc<dyn GraphStoreMut>);
443 }
444 Some(named_store as Arc<dyn GraphStoreMut>)
445 }
446 None => self.graph_store_mut.as_ref().map(Arc::clone),
447 },
448 }
449 }
450
451 fn active_lpg_store(&self) -> Arc<LpgStore> {
456 let key = self.active_graph_storage_key();
457 match key {
458 None => Arc::clone(&self.store),
459 Some(ref name) => self
460 .store
461 .graph(name)
462 .unwrap_or_else(|| Arc::clone(&self.store)),
463 }
464 }
465
466 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
469 match graph_name {
470 None => Arc::clone(&self.store),
471 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
472 Some(name) => self
473 .store
474 .graph(name)
475 .unwrap_or_else(|| Arc::clone(&self.store)),
476 }
477 }
478
479 fn track_graph_touch(&self) {
484 if self.current_transaction.lock().is_some() {
485 let key = self.active_graph_storage_key();
486 let mut touched = self.touched_graphs.lock();
487 if !touched.contains(&key) {
488 touched.push(key);
489 }
490 }
491 }
492
493 pub fn set_time_zone(&self, tz: &str) {
495 *self.time_zone.lock() = Some(tz.to_string());
496 }
497
498 #[must_use]
500 pub fn time_zone(&self) -> Option<String> {
501 self.time_zone.lock().clone()
502 }
503
504 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
506 self.session_params.lock().insert(key.to_string(), value);
507 }
508
509 #[must_use]
511 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
512 self.session_params.lock().get(key).cloned()
513 }
514
515 pub fn reset_session(&self) {
517 *self.current_schema.lock() = None;
518 *self.current_graph.lock() = None;
519 *self.time_zone.lock() = None;
520 self.session_params.lock().clear();
521 *self.viewing_epoch_override.lock() = None;
522 }
523
524 pub fn reset_schema(&self) {
526 *self.current_schema.lock() = None;
527 }
528
529 pub fn reset_graph(&self) {
531 *self.current_graph.lock() = None;
532 }
533
534 pub fn reset_time_zone(&self) {
536 *self.time_zone.lock() = None;
537 }
538
539 pub fn reset_parameters(&self) {
541 self.session_params.lock().clear();
542 }
543
544 pub fn set_viewing_epoch(&self, epoch: EpochId) {
552 *self.viewing_epoch_override.lock() = Some(epoch);
553 }
554
555 pub fn clear_viewing_epoch(&self) {
557 *self.viewing_epoch_override.lock() = None;
558 }
559
560 #[must_use]
562 pub fn viewing_epoch(&self) -> Option<EpochId> {
563 *self.viewing_epoch_override.lock()
564 }
565
566 #[must_use]
570 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
571 self.active_lpg_store().get_node_history(id)
572 }
573
574 #[must_use]
578 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
579 self.active_lpg_store().get_edge_history(id)
580 }
581
582 fn require_lpg(&self, language: &str) -> Result<()> {
584 if self.graph_model == GraphModel::Rdf {
585 return Err(grafeo_common::utils::error::Error::Internal(format!(
586 "This is an RDF database. {language} queries require an LPG database."
587 )));
588 }
589 Ok(())
590 }
591
592 #[cfg(feature = "gql")]
594 fn execute_session_command(
595 &self,
596 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
597 ) -> Result<QueryResult> {
598 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
599 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
600
601 if *self.read_only_tx.lock() {
603 match &cmd {
604 SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
605 return Err(Error::Transaction(
606 grafeo_common::utils::error::TransactionError::ReadOnly,
607 ));
608 }
609 _ => {} }
611 }
612
613 match cmd {
614 SessionCommand::CreateGraph {
615 name,
616 if_not_exists,
617 typed,
618 like_graph,
619 copy_of,
620 open: _,
621 } => {
622 let storage_key = self.effective_graph_key(&name);
624
625 if let Some(ref src) = like_graph {
627 let src_key = self.effective_graph_key(src);
628 if self.store.graph(&src_key).is_none() {
629 return Err(Error::Query(QueryError::new(
630 QueryErrorKind::Semantic,
631 format!("Source graph '{src}' does not exist"),
632 )));
633 }
634 }
635 if let Some(ref src) = copy_of {
636 let src_key = self.effective_graph_key(src);
637 if self.store.graph(&src_key).is_none() {
638 return Err(Error::Query(QueryError::new(
639 QueryErrorKind::Semantic,
640 format!("Source graph '{src}' does not exist"),
641 )));
642 }
643 }
644
645 let created = self
646 .store
647 .create_graph(&storage_key)
648 .map_err(|e| Error::Internal(e.to_string()))?;
649 if !created && !if_not_exists {
650 return Err(Error::Query(QueryError::new(
651 QueryErrorKind::Semantic,
652 format!("Graph '{name}' already exists"),
653 )));
654 }
655 if created {
656 #[cfg(feature = "wal")]
657 self.log_schema_wal(
658 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
659 name: storage_key.clone(),
660 },
661 );
662 }
663
664 if let Some(ref src) = copy_of {
666 let src_key = self.effective_graph_key(src);
667 self.store
668 .copy_graph(Some(&src_key), Some(&storage_key))
669 .map_err(|e| Error::Internal(e.to_string()))?;
670 }
671
672 if let Some(type_name) = typed
676 && let Err(e) = self.catalog.bind_graph_type(
677 &storage_key,
678 if type_name.contains('/') {
679 type_name.clone()
680 } else {
681 self.effective_type_key(&type_name)
682 },
683 )
684 {
685 return Err(Error::Query(QueryError::new(
686 QueryErrorKind::Semantic,
687 e.to_string(),
688 )));
689 }
690
691 if let Some(ref src) = like_graph {
693 let src_key = self.effective_graph_key(src);
694 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
695 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
696 }
697 }
698
699 Ok(QueryResult::empty())
700 }
701 SessionCommand::DropGraph { name, if_exists } => {
702 let storage_key = self.effective_graph_key(&name);
703 let dropped = self.store.drop_graph(&storage_key);
704 if !dropped && !if_exists {
705 return Err(Error::Query(QueryError::new(
706 QueryErrorKind::Semantic,
707 format!("Graph '{name}' does not exist"),
708 )));
709 }
710 if dropped {
711 #[cfg(feature = "wal")]
712 self.log_schema_wal(
713 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
714 name: storage_key.clone(),
715 },
716 );
717 let mut current = self.current_graph.lock();
719 if current
720 .as_deref()
721 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
722 {
723 *current = None;
724 }
725 }
726 Ok(QueryResult::empty())
727 }
728 SessionCommand::UseGraph(name) => {
729 let effective_key = self.effective_graph_key(&name);
731 if !name.eq_ignore_ascii_case("default")
732 && self.store.graph(&effective_key).is_none()
733 {
734 return Err(Error::Query(QueryError::new(
735 QueryErrorKind::Semantic,
736 format!("Graph '{name}' does not exist"),
737 )));
738 }
739 self.use_graph(&name);
740 self.track_graph_touch();
742 Ok(QueryResult::empty())
743 }
744 SessionCommand::SessionSetGraph(name) => {
745 let effective_key = self.effective_graph_key(&name);
747 if !name.eq_ignore_ascii_case("default")
748 && self.store.graph(&effective_key).is_none()
749 {
750 return Err(Error::Query(QueryError::new(
751 QueryErrorKind::Semantic,
752 format!("Graph '{name}' does not exist"),
753 )));
754 }
755 self.use_graph(&name);
756 self.track_graph_touch();
758 Ok(QueryResult::empty())
759 }
760 SessionCommand::SessionSetSchema(name) => {
761 if !self.catalog.schema_exists(&name) {
763 return Err(Error::Query(QueryError::new(
764 QueryErrorKind::Semantic,
765 format!("Schema '{name}' does not exist"),
766 )));
767 }
768 self.set_schema(&name);
769 Ok(QueryResult::empty())
770 }
771 SessionCommand::SessionSetTimeZone(tz) => {
772 self.set_time_zone(&tz);
773 Ok(QueryResult::empty())
774 }
775 SessionCommand::SessionSetParameter(key, expr) => {
776 if key.eq_ignore_ascii_case("viewing_epoch") {
777 match Self::eval_integer_literal(&expr) {
778 Some(n) if n >= 0 => {
779 self.set_viewing_epoch(EpochId::new(n as u64));
780 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
781 }
782 _ => Err(Error::Query(QueryError::new(
783 QueryErrorKind::Semantic,
784 "viewing_epoch must be a non-negative integer literal",
785 ))),
786 }
787 } else {
788 self.set_parameter(&key, Value::Null);
791 Ok(QueryResult::empty())
792 }
793 }
794 SessionCommand::SessionReset(target) => {
795 use grafeo_adapters::query::gql::ast::SessionResetTarget;
796 match target {
797 SessionResetTarget::All => self.reset_session(),
798 SessionResetTarget::Schema => self.reset_schema(),
799 SessionResetTarget::Graph => self.reset_graph(),
800 SessionResetTarget::TimeZone => self.reset_time_zone(),
801 SessionResetTarget::Parameters => self.reset_parameters(),
802 }
803 Ok(QueryResult::empty())
804 }
805 SessionCommand::SessionClose => {
806 self.reset_session();
807 Ok(QueryResult::empty())
808 }
809 SessionCommand::StartTransaction {
810 read_only,
811 isolation_level,
812 } => {
813 let engine_level = isolation_level.map(|l| match l {
814 TransactionIsolationLevel::ReadCommitted => {
815 crate::transaction::IsolationLevel::ReadCommitted
816 }
817 TransactionIsolationLevel::SnapshotIsolation => {
818 crate::transaction::IsolationLevel::SnapshotIsolation
819 }
820 TransactionIsolationLevel::Serializable => {
821 crate::transaction::IsolationLevel::Serializable
822 }
823 });
824 self.begin_transaction_inner(read_only, engine_level)?;
825 Ok(QueryResult::status("Transaction started"))
826 }
827 SessionCommand::Commit => {
828 self.commit_inner()?;
829 Ok(QueryResult::status("Transaction committed"))
830 }
831 SessionCommand::Rollback => {
832 self.rollback_inner()?;
833 Ok(QueryResult::status("Transaction rolled back"))
834 }
835 SessionCommand::Savepoint(name) => {
836 self.savepoint(&name)?;
837 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
838 }
839 SessionCommand::RollbackToSavepoint(name) => {
840 self.rollback_to_savepoint(&name)?;
841 Ok(QueryResult::status(format!(
842 "Rolled back to savepoint '{name}'"
843 )))
844 }
845 SessionCommand::ReleaseSavepoint(name) => {
846 self.release_savepoint(&name)?;
847 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
848 }
849 }
850 }
851
852 #[cfg(feature = "wal")]
854 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
855 if let Some(ref wal) = self.wal
856 && let Err(e) = wal.log(record)
857 {
858 grafeo_warn!("Failed to log schema change to WAL: {}", e);
859 }
860 }
861
862 #[cfg(feature = "gql")]
864 fn execute_schema_command(
865 &self,
866 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
867 ) -> Result<QueryResult> {
868 use crate::catalog::{
869 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
870 };
871 use grafeo_adapters::query::gql::ast::SchemaStatement;
872 #[cfg(feature = "wal")]
873 use grafeo_adapters::storage::wal::WalRecord;
874 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
875
876 macro_rules! wal_log {
878 ($self:expr, $record:expr) => {
879 #[cfg(feature = "wal")]
880 $self.log_schema_wal(&$record);
881 };
882 }
883
884 let result = match cmd {
885 SchemaStatement::CreateNodeType(stmt) => {
886 let effective_name = self.effective_type_key(&stmt.name);
887 #[cfg(feature = "wal")]
888 let props_for_wal: Vec<(String, String, bool)> = stmt
889 .properties
890 .iter()
891 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
892 .collect();
893 let def = NodeTypeDefinition {
894 name: effective_name.clone(),
895 properties: stmt
896 .properties
897 .iter()
898 .map(|p| TypedProperty {
899 name: p.name.clone(),
900 data_type: PropertyDataType::from_type_name(&p.data_type),
901 nullable: p.nullable,
902 default_value: p
903 .default_value
904 .as_ref()
905 .map(|s| parse_default_literal(s)),
906 })
907 .collect(),
908 constraints: Vec::new(),
909 parent_types: stmt.parent_types.clone(),
910 };
911 let result = if stmt.or_replace {
912 let _ = self.catalog.drop_node_type(&effective_name);
913 self.catalog.register_node_type(def)
914 } else {
915 self.catalog.register_node_type(def)
916 };
917 match result {
918 Ok(()) => {
919 wal_log!(
920 self,
921 WalRecord::CreateNodeType {
922 name: effective_name.clone(),
923 properties: props_for_wal,
924 constraints: Vec::new(),
925 }
926 );
927 Ok(QueryResult::status(format!(
928 "Created node type '{}'",
929 stmt.name
930 )))
931 }
932 Err(e) if stmt.if_not_exists => {
933 let _ = e;
934 Ok(QueryResult::status("No change"))
935 }
936 Err(e) => Err(Error::Query(QueryError::new(
937 QueryErrorKind::Semantic,
938 e.to_string(),
939 ))),
940 }
941 }
942 SchemaStatement::CreateEdgeType(stmt) => {
943 let effective_name = self.effective_type_key(&stmt.name);
944 #[cfg(feature = "wal")]
945 let props_for_wal: Vec<(String, String, bool)> = stmt
946 .properties
947 .iter()
948 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
949 .collect();
950 let def = EdgeTypeDefinition {
951 name: effective_name.clone(),
952 properties: stmt
953 .properties
954 .iter()
955 .map(|p| TypedProperty {
956 name: p.name.clone(),
957 data_type: PropertyDataType::from_type_name(&p.data_type),
958 nullable: p.nullable,
959 default_value: p
960 .default_value
961 .as_ref()
962 .map(|s| parse_default_literal(s)),
963 })
964 .collect(),
965 constraints: Vec::new(),
966 source_node_types: stmt.source_node_types.clone(),
967 target_node_types: stmt.target_node_types.clone(),
968 };
969 let result = if stmt.or_replace {
970 let _ = self.catalog.drop_edge_type_def(&effective_name);
971 self.catalog.register_edge_type_def(def)
972 } else {
973 self.catalog.register_edge_type_def(def)
974 };
975 match result {
976 Ok(()) => {
977 wal_log!(
978 self,
979 WalRecord::CreateEdgeType {
980 name: effective_name.clone(),
981 properties: props_for_wal,
982 constraints: Vec::new(),
983 }
984 );
985 Ok(QueryResult::status(format!(
986 "Created edge type '{}'",
987 stmt.name
988 )))
989 }
990 Err(e) if stmt.if_not_exists => {
991 let _ = e;
992 Ok(QueryResult::status("No change"))
993 }
994 Err(e) => Err(Error::Query(QueryError::new(
995 QueryErrorKind::Semantic,
996 e.to_string(),
997 ))),
998 }
999 }
1000 SchemaStatement::CreateVectorIndex(stmt) => {
1001 Self::create_vector_index_on_store(
1002 &self.active_lpg_store(),
1003 &stmt.node_label,
1004 &stmt.property,
1005 stmt.dimensions,
1006 stmt.metric.as_deref(),
1007 )?;
1008 wal_log!(
1009 self,
1010 WalRecord::CreateIndex {
1011 name: stmt.name.clone(),
1012 label: stmt.node_label.clone(),
1013 property: stmt.property.clone(),
1014 index_type: "vector".to_string(),
1015 }
1016 );
1017 Ok(QueryResult::status(format!(
1018 "Created vector index '{}'",
1019 stmt.name
1020 )))
1021 }
1022 SchemaStatement::DropNodeType { name, if_exists } => {
1023 let effective_name = self.effective_type_key(&name);
1024 match self.catalog.drop_node_type(&effective_name) {
1025 Ok(()) => {
1026 wal_log!(
1027 self,
1028 WalRecord::DropNodeType {
1029 name: effective_name
1030 }
1031 );
1032 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1033 }
1034 Err(e) if if_exists => {
1035 let _ = e;
1036 Ok(QueryResult::status("No change"))
1037 }
1038 Err(e) => Err(Error::Query(QueryError::new(
1039 QueryErrorKind::Semantic,
1040 e.to_string(),
1041 ))),
1042 }
1043 }
1044 SchemaStatement::DropEdgeType { name, if_exists } => {
1045 let effective_name = self.effective_type_key(&name);
1046 match self.catalog.drop_edge_type_def(&effective_name) {
1047 Ok(()) => {
1048 wal_log!(
1049 self,
1050 WalRecord::DropEdgeType {
1051 name: effective_name
1052 }
1053 );
1054 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1055 }
1056 Err(e) if if_exists => {
1057 let _ = e;
1058 Ok(QueryResult::status("No change"))
1059 }
1060 Err(e) => Err(Error::Query(QueryError::new(
1061 QueryErrorKind::Semantic,
1062 e.to_string(),
1063 ))),
1064 }
1065 }
1066 SchemaStatement::CreateIndex(stmt) => {
1067 use crate::catalog::IndexType as CatalogIndexType;
1068 use grafeo_adapters::query::gql::ast::IndexKind;
1069 let active = self.active_lpg_store();
1070 let index_type_str = match stmt.index_kind {
1071 IndexKind::Property => "property",
1072 IndexKind::BTree => "btree",
1073 IndexKind::Text => "text",
1074 IndexKind::Vector => "vector",
1075 };
1076 match stmt.index_kind {
1077 IndexKind::Property | IndexKind::BTree => {
1078 for prop in &stmt.properties {
1079 active.create_property_index(prop);
1080 }
1081 }
1082 IndexKind::Text => {
1083 for prop in &stmt.properties {
1084 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1085 }
1086 }
1087 IndexKind::Vector => {
1088 for prop in &stmt.properties {
1089 Self::create_vector_index_on_store(
1090 &active,
1091 &stmt.label,
1092 prop,
1093 stmt.options.dimensions,
1094 stmt.options.metric.as_deref(),
1095 )?;
1096 }
1097 }
1098 }
1099 let catalog_index_type = match stmt.index_kind {
1102 IndexKind::Property => CatalogIndexType::Hash,
1103 IndexKind::BTree => CatalogIndexType::BTree,
1104 IndexKind::Text => CatalogIndexType::FullText,
1105 IndexKind::Vector => CatalogIndexType::Hash,
1106 };
1107 let label_id = self.catalog.get_or_create_label(&stmt.label);
1108 for prop in &stmt.properties {
1109 let prop_id = self.catalog.get_or_create_property_key(prop);
1110 self.catalog
1111 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1112 }
1113 #[cfg(feature = "wal")]
1114 for prop in &stmt.properties {
1115 wal_log!(
1116 self,
1117 WalRecord::CreateIndex {
1118 name: stmt.name.clone(),
1119 label: stmt.label.clone(),
1120 property: prop.clone(),
1121 index_type: index_type_str.to_string(),
1122 }
1123 );
1124 }
1125 Ok(QueryResult::status(format!(
1126 "Created {} index '{}'",
1127 index_type_str, stmt.name
1128 )))
1129 }
1130 SchemaStatement::DropIndex { name, if_exists } => {
1131 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1134 let def = self.catalog.get_index(index_id);
1135 self.catalog.drop_index(index_id);
1136 if let Some(def) = def
1137 && let Some(prop_name) =
1138 self.catalog.get_property_key_name(def.property_key)
1139 {
1140 self.active_lpg_store().drop_property_index(&prop_name);
1141 }
1142 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1143 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1144 } else if if_exists {
1145 Ok(QueryResult::status("No change".to_string()))
1146 } else {
1147 Err(Error::Query(QueryError::new(
1148 QueryErrorKind::Semantic,
1149 format!("Index '{name}' does not exist"),
1150 )))
1151 }
1152 }
1153 SchemaStatement::CreateConstraint(stmt) => {
1154 use crate::catalog::TypeConstraint;
1155 use grafeo_adapters::query::gql::ast::ConstraintKind;
1156 let kind_str = match stmt.constraint_kind {
1157 ConstraintKind::Unique => "unique",
1158 ConstraintKind::NodeKey => "node_key",
1159 ConstraintKind::NotNull => "not_null",
1160 ConstraintKind::Exists => "exists",
1161 };
1162 let constraint_name = stmt
1163 .name
1164 .clone()
1165 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1166
1167 match stmt.constraint_kind {
1169 ConstraintKind::Unique => {
1170 for prop in &stmt.properties {
1171 let label_id = self.catalog.get_or_create_label(&stmt.label);
1172 let prop_id = self.catalog.get_or_create_property_key(prop);
1173 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1174 }
1175 let _ = self.catalog.add_constraint_to_type(
1176 &stmt.label,
1177 TypeConstraint::Unique(stmt.properties.clone()),
1178 );
1179 }
1180 ConstraintKind::NodeKey => {
1181 for prop in &stmt.properties {
1182 let label_id = self.catalog.get_or_create_label(&stmt.label);
1183 let prop_id = self.catalog.get_or_create_property_key(prop);
1184 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1185 let _ = self.catalog.add_required_property(label_id, prop_id);
1186 }
1187 let _ = self.catalog.add_constraint_to_type(
1188 &stmt.label,
1189 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1190 );
1191 }
1192 ConstraintKind::NotNull | ConstraintKind::Exists => {
1193 for prop in &stmt.properties {
1194 let label_id = self.catalog.get_or_create_label(&stmt.label);
1195 let prop_id = self.catalog.get_or_create_property_key(prop);
1196 let _ = self.catalog.add_required_property(label_id, prop_id);
1197 let _ = self.catalog.add_constraint_to_type(
1198 &stmt.label,
1199 TypeConstraint::NotNull(prop.clone()),
1200 );
1201 }
1202 }
1203 }
1204
1205 wal_log!(
1206 self,
1207 WalRecord::CreateConstraint {
1208 name: constraint_name.clone(),
1209 label: stmt.label.clone(),
1210 properties: stmt.properties.clone(),
1211 kind: kind_str.to_string(),
1212 }
1213 );
1214 Ok(QueryResult::status(format!(
1215 "Created {kind_str} constraint '{constraint_name}'"
1216 )))
1217 }
1218 SchemaStatement::DropConstraint { name, if_exists } => {
1219 let _ = if_exists;
1220 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1221 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1222 }
1223 SchemaStatement::CreateGraphType(stmt) => {
1224 use crate::catalog::GraphTypeDefinition;
1225 use grafeo_adapters::query::gql::ast::InlineElementType;
1226
1227 let effective_name = self.effective_type_key(&stmt.name);
1228
1229 let (mut node_types, mut edge_types, open) =
1231 if let Some(ref like_graph) = stmt.like_graph {
1232 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1234 if let Some(existing) = self
1235 .catalog
1236 .schema()
1237 .and_then(|s| s.get_graph_type(&type_name))
1238 {
1239 (
1240 existing.allowed_node_types.clone(),
1241 existing.allowed_edge_types.clone(),
1242 existing.open,
1243 )
1244 } else {
1245 (Vec::new(), Vec::new(), true)
1246 }
1247 } else {
1248 let nt = self.catalog.all_node_type_names();
1250 let et = self.catalog.all_edge_type_names();
1251 if nt.is_empty() && et.is_empty() {
1252 (Vec::new(), Vec::new(), true)
1253 } else {
1254 (nt, et, false)
1255 }
1256 }
1257 } else {
1258 let nt = stmt
1260 .node_types
1261 .iter()
1262 .map(|n| self.effective_type_key(n))
1263 .collect();
1264 let et = stmt
1265 .edge_types
1266 .iter()
1267 .map(|n| self.effective_type_key(n))
1268 .collect();
1269 (nt, et, stmt.open)
1270 };
1271
1272 for inline in &stmt.inline_types {
1274 match inline {
1275 InlineElementType::Node {
1276 name,
1277 properties,
1278 key_labels,
1279 ..
1280 } => {
1281 let inline_effective = self.effective_type_key(name);
1282 let def = NodeTypeDefinition {
1283 name: inline_effective.clone(),
1284 properties: properties
1285 .iter()
1286 .map(|p| TypedProperty {
1287 name: p.name.clone(),
1288 data_type: PropertyDataType::from_type_name(&p.data_type),
1289 nullable: p.nullable,
1290 default_value: None,
1291 })
1292 .collect(),
1293 constraints: Vec::new(),
1294 parent_types: key_labels.clone(),
1295 };
1296 self.catalog.register_or_replace_node_type(def);
1298 #[cfg(feature = "wal")]
1299 {
1300 let props_for_wal: Vec<(String, String, bool)> = properties
1301 .iter()
1302 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1303 .collect();
1304 self.log_schema_wal(&WalRecord::CreateNodeType {
1305 name: inline_effective.clone(),
1306 properties: props_for_wal,
1307 constraints: Vec::new(),
1308 });
1309 }
1310 if !node_types.contains(&inline_effective) {
1311 node_types.push(inline_effective);
1312 }
1313 }
1314 InlineElementType::Edge {
1315 name,
1316 properties,
1317 source_node_types,
1318 target_node_types,
1319 ..
1320 } => {
1321 let inline_effective = self.effective_type_key(name);
1322 let def = EdgeTypeDefinition {
1323 name: inline_effective.clone(),
1324 properties: properties
1325 .iter()
1326 .map(|p| TypedProperty {
1327 name: p.name.clone(),
1328 data_type: PropertyDataType::from_type_name(&p.data_type),
1329 nullable: p.nullable,
1330 default_value: None,
1331 })
1332 .collect(),
1333 constraints: Vec::new(),
1334 source_node_types: source_node_types.clone(),
1335 target_node_types: target_node_types.clone(),
1336 };
1337 self.catalog.register_or_replace_edge_type_def(def);
1338 #[cfg(feature = "wal")]
1339 {
1340 let props_for_wal: Vec<(String, String, bool)> = properties
1341 .iter()
1342 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1343 .collect();
1344 self.log_schema_wal(&WalRecord::CreateEdgeType {
1345 name: inline_effective.clone(),
1346 properties: props_for_wal,
1347 constraints: Vec::new(),
1348 });
1349 }
1350 if !edge_types.contains(&inline_effective) {
1351 edge_types.push(inline_effective);
1352 }
1353 }
1354 }
1355 }
1356
1357 let def = GraphTypeDefinition {
1358 name: effective_name.clone(),
1359 allowed_node_types: node_types.clone(),
1360 allowed_edge_types: edge_types.clone(),
1361 open,
1362 };
1363 let result = if stmt.or_replace {
1364 let _ = self.catalog.drop_graph_type(&effective_name);
1366 self.catalog.register_graph_type(def)
1367 } else {
1368 self.catalog.register_graph_type(def)
1369 };
1370 match result {
1371 Ok(()) => {
1372 wal_log!(
1373 self,
1374 WalRecord::CreateGraphType {
1375 name: effective_name.clone(),
1376 node_types,
1377 edge_types,
1378 open,
1379 }
1380 );
1381 Ok(QueryResult::status(format!(
1382 "Created graph type '{}'",
1383 stmt.name
1384 )))
1385 }
1386 Err(e) if stmt.if_not_exists => {
1387 let _ = e;
1388 Ok(QueryResult::status("No change"))
1389 }
1390 Err(e) => Err(Error::Query(QueryError::new(
1391 QueryErrorKind::Semantic,
1392 e.to_string(),
1393 ))),
1394 }
1395 }
1396 SchemaStatement::DropGraphType { name, if_exists } => {
1397 let effective_name = self.effective_type_key(&name);
1398 match self.catalog.drop_graph_type(&effective_name) {
1399 Ok(()) => {
1400 wal_log!(
1401 self,
1402 WalRecord::DropGraphType {
1403 name: effective_name
1404 }
1405 );
1406 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1407 }
1408 Err(e) if if_exists => {
1409 let _ = e;
1410 Ok(QueryResult::status("No change"))
1411 }
1412 Err(e) => Err(Error::Query(QueryError::new(
1413 QueryErrorKind::Semantic,
1414 e.to_string(),
1415 ))),
1416 }
1417 }
1418 SchemaStatement::CreateSchema {
1419 name,
1420 if_not_exists,
1421 } => match self.catalog.register_schema_namespace(name.clone()) {
1422 Ok(()) => {
1423 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1424 Ok(QueryResult::status(format!("Created schema '{name}'")))
1425 }
1426 Err(e) if if_not_exists => {
1427 let _ = e;
1428 Ok(QueryResult::status("No change"))
1429 }
1430 Err(e) => Err(Error::Query(QueryError::new(
1431 QueryErrorKind::Semantic,
1432 e.to_string(),
1433 ))),
1434 },
1435 SchemaStatement::DropSchema { name, if_exists } => {
1436 let prefix = format!("{name}/");
1438 let has_graphs = self
1439 .store
1440 .graph_names()
1441 .iter()
1442 .any(|g| g.starts_with(&prefix));
1443 let has_types = self
1444 .catalog
1445 .all_node_type_names()
1446 .iter()
1447 .any(|n| n.starts_with(&prefix))
1448 || self
1449 .catalog
1450 .all_edge_type_names()
1451 .iter()
1452 .any(|n| n.starts_with(&prefix))
1453 || self
1454 .catalog
1455 .all_graph_type_names()
1456 .iter()
1457 .any(|n| n.starts_with(&prefix));
1458 if has_graphs || has_types {
1459 return Err(Error::Query(QueryError::new(
1460 QueryErrorKind::Semantic,
1461 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1462 )));
1463 }
1464 match self.catalog.drop_schema_namespace(&name) {
1465 Ok(()) => {
1466 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1467 let mut current = self.current_schema.lock();
1469 if current
1470 .as_deref()
1471 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1472 {
1473 *current = None;
1474 }
1475 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1476 }
1477 Err(e) if if_exists => {
1478 let _ = e;
1479 Ok(QueryResult::status("No change"))
1480 }
1481 Err(e) => Err(Error::Query(QueryError::new(
1482 QueryErrorKind::Semantic,
1483 e.to_string(),
1484 ))),
1485 }
1486 }
1487 SchemaStatement::AlterNodeType(stmt) => {
1488 use grafeo_adapters::query::gql::ast::TypeAlteration;
1489 let effective_name = self.effective_type_key(&stmt.name);
1490 let mut wal_alts = Vec::new();
1491 for alt in &stmt.alterations {
1492 match alt {
1493 TypeAlteration::AddProperty(prop) => {
1494 let typed = TypedProperty {
1495 name: prop.name.clone(),
1496 data_type: PropertyDataType::from_type_name(&prop.data_type),
1497 nullable: prop.nullable,
1498 default_value: prop
1499 .default_value
1500 .as_ref()
1501 .map(|s| parse_default_literal(s)),
1502 };
1503 self.catalog
1504 .alter_node_type_add_property(&effective_name, typed)
1505 .map_err(|e| {
1506 Error::Query(QueryError::new(
1507 QueryErrorKind::Semantic,
1508 e.to_string(),
1509 ))
1510 })?;
1511 wal_alts.push((
1512 "add".to_string(),
1513 prop.name.clone(),
1514 prop.data_type.clone(),
1515 prop.nullable,
1516 ));
1517 }
1518 TypeAlteration::DropProperty(name) => {
1519 self.catalog
1520 .alter_node_type_drop_property(&effective_name, name)
1521 .map_err(|e| {
1522 Error::Query(QueryError::new(
1523 QueryErrorKind::Semantic,
1524 e.to_string(),
1525 ))
1526 })?;
1527 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1528 }
1529 }
1530 }
1531 wal_log!(
1532 self,
1533 WalRecord::AlterNodeType {
1534 name: effective_name,
1535 alterations: wal_alts,
1536 }
1537 );
1538 Ok(QueryResult::status(format!(
1539 "Altered node type '{}'",
1540 stmt.name
1541 )))
1542 }
1543 SchemaStatement::AlterEdgeType(stmt) => {
1544 use grafeo_adapters::query::gql::ast::TypeAlteration;
1545 let effective_name = self.effective_type_key(&stmt.name);
1546 let mut wal_alts = Vec::new();
1547 for alt in &stmt.alterations {
1548 match alt {
1549 TypeAlteration::AddProperty(prop) => {
1550 let typed = TypedProperty {
1551 name: prop.name.clone(),
1552 data_type: PropertyDataType::from_type_name(&prop.data_type),
1553 nullable: prop.nullable,
1554 default_value: prop
1555 .default_value
1556 .as_ref()
1557 .map(|s| parse_default_literal(s)),
1558 };
1559 self.catalog
1560 .alter_edge_type_add_property(&effective_name, typed)
1561 .map_err(|e| {
1562 Error::Query(QueryError::new(
1563 QueryErrorKind::Semantic,
1564 e.to_string(),
1565 ))
1566 })?;
1567 wal_alts.push((
1568 "add".to_string(),
1569 prop.name.clone(),
1570 prop.data_type.clone(),
1571 prop.nullable,
1572 ));
1573 }
1574 TypeAlteration::DropProperty(name) => {
1575 self.catalog
1576 .alter_edge_type_drop_property(&effective_name, name)
1577 .map_err(|e| {
1578 Error::Query(QueryError::new(
1579 QueryErrorKind::Semantic,
1580 e.to_string(),
1581 ))
1582 })?;
1583 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1584 }
1585 }
1586 }
1587 wal_log!(
1588 self,
1589 WalRecord::AlterEdgeType {
1590 name: effective_name,
1591 alterations: wal_alts,
1592 }
1593 );
1594 Ok(QueryResult::status(format!(
1595 "Altered edge type '{}'",
1596 stmt.name
1597 )))
1598 }
1599 SchemaStatement::AlterGraphType(stmt) => {
1600 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1601 let effective_name = self.effective_type_key(&stmt.name);
1602 let mut wal_alts = Vec::new();
1603 for alt in &stmt.alterations {
1604 match alt {
1605 GraphTypeAlteration::AddNodeType(name) => {
1606 self.catalog
1607 .alter_graph_type_add_node_type(&effective_name, name.clone())
1608 .map_err(|e| {
1609 Error::Query(QueryError::new(
1610 QueryErrorKind::Semantic,
1611 e.to_string(),
1612 ))
1613 })?;
1614 wal_alts.push(("add_node_type".to_string(), name.clone()));
1615 }
1616 GraphTypeAlteration::DropNodeType(name) => {
1617 self.catalog
1618 .alter_graph_type_drop_node_type(&effective_name, name)
1619 .map_err(|e| {
1620 Error::Query(QueryError::new(
1621 QueryErrorKind::Semantic,
1622 e.to_string(),
1623 ))
1624 })?;
1625 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1626 }
1627 GraphTypeAlteration::AddEdgeType(name) => {
1628 self.catalog
1629 .alter_graph_type_add_edge_type(&effective_name, name.clone())
1630 .map_err(|e| {
1631 Error::Query(QueryError::new(
1632 QueryErrorKind::Semantic,
1633 e.to_string(),
1634 ))
1635 })?;
1636 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1637 }
1638 GraphTypeAlteration::DropEdgeType(name) => {
1639 self.catalog
1640 .alter_graph_type_drop_edge_type(&effective_name, name)
1641 .map_err(|e| {
1642 Error::Query(QueryError::new(
1643 QueryErrorKind::Semantic,
1644 e.to_string(),
1645 ))
1646 })?;
1647 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1648 }
1649 }
1650 }
1651 wal_log!(
1652 self,
1653 WalRecord::AlterGraphType {
1654 name: effective_name,
1655 alterations: wal_alts,
1656 }
1657 );
1658 Ok(QueryResult::status(format!(
1659 "Altered graph type '{}'",
1660 stmt.name
1661 )))
1662 }
1663 SchemaStatement::CreateProcedure(stmt) => {
1664 use crate::catalog::ProcedureDefinition;
1665
1666 let def = ProcedureDefinition {
1667 name: stmt.name.clone(),
1668 params: stmt
1669 .params
1670 .iter()
1671 .map(|p| (p.name.clone(), p.param_type.clone()))
1672 .collect(),
1673 returns: stmt
1674 .returns
1675 .iter()
1676 .map(|r| (r.name.clone(), r.return_type.clone()))
1677 .collect(),
1678 body: stmt.body.clone(),
1679 };
1680
1681 if stmt.or_replace {
1682 self.catalog.replace_procedure(def).map_err(|e| {
1683 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1684 })?;
1685 } else {
1686 match self.catalog.register_procedure(def) {
1687 Ok(()) => {}
1688 Err(_) if stmt.if_not_exists => {
1689 return Ok(QueryResult::empty());
1690 }
1691 Err(e) => {
1692 return Err(Error::Query(QueryError::new(
1693 QueryErrorKind::Semantic,
1694 e.to_string(),
1695 )));
1696 }
1697 }
1698 }
1699
1700 wal_log!(
1701 self,
1702 WalRecord::CreateProcedure {
1703 name: stmt.name.clone(),
1704 params: stmt
1705 .params
1706 .iter()
1707 .map(|p| (p.name.clone(), p.param_type.clone()))
1708 .collect(),
1709 returns: stmt
1710 .returns
1711 .iter()
1712 .map(|r| (r.name.clone(), r.return_type.clone()))
1713 .collect(),
1714 body: stmt.body,
1715 }
1716 );
1717 Ok(QueryResult::status(format!(
1718 "Created procedure '{}'",
1719 stmt.name
1720 )))
1721 }
1722 SchemaStatement::DropProcedure { name, if_exists } => {
1723 match self.catalog.drop_procedure(&name) {
1724 Ok(()) => {}
1725 Err(_) if if_exists => {
1726 return Ok(QueryResult::empty());
1727 }
1728 Err(e) => {
1729 return Err(Error::Query(QueryError::new(
1730 QueryErrorKind::Semantic,
1731 e.to_string(),
1732 )));
1733 }
1734 }
1735 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1736 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1737 }
1738 SchemaStatement::ShowIndexes => {
1739 return self.execute_show_indexes();
1740 }
1741 SchemaStatement::ShowConstraints => {
1742 return self.execute_show_constraints();
1743 }
1744 SchemaStatement::ShowNodeTypes => {
1745 return self.execute_show_node_types();
1746 }
1747 SchemaStatement::ShowEdgeTypes => {
1748 return self.execute_show_edge_types();
1749 }
1750 SchemaStatement::ShowGraphTypes => {
1751 return self.execute_show_graph_types();
1752 }
1753 SchemaStatement::ShowGraphType(name) => {
1754 return self.execute_show_graph_type(&name);
1755 }
1756 SchemaStatement::ShowCurrentGraphType => {
1757 return self.execute_show_current_graph_type();
1758 }
1759 SchemaStatement::ShowGraphs => {
1760 return self.execute_show_graphs();
1761 }
1762 SchemaStatement::ShowSchemas => {
1763 return self.execute_show_schemas();
1764 }
1765 };
1766
1767 if result.is_ok() {
1770 self.query_cache.clear();
1771 }
1772
1773 result
1774 }
1775
1776 #[cfg(all(feature = "gql", feature = "vector-index"))]
1778 fn create_vector_index_on_store(
1779 store: &LpgStore,
1780 label: &str,
1781 property: &str,
1782 dimensions: Option<usize>,
1783 metric: Option<&str>,
1784 ) -> Result<()> {
1785 use grafeo_common::types::{PropertyKey, Value};
1786 use grafeo_common::utils::error::Error;
1787 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1788
1789 let metric = match metric {
1790 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1791 Error::Internal(format!(
1792 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1793 ))
1794 })?,
1795 None => DistanceMetric::Cosine,
1796 };
1797
1798 let prop_key = PropertyKey::new(property);
1799 let mut found_dims: Option<usize> = dimensions;
1800 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1801
1802 for node in store.nodes_with_label(label) {
1803 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1804 if let Some(expected) = found_dims {
1805 if v.len() != expected {
1806 return Err(Error::Internal(format!(
1807 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1808 v.len(),
1809 node.id.0
1810 )));
1811 }
1812 } else {
1813 found_dims = Some(v.len());
1814 }
1815 vectors.push((node.id, v.to_vec()));
1816 }
1817 }
1818
1819 let Some(dims) = found_dims else {
1820 return Err(Error::Internal(format!(
1821 "No vector properties found on :{label}({property}) and no dimensions specified"
1822 )));
1823 };
1824
1825 let config = HnswConfig::new(dims, metric);
1826 let index = HnswIndex::with_capacity(config, vectors.len());
1827 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1828 for (node_id, vec) in &vectors {
1829 index.insert(*node_id, vec, &accessor);
1830 }
1831
1832 store.add_vector_index(label, property, Arc::new(index));
1833 Ok(())
1834 }
1835
1836 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1838 fn create_vector_index_on_store(
1839 _store: &LpgStore,
1840 _label: &str,
1841 _property: &str,
1842 _dimensions: Option<usize>,
1843 _metric: Option<&str>,
1844 ) -> Result<()> {
1845 Err(grafeo_common::utils::error::Error::Internal(
1846 "Vector index support requires the 'vector-index' feature".to_string(),
1847 ))
1848 }
1849
1850 #[cfg(all(feature = "gql", feature = "text-index"))]
1852 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1853 use grafeo_common::types::{PropertyKey, Value};
1854 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1855
1856 let mut index = InvertedIndex::new(BM25Config::default());
1857 let prop_key = PropertyKey::new(property);
1858
1859 let nodes = store.nodes_by_label(label);
1860 for node_id in nodes {
1861 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1862 index.insert(node_id, text.as_str());
1863 }
1864 }
1865
1866 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1867 Ok(())
1868 }
1869
1870 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1872 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1873 Err(grafeo_common::utils::error::Error::Internal(
1874 "Text index support requires the 'text-index' feature".to_string(),
1875 ))
1876 }
1877
1878 fn execute_show_indexes(&self) -> Result<QueryResult> {
1880 let indexes = self.catalog.all_indexes();
1881 let columns = vec![
1882 "name".to_string(),
1883 "type".to_string(),
1884 "label".to_string(),
1885 "property".to_string(),
1886 ];
1887 let rows: Vec<Vec<Value>> = indexes
1888 .into_iter()
1889 .map(|def| {
1890 let label_name = self
1891 .catalog
1892 .get_label_name(def.label)
1893 .unwrap_or_else(|| "?".into());
1894 let prop_name = self
1895 .catalog
1896 .get_property_key_name(def.property_key)
1897 .unwrap_or_else(|| "?".into());
1898 vec![
1899 Value::from(def.name),
1900 Value::from(format!("{:?}", def.index_type)),
1901 Value::from(&*label_name),
1902 Value::from(&*prop_name),
1903 ]
1904 })
1905 .collect();
1906 Ok(QueryResult {
1907 columns,
1908 column_types: Vec::new(),
1909 rows,
1910 ..QueryResult::empty()
1911 })
1912 }
1913
1914 fn execute_show_constraints(&self) -> Result<QueryResult> {
1916 Ok(QueryResult {
1919 columns: vec![
1920 "name".to_string(),
1921 "type".to_string(),
1922 "label".to_string(),
1923 "properties".to_string(),
1924 ],
1925 column_types: Vec::new(),
1926 rows: Vec::new(),
1927 ..QueryResult::empty()
1928 })
1929 }
1930
1931 fn execute_show_node_types(&self) -> Result<QueryResult> {
1933 let columns = vec![
1934 "name".to_string(),
1935 "properties".to_string(),
1936 "constraints".to_string(),
1937 "parents".to_string(),
1938 ];
1939 let schema = self.current_schema.lock().clone();
1940 let all_names = self.catalog.all_node_type_names();
1941 let type_names: Vec<String> = match &schema {
1942 Some(s) => {
1943 let prefix = format!("{s}/");
1944 all_names
1945 .into_iter()
1946 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1947 .collect()
1948 }
1949 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1950 };
1951 let rows: Vec<Vec<Value>> = type_names
1952 .into_iter()
1953 .filter_map(|name| {
1954 let lookup = match &schema {
1955 Some(s) => format!("{s}/{name}"),
1956 None => name.clone(),
1957 };
1958 let def = self.catalog.get_node_type(&lookup)?;
1959 let props: Vec<String> = def
1960 .properties
1961 .iter()
1962 .map(|p| {
1963 let nullable = if p.nullable { "" } else { " NOT NULL" };
1964 format!("{} {}{}", p.name, p.data_type, nullable)
1965 })
1966 .collect();
1967 let constraints: Vec<String> =
1968 def.constraints.iter().map(|c| format!("{c:?}")).collect();
1969 let parents = def.parent_types.join(", ");
1970 Some(vec![
1971 Value::from(name),
1972 Value::from(props.join(", ")),
1973 Value::from(constraints.join(", ")),
1974 Value::from(parents),
1975 ])
1976 })
1977 .collect();
1978 Ok(QueryResult {
1979 columns,
1980 column_types: Vec::new(),
1981 rows,
1982 ..QueryResult::empty()
1983 })
1984 }
1985
1986 fn execute_show_edge_types(&self) -> Result<QueryResult> {
1988 let columns = vec![
1989 "name".to_string(),
1990 "properties".to_string(),
1991 "source_types".to_string(),
1992 "target_types".to_string(),
1993 ];
1994 let schema = self.current_schema.lock().clone();
1995 let all_names = self.catalog.all_edge_type_names();
1996 let type_names: Vec<String> = match &schema {
1997 Some(s) => {
1998 let prefix = format!("{s}/");
1999 all_names
2000 .into_iter()
2001 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2002 .collect()
2003 }
2004 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2005 };
2006 let rows: Vec<Vec<Value>> = type_names
2007 .into_iter()
2008 .filter_map(|name| {
2009 let lookup = match &schema {
2010 Some(s) => format!("{s}/{name}"),
2011 None => name.clone(),
2012 };
2013 let def = self.catalog.get_edge_type_def(&lookup)?;
2014 let props: Vec<String> = def
2015 .properties
2016 .iter()
2017 .map(|p| {
2018 let nullable = if p.nullable { "" } else { " NOT NULL" };
2019 format!("{} {}{}", p.name, p.data_type, nullable)
2020 })
2021 .collect();
2022 let src = def.source_node_types.join(", ");
2023 let tgt = def.target_node_types.join(", ");
2024 Some(vec![
2025 Value::from(name),
2026 Value::from(props.join(", ")),
2027 Value::from(src),
2028 Value::from(tgt),
2029 ])
2030 })
2031 .collect();
2032 Ok(QueryResult {
2033 columns,
2034 column_types: Vec::new(),
2035 rows,
2036 ..QueryResult::empty()
2037 })
2038 }
2039
2040 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2042 let columns = vec![
2043 "name".to_string(),
2044 "open".to_string(),
2045 "node_types".to_string(),
2046 "edge_types".to_string(),
2047 ];
2048 let schema = self.current_schema.lock().clone();
2049 let all_names = self.catalog.all_graph_type_names();
2050 let type_names: Vec<String> = match &schema {
2051 Some(s) => {
2052 let prefix = format!("{s}/");
2053 all_names
2054 .into_iter()
2055 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2056 .collect()
2057 }
2058 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2059 };
2060 let rows: Vec<Vec<Value>> = type_names
2061 .into_iter()
2062 .filter_map(|name| {
2063 let lookup = match &schema {
2064 Some(s) => format!("{s}/{name}"),
2065 None => name.clone(),
2066 };
2067 let def = self.catalog.get_graph_type_def(&lookup)?;
2068 let strip = |n: &String| -> String {
2070 match &schema {
2071 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2072 None => n.clone(),
2073 }
2074 };
2075 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2076 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2077 Some(vec![
2078 Value::from(name),
2079 Value::from(def.open),
2080 Value::from(node_types.join(", ")),
2081 Value::from(edge_types.join(", ")),
2082 ])
2083 })
2084 .collect();
2085 Ok(QueryResult {
2086 columns,
2087 column_types: Vec::new(),
2088 rows,
2089 ..QueryResult::empty()
2090 })
2091 }
2092
2093 fn execute_show_graphs(&self) -> Result<QueryResult> {
2099 let schema = self.current_schema.lock().clone();
2100 let all_names = self.store.graph_names();
2101
2102 let mut names: Vec<String> = match &schema {
2103 Some(s) => {
2104 let prefix = format!("{s}/");
2105 all_names
2106 .into_iter()
2107 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2108 .collect()
2109 }
2110 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2111 };
2112 names.sort();
2113
2114 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2115 Ok(QueryResult {
2116 columns: vec!["name".to_string()],
2117 column_types: Vec::new(),
2118 rows,
2119 ..QueryResult::empty()
2120 })
2121 }
2122
2123 fn execute_show_schemas(&self) -> Result<QueryResult> {
2125 let mut names = self.catalog.schema_names();
2126 names.sort();
2127 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2128 Ok(QueryResult {
2129 columns: vec!["name".to_string()],
2130 column_types: Vec::new(),
2131 rows,
2132 ..QueryResult::empty()
2133 })
2134 }
2135
2136 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2138 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2139
2140 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2141 Error::Query(QueryError::new(
2142 QueryErrorKind::Semantic,
2143 format!("Graph type '{name}' not found"),
2144 ))
2145 })?;
2146
2147 let columns = vec![
2148 "name".to_string(),
2149 "open".to_string(),
2150 "node_types".to_string(),
2151 "edge_types".to_string(),
2152 ];
2153 let rows = vec![vec![
2154 Value::from(def.name),
2155 Value::from(def.open),
2156 Value::from(def.allowed_node_types.join(", ")),
2157 Value::from(def.allowed_edge_types.join(", ")),
2158 ]];
2159 Ok(QueryResult {
2160 columns,
2161 column_types: Vec::new(),
2162 rows,
2163 ..QueryResult::empty()
2164 })
2165 }
2166
2167 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2169 let graph_name = self
2170 .current_graph()
2171 .unwrap_or_else(|| "default".to_string());
2172 let columns = vec![
2173 "graph".to_string(),
2174 "graph_type".to_string(),
2175 "open".to_string(),
2176 "node_types".to_string(),
2177 "edge_types".to_string(),
2178 ];
2179
2180 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2181 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2182 {
2183 let rows = vec![vec![
2184 Value::from(graph_name),
2185 Value::from(type_name),
2186 Value::from(def.open),
2187 Value::from(def.allowed_node_types.join(", ")),
2188 Value::from(def.allowed_edge_types.join(", ")),
2189 ]];
2190 return Ok(QueryResult {
2191 columns,
2192 column_types: Vec::new(),
2193 rows,
2194 ..QueryResult::empty()
2195 });
2196 }
2197
2198 Ok(QueryResult {
2200 columns,
2201 column_types: Vec::new(),
2202 rows: vec![vec![
2203 Value::from(graph_name),
2204 Value::Null,
2205 Value::Null,
2206 Value::Null,
2207 Value::Null,
2208 ]],
2209 ..QueryResult::empty()
2210 })
2211 }
2212
2213 #[cfg(feature = "gql")]
2240 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2241 self.require_lpg("GQL")?;
2242
2243 use crate::query::{
2244 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2245 processor::QueryLanguage, translators::gql,
2246 };
2247
2248 let _span = grafeo_info_span!(
2249 "grafeo::session::execute",
2250 language = "gql",
2251 query_len = query.len(),
2252 );
2253
2254 #[cfg(not(target_arch = "wasm32"))]
2255 let start_time = std::time::Instant::now();
2256
2257 let translation = gql::translate_full(query)?;
2259 let logical_plan = match translation {
2260 gql::GqlTranslationResult::SessionCommand(cmd) => {
2261 return self.execute_session_command(cmd);
2262 }
2263 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2264 if *self.read_only_tx.lock() {
2266 return Err(grafeo_common::utils::error::Error::Transaction(
2267 grafeo_common::utils::error::TransactionError::ReadOnly,
2268 ));
2269 }
2270 return self.execute_schema_command(cmd);
2271 }
2272 gql::GqlTranslationResult::Plan(plan) => {
2273 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2275 return Err(grafeo_common::utils::error::Error::Transaction(
2276 grafeo_common::utils::error::TransactionError::ReadOnly,
2277 ));
2278 }
2279 plan
2280 }
2281 };
2282
2283 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2285
2286 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2288 cached_plan
2289 } else {
2290 let mut binder = Binder::new();
2292 let _binding_context = binder.bind(&logical_plan)?;
2293
2294 let active = self.active_store();
2296 let optimizer = Optimizer::from_graph_store(&*active);
2297 let plan = optimizer.optimize(logical_plan)?;
2298
2299 self.query_cache.put_optimized(cache_key, plan.clone());
2301
2302 plan
2303 };
2304
2305 let active = self.active_store();
2307
2308 if optimized_plan.explain {
2310 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2311 let mut plan = optimized_plan;
2312 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2313 return Ok(explain_result(&plan));
2314 }
2315
2316 if optimized_plan.profile {
2318 let has_mutations = optimized_plan.root.has_mutations();
2319 return self.with_auto_commit(has_mutations, || {
2320 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2321 let planner = self.create_planner_for_store(
2322 Arc::clone(&active),
2323 viewing_epoch,
2324 transaction_id,
2325 );
2326 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2327
2328 let executor = Executor::with_columns(physical_plan.columns.clone())
2329 .with_deadline(self.query_deadline());
2330 let _result = executor.execute(physical_plan.operator.as_mut())?;
2331
2332 let total_time_ms;
2333 #[cfg(not(target_arch = "wasm32"))]
2334 {
2335 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2336 }
2337 #[cfg(target_arch = "wasm32")]
2338 {
2339 total_time_ms = 0.0;
2340 }
2341
2342 let profile_tree = crate::query::profile::build_profile_tree(
2343 &optimized_plan.root,
2344 &mut entries.into_iter(),
2345 );
2346 Ok(crate::query::profile::profile_result(
2347 &profile_tree,
2348 total_time_ms,
2349 ))
2350 });
2351 }
2352
2353 let has_mutations = optimized_plan.root.has_mutations();
2354
2355 let result = self.with_auto_commit(has_mutations, || {
2356 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2358
2359 let has_active_tx = self.current_transaction.lock().is_some();
2364 let read_only = !has_mutations && !has_active_tx;
2365 let planner = self.create_planner_for_store_with_read_only(
2366 Arc::clone(&active),
2367 viewing_epoch,
2368 transaction_id,
2369 read_only,
2370 );
2371 let mut physical_plan = planner.plan(&optimized_plan)?;
2372
2373 let executor = Executor::with_columns(physical_plan.columns.clone())
2375 .with_deadline(self.query_deadline());
2376 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2377
2378 let rows_scanned = result.rows.len() as u64;
2380 #[cfg(not(target_arch = "wasm32"))]
2381 {
2382 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2383 result.execution_time_ms = Some(elapsed_ms);
2384 }
2385 result.rows_scanned = Some(rows_scanned);
2386
2387 Ok(result)
2388 });
2389
2390 #[cfg(feature = "metrics")]
2392 {
2393 #[cfg(not(target_arch = "wasm32"))]
2394 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2395 #[cfg(target_arch = "wasm32")]
2396 let elapsed_ms = None;
2397 self.record_query_metrics("gql", elapsed_ms, &result);
2398 }
2399
2400 result
2401 }
2402
2403 #[cfg(feature = "gql")]
2412 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2413 let previous = self.viewing_epoch_override.lock().replace(epoch);
2414 let result = self.execute(query);
2415 *self.viewing_epoch_override.lock() = previous;
2416 result
2417 }
2418
2419 #[cfg(feature = "gql")]
2427 pub fn execute_at_epoch_with_params(
2428 &self,
2429 query: &str,
2430 epoch: EpochId,
2431 params: Option<std::collections::HashMap<String, Value>>,
2432 ) -> Result<QueryResult> {
2433 let previous = self.viewing_epoch_override.lock().replace(epoch);
2434 let result = if let Some(p) = params {
2435 self.execute_with_params(query, p)
2436 } else {
2437 self.execute(query)
2438 };
2439 *self.viewing_epoch_override.lock() = previous;
2440 result
2441 }
2442
2443 #[cfg(feature = "gql")]
2449 pub fn execute_with_params(
2450 &self,
2451 query: &str,
2452 params: std::collections::HashMap<String, Value>,
2453 ) -> Result<QueryResult> {
2454 self.require_lpg("GQL")?;
2455
2456 use crate::query::processor::{QueryLanguage, QueryProcessor};
2457
2458 let has_mutations = Self::query_looks_like_mutation(query);
2459 let active = self.active_store();
2460
2461 self.with_auto_commit(has_mutations, || {
2462 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2464
2465 let processor = QueryProcessor::for_stores_with_transaction(
2467 Arc::clone(&active),
2468 self.active_write_store(),
2469 Arc::clone(&self.transaction_manager),
2470 )?;
2471
2472 let processor = if let Some(transaction_id) = transaction_id {
2474 processor.with_transaction_context(viewing_epoch, transaction_id)
2475 } else {
2476 processor
2477 };
2478
2479 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2480 })
2481 }
2482
2483 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2489 pub fn execute_with_params(
2490 &self,
2491 _query: &str,
2492 _params: std::collections::HashMap<String, Value>,
2493 ) -> Result<QueryResult> {
2494 Err(grafeo_common::utils::error::Error::Internal(
2495 "No query language enabled".to_string(),
2496 ))
2497 }
2498
2499 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2505 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2506 Err(grafeo_common::utils::error::Error::Internal(
2507 "No query language enabled".to_string(),
2508 ))
2509 }
2510
2511 #[cfg(feature = "cypher")]
2517 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2518 use crate::query::{
2519 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2520 processor::QueryLanguage, translators::cypher,
2521 };
2522 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2523
2524 let translation = cypher::translate_full(query)?;
2526 match translation {
2527 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2528 if *self.read_only_tx.lock() {
2529 return Err(GrafeoError::Query(QueryError::new(
2530 QueryErrorKind::Semantic,
2531 "Cannot execute schema DDL in a read-only transaction",
2532 )));
2533 }
2534 return self.execute_schema_command(cmd);
2535 }
2536 cypher::CypherTranslationResult::ShowIndexes => {
2537 return self.execute_show_indexes();
2538 }
2539 cypher::CypherTranslationResult::ShowConstraints => {
2540 return self.execute_show_constraints();
2541 }
2542 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2543 return self.execute_show_current_graph_type();
2544 }
2545 cypher::CypherTranslationResult::Plan(_) => {
2546 }
2548 }
2549
2550 #[cfg(not(target_arch = "wasm32"))]
2551 let start_time = std::time::Instant::now();
2552
2553 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2555
2556 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2558 cached_plan
2559 } else {
2560 let logical_plan = cypher::translate(query)?;
2562
2563 let mut binder = Binder::new();
2565 let _binding_context = binder.bind(&logical_plan)?;
2566
2567 let active = self.active_store();
2569 let optimizer = Optimizer::from_graph_store(&*active);
2570 let plan = optimizer.optimize(logical_plan)?;
2571
2572 self.query_cache.put_optimized(cache_key, plan.clone());
2574
2575 plan
2576 };
2577
2578 let active = self.active_store();
2580
2581 if optimized_plan.explain {
2583 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2584 let mut plan = optimized_plan;
2585 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2586 return Ok(explain_result(&plan));
2587 }
2588
2589 if optimized_plan.profile {
2591 let has_mutations = optimized_plan.root.has_mutations();
2592 return self.with_auto_commit(has_mutations, || {
2593 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2594 let planner = self.create_planner_for_store(
2595 Arc::clone(&active),
2596 viewing_epoch,
2597 transaction_id,
2598 );
2599 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2600
2601 let executor = Executor::with_columns(physical_plan.columns.clone())
2602 .with_deadline(self.query_deadline());
2603 let _result = executor.execute(physical_plan.operator.as_mut())?;
2604
2605 let total_time_ms;
2606 #[cfg(not(target_arch = "wasm32"))]
2607 {
2608 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2609 }
2610 #[cfg(target_arch = "wasm32")]
2611 {
2612 total_time_ms = 0.0;
2613 }
2614
2615 let profile_tree = crate::query::profile::build_profile_tree(
2616 &optimized_plan.root,
2617 &mut entries.into_iter(),
2618 );
2619 Ok(crate::query::profile::profile_result(
2620 &profile_tree,
2621 total_time_ms,
2622 ))
2623 });
2624 }
2625
2626 let has_mutations = optimized_plan.root.has_mutations();
2627
2628 let result = self.with_auto_commit(has_mutations, || {
2629 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2631
2632 let planner =
2634 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2635 let mut physical_plan = planner.plan(&optimized_plan)?;
2636
2637 let executor = Executor::with_columns(physical_plan.columns.clone())
2639 .with_deadline(self.query_deadline());
2640 executor.execute(physical_plan.operator.as_mut())
2641 });
2642
2643 #[cfg(feature = "metrics")]
2644 {
2645 #[cfg(not(target_arch = "wasm32"))]
2646 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2647 #[cfg(target_arch = "wasm32")]
2648 let elapsed_ms = None;
2649 self.record_query_metrics("cypher", elapsed_ms, &result);
2650 }
2651
2652 result
2653 }
2654
2655 #[cfg(feature = "gremlin")]
2679 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2680 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2681
2682 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2683 let start_time = Instant::now();
2684
2685 let logical_plan = gremlin::translate(query)?;
2687
2688 let mut binder = Binder::new();
2690 let _binding_context = binder.bind(&logical_plan)?;
2691
2692 let active = self.active_store();
2694 let optimizer = Optimizer::from_graph_store(&*active);
2695 let optimized_plan = optimizer.optimize(logical_plan)?;
2696
2697 let has_mutations = optimized_plan.root.has_mutations();
2698
2699 let result = self.with_auto_commit(has_mutations, || {
2700 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2702
2703 let planner =
2705 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2706 let mut physical_plan = planner.plan(&optimized_plan)?;
2707
2708 let executor = Executor::with_columns(physical_plan.columns.clone())
2710 .with_deadline(self.query_deadline());
2711 executor.execute(physical_plan.operator.as_mut())
2712 });
2713
2714 #[cfg(feature = "metrics")]
2715 {
2716 #[cfg(not(target_arch = "wasm32"))]
2717 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2718 #[cfg(target_arch = "wasm32")]
2719 let elapsed_ms = None;
2720 self.record_query_metrics("gremlin", elapsed_ms, &result);
2721 }
2722
2723 result
2724 }
2725
2726 #[cfg(feature = "gremlin")]
2732 pub fn execute_gremlin_with_params(
2733 &self,
2734 query: &str,
2735 params: std::collections::HashMap<String, Value>,
2736 ) -> Result<QueryResult> {
2737 use crate::query::processor::{QueryLanguage, QueryProcessor};
2738
2739 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2740 let start_time = Instant::now();
2741
2742 let has_mutations = Self::query_looks_like_mutation(query);
2743 let active = self.active_store();
2744
2745 let result = self.with_auto_commit(has_mutations, || {
2746 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2747 let processor = QueryProcessor::for_stores_with_transaction(
2748 Arc::clone(&active),
2749 self.active_write_store(),
2750 Arc::clone(&self.transaction_manager),
2751 )?;
2752 let processor = if let Some(transaction_id) = transaction_id {
2753 processor.with_transaction_context(viewing_epoch, transaction_id)
2754 } else {
2755 processor
2756 };
2757 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2758 });
2759
2760 #[cfg(feature = "metrics")]
2761 {
2762 #[cfg(not(target_arch = "wasm32"))]
2763 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2764 #[cfg(target_arch = "wasm32")]
2765 let elapsed_ms = None;
2766 self.record_query_metrics("gremlin", elapsed_ms, &result);
2767 }
2768
2769 result
2770 }
2771
2772 #[cfg(feature = "graphql")]
2796 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2797 use crate::query::{
2798 Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
2799 translators::graphql,
2800 };
2801
2802 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2803 let start_time = Instant::now();
2804
2805 let mut logical_plan = graphql::translate(query)?;
2806
2807 if !logical_plan.default_params.is_empty() {
2809 let defaults = logical_plan.default_params.clone();
2810 substitute_params(&mut logical_plan, &defaults)?;
2811 }
2812
2813 let mut binder = Binder::new();
2814 let _binding_context = binder.bind(&logical_plan)?;
2815
2816 let active = self.active_store();
2817 let optimizer = Optimizer::from_graph_store(&*active);
2818 let optimized_plan = optimizer.optimize(logical_plan)?;
2819 let has_mutations = optimized_plan.root.has_mutations();
2820
2821 let result = self.with_auto_commit(has_mutations, || {
2822 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2823 let planner =
2824 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2825 let mut physical_plan = planner.plan(&optimized_plan)?;
2826 let executor = Executor::with_columns(physical_plan.columns.clone())
2827 .with_deadline(self.query_deadline());
2828 executor.execute(physical_plan.operator.as_mut())
2829 });
2830
2831 #[cfg(feature = "metrics")]
2832 {
2833 #[cfg(not(target_arch = "wasm32"))]
2834 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2835 #[cfg(target_arch = "wasm32")]
2836 let elapsed_ms = None;
2837 self.record_query_metrics("graphql", elapsed_ms, &result);
2838 }
2839
2840 result
2841 }
2842
2843 #[cfg(feature = "graphql")]
2849 pub fn execute_graphql_with_params(
2850 &self,
2851 query: &str,
2852 params: std::collections::HashMap<String, Value>,
2853 ) -> Result<QueryResult> {
2854 use crate::query::processor::{QueryLanguage, QueryProcessor};
2855
2856 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2857 let start_time = Instant::now();
2858
2859 let has_mutations = Self::query_looks_like_mutation(query);
2860 let active = self.active_store();
2861
2862 let result = self.with_auto_commit(has_mutations, || {
2863 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2864 let processor = QueryProcessor::for_stores_with_transaction(
2865 Arc::clone(&active),
2866 self.active_write_store(),
2867 Arc::clone(&self.transaction_manager),
2868 )?;
2869 let processor = if let Some(transaction_id) = transaction_id {
2870 processor.with_transaction_context(viewing_epoch, transaction_id)
2871 } else {
2872 processor
2873 };
2874 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2875 });
2876
2877 #[cfg(feature = "metrics")]
2878 {
2879 #[cfg(not(target_arch = "wasm32"))]
2880 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2881 #[cfg(target_arch = "wasm32")]
2882 let elapsed_ms = None;
2883 self.record_query_metrics("graphql", elapsed_ms, &result);
2884 }
2885
2886 result
2887 }
2888
2889 #[cfg(feature = "sql-pgq")]
2914 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2915 use crate::query::{
2916 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2917 processor::QueryLanguage, translators::sql_pgq,
2918 };
2919
2920 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2921 let start_time = Instant::now();
2922
2923 let logical_plan = sql_pgq::translate(query)?;
2925
2926 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2928 return Ok(QueryResult {
2929 columns: vec!["status".into()],
2930 column_types: vec![grafeo_common::types::LogicalType::String],
2931 rows: vec![vec![Value::from(format!(
2932 "Property graph '{}' created ({} node tables, {} edge tables)",
2933 cpg.name,
2934 cpg.node_tables.len(),
2935 cpg.edge_tables.len()
2936 ))]],
2937 execution_time_ms: None,
2938 rows_scanned: None,
2939 status_message: None,
2940 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2941 });
2942 }
2943
2944 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2945
2946 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2947 cached_plan
2948 } else {
2949 let mut binder = Binder::new();
2950 let _binding_context = binder.bind(&logical_plan)?;
2951 let active = self.active_store();
2952 let optimizer = Optimizer::from_graph_store(&*active);
2953 let plan = optimizer.optimize(logical_plan)?;
2954 self.query_cache.put_optimized(cache_key, plan.clone());
2955 plan
2956 };
2957
2958 let active = self.active_store();
2959 let has_mutations = optimized_plan.root.has_mutations();
2960
2961 let result = self.with_auto_commit(has_mutations, || {
2962 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2963 let planner =
2964 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2965 let mut physical_plan = planner.plan(&optimized_plan)?;
2966 let executor = Executor::with_columns(physical_plan.columns.clone())
2967 .with_deadline(self.query_deadline());
2968 executor.execute(physical_plan.operator.as_mut())
2969 });
2970
2971 #[cfg(feature = "metrics")]
2972 {
2973 #[cfg(not(target_arch = "wasm32"))]
2974 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2975 #[cfg(target_arch = "wasm32")]
2976 let elapsed_ms = None;
2977 self.record_query_metrics("sql", elapsed_ms, &result);
2978 }
2979
2980 result
2981 }
2982
2983 #[cfg(feature = "sql-pgq")]
2989 pub fn execute_sql_with_params(
2990 &self,
2991 query: &str,
2992 params: std::collections::HashMap<String, Value>,
2993 ) -> Result<QueryResult> {
2994 use crate::query::processor::{QueryLanguage, QueryProcessor};
2995
2996 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2997 let start_time = Instant::now();
2998
2999 let has_mutations = Self::query_looks_like_mutation(query);
3000 let active = self.active_store();
3001
3002 let result = self.with_auto_commit(has_mutations, || {
3003 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3004 let processor = QueryProcessor::for_stores_with_transaction(
3005 Arc::clone(&active),
3006 self.active_write_store(),
3007 Arc::clone(&self.transaction_manager),
3008 )?;
3009 let processor = if let Some(transaction_id) = transaction_id {
3010 processor.with_transaction_context(viewing_epoch, transaction_id)
3011 } else {
3012 processor
3013 };
3014 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3015 });
3016
3017 #[cfg(feature = "metrics")]
3018 {
3019 #[cfg(not(target_arch = "wasm32"))]
3020 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3021 #[cfg(target_arch = "wasm32")]
3022 let elapsed_ms = None;
3023 self.record_query_metrics("sql", elapsed_ms, &result);
3024 }
3025
3026 result
3027 }
3028
3029 pub fn execute_language(
3038 &self,
3039 query: &str,
3040 language: &str,
3041 params: Option<std::collections::HashMap<String, Value>>,
3042 ) -> Result<QueryResult> {
3043 let _span = grafeo_info_span!(
3044 "grafeo::session::execute",
3045 language,
3046 query_len = query.len(),
3047 );
3048 match language {
3049 "gql" => {
3050 if let Some(p) = params {
3051 self.execute_with_params(query, p)
3052 } else {
3053 self.execute(query)
3054 }
3055 }
3056 #[cfg(feature = "cypher")]
3057 "cypher" => {
3058 if let Some(p) = params {
3059 use crate::query::processor::{QueryLanguage, QueryProcessor};
3060
3061 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3062 let start_time = Instant::now();
3063
3064 let has_mutations = Self::query_looks_like_mutation(query);
3065 let active = self.active_store();
3066 let result = self.with_auto_commit(has_mutations, || {
3067 let processor = QueryProcessor::for_stores_with_transaction(
3068 Arc::clone(&active),
3069 self.active_write_store(),
3070 Arc::clone(&self.transaction_manager),
3071 )?;
3072 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3073 let processor = if let Some(transaction_id) = transaction_id {
3074 processor.with_transaction_context(viewing_epoch, transaction_id)
3075 } else {
3076 processor
3077 };
3078 processor.process(query, QueryLanguage::Cypher, Some(&p))
3079 });
3080
3081 #[cfg(feature = "metrics")]
3082 {
3083 #[cfg(not(target_arch = "wasm32"))]
3084 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3085 #[cfg(target_arch = "wasm32")]
3086 let elapsed_ms = None;
3087 self.record_query_metrics("cypher", elapsed_ms, &result);
3088 }
3089
3090 result
3091 } else {
3092 self.execute_cypher(query)
3093 }
3094 }
3095 #[cfg(feature = "gremlin")]
3096 "gremlin" => {
3097 if let Some(p) = params {
3098 self.execute_gremlin_with_params(query, p)
3099 } else {
3100 self.execute_gremlin(query)
3101 }
3102 }
3103 #[cfg(feature = "graphql")]
3104 "graphql" => {
3105 if let Some(p) = params {
3106 self.execute_graphql_with_params(query, p)
3107 } else {
3108 self.execute_graphql(query)
3109 }
3110 }
3111 #[cfg(all(feature = "graphql", feature = "rdf"))]
3112 "graphql-rdf" => {
3113 if let Some(p) = params {
3114 self.execute_graphql_rdf_with_params(query, p)
3115 } else {
3116 self.execute_graphql_rdf(query)
3117 }
3118 }
3119 #[cfg(feature = "sql-pgq")]
3120 "sql" | "sql-pgq" => {
3121 if let Some(p) = params {
3122 self.execute_sql_with_params(query, p)
3123 } else {
3124 self.execute_sql(query)
3125 }
3126 }
3127 #[cfg(all(feature = "sparql", feature = "rdf"))]
3128 "sparql" => {
3129 if let Some(p) = params {
3130 self.execute_sparql_with_params(query, p)
3131 } else {
3132 self.execute_sparql(query)
3133 }
3134 }
3135 other => Err(grafeo_common::utils::error::Error::Query(
3136 grafeo_common::utils::error::QueryError::new(
3137 grafeo_common::utils::error::QueryErrorKind::Semantic,
3138 format!("Unknown query language: '{other}'"),
3139 ),
3140 )),
3141 }
3142 }
3143
3144 pub fn clear_plan_cache(&self) {
3171 self.query_cache.clear();
3172 }
3173
3174 pub fn begin_transaction(&mut self) -> Result<()> {
3182 self.begin_transaction_inner(false, None)
3183 }
3184
3185 pub fn begin_transaction_with_isolation(
3193 &mut self,
3194 isolation_level: crate::transaction::IsolationLevel,
3195 ) -> Result<()> {
3196 self.begin_transaction_inner(false, Some(isolation_level))
3197 }
3198
3199 fn begin_transaction_inner(
3201 &self,
3202 read_only: bool,
3203 isolation_level: Option<crate::transaction::IsolationLevel>,
3204 ) -> Result<()> {
3205 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3206 let mut current = self.current_transaction.lock();
3207 if current.is_some() {
3208 drop(current);
3210 let mut depth = self.transaction_nesting_depth.lock();
3211 *depth += 1;
3212 let sp_name = format!("_nested_tx_{}", *depth);
3213 self.savepoint(&sp_name)?;
3214 return Ok(());
3215 }
3216
3217 let active = self.active_lpg_store();
3218 self.transaction_start_node_count
3219 .store(active.node_count(), Ordering::Relaxed);
3220 self.transaction_start_edge_count
3221 .store(active.edge_count(), Ordering::Relaxed);
3222 let transaction_id = if let Some(level) = isolation_level {
3223 self.transaction_manager.begin_with_isolation(level)
3224 } else {
3225 self.transaction_manager.begin()
3226 };
3227 *current = Some(transaction_id);
3228 *self.read_only_tx.lock() = read_only || self.db_read_only;
3229
3230 let key = self.active_graph_storage_key();
3233 let mut touched = self.touched_graphs.lock();
3234 touched.clear();
3235 touched.push(key);
3236
3237 #[cfg(feature = "metrics")]
3238 {
3239 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3240 #[cfg(not(target_arch = "wasm32"))]
3241 {
3242 *self.tx_start_time.lock() = Some(Instant::now());
3243 }
3244 }
3245
3246 Ok(())
3247 }
3248
3249 pub fn commit(&mut self) -> Result<()> {
3257 self.commit_inner()
3258 }
3259
3260 fn commit_inner(&self) -> Result<()> {
3262 let _span = grafeo_debug_span!("grafeo::tx::commit");
3263 {
3265 let mut depth = self.transaction_nesting_depth.lock();
3266 if *depth > 0 {
3267 let sp_name = format!("_nested_tx_{depth}");
3268 *depth -= 1;
3269 drop(depth);
3270 return self.release_savepoint(&sp_name);
3271 }
3272 }
3273
3274 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3275 grafeo_common::utils::error::Error::Transaction(
3276 grafeo_common::utils::error::TransactionError::InvalidState(
3277 "No active transaction".to_string(),
3278 ),
3279 )
3280 })?;
3281
3282 let touched = self.touched_graphs.lock().clone();
3285 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3286 Ok(epoch) => epoch,
3287 Err(e) => {
3288 for graph_name in &touched {
3290 let store = self.resolve_store(graph_name);
3291 store.rollback_transaction_properties(transaction_id);
3292 }
3293 #[cfg(feature = "rdf")]
3294 self.rollback_rdf_transaction(transaction_id);
3295 *self.read_only_tx.lock() = self.db_read_only;
3296 self.savepoints.lock().clear();
3297 self.touched_graphs.lock().clear();
3298 #[cfg(feature = "metrics")]
3299 {
3300 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3301 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3302 #[cfg(not(target_arch = "wasm32"))]
3303 if let Some(start) = self.tx_start_time.lock().take() {
3304 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3305 crate::metrics::record_metric!(
3306 self.metrics,
3307 tx_duration,
3308 observe duration_ms
3309 );
3310 }
3311 }
3312 return Err(e);
3313 }
3314 };
3315
3316 for graph_name in &touched {
3318 let store = self.resolve_store(graph_name);
3319 store.finalize_version_epochs(transaction_id, commit_epoch);
3320 }
3321
3322 #[cfg(feature = "rdf")]
3324 self.commit_rdf_transaction(transaction_id);
3325
3326 for graph_name in &touched {
3327 let store = self.resolve_store(graph_name);
3328 store.commit_transaction_properties(transaction_id);
3329 }
3330
3331 let current_epoch = self.transaction_manager.current_epoch();
3334 for graph_name in &touched {
3335 let store = self.resolve_store(graph_name);
3336 store.sync_epoch(current_epoch);
3337 }
3338
3339 *self.read_only_tx.lock() = self.db_read_only;
3341 self.savepoints.lock().clear();
3342 self.touched_graphs.lock().clear();
3343
3344 if self.gc_interval > 0 {
3346 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3347 if count.is_multiple_of(self.gc_interval) {
3348 let min_epoch = self.transaction_manager.min_active_epoch();
3349 for graph_name in &touched {
3350 let store = self.resolve_store(graph_name);
3351 store.gc_versions(min_epoch);
3352 }
3353 self.transaction_manager.gc();
3354 #[cfg(feature = "metrics")]
3355 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3356 }
3357 }
3358
3359 #[cfg(feature = "metrics")]
3360 {
3361 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3362 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3363 #[cfg(not(target_arch = "wasm32"))]
3364 if let Some(start) = self.tx_start_time.lock().take() {
3365 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3366 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3367 }
3368 }
3369
3370 Ok(())
3371 }
3372
3373 pub fn rollback(&mut self) -> Result<()> {
3397 self.rollback_inner()
3398 }
3399
3400 fn rollback_inner(&self) -> Result<()> {
3402 let _span = grafeo_debug_span!("grafeo::tx::rollback");
3403 {
3405 let mut depth = self.transaction_nesting_depth.lock();
3406 if *depth > 0 {
3407 let sp_name = format!("_nested_tx_{depth}");
3408 *depth -= 1;
3409 drop(depth);
3410 return self.rollback_to_savepoint(&sp_name);
3411 }
3412 }
3413
3414 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3415 grafeo_common::utils::error::Error::Transaction(
3416 grafeo_common::utils::error::TransactionError::InvalidState(
3417 "No active transaction".to_string(),
3418 ),
3419 )
3420 })?;
3421
3422 *self.read_only_tx.lock() = self.db_read_only;
3424
3425 let touched = self.touched_graphs.lock().clone();
3427 for graph_name in &touched {
3428 let store = self.resolve_store(graph_name);
3429 store.discard_uncommitted_versions(transaction_id);
3430 }
3431
3432 #[cfg(feature = "rdf")]
3434 self.rollback_rdf_transaction(transaction_id);
3435
3436 self.savepoints.lock().clear();
3438 self.touched_graphs.lock().clear();
3439
3440 let result = self.transaction_manager.abort(transaction_id);
3442
3443 #[cfg(feature = "metrics")]
3444 if result.is_ok() {
3445 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3446 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3447 #[cfg(not(target_arch = "wasm32"))]
3448 if let Some(start) = self.tx_start_time.lock().take() {
3449 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3450 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3451 }
3452 }
3453
3454 result
3455 }
3456
3457 pub fn savepoint(&self, name: &str) -> Result<()> {
3467 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3468 grafeo_common::utils::error::Error::Transaction(
3469 grafeo_common::utils::error::TransactionError::InvalidState(
3470 "No active transaction".to_string(),
3471 ),
3472 )
3473 })?;
3474
3475 let touched = self.touched_graphs.lock().clone();
3477 let graph_snapshots: Vec<GraphSavepoint> = touched
3478 .iter()
3479 .map(|graph_name| {
3480 let store = self.resolve_store(graph_name);
3481 GraphSavepoint {
3482 graph_name: graph_name.clone(),
3483 next_node_id: store.peek_next_node_id(),
3484 next_edge_id: store.peek_next_edge_id(),
3485 undo_log_position: store.property_undo_log_position(tx_id),
3486 }
3487 })
3488 .collect();
3489
3490 self.savepoints.lock().push(SavepointState {
3491 name: name.to_string(),
3492 graph_snapshots,
3493 active_graph: self.current_graph.lock().clone(),
3494 });
3495 Ok(())
3496 }
3497
3498 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3507 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3508 grafeo_common::utils::error::Error::Transaction(
3509 grafeo_common::utils::error::TransactionError::InvalidState(
3510 "No active transaction".to_string(),
3511 ),
3512 )
3513 })?;
3514
3515 let mut savepoints = self.savepoints.lock();
3516
3517 let pos = savepoints
3519 .iter()
3520 .rposition(|sp| sp.name == name)
3521 .ok_or_else(|| {
3522 grafeo_common::utils::error::Error::Transaction(
3523 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3524 "Savepoint '{name}' not found"
3525 )),
3526 )
3527 })?;
3528
3529 let sp_state = savepoints[pos].clone();
3530
3531 savepoints.truncate(pos);
3533 drop(savepoints);
3534
3535 for gs in &sp_state.graph_snapshots {
3537 let store = self.resolve_store(&gs.graph_name);
3538
3539 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3541
3542 let current_next_node = store.peek_next_node_id();
3544 let current_next_edge = store.peek_next_edge_id();
3545
3546 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3547 .map(NodeId::new)
3548 .collect();
3549 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3550 .map(EdgeId::new)
3551 .collect();
3552
3553 if !node_ids.is_empty() || !edge_ids.is_empty() {
3554 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3555 }
3556 }
3557
3558 let touched = self.touched_graphs.lock().clone();
3562 for graph_name in &touched {
3563 let already_captured = sp_state
3564 .graph_snapshots
3565 .iter()
3566 .any(|gs| gs.graph_name == *graph_name);
3567 if !already_captured {
3568 let store = self.resolve_store(graph_name);
3569 store.discard_uncommitted_versions(transaction_id);
3570 }
3571 }
3572
3573 let mut touched = self.touched_graphs.lock();
3575 touched.clear();
3576 for gs in &sp_state.graph_snapshots {
3577 if !touched.contains(&gs.graph_name) {
3578 touched.push(gs.graph_name.clone());
3579 }
3580 }
3581
3582 Ok(())
3583 }
3584
3585 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3591 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3592 grafeo_common::utils::error::Error::Transaction(
3593 grafeo_common::utils::error::TransactionError::InvalidState(
3594 "No active transaction".to_string(),
3595 ),
3596 )
3597 })?;
3598
3599 let mut savepoints = self.savepoints.lock();
3600 let pos = savepoints
3601 .iter()
3602 .rposition(|sp| sp.name == name)
3603 .ok_or_else(|| {
3604 grafeo_common::utils::error::Error::Transaction(
3605 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3606 "Savepoint '{name}' not found"
3607 )),
3608 )
3609 })?;
3610 savepoints.remove(pos);
3611 Ok(())
3612 }
3613
3614 #[must_use]
3616 pub fn in_transaction(&self) -> bool {
3617 self.current_transaction.lock().is_some()
3618 }
3619
3620 #[must_use]
3622 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3623 *self.current_transaction.lock()
3624 }
3625
3626 #[must_use]
3628 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3629 &self.transaction_manager
3630 }
3631
3632 #[must_use]
3634 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3635 (
3636 self.transaction_start_node_count.load(Ordering::Relaxed),
3637 self.active_lpg_store().node_count(),
3638 )
3639 }
3640
3641 #[must_use]
3643 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3644 (
3645 self.transaction_start_edge_count.load(Ordering::Relaxed),
3646 self.active_lpg_store().edge_count(),
3647 )
3648 }
3649
3650 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3684 crate::transaction::PreparedCommit::new(self)
3685 }
3686
3687 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3689 self.auto_commit = auto_commit;
3690 }
3691
3692 #[must_use]
3694 pub fn auto_commit(&self) -> bool {
3695 self.auto_commit
3696 }
3697
3698 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3703 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3704 }
3705
3706 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3709 where
3710 F: FnOnce() -> Result<QueryResult>,
3711 {
3712 if self.needs_auto_commit(has_mutations) {
3713 self.begin_transaction_inner(false, None)?;
3714 match body() {
3715 Ok(result) => {
3716 self.commit_inner()?;
3717 Ok(result)
3718 }
3719 Err(e) => {
3720 let _ = self.rollback_inner();
3721 Err(e)
3722 }
3723 }
3724 } else {
3725 body()
3726 }
3727 }
3728
3729 fn query_looks_like_mutation(query: &str) -> bool {
3735 let upper = query.to_ascii_uppercase();
3736 upper.contains("INSERT")
3737 || upper.contains("CREATE")
3738 || upper.contains("DELETE")
3739 || upper.contains("MERGE")
3740 || upper.contains("SET")
3741 || upper.contains("REMOVE")
3742 || upper.contains("DROP")
3743 || upper.contains("ALTER")
3744 }
3745
3746 #[must_use]
3748 fn query_deadline(&self) -> Option<Instant> {
3749 #[cfg(not(target_arch = "wasm32"))]
3750 {
3751 self.query_timeout.map(|d| Instant::now() + d)
3752 }
3753 #[cfg(target_arch = "wasm32")]
3754 {
3755 let _ = &self.query_timeout;
3756 None
3757 }
3758 }
3759
3760 #[cfg(feature = "metrics")]
3766 fn record_query_metrics(
3767 &self,
3768 language: &str,
3769 elapsed_ms: Option<f64>,
3770 result: &Result<crate::database::QueryResult>,
3771 ) {
3772 use crate::metrics::record_metric;
3773
3774 record_metric!(self.metrics, query_count, inc);
3775 if let Some(ref reg) = self.metrics {
3776 reg.query_count_by_language.increment(language);
3777 }
3778 if let Some(ms) = elapsed_ms {
3779 record_metric!(self.metrics, query_latency, observe ms);
3780 }
3781 match result {
3782 Ok(r) => {
3783 let returned = r.rows.len() as u64;
3784 record_metric!(self.metrics, rows_returned, add returned);
3785 if let Some(scanned) = r.rows_scanned {
3786 record_metric!(self.metrics, rows_scanned, add scanned);
3787 }
3788 }
3789 Err(e) => {
3790 record_metric!(self.metrics, query_errors, inc);
3791 let msg = e.to_string();
3793 if msg.contains("exceeded timeout") {
3794 record_metric!(self.metrics, query_timeouts, inc);
3795 }
3796 }
3797 }
3798 }
3799
3800 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3802 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3803 match expr {
3804 Expression::Literal(Literal::Integer(n)) => Some(*n),
3805 _ => None,
3806 }
3807 }
3808
3809 #[must_use]
3815 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3816 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3818 return (epoch, None);
3819 }
3820
3821 if let Some(transaction_id) = *self.current_transaction.lock() {
3822 let epoch = self
3824 .transaction_manager
3825 .start_epoch(transaction_id)
3826 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3827 (epoch, Some(transaction_id))
3828 } else {
3829 (self.transaction_manager.current_epoch(), None)
3831 }
3832 }
3833
3834 fn create_planner_for_store(
3839 &self,
3840 store: Arc<dyn GraphStore>,
3841 viewing_epoch: EpochId,
3842 transaction_id: Option<TransactionId>,
3843 ) -> crate::query::Planner {
3844 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3845 }
3846
3847 fn create_planner_for_store_with_read_only(
3848 &self,
3849 store: Arc<dyn GraphStore>,
3850 viewing_epoch: EpochId,
3851 transaction_id: Option<TransactionId>,
3852 read_only: bool,
3853 ) -> crate::query::Planner {
3854 use crate::query::Planner;
3855 use grafeo_core::execution::operators::{LazyValue, SessionContext};
3856
3857 let info_store = Arc::clone(&store);
3859 let schema_store = Arc::clone(&store);
3860
3861 let session_context = SessionContext {
3862 current_schema: self.current_schema(),
3863 current_graph: self.current_graph(),
3864 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3865 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3866 };
3867
3868 let write_store = self.active_write_store();
3869
3870 let mut planner = Planner::with_context(
3871 Arc::clone(&store),
3872 write_store,
3873 Arc::clone(&self.transaction_manager),
3874 transaction_id,
3875 viewing_epoch,
3876 )
3877 .with_factorized_execution(self.factorized_execution)
3878 .with_catalog(Arc::clone(&self.catalog))
3879 .with_session_context(session_context)
3880 .with_read_only(read_only);
3881
3882 let validator =
3884 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3885 planner = planner.with_validator(Arc::new(validator));
3886
3887 planner
3888 }
3889
3890 fn build_info_value(store: &dyn GraphStore) -> Value {
3892 use grafeo_common::types::PropertyKey;
3893 use std::collections::BTreeMap;
3894
3895 let mut map = BTreeMap::new();
3896 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3897 map.insert(
3898 PropertyKey::from("node_count"),
3899 Value::Int64(store.node_count() as i64),
3900 );
3901 map.insert(
3902 PropertyKey::from("edge_count"),
3903 Value::Int64(store.edge_count() as i64),
3904 );
3905 map.insert(
3906 PropertyKey::from("version"),
3907 Value::String(env!("CARGO_PKG_VERSION").into()),
3908 );
3909 Value::Map(map.into())
3910 }
3911
3912 fn build_schema_value(store: &dyn GraphStore) -> Value {
3914 use grafeo_common::types::PropertyKey;
3915 use std::collections::BTreeMap;
3916
3917 let labels: Vec<Value> = store
3918 .all_labels()
3919 .into_iter()
3920 .map(|l| Value::String(l.into()))
3921 .collect();
3922 let edge_types: Vec<Value> = store
3923 .all_edge_types()
3924 .into_iter()
3925 .map(|t| Value::String(t.into()))
3926 .collect();
3927 let property_keys: Vec<Value> = store
3928 .all_property_keys()
3929 .into_iter()
3930 .map(|k| Value::String(k.into()))
3931 .collect();
3932
3933 let mut map = BTreeMap::new();
3934 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3935 map.insert(
3936 PropertyKey::from("edge_types"),
3937 Value::List(edge_types.into()),
3938 );
3939 map.insert(
3940 PropertyKey::from("property_keys"),
3941 Value::List(property_keys.into()),
3942 );
3943 Value::Map(map.into())
3944 }
3945
3946 pub fn create_node(&self, labels: &[&str]) -> NodeId {
3951 let (epoch, transaction_id) = self.get_transaction_context();
3952 self.active_lpg_store().create_node_versioned(
3953 labels,
3954 epoch,
3955 transaction_id.unwrap_or(TransactionId::SYSTEM),
3956 )
3957 }
3958
3959 pub fn create_node_with_props<'a>(
3963 &self,
3964 labels: &[&str],
3965 properties: impl IntoIterator<Item = (&'a str, Value)>,
3966 ) -> NodeId {
3967 let (epoch, transaction_id) = self.get_transaction_context();
3968 self.active_lpg_store().create_node_with_props_versioned(
3969 labels,
3970 properties,
3971 epoch,
3972 transaction_id.unwrap_or(TransactionId::SYSTEM),
3973 )
3974 }
3975
3976 pub fn create_edge(
3981 &self,
3982 src: NodeId,
3983 dst: NodeId,
3984 edge_type: &str,
3985 ) -> grafeo_common::types::EdgeId {
3986 let (epoch, transaction_id) = self.get_transaction_context();
3987 self.active_lpg_store().create_edge_versioned(
3988 src,
3989 dst,
3990 edge_type,
3991 epoch,
3992 transaction_id.unwrap_or(TransactionId::SYSTEM),
3993 )
3994 }
3995
3996 #[must_use]
4024 pub fn get_node(&self, id: NodeId) -> Option<Node> {
4025 let (epoch, transaction_id) = self.get_transaction_context();
4026 self.active_lpg_store().get_node_versioned(
4027 id,
4028 epoch,
4029 transaction_id.unwrap_or(TransactionId::SYSTEM),
4030 )
4031 }
4032
4033 #[must_use]
4057 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4058 self.get_node(id)
4059 .and_then(|node| node.get_property(key).cloned())
4060 }
4061
4062 #[must_use]
4069 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4070 let (epoch, transaction_id) = self.get_transaction_context();
4071 self.active_lpg_store().get_edge_versioned(
4072 id,
4073 epoch,
4074 transaction_id.unwrap_or(TransactionId::SYSTEM),
4075 )
4076 }
4077
4078 #[must_use]
4104 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4105 self.active_lpg_store()
4106 .edges_from(node, Direction::Outgoing)
4107 .collect()
4108 }
4109
4110 #[must_use]
4119 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4120 self.active_lpg_store()
4121 .edges_from(node, Direction::Incoming)
4122 .collect()
4123 }
4124
4125 #[must_use]
4137 pub fn get_neighbors_outgoing_by_type(
4138 &self,
4139 node: NodeId,
4140 edge_type: &str,
4141 ) -> Vec<(NodeId, EdgeId)> {
4142 self.active_lpg_store()
4143 .edges_from(node, Direction::Outgoing)
4144 .filter(|(_, edge_id)| {
4145 self.get_edge(*edge_id)
4146 .is_some_and(|e| e.edge_type.as_str() == edge_type)
4147 })
4148 .collect()
4149 }
4150
4151 #[must_use]
4158 pub fn node_exists(&self, id: NodeId) -> bool {
4159 self.get_node(id).is_some()
4160 }
4161
4162 #[must_use]
4164 pub fn edge_exists(&self, id: EdgeId) -> bool {
4165 self.get_edge(id).is_some()
4166 }
4167
4168 #[must_use]
4172 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4173 let active = self.active_lpg_store();
4174 let out = active.out_degree(node);
4175 let in_degree = active.in_degree(node);
4176 (out, in_degree)
4177 }
4178
4179 #[must_use]
4189 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4190 let (epoch, transaction_id) = self.get_transaction_context();
4191 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4192 let active = self.active_lpg_store();
4193 ids.iter()
4194 .map(|&id| active.get_node_versioned(id, epoch, tx))
4195 .collect()
4196 }
4197
4198 #[cfg(feature = "cdc")]
4202 pub fn history(
4203 &self,
4204 entity_id: impl Into<crate::cdc::EntityId>,
4205 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4206 Ok(self.cdc_log.history(entity_id.into()))
4207 }
4208
4209 #[cfg(feature = "cdc")]
4211 pub fn history_since(
4212 &self,
4213 entity_id: impl Into<crate::cdc::EntityId>,
4214 since_epoch: EpochId,
4215 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4216 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4217 }
4218
4219 #[cfg(feature = "cdc")]
4221 pub fn changes_between(
4222 &self,
4223 start_epoch: EpochId,
4224 end_epoch: EpochId,
4225 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4226 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4227 }
4228}
4229
4230impl Drop for Session {
4231 fn drop(&mut self) {
4232 if self.in_transaction() {
4235 let _ = self.rollback_inner();
4236 }
4237
4238 #[cfg(feature = "metrics")]
4239 if let Some(ref reg) = self.metrics {
4240 reg.session_active
4241 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4242 }
4243 }
4244}
4245
4246#[cfg(test)]
4247mod tests {
4248 use super::parse_default_literal;
4249 use crate::database::GrafeoDB;
4250 use grafeo_common::types::Value;
4251
4252 #[test]
4257 fn parse_default_literal_null() {
4258 assert_eq!(parse_default_literal("null"), Value::Null);
4259 assert_eq!(parse_default_literal("NULL"), Value::Null);
4260 assert_eq!(parse_default_literal("Null"), Value::Null);
4261 }
4262
4263 #[test]
4264 fn parse_default_literal_bool() {
4265 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4266 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4267 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4268 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4269 }
4270
4271 #[test]
4272 fn parse_default_literal_string_single_quoted() {
4273 assert_eq!(
4274 parse_default_literal("'hello'"),
4275 Value::String("hello".into())
4276 );
4277 }
4278
4279 #[test]
4280 fn parse_default_literal_string_double_quoted() {
4281 assert_eq!(
4282 parse_default_literal("\"world\""),
4283 Value::String("world".into())
4284 );
4285 }
4286
4287 #[test]
4288 fn parse_default_literal_integer() {
4289 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4290 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4291 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4292 }
4293
4294 #[test]
4295 fn parse_default_literal_float() {
4296 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4297 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4298 }
4299
4300 #[test]
4301 fn parse_default_literal_fallback_string() {
4302 assert_eq!(
4304 parse_default_literal("some_identifier"),
4305 Value::String("some_identifier".into())
4306 );
4307 }
4308
4309 #[test]
4310 fn test_session_create_node() {
4311 let db = GrafeoDB::new_in_memory();
4312 let session = db.session();
4313
4314 let id = session.create_node(&["Person"]);
4315 assert!(id.is_valid());
4316 assert_eq!(db.node_count(), 1);
4317 }
4318
4319 #[test]
4320 fn test_session_transaction() {
4321 let db = GrafeoDB::new_in_memory();
4322 let mut session = db.session();
4323
4324 assert!(!session.in_transaction());
4325
4326 session.begin_transaction().unwrap();
4327 assert!(session.in_transaction());
4328
4329 session.commit().unwrap();
4330 assert!(!session.in_transaction());
4331 }
4332
4333 #[test]
4334 fn test_session_transaction_context() {
4335 let db = GrafeoDB::new_in_memory();
4336 let mut session = db.session();
4337
4338 let (_epoch1, transaction_id1) = session.get_transaction_context();
4340 assert!(transaction_id1.is_none());
4341
4342 session.begin_transaction().unwrap();
4344 let (epoch2, transaction_id2) = session.get_transaction_context();
4345 assert!(transaction_id2.is_some());
4346 let _ = epoch2; session.commit().unwrap();
4351 let (epoch3, tx_id3) = session.get_transaction_context();
4352 assert!(tx_id3.is_none());
4353 assert!(epoch3.as_u64() >= epoch2.as_u64());
4355 }
4356
4357 #[test]
4358 fn test_session_rollback() {
4359 let db = GrafeoDB::new_in_memory();
4360 let mut session = db.session();
4361
4362 session.begin_transaction().unwrap();
4363 session.rollback().unwrap();
4364 assert!(!session.in_transaction());
4365 }
4366
4367 #[test]
4368 fn test_session_rollback_discards_versions() {
4369 use grafeo_common::types::TransactionId;
4370
4371 let db = GrafeoDB::new_in_memory();
4372
4373 let node_before = db.store().create_node(&["Person"]);
4375 assert!(node_before.is_valid());
4376 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4377
4378 let mut session = db.session();
4380 session.begin_transaction().unwrap();
4381 let transaction_id = session.current_transaction.lock().unwrap();
4382
4383 let epoch = db.store().current_epoch();
4385 let node_in_tx = db
4386 .store()
4387 .create_node_versioned(&["Person"], epoch, transaction_id);
4388 assert!(node_in_tx.is_valid());
4389
4390 assert_eq!(
4394 db.node_count(),
4395 1,
4396 "PENDING nodes should be invisible to non-versioned node_count()"
4397 );
4398 assert!(
4399 db.store()
4400 .get_node_versioned(node_in_tx, epoch, transaction_id)
4401 .is_some(),
4402 "Transaction node should be visible to its own transaction"
4403 );
4404
4405 session.rollback().unwrap();
4407 assert!(!session.in_transaction());
4408
4409 let count_after = db.node_count();
4412 assert_eq!(
4413 count_after, 1,
4414 "Rollback should discard uncommitted node, but got {count_after}"
4415 );
4416
4417 let current_epoch = db.store().current_epoch();
4419 assert!(
4420 db.store()
4421 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4422 .is_some(),
4423 "Original node should still exist"
4424 );
4425
4426 assert!(
4428 db.store()
4429 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4430 .is_none(),
4431 "Transaction node should be gone"
4432 );
4433 }
4434
4435 #[test]
4436 fn test_session_create_node_in_transaction() {
4437 let db = GrafeoDB::new_in_memory();
4439
4440 let node_before = db.create_node(&["Person"]);
4442 assert!(node_before.is_valid());
4443 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4444
4445 let mut session = db.session();
4447 session.begin_transaction().unwrap();
4448 let transaction_id = session.current_transaction.lock().unwrap();
4449
4450 let node_in_tx = session.create_node(&["Person"]);
4452 assert!(node_in_tx.is_valid());
4453
4454 assert_eq!(
4457 db.node_count(),
4458 1,
4459 "PENDING nodes should be invisible to non-versioned node_count()"
4460 );
4461 let epoch = db.store().current_epoch();
4462 assert!(
4463 db.store()
4464 .get_node_versioned(node_in_tx, epoch, transaction_id)
4465 .is_some(),
4466 "Transaction node should be visible to its own transaction"
4467 );
4468
4469 session.rollback().unwrap();
4471
4472 let count_after = db.node_count();
4474 assert_eq!(
4475 count_after, 1,
4476 "Rollback should discard node created via session.create_node(), but got {count_after}"
4477 );
4478 }
4479
4480 #[test]
4481 fn test_session_create_node_with_props_in_transaction() {
4482 use grafeo_common::types::Value;
4483
4484 let db = GrafeoDB::new_in_memory();
4486
4487 db.create_node(&["Person"]);
4489 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4490
4491 let mut session = db.session();
4493 session.begin_transaction().unwrap();
4494 let transaction_id = session.current_transaction.lock().unwrap();
4495
4496 let node_in_tx =
4497 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4498 assert!(node_in_tx.is_valid());
4499
4500 assert_eq!(
4503 db.node_count(),
4504 1,
4505 "PENDING nodes should be invisible to non-versioned node_count()"
4506 );
4507 let epoch = db.store().current_epoch();
4508 assert!(
4509 db.store()
4510 .get_node_versioned(node_in_tx, epoch, transaction_id)
4511 .is_some(),
4512 "Transaction node should be visible to its own transaction"
4513 );
4514
4515 session.rollback().unwrap();
4517
4518 let count_after = db.node_count();
4520 assert_eq!(
4521 count_after, 1,
4522 "Rollback should discard node created via session.create_node_with_props()"
4523 );
4524 }
4525
4526 #[cfg(feature = "gql")]
4527 mod gql_tests {
4528 use super::*;
4529
4530 #[test]
4531 fn test_gql_query_execution() {
4532 let db = GrafeoDB::new_in_memory();
4533 let session = db.session();
4534
4535 session.create_node(&["Person"]);
4537 session.create_node(&["Person"]);
4538 session.create_node(&["Animal"]);
4539
4540 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4542
4543 assert_eq!(result.row_count(), 2);
4545 assert_eq!(result.column_count(), 1);
4546 assert_eq!(result.columns[0], "n");
4547 }
4548
4549 #[test]
4550 fn test_gql_empty_result() {
4551 let db = GrafeoDB::new_in_memory();
4552 let session = db.session();
4553
4554 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4556
4557 assert_eq!(result.row_count(), 0);
4558 }
4559
4560 #[test]
4561 fn test_gql_parse_error() {
4562 let db = GrafeoDB::new_in_memory();
4563 let session = db.session();
4564
4565 let result = session.execute("MATCH (n RETURN n");
4567
4568 assert!(result.is_err());
4569 }
4570
4571 #[test]
4572 fn test_gql_relationship_traversal() {
4573 let db = GrafeoDB::new_in_memory();
4574 let session = db.session();
4575
4576 let alix = session.create_node(&["Person"]);
4578 let gus = session.create_node(&["Person"]);
4579 let vincent = session.create_node(&["Person"]);
4580
4581 session.create_edge(alix, gus, "KNOWS");
4582 session.create_edge(alix, vincent, "KNOWS");
4583
4584 let result = session
4586 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4587 .unwrap();
4588
4589 assert_eq!(result.row_count(), 2);
4591 assert_eq!(result.column_count(), 2);
4592 assert_eq!(result.columns[0], "a");
4593 assert_eq!(result.columns[1], "b");
4594 }
4595
4596 #[test]
4597 fn test_gql_relationship_with_type_filter() {
4598 let db = GrafeoDB::new_in_memory();
4599 let session = db.session();
4600
4601 let alix = session.create_node(&["Person"]);
4603 let gus = session.create_node(&["Person"]);
4604 let vincent = session.create_node(&["Person"]);
4605
4606 session.create_edge(alix, gus, "KNOWS");
4607 session.create_edge(alix, vincent, "WORKS_WITH");
4608
4609 let result = session
4611 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4612 .unwrap();
4613
4614 assert_eq!(result.row_count(), 1);
4616 }
4617
4618 #[test]
4619 fn test_gql_semantic_error_undefined_variable() {
4620 let db = GrafeoDB::new_in_memory();
4621 let session = db.session();
4622
4623 let result = session.execute("MATCH (n:Person) RETURN x");
4625
4626 assert!(result.is_err());
4628 let Err(err) = result else {
4629 panic!("Expected error")
4630 };
4631 assert!(
4632 err.to_string().contains("Undefined variable"),
4633 "Expected undefined variable error, got: {}",
4634 err
4635 );
4636 }
4637
4638 #[test]
4639 fn test_gql_where_clause_property_filter() {
4640 use grafeo_common::types::Value;
4641
4642 let db = GrafeoDB::new_in_memory();
4643 let session = db.session();
4644
4645 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4647 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4648 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4649
4650 let result = session
4652 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4653 .unwrap();
4654
4655 assert_eq!(result.row_count(), 2);
4657 }
4658
4659 #[test]
4660 fn test_gql_where_clause_equality() {
4661 use grafeo_common::types::Value;
4662
4663 let db = GrafeoDB::new_in_memory();
4664 let session = db.session();
4665
4666 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4668 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4669 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4670
4671 let result = session
4673 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4674 .unwrap();
4675
4676 assert_eq!(result.row_count(), 2);
4678 }
4679
4680 #[test]
4681 fn test_gql_return_property_access() {
4682 use grafeo_common::types::Value;
4683
4684 let db = GrafeoDB::new_in_memory();
4685 let session = db.session();
4686
4687 session.create_node_with_props(
4689 &["Person"],
4690 [
4691 ("name", Value::String("Alix".into())),
4692 ("age", Value::Int64(30)),
4693 ],
4694 );
4695 session.create_node_with_props(
4696 &["Person"],
4697 [
4698 ("name", Value::String("Gus".into())),
4699 ("age", Value::Int64(25)),
4700 ],
4701 );
4702
4703 let result = session
4705 .execute("MATCH (n:Person) RETURN n.name, n.age")
4706 .unwrap();
4707
4708 assert_eq!(result.row_count(), 2);
4710 assert_eq!(result.column_count(), 2);
4711 assert_eq!(result.columns[0], "n.name");
4712 assert_eq!(result.columns[1], "n.age");
4713
4714 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4716 assert!(names.contains(&&Value::String("Alix".into())));
4717 assert!(names.contains(&&Value::String("Gus".into())));
4718 }
4719
4720 #[test]
4721 fn test_gql_return_mixed_expressions() {
4722 use grafeo_common::types::Value;
4723
4724 let db = GrafeoDB::new_in_memory();
4725 let session = db.session();
4726
4727 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4729
4730 let result = session
4732 .execute("MATCH (n:Person) RETURN n, n.name")
4733 .unwrap();
4734
4735 assert_eq!(result.row_count(), 1);
4736 assert_eq!(result.column_count(), 2);
4737 assert_eq!(result.columns[0], "n");
4738 assert_eq!(result.columns[1], "n.name");
4739
4740 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4742 }
4743 }
4744
4745 #[cfg(feature = "cypher")]
4746 mod cypher_tests {
4747 use super::*;
4748
4749 #[test]
4750 fn test_cypher_query_execution() {
4751 let db = GrafeoDB::new_in_memory();
4752 let session = db.session();
4753
4754 session.create_node(&["Person"]);
4756 session.create_node(&["Person"]);
4757 session.create_node(&["Animal"]);
4758
4759 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4761
4762 assert_eq!(result.row_count(), 2);
4764 assert_eq!(result.column_count(), 1);
4765 assert_eq!(result.columns[0], "n");
4766 }
4767
4768 #[test]
4769 fn test_cypher_empty_result() {
4770 let db = GrafeoDB::new_in_memory();
4771 let session = db.session();
4772
4773 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4775
4776 assert_eq!(result.row_count(), 0);
4777 }
4778
4779 #[test]
4780 fn test_cypher_parse_error() {
4781 let db = GrafeoDB::new_in_memory();
4782 let session = db.session();
4783
4784 let result = session.execute_cypher("MATCH (n RETURN n");
4786
4787 assert!(result.is_err());
4788 }
4789 }
4790
4791 mod direct_lookup_tests {
4794 use super::*;
4795 use grafeo_common::types::Value;
4796
4797 #[test]
4798 fn test_get_node() {
4799 let db = GrafeoDB::new_in_memory();
4800 let session = db.session();
4801
4802 let id = session.create_node(&["Person"]);
4803 let node = session.get_node(id);
4804
4805 assert!(node.is_some());
4806 let node = node.unwrap();
4807 assert_eq!(node.id, id);
4808 }
4809
4810 #[test]
4811 fn test_get_node_not_found() {
4812 use grafeo_common::types::NodeId;
4813
4814 let db = GrafeoDB::new_in_memory();
4815 let session = db.session();
4816
4817 let node = session.get_node(NodeId::new(9999));
4819 assert!(node.is_none());
4820 }
4821
4822 #[test]
4823 fn test_get_node_property() {
4824 let db = GrafeoDB::new_in_memory();
4825 let session = db.session();
4826
4827 let id = session
4828 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4829
4830 let name = session.get_node_property(id, "name");
4831 assert_eq!(name, Some(Value::String("Alix".into())));
4832
4833 let missing = session.get_node_property(id, "missing");
4835 assert!(missing.is_none());
4836 }
4837
4838 #[test]
4839 fn test_get_edge() {
4840 let db = GrafeoDB::new_in_memory();
4841 let session = db.session();
4842
4843 let alix = session.create_node(&["Person"]);
4844 let gus = session.create_node(&["Person"]);
4845 let edge_id = session.create_edge(alix, gus, "KNOWS");
4846
4847 let edge = session.get_edge(edge_id);
4848 assert!(edge.is_some());
4849 let edge = edge.unwrap();
4850 assert_eq!(edge.id, edge_id);
4851 assert_eq!(edge.src, alix);
4852 assert_eq!(edge.dst, gus);
4853 }
4854
4855 #[test]
4856 fn test_get_edge_not_found() {
4857 use grafeo_common::types::EdgeId;
4858
4859 let db = GrafeoDB::new_in_memory();
4860 let session = db.session();
4861
4862 let edge = session.get_edge(EdgeId::new(9999));
4863 assert!(edge.is_none());
4864 }
4865
4866 #[test]
4867 fn test_get_neighbors_outgoing() {
4868 let db = GrafeoDB::new_in_memory();
4869 let session = db.session();
4870
4871 let alix = session.create_node(&["Person"]);
4872 let gus = session.create_node(&["Person"]);
4873 let harm = session.create_node(&["Person"]);
4874
4875 session.create_edge(alix, gus, "KNOWS");
4876 session.create_edge(alix, harm, "KNOWS");
4877
4878 let neighbors = session.get_neighbors_outgoing(alix);
4879 assert_eq!(neighbors.len(), 2);
4880
4881 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4882 assert!(neighbor_ids.contains(&gus));
4883 assert!(neighbor_ids.contains(&harm));
4884 }
4885
4886 #[test]
4887 fn test_get_neighbors_incoming() {
4888 let db = GrafeoDB::new_in_memory();
4889 let session = db.session();
4890
4891 let alix = session.create_node(&["Person"]);
4892 let gus = session.create_node(&["Person"]);
4893 let harm = session.create_node(&["Person"]);
4894
4895 session.create_edge(gus, alix, "KNOWS");
4896 session.create_edge(harm, alix, "KNOWS");
4897
4898 let neighbors = session.get_neighbors_incoming(alix);
4899 assert_eq!(neighbors.len(), 2);
4900
4901 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4902 assert!(neighbor_ids.contains(&gus));
4903 assert!(neighbor_ids.contains(&harm));
4904 }
4905
4906 #[test]
4907 fn test_get_neighbors_outgoing_by_type() {
4908 let db = GrafeoDB::new_in_memory();
4909 let session = db.session();
4910
4911 let alix = session.create_node(&["Person"]);
4912 let gus = session.create_node(&["Person"]);
4913 let company = session.create_node(&["Company"]);
4914
4915 session.create_edge(alix, gus, "KNOWS");
4916 session.create_edge(alix, company, "WORKS_AT");
4917
4918 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4919 assert_eq!(knows_neighbors.len(), 1);
4920 assert_eq!(knows_neighbors[0].0, gus);
4921
4922 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4923 assert_eq!(works_neighbors.len(), 1);
4924 assert_eq!(works_neighbors[0].0, company);
4925
4926 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4928 assert!(no_neighbors.is_empty());
4929 }
4930
4931 #[test]
4932 fn test_node_exists() {
4933 use grafeo_common::types::NodeId;
4934
4935 let db = GrafeoDB::new_in_memory();
4936 let session = db.session();
4937
4938 let id = session.create_node(&["Person"]);
4939
4940 assert!(session.node_exists(id));
4941 assert!(!session.node_exists(NodeId::new(9999)));
4942 }
4943
4944 #[test]
4945 fn test_edge_exists() {
4946 use grafeo_common::types::EdgeId;
4947
4948 let db = GrafeoDB::new_in_memory();
4949 let session = db.session();
4950
4951 let alix = session.create_node(&["Person"]);
4952 let gus = session.create_node(&["Person"]);
4953 let edge_id = session.create_edge(alix, gus, "KNOWS");
4954
4955 assert!(session.edge_exists(edge_id));
4956 assert!(!session.edge_exists(EdgeId::new(9999)));
4957 }
4958
4959 #[test]
4960 fn test_get_degree() {
4961 let db = GrafeoDB::new_in_memory();
4962 let session = db.session();
4963
4964 let alix = session.create_node(&["Person"]);
4965 let gus = session.create_node(&["Person"]);
4966 let harm = session.create_node(&["Person"]);
4967
4968 session.create_edge(alix, gus, "KNOWS");
4970 session.create_edge(alix, harm, "KNOWS");
4971 session.create_edge(gus, alix, "KNOWS");
4973
4974 let (out_degree, in_degree) = session.get_degree(alix);
4975 assert_eq!(out_degree, 2);
4976 assert_eq!(in_degree, 1);
4977
4978 let lonely = session.create_node(&["Person"]);
4980 let (out, in_deg) = session.get_degree(lonely);
4981 assert_eq!(out, 0);
4982 assert_eq!(in_deg, 0);
4983 }
4984
4985 #[test]
4986 fn test_get_nodes_batch() {
4987 let db = GrafeoDB::new_in_memory();
4988 let session = db.session();
4989
4990 let alix = session.create_node(&["Person"]);
4991 let gus = session.create_node(&["Person"]);
4992 let harm = session.create_node(&["Person"]);
4993
4994 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4995 assert_eq!(nodes.len(), 3);
4996 assert!(nodes[0].is_some());
4997 assert!(nodes[1].is_some());
4998 assert!(nodes[2].is_some());
4999
5000 use grafeo_common::types::NodeId;
5002 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5003 assert_eq!(nodes_with_missing.len(), 3);
5004 assert!(nodes_with_missing[0].is_some());
5005 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
5007 }
5008
5009 #[test]
5010 fn test_auto_commit_setting() {
5011 let db = GrafeoDB::new_in_memory();
5012 let mut session = db.session();
5013
5014 assert!(session.auto_commit());
5016
5017 session.set_auto_commit(false);
5018 assert!(!session.auto_commit());
5019
5020 session.set_auto_commit(true);
5021 assert!(session.auto_commit());
5022 }
5023
5024 #[test]
5025 fn test_transaction_double_begin_nests() {
5026 let db = GrafeoDB::new_in_memory();
5027 let mut session = db.session();
5028
5029 session.begin_transaction().unwrap();
5030 let result = session.begin_transaction();
5032 assert!(result.is_ok());
5033 session.commit().unwrap();
5035 session.commit().unwrap();
5037 }
5038
5039 #[test]
5040 fn test_commit_without_transaction_error() {
5041 let db = GrafeoDB::new_in_memory();
5042 let mut session = db.session();
5043
5044 let result = session.commit();
5045 assert!(result.is_err());
5046 }
5047
5048 #[test]
5049 fn test_rollback_without_transaction_error() {
5050 let db = GrafeoDB::new_in_memory();
5051 let mut session = db.session();
5052
5053 let result = session.rollback();
5054 assert!(result.is_err());
5055 }
5056
5057 #[test]
5058 fn test_create_edge_in_transaction() {
5059 let db = GrafeoDB::new_in_memory();
5060 let mut session = db.session();
5061
5062 let alix = session.create_node(&["Person"]);
5064 let gus = session.create_node(&["Person"]);
5065
5066 session.begin_transaction().unwrap();
5068 let edge_id = session.create_edge(alix, gus, "KNOWS");
5069
5070 assert!(session.edge_exists(edge_id));
5072
5073 session.commit().unwrap();
5075
5076 assert!(session.edge_exists(edge_id));
5078 }
5079
5080 #[test]
5081 fn test_neighbors_empty_node() {
5082 let db = GrafeoDB::new_in_memory();
5083 let session = db.session();
5084
5085 let lonely = session.create_node(&["Person"]);
5086
5087 assert!(session.get_neighbors_outgoing(lonely).is_empty());
5088 assert!(session.get_neighbors_incoming(lonely).is_empty());
5089 assert!(
5090 session
5091 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5092 .is_empty()
5093 );
5094 }
5095 }
5096
5097 #[test]
5098 fn test_auto_gc_triggers_on_commit_interval() {
5099 use crate::config::Config;
5100
5101 let config = Config::in_memory().with_gc_interval(2);
5102 let db = GrafeoDB::with_config(config).unwrap();
5103 let mut session = db.session();
5104
5105 session.begin_transaction().unwrap();
5107 session.create_node(&["A"]);
5108 session.commit().unwrap();
5109
5110 session.begin_transaction().unwrap();
5112 session.create_node(&["B"]);
5113 session.commit().unwrap();
5114
5115 assert_eq!(db.node_count(), 2);
5117 }
5118
5119 #[test]
5120 fn test_query_timeout_config_propagates_to_session() {
5121 use crate::config::Config;
5122 use std::time::Duration;
5123
5124 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5125 let db = GrafeoDB::with_config(config).unwrap();
5126 let session = db.session();
5127
5128 assert!(session.query_deadline().is_some());
5130 }
5131
5132 #[test]
5133 fn test_no_query_timeout_returns_no_deadline() {
5134 let db = GrafeoDB::new_in_memory();
5135 let session = db.session();
5136
5137 assert!(session.query_deadline().is_none());
5139 }
5140
5141 #[test]
5142 fn test_graph_model_accessor() {
5143 use crate::config::GraphModel;
5144
5145 let db = GrafeoDB::new_in_memory();
5146 let session = db.session();
5147
5148 assert_eq!(session.graph_model(), GraphModel::Lpg);
5149 }
5150
5151 #[cfg(feature = "gql")]
5152 #[test]
5153 fn test_external_store_session() {
5154 use grafeo_core::graph::GraphStoreMut;
5155 use std::sync::Arc;
5156
5157 let config = crate::config::Config::in_memory();
5158 let store =
5159 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5160 let db = GrafeoDB::with_store(store, config).unwrap();
5161
5162 let mut session = db.session();
5163
5164 session.begin_transaction().unwrap();
5168 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5169
5170 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5172 assert_eq!(result.row_count(), 1);
5173
5174 session.commit().unwrap();
5175 }
5176
5177 #[cfg(feature = "gql")]
5180 mod session_command_tests {
5181 use super::*;
5182 use grafeo_common::types::Value;
5183
5184 #[test]
5185 fn test_use_graph_sets_current_graph() {
5186 let db = GrafeoDB::new_in_memory();
5187 let session = db.session();
5188
5189 session.execute("CREATE GRAPH mydb").unwrap();
5191 session.execute("USE GRAPH mydb").unwrap();
5192
5193 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5194 }
5195
5196 #[test]
5197 fn test_use_graph_nonexistent_errors() {
5198 let db = GrafeoDB::new_in_memory();
5199 let session = db.session();
5200
5201 let result = session.execute("USE GRAPH doesnotexist");
5202 assert!(result.is_err());
5203 let err = result.unwrap_err().to_string();
5204 assert!(
5205 err.contains("does not exist"),
5206 "Expected 'does not exist' error, got: {err}"
5207 );
5208 }
5209
5210 #[test]
5211 fn test_use_graph_default_always_valid() {
5212 let db = GrafeoDB::new_in_memory();
5213 let session = db.session();
5214
5215 session.execute("USE GRAPH default").unwrap();
5217 assert_eq!(session.current_graph(), Some("default".to_string()));
5218 }
5219
5220 #[test]
5221 fn test_session_set_graph() {
5222 let db = GrafeoDB::new_in_memory();
5223 let session = db.session();
5224
5225 session.execute("CREATE GRAPH analytics").unwrap();
5226 session.execute("SESSION SET GRAPH analytics").unwrap();
5227 assert_eq!(session.current_graph(), Some("analytics".to_string()));
5228 }
5229
5230 #[test]
5231 fn test_session_set_graph_nonexistent_errors() {
5232 let db = GrafeoDB::new_in_memory();
5233 let session = db.session();
5234
5235 let result = session.execute("SESSION SET GRAPH nosuchgraph");
5236 assert!(result.is_err());
5237 }
5238
5239 #[test]
5240 fn test_session_set_time_zone() {
5241 let db = GrafeoDB::new_in_memory();
5242 let session = db.session();
5243
5244 assert_eq!(session.time_zone(), None);
5245
5246 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5247 assert_eq!(session.time_zone(), Some("UTC".to_string()));
5248
5249 session
5250 .execute("SESSION SET TIME ZONE 'America/New_York'")
5251 .unwrap();
5252 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5253 }
5254
5255 #[test]
5256 fn test_session_set_parameter() {
5257 let db = GrafeoDB::new_in_memory();
5258 let session = db.session();
5259
5260 session
5261 .execute("SESSION SET PARAMETER $timeout = 30")
5262 .unwrap();
5263
5264 assert!(session.get_parameter("timeout").is_some());
5267 }
5268
5269 #[test]
5270 fn test_session_reset_clears_all_state() {
5271 let db = GrafeoDB::new_in_memory();
5272 let session = db.session();
5273
5274 session.execute("CREATE GRAPH analytics").unwrap();
5276 session.execute("SESSION SET GRAPH analytics").unwrap();
5277 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5278 session
5279 .execute("SESSION SET PARAMETER $limit = 100")
5280 .unwrap();
5281
5282 assert!(session.current_graph().is_some());
5284 assert!(session.time_zone().is_some());
5285 assert!(session.get_parameter("limit").is_some());
5286
5287 session.execute("SESSION RESET").unwrap();
5289
5290 assert_eq!(session.current_graph(), None);
5291 assert_eq!(session.time_zone(), None);
5292 assert!(session.get_parameter("limit").is_none());
5293 }
5294
5295 #[test]
5296 fn test_session_close_clears_state() {
5297 let db = GrafeoDB::new_in_memory();
5298 let session = db.session();
5299
5300 session.execute("CREATE GRAPH analytics").unwrap();
5301 session.execute("SESSION SET GRAPH analytics").unwrap();
5302 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5303
5304 session.execute("SESSION CLOSE").unwrap();
5305
5306 assert_eq!(session.current_graph(), None);
5307 assert_eq!(session.time_zone(), None);
5308 }
5309
5310 #[test]
5311 fn test_create_graph() {
5312 let db = GrafeoDB::new_in_memory();
5313 let session = db.session();
5314
5315 session.execute("CREATE GRAPH mydb").unwrap();
5316
5317 session.execute("USE GRAPH mydb").unwrap();
5319 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5320 }
5321
5322 #[test]
5323 fn test_create_graph_duplicate_errors() {
5324 let db = GrafeoDB::new_in_memory();
5325 let session = db.session();
5326
5327 session.execute("CREATE GRAPH mydb").unwrap();
5328 let result = session.execute("CREATE GRAPH mydb");
5329
5330 assert!(result.is_err());
5331 let err = result.unwrap_err().to_string();
5332 assert!(
5333 err.contains("already exists"),
5334 "Expected 'already exists' error, got: {err}"
5335 );
5336 }
5337
5338 #[test]
5339 fn test_create_graph_if_not_exists() {
5340 let db = GrafeoDB::new_in_memory();
5341 let session = db.session();
5342
5343 session.execute("CREATE GRAPH mydb").unwrap();
5344 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5346 }
5347
5348 #[test]
5349 fn test_drop_graph() {
5350 let db = GrafeoDB::new_in_memory();
5351 let session = db.session();
5352
5353 session.execute("CREATE GRAPH mydb").unwrap();
5354 session.execute("DROP GRAPH mydb").unwrap();
5355
5356 let result = session.execute("USE GRAPH mydb");
5358 assert!(result.is_err());
5359 }
5360
5361 #[test]
5362 fn test_drop_graph_nonexistent_errors() {
5363 let db = GrafeoDB::new_in_memory();
5364 let session = db.session();
5365
5366 let result = session.execute("DROP GRAPH nosuchgraph");
5367 assert!(result.is_err());
5368 let err = result.unwrap_err().to_string();
5369 assert!(
5370 err.contains("does not exist"),
5371 "Expected 'does not exist' error, got: {err}"
5372 );
5373 }
5374
5375 #[test]
5376 fn test_drop_graph_if_exists() {
5377 let db = GrafeoDB::new_in_memory();
5378 let session = db.session();
5379
5380 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5382 }
5383
5384 #[test]
5385 fn test_start_transaction_via_gql() {
5386 let db = GrafeoDB::new_in_memory();
5387 let session = db.session();
5388
5389 session.execute("START TRANSACTION").unwrap();
5390 assert!(session.in_transaction());
5391 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5392 session.execute("COMMIT").unwrap();
5393 assert!(!session.in_transaction());
5394
5395 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5396 assert_eq!(result.rows.len(), 1);
5397 }
5398
5399 #[test]
5400 fn test_start_transaction_read_only_blocks_insert() {
5401 let db = GrafeoDB::new_in_memory();
5402 let session = db.session();
5403
5404 session.execute("START TRANSACTION READ ONLY").unwrap();
5405 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5406 assert!(result.is_err());
5407 let err = result.unwrap_err().to_string();
5408 assert!(
5409 err.contains("read-only"),
5410 "Expected read-only error, got: {err}"
5411 );
5412 session.execute("ROLLBACK").unwrap();
5413 }
5414
5415 #[test]
5416 fn test_start_transaction_read_only_allows_reads() {
5417 let db = GrafeoDB::new_in_memory();
5418 let mut session = db.session();
5419 session.begin_transaction().unwrap();
5420 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5421 session.commit().unwrap();
5422
5423 session.execute("START TRANSACTION READ ONLY").unwrap();
5424 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5425 assert_eq!(result.rows.len(), 1);
5426 session.execute("COMMIT").unwrap();
5427 }
5428
5429 #[test]
5430 fn test_rollback_via_gql() {
5431 let db = GrafeoDB::new_in_memory();
5432 let session = db.session();
5433
5434 session.execute("START TRANSACTION").unwrap();
5435 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5436 session.execute("ROLLBACK").unwrap();
5437
5438 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5439 assert!(result.rows.is_empty());
5440 }
5441
5442 #[test]
5443 fn test_start_transaction_with_isolation_level() {
5444 let db = GrafeoDB::new_in_memory();
5445 let session = db.session();
5446
5447 session
5448 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5449 .unwrap();
5450 assert!(session.in_transaction());
5451 session.execute("ROLLBACK").unwrap();
5452 }
5453
5454 #[test]
5455 fn test_session_commands_return_empty_result() {
5456 let db = GrafeoDB::new_in_memory();
5457 let session = db.session();
5458
5459 session.execute("CREATE GRAPH test").unwrap();
5460 let result = session.execute("SESSION SET GRAPH test").unwrap();
5461 assert_eq!(result.row_count(), 0);
5462 assert_eq!(result.column_count(), 0);
5463 }
5464
5465 #[test]
5466 fn test_current_graph_default_is_none() {
5467 let db = GrafeoDB::new_in_memory();
5468 let session = db.session();
5469
5470 assert_eq!(session.current_graph(), None);
5471 }
5472
5473 #[test]
5474 fn test_time_zone_default_is_none() {
5475 let db = GrafeoDB::new_in_memory();
5476 let session = db.session();
5477
5478 assert_eq!(session.time_zone(), None);
5479 }
5480
5481 #[test]
5482 fn test_session_state_independent_across_sessions() {
5483 let db = GrafeoDB::new_in_memory();
5484 let session1 = db.session();
5485 let session2 = db.session();
5486
5487 session1.execute("CREATE GRAPH first").unwrap();
5488 session1.execute("CREATE GRAPH second").unwrap();
5489 session1.execute("SESSION SET GRAPH first").unwrap();
5490 session2.execute("SESSION SET GRAPH second").unwrap();
5491
5492 assert_eq!(session1.current_graph(), Some("first".to_string()));
5493 assert_eq!(session2.current_graph(), Some("second".to_string()));
5494 }
5495
5496 #[test]
5497 fn test_show_node_types() {
5498 let db = GrafeoDB::new_in_memory();
5499 let session = db.session();
5500
5501 session
5502 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5503 .unwrap();
5504
5505 let result = session.execute("SHOW NODE TYPES").unwrap();
5506 assert_eq!(
5507 result.columns,
5508 vec!["name", "properties", "constraints", "parents"]
5509 );
5510 assert_eq!(result.rows.len(), 1);
5511 assert_eq!(result.rows[0][0], Value::from("Person"));
5513 }
5514
5515 #[test]
5516 fn test_show_edge_types() {
5517 let db = GrafeoDB::new_in_memory();
5518 let session = db.session();
5519
5520 session
5521 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5522 .unwrap();
5523
5524 let result = session.execute("SHOW EDGE TYPES").unwrap();
5525 assert_eq!(
5526 result.columns,
5527 vec!["name", "properties", "source_types", "target_types"]
5528 );
5529 assert_eq!(result.rows.len(), 1);
5530 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5531 }
5532
5533 #[test]
5534 fn test_show_graph_types() {
5535 let db = GrafeoDB::new_in_memory();
5536 let session = db.session();
5537
5538 session
5539 .execute("CREATE NODE TYPE Person (name STRING)")
5540 .unwrap();
5541 session
5542 .execute(
5543 "CREATE GRAPH TYPE social (\
5544 NODE TYPE Person (name STRING)\
5545 )",
5546 )
5547 .unwrap();
5548
5549 let result = session.execute("SHOW GRAPH TYPES").unwrap();
5550 assert_eq!(
5551 result.columns,
5552 vec!["name", "open", "node_types", "edge_types"]
5553 );
5554 assert_eq!(result.rows.len(), 1);
5555 assert_eq!(result.rows[0][0], Value::from("social"));
5556 }
5557
5558 #[test]
5559 fn test_show_graph_type_named() {
5560 let db = GrafeoDB::new_in_memory();
5561 let session = db.session();
5562
5563 session
5564 .execute("CREATE NODE TYPE Person (name STRING)")
5565 .unwrap();
5566 session
5567 .execute(
5568 "CREATE GRAPH TYPE social (\
5569 NODE TYPE Person (name STRING)\
5570 )",
5571 )
5572 .unwrap();
5573
5574 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5575 assert_eq!(result.rows.len(), 1);
5576 assert_eq!(result.rows[0][0], Value::from("social"));
5577 }
5578
5579 #[test]
5580 fn test_show_graph_type_not_found() {
5581 let db = GrafeoDB::new_in_memory();
5582 let session = db.session();
5583
5584 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5585 assert!(result.is_err());
5586 }
5587
5588 #[test]
5589 fn test_show_indexes_via_gql() {
5590 let db = GrafeoDB::new_in_memory();
5591 let session = db.session();
5592
5593 let result = session.execute("SHOW INDEXES").unwrap();
5594 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5595 }
5596
5597 #[test]
5598 fn test_show_constraints_via_gql() {
5599 let db = GrafeoDB::new_in_memory();
5600 let session = db.session();
5601
5602 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5603 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5604 }
5605
5606 #[test]
5607 fn test_pattern_form_graph_type_roundtrip() {
5608 let db = GrafeoDB::new_in_memory();
5609 let session = db.session();
5610
5611 session
5613 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5614 .unwrap();
5615 session
5616 .execute("CREATE NODE TYPE City (name STRING)")
5617 .unwrap();
5618 session
5619 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5620 .unwrap();
5621 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5622
5623 session
5625 .execute(
5626 "CREATE GRAPH TYPE social (\
5627 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5628 (:Person)-[:LIVES_IN]->(:City)\
5629 )",
5630 )
5631 .unwrap();
5632
5633 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5635 assert_eq!(result.rows.len(), 1);
5636 assert_eq!(result.rows[0][0], Value::from("social"));
5637 }
5638 }
5639}