1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25fn parse_default_literal(text: &str) -> Value {
30 if text.eq_ignore_ascii_case("null") {
31 return Value::Null;
32 }
33 if text.eq_ignore_ascii_case("true") {
34 return Value::Bool(true);
35 }
36 if text.eq_ignore_ascii_case("false") {
37 return Value::Bool(false);
38 }
39 if (text.starts_with('\'') && text.ends_with('\''))
41 || (text.starts_with('"') && text.ends_with('"'))
42 {
43 return Value::String(text[1..text.len() - 1].into());
44 }
45 if let Ok(i) = text.parse::<i64>() {
47 return Value::Int64(i);
48 }
49 if let Ok(f) = text.parse::<f64>() {
50 return Value::Float64(f);
51 }
52 Value::String(text.into())
54}
55
56pub(crate) struct SessionConfig {
61 pub transaction_manager: Arc<TransactionManager>,
62 pub query_cache: Arc<QueryCache>,
63 pub catalog: Arc<Catalog>,
64 pub adaptive_config: AdaptiveConfig,
65 pub factorized_execution: bool,
66 pub graph_model: GraphModel,
67 pub query_timeout: Option<Duration>,
68 pub commit_counter: Arc<AtomicUsize>,
69 pub gc_interval: usize,
70}
71
72pub struct Session {
78 store: Arc<LpgStore>,
80 graph_store: Arc<dyn GraphStoreMut>,
82 catalog: Arc<Catalog>,
84 #[cfg(feature = "rdf")]
86 rdf_store: Arc<RdfStore>,
87 transaction_manager: Arc<TransactionManager>,
89 query_cache: Arc<QueryCache>,
91 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
95 read_only_tx: parking_lot::Mutex<bool>,
97 auto_commit: bool,
99 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
102 factorized_execution: bool,
104 graph_model: GraphModel,
106 query_timeout: Option<Duration>,
108 commit_counter: Arc<AtomicUsize>,
110 gc_interval: usize,
112 transaction_start_node_count: AtomicUsize,
114 transaction_start_edge_count: AtomicUsize,
116 #[cfg(feature = "wal")]
118 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
119 #[cfg(feature = "wal")]
121 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
122 #[cfg(feature = "cdc")]
124 cdc_log: Arc<crate::cdc::CdcLog>,
125 current_graph: parking_lot::Mutex<Option<String>>,
127 current_schema: parking_lot::Mutex<Option<String>>,
130 time_zone: parking_lot::Mutex<Option<String>>,
132 session_params:
134 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
135 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
137 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
139 transaction_nesting_depth: parking_lot::Mutex<u32>,
143 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
147}
148
149#[derive(Clone)]
151struct GraphSavepoint {
152 graph_name: Option<String>,
153 next_node_id: u64,
154 next_edge_id: u64,
155 undo_log_position: usize,
156}
157
158#[derive(Clone)]
160struct SavepointState {
161 name: String,
162 graph_snapshots: Vec<GraphSavepoint>,
163 #[allow(dead_code)]
166 active_graph: Option<String>,
167}
168
169impl Session {
170 #[allow(dead_code)]
172 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
173 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
174 Self {
175 store,
176 graph_store,
177 catalog: cfg.catalog,
178 #[cfg(feature = "rdf")]
179 rdf_store: Arc::new(RdfStore::new()),
180 transaction_manager: cfg.transaction_manager,
181 query_cache: cfg.query_cache,
182 current_transaction: parking_lot::Mutex::new(None),
183 read_only_tx: parking_lot::Mutex::new(false),
184 auto_commit: true,
185 adaptive_config: cfg.adaptive_config,
186 factorized_execution: cfg.factorized_execution,
187 graph_model: cfg.graph_model,
188 query_timeout: cfg.query_timeout,
189 commit_counter: cfg.commit_counter,
190 gc_interval: cfg.gc_interval,
191 transaction_start_node_count: AtomicUsize::new(0),
192 transaction_start_edge_count: AtomicUsize::new(0),
193 #[cfg(feature = "wal")]
194 wal: None,
195 #[cfg(feature = "wal")]
196 wal_graph_context: None,
197 #[cfg(feature = "cdc")]
198 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
199 current_graph: parking_lot::Mutex::new(None),
200 current_schema: parking_lot::Mutex::new(None),
201 time_zone: parking_lot::Mutex::new(None),
202 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
203 viewing_epoch_override: parking_lot::Mutex::new(None),
204 savepoints: parking_lot::Mutex::new(Vec::new()),
205 transaction_nesting_depth: parking_lot::Mutex::new(0),
206 touched_graphs: parking_lot::Mutex::new(Vec::new()),
207 }
208 }
209
210 #[cfg(feature = "wal")]
215 pub(crate) fn set_wal(
216 &mut self,
217 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
218 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
219 ) {
220 self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
222 Arc::clone(&self.store),
223 Arc::clone(&wal),
224 Arc::clone(&wal_graph_context),
225 ));
226 self.wal = Some(wal);
227 self.wal_graph_context = Some(wal_graph_context);
228 }
229
230 #[cfg(feature = "cdc")]
232 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
233 self.cdc_log = cdc_log;
234 }
235
236 #[cfg(feature = "rdf")]
238 pub(crate) fn with_rdf_store_and_adaptive(
239 store: Arc<LpgStore>,
240 rdf_store: Arc<RdfStore>,
241 cfg: SessionConfig,
242 ) -> Self {
243 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
244 Self {
245 store,
246 graph_store,
247 catalog: cfg.catalog,
248 rdf_store,
249 transaction_manager: cfg.transaction_manager,
250 query_cache: cfg.query_cache,
251 current_transaction: parking_lot::Mutex::new(None),
252 read_only_tx: parking_lot::Mutex::new(false),
253 auto_commit: true,
254 adaptive_config: cfg.adaptive_config,
255 factorized_execution: cfg.factorized_execution,
256 graph_model: cfg.graph_model,
257 query_timeout: cfg.query_timeout,
258 commit_counter: cfg.commit_counter,
259 gc_interval: cfg.gc_interval,
260 transaction_start_node_count: AtomicUsize::new(0),
261 transaction_start_edge_count: AtomicUsize::new(0),
262 #[cfg(feature = "wal")]
263 wal: None,
264 #[cfg(feature = "wal")]
265 wal_graph_context: None,
266 #[cfg(feature = "cdc")]
267 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
268 current_graph: parking_lot::Mutex::new(None),
269 current_schema: parking_lot::Mutex::new(None),
270 time_zone: parking_lot::Mutex::new(None),
271 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
272 viewing_epoch_override: parking_lot::Mutex::new(None),
273 savepoints: parking_lot::Mutex::new(Vec::new()),
274 transaction_nesting_depth: parking_lot::Mutex::new(0),
275 touched_graphs: parking_lot::Mutex::new(Vec::new()),
276 }
277 }
278
279 pub(crate) fn with_external_store(
288 store: Arc<dyn GraphStoreMut>,
289 cfg: SessionConfig,
290 ) -> Result<Self> {
291 Ok(Self {
292 store: Arc::new(LpgStore::new()?),
293 graph_store: store,
294 catalog: cfg.catalog,
295 #[cfg(feature = "rdf")]
296 rdf_store: Arc::new(RdfStore::new()),
297 transaction_manager: cfg.transaction_manager,
298 query_cache: cfg.query_cache,
299 current_transaction: parking_lot::Mutex::new(None),
300 read_only_tx: parking_lot::Mutex::new(false),
301 auto_commit: true,
302 adaptive_config: cfg.adaptive_config,
303 factorized_execution: cfg.factorized_execution,
304 graph_model: cfg.graph_model,
305 query_timeout: cfg.query_timeout,
306 commit_counter: cfg.commit_counter,
307 gc_interval: cfg.gc_interval,
308 transaction_start_node_count: AtomicUsize::new(0),
309 transaction_start_edge_count: AtomicUsize::new(0),
310 #[cfg(feature = "wal")]
311 wal: None,
312 #[cfg(feature = "wal")]
313 wal_graph_context: None,
314 #[cfg(feature = "cdc")]
315 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
316 current_graph: parking_lot::Mutex::new(None),
317 current_schema: parking_lot::Mutex::new(None),
318 time_zone: parking_lot::Mutex::new(None),
319 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
320 viewing_epoch_override: parking_lot::Mutex::new(None),
321 savepoints: parking_lot::Mutex::new(Vec::new()),
322 transaction_nesting_depth: parking_lot::Mutex::new(0),
323 touched_graphs: parking_lot::Mutex::new(Vec::new()),
324 })
325 }
326
327 #[must_use]
329 pub fn graph_model(&self) -> GraphModel {
330 self.graph_model
331 }
332
333 pub fn use_graph(&self, name: &str) {
337 *self.current_graph.lock() = Some(name.to_string());
338 }
339
340 #[must_use]
342 pub fn current_graph(&self) -> Option<String> {
343 self.current_graph.lock().clone()
344 }
345
346 pub fn set_schema(&self, name: &str) {
350 *self.current_schema.lock() = Some(name.to_string());
351 }
352
353 #[must_use]
357 pub fn current_schema(&self) -> Option<String> {
358 self.current_schema.lock().clone()
359 }
360
361 fn effective_graph_key(&self, graph_name: &str) -> String {
366 let schema = self.current_schema.lock().clone();
367 match schema {
368 Some(s) => format!("{s}/{graph_name}"),
369 None => graph_name.to_string(),
370 }
371 }
372
373 fn active_graph_storage_key(&self) -> Option<String> {
377 let graph = self.current_graph.lock().clone();
378 let schema = self.current_schema.lock().clone();
379 match (schema, graph) {
380 (_, None) => None,
381 (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
382 (None, Some(name)) => Some(name),
383 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
384 }
385 }
386
387 fn active_store(&self) -> Arc<dyn GraphStoreMut> {
395 let key = self.active_graph_storage_key();
396 match key {
397 None => Arc::clone(&self.graph_store),
398 Some(ref name) => match self.store.graph(name) {
399 Some(named_store) => {
400 #[cfg(feature = "wal")]
401 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
402 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
403 named_store,
404 Arc::clone(wal),
405 name.clone(),
406 Arc::clone(ctx),
407 )) as Arc<dyn GraphStoreMut>;
408 }
409 named_store as Arc<dyn GraphStoreMut>
410 }
411 None => Arc::clone(&self.graph_store),
412 },
413 }
414 }
415
416 fn active_lpg_store(&self) -> Arc<LpgStore> {
421 let key = self.active_graph_storage_key();
422 match key {
423 None => Arc::clone(&self.store),
424 Some(ref name) => self
425 .store
426 .graph(name)
427 .unwrap_or_else(|| Arc::clone(&self.store)),
428 }
429 }
430
431 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
434 match graph_name {
435 None => Arc::clone(&self.store),
436 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
437 Some(name) => self
438 .store
439 .graph(name)
440 .unwrap_or_else(|| Arc::clone(&self.store)),
441 }
442 }
443
444 fn track_graph_touch(&self) {
449 if self.current_transaction.lock().is_some() {
450 let key = self.active_graph_storage_key();
451 let mut touched = self.touched_graphs.lock();
452 if !touched.contains(&key) {
453 touched.push(key);
454 }
455 }
456 }
457
458 pub fn set_time_zone(&self, tz: &str) {
460 *self.time_zone.lock() = Some(tz.to_string());
461 }
462
463 #[must_use]
465 pub fn time_zone(&self) -> Option<String> {
466 self.time_zone.lock().clone()
467 }
468
469 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
471 self.session_params.lock().insert(key.to_string(), value);
472 }
473
474 #[must_use]
476 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
477 self.session_params.lock().get(key).cloned()
478 }
479
480 pub fn reset_session(&self) {
482 *self.current_schema.lock() = None;
483 *self.current_graph.lock() = None;
484 *self.time_zone.lock() = None;
485 self.session_params.lock().clear();
486 *self.viewing_epoch_override.lock() = None;
487 }
488
489 pub fn reset_schema(&self) {
491 *self.current_schema.lock() = None;
492 }
493
494 pub fn reset_graph(&self) {
496 *self.current_graph.lock() = None;
497 }
498
499 pub fn reset_time_zone(&self) {
501 *self.time_zone.lock() = None;
502 }
503
504 pub fn reset_parameters(&self) {
506 self.session_params.lock().clear();
507 }
508
509 pub fn set_viewing_epoch(&self, epoch: EpochId) {
517 *self.viewing_epoch_override.lock() = Some(epoch);
518 }
519
520 pub fn clear_viewing_epoch(&self) {
522 *self.viewing_epoch_override.lock() = None;
523 }
524
525 #[must_use]
527 pub fn viewing_epoch(&self) -> Option<EpochId> {
528 *self.viewing_epoch_override.lock()
529 }
530
531 #[must_use]
535 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
536 self.active_lpg_store().get_node_history(id)
537 }
538
539 #[must_use]
543 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
544 self.active_lpg_store().get_edge_history(id)
545 }
546
547 fn require_lpg(&self, language: &str) -> Result<()> {
549 if self.graph_model == GraphModel::Rdf {
550 return Err(grafeo_common::utils::error::Error::Internal(format!(
551 "This is an RDF database. {language} queries require an LPG database."
552 )));
553 }
554 Ok(())
555 }
556
557 #[cfg(feature = "gql")]
559 fn execute_session_command(
560 &self,
561 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
562 ) -> Result<QueryResult> {
563 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
564 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
565
566 if *self.read_only_tx.lock() {
568 match &cmd {
569 SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
570 return Err(Error::Transaction(
571 grafeo_common::utils::error::TransactionError::ReadOnly,
572 ));
573 }
574 _ => {} }
576 }
577
578 match cmd {
579 SessionCommand::CreateGraph {
580 name,
581 if_not_exists,
582 typed,
583 like_graph,
584 copy_of,
585 open: _,
586 } => {
587 let storage_key = self.effective_graph_key(&name);
589
590 if let Some(ref src) = like_graph {
592 let src_key = self.effective_graph_key(src);
593 if self.store.graph(&src_key).is_none() {
594 return Err(Error::Query(QueryError::new(
595 QueryErrorKind::Semantic,
596 format!("Source graph '{src}' does not exist"),
597 )));
598 }
599 }
600 if let Some(ref src) = copy_of {
601 let src_key = self.effective_graph_key(src);
602 if self.store.graph(&src_key).is_none() {
603 return Err(Error::Query(QueryError::new(
604 QueryErrorKind::Semantic,
605 format!("Source graph '{src}' does not exist"),
606 )));
607 }
608 }
609
610 let created = self
611 .store
612 .create_graph(&storage_key)
613 .map_err(|e| Error::Internal(e.to_string()))?;
614 if !created && !if_not_exists {
615 return Err(Error::Query(QueryError::new(
616 QueryErrorKind::Semantic,
617 format!("Graph '{name}' already exists"),
618 )));
619 }
620 if created {
621 #[cfg(feature = "wal")]
622 self.log_schema_wal(
623 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
624 name: storage_key.clone(),
625 },
626 );
627 }
628
629 if let Some(ref src) = copy_of {
631 let src_key = self.effective_graph_key(src);
632 self.store
633 .copy_graph(Some(&src_key), Some(&storage_key))
634 .map_err(|e| Error::Internal(e.to_string()))?;
635 }
636
637 if let Some(type_name) = typed
639 && let Err(e) = self
640 .catalog
641 .bind_graph_type(&storage_key, type_name.clone())
642 {
643 return Err(Error::Query(QueryError::new(
644 QueryErrorKind::Semantic,
645 e.to_string(),
646 )));
647 }
648
649 if let Some(ref src) = like_graph {
651 let src_key = self.effective_graph_key(src);
652 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
653 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
654 }
655 }
656
657 Ok(QueryResult::empty())
658 }
659 SessionCommand::DropGraph { name, if_exists } => {
660 let storage_key = self.effective_graph_key(&name);
661 let dropped = self.store.drop_graph(&storage_key);
662 if !dropped && !if_exists {
663 return Err(Error::Query(QueryError::new(
664 QueryErrorKind::Semantic,
665 format!("Graph '{name}' does not exist"),
666 )));
667 }
668 if dropped {
669 #[cfg(feature = "wal")]
670 self.log_schema_wal(
671 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
672 name: storage_key.clone(),
673 },
674 );
675 let mut current = self.current_graph.lock();
677 if current
678 .as_deref()
679 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
680 {
681 *current = None;
682 }
683 }
684 Ok(QueryResult::empty())
685 }
686 SessionCommand::UseGraph(name) => {
687 let effective_key = self.effective_graph_key(&name);
689 if !name.eq_ignore_ascii_case("default")
690 && self.store.graph(&effective_key).is_none()
691 {
692 return Err(Error::Query(QueryError::new(
693 QueryErrorKind::Semantic,
694 format!("Graph '{name}' does not exist"),
695 )));
696 }
697 self.use_graph(&name);
698 self.track_graph_touch();
700 Ok(QueryResult::empty())
701 }
702 SessionCommand::SessionSetGraph(name) => {
703 let effective_key = self.effective_graph_key(&name);
705 if !name.eq_ignore_ascii_case("default")
706 && self.store.graph(&effective_key).is_none()
707 {
708 return Err(Error::Query(QueryError::new(
709 QueryErrorKind::Semantic,
710 format!("Graph '{name}' does not exist"),
711 )));
712 }
713 self.use_graph(&name);
714 self.track_graph_touch();
716 Ok(QueryResult::empty())
717 }
718 SessionCommand::SessionSetSchema(name) => {
719 if !self.catalog.schema_exists(&name) {
721 return Err(Error::Query(QueryError::new(
722 QueryErrorKind::Semantic,
723 format!("Schema '{name}' does not exist"),
724 )));
725 }
726 self.set_schema(&name);
727 Ok(QueryResult::empty())
728 }
729 SessionCommand::SessionSetTimeZone(tz) => {
730 self.set_time_zone(&tz);
731 Ok(QueryResult::empty())
732 }
733 SessionCommand::SessionSetParameter(key, expr) => {
734 if key.eq_ignore_ascii_case("viewing_epoch") {
735 match Self::eval_integer_literal(&expr) {
736 Some(n) if n >= 0 => {
737 self.set_viewing_epoch(EpochId::new(n as u64));
738 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
739 }
740 _ => Err(Error::Query(QueryError::new(
741 QueryErrorKind::Semantic,
742 "viewing_epoch must be a non-negative integer literal",
743 ))),
744 }
745 } else {
746 self.set_parameter(&key, Value::Null);
749 Ok(QueryResult::empty())
750 }
751 }
752 SessionCommand::SessionReset(target) => {
753 use grafeo_adapters::query::gql::ast::SessionResetTarget;
754 match target {
755 SessionResetTarget::All => self.reset_session(),
756 SessionResetTarget::Schema => self.reset_schema(),
757 SessionResetTarget::Graph => self.reset_graph(),
758 SessionResetTarget::TimeZone => self.reset_time_zone(),
759 SessionResetTarget::Parameters => self.reset_parameters(),
760 }
761 Ok(QueryResult::empty())
762 }
763 SessionCommand::SessionClose => {
764 self.reset_session();
765 Ok(QueryResult::empty())
766 }
767 SessionCommand::StartTransaction {
768 read_only,
769 isolation_level,
770 } => {
771 let engine_level = isolation_level.map(|l| match l {
772 TransactionIsolationLevel::ReadCommitted => {
773 crate::transaction::IsolationLevel::ReadCommitted
774 }
775 TransactionIsolationLevel::SnapshotIsolation => {
776 crate::transaction::IsolationLevel::SnapshotIsolation
777 }
778 TransactionIsolationLevel::Serializable => {
779 crate::transaction::IsolationLevel::Serializable
780 }
781 });
782 self.begin_transaction_inner(read_only, engine_level)?;
783 Ok(QueryResult::status("Transaction started"))
784 }
785 SessionCommand::Commit => {
786 self.commit_inner()?;
787 Ok(QueryResult::status("Transaction committed"))
788 }
789 SessionCommand::Rollback => {
790 self.rollback_inner()?;
791 Ok(QueryResult::status("Transaction rolled back"))
792 }
793 SessionCommand::Savepoint(name) => {
794 self.savepoint(&name)?;
795 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
796 }
797 SessionCommand::RollbackToSavepoint(name) => {
798 self.rollback_to_savepoint(&name)?;
799 Ok(QueryResult::status(format!(
800 "Rolled back to savepoint '{name}'"
801 )))
802 }
803 SessionCommand::ReleaseSavepoint(name) => {
804 self.release_savepoint(&name)?;
805 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
806 }
807 }
808 }
809
810 #[cfg(feature = "wal")]
812 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
813 if let Some(ref wal) = self.wal
814 && let Err(e) = wal.log(record)
815 {
816 tracing::warn!("Failed to log schema change to WAL: {}", e);
817 }
818 }
819
820 #[cfg(feature = "gql")]
822 fn execute_schema_command(
823 &self,
824 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
825 ) -> Result<QueryResult> {
826 use crate::catalog::{
827 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
828 };
829 use grafeo_adapters::query::gql::ast::SchemaStatement;
830 #[cfg(feature = "wal")]
831 use grafeo_adapters::storage::wal::WalRecord;
832 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
833
834 macro_rules! wal_log {
836 ($self:expr, $record:expr) => {
837 #[cfg(feature = "wal")]
838 $self.log_schema_wal(&$record);
839 };
840 }
841
842 let result = match cmd {
843 SchemaStatement::CreateNodeType(stmt) => {
844 #[cfg(feature = "wal")]
845 let props_for_wal: Vec<(String, String, bool)> = stmt
846 .properties
847 .iter()
848 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
849 .collect();
850 let def = NodeTypeDefinition {
851 name: stmt.name.clone(),
852 properties: stmt
853 .properties
854 .iter()
855 .map(|p| TypedProperty {
856 name: p.name.clone(),
857 data_type: PropertyDataType::from_type_name(&p.data_type),
858 nullable: p.nullable,
859 default_value: p
860 .default_value
861 .as_ref()
862 .map(|s| parse_default_literal(s)),
863 })
864 .collect(),
865 constraints: Vec::new(),
866 parent_types: stmt.parent_types.clone(),
867 };
868 let result = if stmt.or_replace {
869 let _ = self.catalog.drop_node_type(&stmt.name);
870 self.catalog.register_node_type(def)
871 } else {
872 self.catalog.register_node_type(def)
873 };
874 match result {
875 Ok(()) => {
876 wal_log!(
877 self,
878 WalRecord::CreateNodeType {
879 name: stmt.name.clone(),
880 properties: props_for_wal,
881 constraints: Vec::new(),
882 }
883 );
884 Ok(QueryResult::status(format!(
885 "Created node type '{}'",
886 stmt.name
887 )))
888 }
889 Err(e) if stmt.if_not_exists => {
890 let _ = e;
891 Ok(QueryResult::status("No change"))
892 }
893 Err(e) => Err(Error::Query(QueryError::new(
894 QueryErrorKind::Semantic,
895 e.to_string(),
896 ))),
897 }
898 }
899 SchemaStatement::CreateEdgeType(stmt) => {
900 #[cfg(feature = "wal")]
901 let props_for_wal: Vec<(String, String, bool)> = stmt
902 .properties
903 .iter()
904 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
905 .collect();
906 let def = EdgeTypeDefinition {
907 name: stmt.name.clone(),
908 properties: stmt
909 .properties
910 .iter()
911 .map(|p| TypedProperty {
912 name: p.name.clone(),
913 data_type: PropertyDataType::from_type_name(&p.data_type),
914 nullable: p.nullable,
915 default_value: p
916 .default_value
917 .as_ref()
918 .map(|s| parse_default_literal(s)),
919 })
920 .collect(),
921 constraints: Vec::new(),
922 source_node_types: stmt.source_node_types.clone(),
923 target_node_types: stmt.target_node_types.clone(),
924 };
925 let result = if stmt.or_replace {
926 let _ = self.catalog.drop_edge_type_def(&stmt.name);
927 self.catalog.register_edge_type_def(def)
928 } else {
929 self.catalog.register_edge_type_def(def)
930 };
931 match result {
932 Ok(()) => {
933 wal_log!(
934 self,
935 WalRecord::CreateEdgeType {
936 name: stmt.name.clone(),
937 properties: props_for_wal,
938 constraints: Vec::new(),
939 }
940 );
941 Ok(QueryResult::status(format!(
942 "Created edge type '{}'",
943 stmt.name
944 )))
945 }
946 Err(e) if stmt.if_not_exists => {
947 let _ = e;
948 Ok(QueryResult::status("No change"))
949 }
950 Err(e) => Err(Error::Query(QueryError::new(
951 QueryErrorKind::Semantic,
952 e.to_string(),
953 ))),
954 }
955 }
956 SchemaStatement::CreateVectorIndex(stmt) => {
957 Self::create_vector_index_on_store(
958 &self.active_lpg_store(),
959 &stmt.node_label,
960 &stmt.property,
961 stmt.dimensions,
962 stmt.metric.as_deref(),
963 )?;
964 wal_log!(
965 self,
966 WalRecord::CreateIndex {
967 name: stmt.name.clone(),
968 label: stmt.node_label.clone(),
969 property: stmt.property.clone(),
970 index_type: "vector".to_string(),
971 }
972 );
973 Ok(QueryResult::status(format!(
974 "Created vector index '{}'",
975 stmt.name
976 )))
977 }
978 SchemaStatement::DropNodeType { name, if_exists } => {
979 match self.catalog.drop_node_type(&name) {
980 Ok(()) => {
981 wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
982 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
983 }
984 Err(e) if if_exists => {
985 let _ = e;
986 Ok(QueryResult::status("No change"))
987 }
988 Err(e) => Err(Error::Query(QueryError::new(
989 QueryErrorKind::Semantic,
990 e.to_string(),
991 ))),
992 }
993 }
994 SchemaStatement::DropEdgeType { name, if_exists } => {
995 match self.catalog.drop_edge_type_def(&name) {
996 Ok(()) => {
997 wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
998 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
999 }
1000 Err(e) if if_exists => {
1001 let _ = e;
1002 Ok(QueryResult::status("No change"))
1003 }
1004 Err(e) => Err(Error::Query(QueryError::new(
1005 QueryErrorKind::Semantic,
1006 e.to_string(),
1007 ))),
1008 }
1009 }
1010 SchemaStatement::CreateIndex(stmt) => {
1011 use grafeo_adapters::query::gql::ast::IndexKind;
1012 let active = self.active_lpg_store();
1013 let index_type_str = match stmt.index_kind {
1014 IndexKind::Property => "property",
1015 IndexKind::BTree => "btree",
1016 IndexKind::Text => "text",
1017 IndexKind::Vector => "vector",
1018 };
1019 match stmt.index_kind {
1020 IndexKind::Property | IndexKind::BTree => {
1021 for prop in &stmt.properties {
1022 active.create_property_index(prop);
1023 }
1024 }
1025 IndexKind::Text => {
1026 for prop in &stmt.properties {
1027 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1028 }
1029 }
1030 IndexKind::Vector => {
1031 for prop in &stmt.properties {
1032 Self::create_vector_index_on_store(
1033 &active,
1034 &stmt.label,
1035 prop,
1036 stmt.options.dimensions,
1037 stmt.options.metric.as_deref(),
1038 )?;
1039 }
1040 }
1041 }
1042 #[cfg(feature = "wal")]
1043 for prop in &stmt.properties {
1044 wal_log!(
1045 self,
1046 WalRecord::CreateIndex {
1047 name: stmt.name.clone(),
1048 label: stmt.label.clone(),
1049 property: prop.clone(),
1050 index_type: index_type_str.to_string(),
1051 }
1052 );
1053 }
1054 Ok(QueryResult::status(format!(
1055 "Created {} index '{}'",
1056 index_type_str, stmt.name
1057 )))
1058 }
1059 SchemaStatement::DropIndex { name, if_exists } => {
1060 let dropped = self.active_lpg_store().drop_property_index(&name);
1062 if dropped || if_exists {
1063 if dropped {
1064 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1065 }
1066 Ok(QueryResult::status(if dropped {
1067 format!("Dropped index '{name}'")
1068 } else {
1069 "No change".to_string()
1070 }))
1071 } else {
1072 Err(Error::Query(QueryError::new(
1073 QueryErrorKind::Semantic,
1074 format!("Index '{name}' does not exist"),
1075 )))
1076 }
1077 }
1078 SchemaStatement::CreateConstraint(stmt) => {
1079 use crate::catalog::TypeConstraint;
1080 use grafeo_adapters::query::gql::ast::ConstraintKind;
1081 let kind_str = match stmt.constraint_kind {
1082 ConstraintKind::Unique => "unique",
1083 ConstraintKind::NodeKey => "node_key",
1084 ConstraintKind::NotNull => "not_null",
1085 ConstraintKind::Exists => "exists",
1086 };
1087 let constraint_name = stmt
1088 .name
1089 .clone()
1090 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1091
1092 match stmt.constraint_kind {
1094 ConstraintKind::Unique => {
1095 for prop in &stmt.properties {
1096 let label_id = self.catalog.get_or_create_label(&stmt.label);
1097 let prop_id = self.catalog.get_or_create_property_key(prop);
1098 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1099 }
1100 let _ = self.catalog.add_constraint_to_type(
1101 &stmt.label,
1102 TypeConstraint::Unique(stmt.properties.clone()),
1103 );
1104 }
1105 ConstraintKind::NodeKey => {
1106 for prop in &stmt.properties {
1107 let label_id = self.catalog.get_or_create_label(&stmt.label);
1108 let prop_id = self.catalog.get_or_create_property_key(prop);
1109 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1110 let _ = self.catalog.add_required_property(label_id, prop_id);
1111 }
1112 let _ = self.catalog.add_constraint_to_type(
1113 &stmt.label,
1114 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1115 );
1116 }
1117 ConstraintKind::NotNull | ConstraintKind::Exists => {
1118 for prop in &stmt.properties {
1119 let label_id = self.catalog.get_or_create_label(&stmt.label);
1120 let prop_id = self.catalog.get_or_create_property_key(prop);
1121 let _ = self.catalog.add_required_property(label_id, prop_id);
1122 let _ = self.catalog.add_constraint_to_type(
1123 &stmt.label,
1124 TypeConstraint::NotNull(prop.clone()),
1125 );
1126 }
1127 }
1128 }
1129
1130 wal_log!(
1131 self,
1132 WalRecord::CreateConstraint {
1133 name: constraint_name.clone(),
1134 label: stmt.label.clone(),
1135 properties: stmt.properties.clone(),
1136 kind: kind_str.to_string(),
1137 }
1138 );
1139 Ok(QueryResult::status(format!(
1140 "Created {kind_str} constraint '{constraint_name}'"
1141 )))
1142 }
1143 SchemaStatement::DropConstraint { name, if_exists } => {
1144 let _ = if_exists;
1145 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1146 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1147 }
1148 SchemaStatement::CreateGraphType(stmt) => {
1149 use crate::catalog::GraphTypeDefinition;
1150 use grafeo_adapters::query::gql::ast::InlineElementType;
1151
1152 let (mut node_types, mut edge_types, open) =
1154 if let Some(ref like_graph) = stmt.like_graph {
1155 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1157 if let Some(existing) = self
1158 .catalog
1159 .schema()
1160 .and_then(|s| s.get_graph_type(&type_name))
1161 {
1162 (
1163 existing.allowed_node_types.clone(),
1164 existing.allowed_edge_types.clone(),
1165 existing.open,
1166 )
1167 } else {
1168 (Vec::new(), Vec::new(), true)
1169 }
1170 } else {
1171 let nt = self.catalog.all_node_type_names();
1173 let et = self.catalog.all_edge_type_names();
1174 if nt.is_empty() && et.is_empty() {
1175 (Vec::new(), Vec::new(), true)
1176 } else {
1177 (nt, et, false)
1178 }
1179 }
1180 } else {
1181 (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1182 };
1183
1184 for inline in &stmt.inline_types {
1186 match inline {
1187 InlineElementType::Node {
1188 name,
1189 properties,
1190 key_labels,
1191 ..
1192 } => {
1193 let def = NodeTypeDefinition {
1194 name: name.clone(),
1195 properties: properties
1196 .iter()
1197 .map(|p| TypedProperty {
1198 name: p.name.clone(),
1199 data_type: PropertyDataType::from_type_name(&p.data_type),
1200 nullable: p.nullable,
1201 default_value: None,
1202 })
1203 .collect(),
1204 constraints: Vec::new(),
1205 parent_types: key_labels.clone(),
1206 };
1207 self.catalog.register_or_replace_node_type(def);
1209 #[cfg(feature = "wal")]
1210 {
1211 let props_for_wal: Vec<(String, String, bool)> = properties
1212 .iter()
1213 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1214 .collect();
1215 self.log_schema_wal(&WalRecord::CreateNodeType {
1216 name: name.clone(),
1217 properties: props_for_wal,
1218 constraints: Vec::new(),
1219 });
1220 }
1221 if !node_types.contains(name) {
1222 node_types.push(name.clone());
1223 }
1224 }
1225 InlineElementType::Edge {
1226 name,
1227 properties,
1228 source_node_types,
1229 target_node_types,
1230 ..
1231 } => {
1232 let def = EdgeTypeDefinition {
1233 name: name.clone(),
1234 properties: properties
1235 .iter()
1236 .map(|p| TypedProperty {
1237 name: p.name.clone(),
1238 data_type: PropertyDataType::from_type_name(&p.data_type),
1239 nullable: p.nullable,
1240 default_value: None,
1241 })
1242 .collect(),
1243 constraints: Vec::new(),
1244 source_node_types: source_node_types.clone(),
1245 target_node_types: target_node_types.clone(),
1246 };
1247 self.catalog.register_or_replace_edge_type_def(def);
1248 #[cfg(feature = "wal")]
1249 {
1250 let props_for_wal: Vec<(String, String, bool)> = properties
1251 .iter()
1252 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1253 .collect();
1254 self.log_schema_wal(&WalRecord::CreateEdgeType {
1255 name: name.clone(),
1256 properties: props_for_wal,
1257 constraints: Vec::new(),
1258 });
1259 }
1260 if !edge_types.contains(name) {
1261 edge_types.push(name.clone());
1262 }
1263 }
1264 }
1265 }
1266
1267 let def = GraphTypeDefinition {
1268 name: stmt.name.clone(),
1269 allowed_node_types: node_types.clone(),
1270 allowed_edge_types: edge_types.clone(),
1271 open,
1272 };
1273 let result = if stmt.or_replace {
1274 let _ = self.catalog.drop_graph_type(&stmt.name);
1276 self.catalog.register_graph_type(def)
1277 } else {
1278 self.catalog.register_graph_type(def)
1279 };
1280 match result {
1281 Ok(()) => {
1282 wal_log!(
1283 self,
1284 WalRecord::CreateGraphType {
1285 name: stmt.name.clone(),
1286 node_types,
1287 edge_types,
1288 open,
1289 }
1290 );
1291 Ok(QueryResult::status(format!(
1292 "Created graph type '{}'",
1293 stmt.name
1294 )))
1295 }
1296 Err(e) if stmt.if_not_exists => {
1297 let _ = e;
1298 Ok(QueryResult::status("No change"))
1299 }
1300 Err(e) => Err(Error::Query(QueryError::new(
1301 QueryErrorKind::Semantic,
1302 e.to_string(),
1303 ))),
1304 }
1305 }
1306 SchemaStatement::DropGraphType { name, if_exists } => {
1307 match self.catalog.drop_graph_type(&name) {
1308 Ok(()) => {
1309 wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1310 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1311 }
1312 Err(e) if if_exists => {
1313 let _ = e;
1314 Ok(QueryResult::status("No change"))
1315 }
1316 Err(e) => Err(Error::Query(QueryError::new(
1317 QueryErrorKind::Semantic,
1318 e.to_string(),
1319 ))),
1320 }
1321 }
1322 SchemaStatement::CreateSchema {
1323 name,
1324 if_not_exists,
1325 } => match self.catalog.register_schema_namespace(name.clone()) {
1326 Ok(()) => {
1327 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1328 Ok(QueryResult::status(format!("Created schema '{name}'")))
1329 }
1330 Err(e) if if_not_exists => {
1331 let _ = e;
1332 Ok(QueryResult::status("No change"))
1333 }
1334 Err(e) => Err(Error::Query(QueryError::new(
1335 QueryErrorKind::Semantic,
1336 e.to_string(),
1337 ))),
1338 },
1339 SchemaStatement::DropSchema { name, if_exists } => {
1340 let prefix = format!("{name}/");
1342 let has_graphs = self
1343 .store
1344 .graph_names()
1345 .iter()
1346 .any(|g| g.starts_with(&prefix));
1347 if has_graphs {
1348 return Err(Error::Query(QueryError::new(
1349 QueryErrorKind::Semantic,
1350 format!(
1351 "Schema '{name}' is not empty: drop all graphs in the schema first"
1352 ),
1353 )));
1354 }
1355 match self.catalog.drop_schema_namespace(&name) {
1356 Ok(()) => {
1357 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1358 let mut current = self.current_schema.lock();
1360 if current
1361 .as_deref()
1362 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1363 {
1364 *current = None;
1365 }
1366 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1367 }
1368 Err(e) if if_exists => {
1369 let _ = e;
1370 Ok(QueryResult::status("No change"))
1371 }
1372 Err(e) => Err(Error::Query(QueryError::new(
1373 QueryErrorKind::Semantic,
1374 e.to_string(),
1375 ))),
1376 }
1377 }
1378 SchemaStatement::AlterNodeType(stmt) => {
1379 use grafeo_adapters::query::gql::ast::TypeAlteration;
1380 let mut wal_alts = Vec::new();
1381 for alt in &stmt.alterations {
1382 match alt {
1383 TypeAlteration::AddProperty(prop) => {
1384 let typed = TypedProperty {
1385 name: prop.name.clone(),
1386 data_type: PropertyDataType::from_type_name(&prop.data_type),
1387 nullable: prop.nullable,
1388 default_value: prop
1389 .default_value
1390 .as_ref()
1391 .map(|s| parse_default_literal(s)),
1392 };
1393 self.catalog
1394 .alter_node_type_add_property(&stmt.name, typed)
1395 .map_err(|e| {
1396 Error::Query(QueryError::new(
1397 QueryErrorKind::Semantic,
1398 e.to_string(),
1399 ))
1400 })?;
1401 wal_alts.push((
1402 "add".to_string(),
1403 prop.name.clone(),
1404 prop.data_type.clone(),
1405 prop.nullable,
1406 ));
1407 }
1408 TypeAlteration::DropProperty(name) => {
1409 self.catalog
1410 .alter_node_type_drop_property(&stmt.name, name)
1411 .map_err(|e| {
1412 Error::Query(QueryError::new(
1413 QueryErrorKind::Semantic,
1414 e.to_string(),
1415 ))
1416 })?;
1417 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1418 }
1419 }
1420 }
1421 wal_log!(
1422 self,
1423 WalRecord::AlterNodeType {
1424 name: stmt.name.clone(),
1425 alterations: wal_alts,
1426 }
1427 );
1428 Ok(QueryResult::status(format!(
1429 "Altered node type '{}'",
1430 stmt.name
1431 )))
1432 }
1433 SchemaStatement::AlterEdgeType(stmt) => {
1434 use grafeo_adapters::query::gql::ast::TypeAlteration;
1435 let mut wal_alts = Vec::new();
1436 for alt in &stmt.alterations {
1437 match alt {
1438 TypeAlteration::AddProperty(prop) => {
1439 let typed = TypedProperty {
1440 name: prop.name.clone(),
1441 data_type: PropertyDataType::from_type_name(&prop.data_type),
1442 nullable: prop.nullable,
1443 default_value: prop
1444 .default_value
1445 .as_ref()
1446 .map(|s| parse_default_literal(s)),
1447 };
1448 self.catalog
1449 .alter_edge_type_add_property(&stmt.name, typed)
1450 .map_err(|e| {
1451 Error::Query(QueryError::new(
1452 QueryErrorKind::Semantic,
1453 e.to_string(),
1454 ))
1455 })?;
1456 wal_alts.push((
1457 "add".to_string(),
1458 prop.name.clone(),
1459 prop.data_type.clone(),
1460 prop.nullable,
1461 ));
1462 }
1463 TypeAlteration::DropProperty(name) => {
1464 self.catalog
1465 .alter_edge_type_drop_property(&stmt.name, name)
1466 .map_err(|e| {
1467 Error::Query(QueryError::new(
1468 QueryErrorKind::Semantic,
1469 e.to_string(),
1470 ))
1471 })?;
1472 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1473 }
1474 }
1475 }
1476 wal_log!(
1477 self,
1478 WalRecord::AlterEdgeType {
1479 name: stmt.name.clone(),
1480 alterations: wal_alts,
1481 }
1482 );
1483 Ok(QueryResult::status(format!(
1484 "Altered edge type '{}'",
1485 stmt.name
1486 )))
1487 }
1488 SchemaStatement::AlterGraphType(stmt) => {
1489 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1490 let mut wal_alts = Vec::new();
1491 for alt in &stmt.alterations {
1492 match alt {
1493 GraphTypeAlteration::AddNodeType(name) => {
1494 self.catalog
1495 .alter_graph_type_add_node_type(&stmt.name, name.clone())
1496 .map_err(|e| {
1497 Error::Query(QueryError::new(
1498 QueryErrorKind::Semantic,
1499 e.to_string(),
1500 ))
1501 })?;
1502 wal_alts.push(("add_node_type".to_string(), name.clone()));
1503 }
1504 GraphTypeAlteration::DropNodeType(name) => {
1505 self.catalog
1506 .alter_graph_type_drop_node_type(&stmt.name, name)
1507 .map_err(|e| {
1508 Error::Query(QueryError::new(
1509 QueryErrorKind::Semantic,
1510 e.to_string(),
1511 ))
1512 })?;
1513 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1514 }
1515 GraphTypeAlteration::AddEdgeType(name) => {
1516 self.catalog
1517 .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1518 .map_err(|e| {
1519 Error::Query(QueryError::new(
1520 QueryErrorKind::Semantic,
1521 e.to_string(),
1522 ))
1523 })?;
1524 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1525 }
1526 GraphTypeAlteration::DropEdgeType(name) => {
1527 self.catalog
1528 .alter_graph_type_drop_edge_type(&stmt.name, name)
1529 .map_err(|e| {
1530 Error::Query(QueryError::new(
1531 QueryErrorKind::Semantic,
1532 e.to_string(),
1533 ))
1534 })?;
1535 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1536 }
1537 }
1538 }
1539 wal_log!(
1540 self,
1541 WalRecord::AlterGraphType {
1542 name: stmt.name.clone(),
1543 alterations: wal_alts,
1544 }
1545 );
1546 Ok(QueryResult::status(format!(
1547 "Altered graph type '{}'",
1548 stmt.name
1549 )))
1550 }
1551 SchemaStatement::CreateProcedure(stmt) => {
1552 use crate::catalog::ProcedureDefinition;
1553
1554 let def = ProcedureDefinition {
1555 name: stmt.name.clone(),
1556 params: stmt
1557 .params
1558 .iter()
1559 .map(|p| (p.name.clone(), p.param_type.clone()))
1560 .collect(),
1561 returns: stmt
1562 .returns
1563 .iter()
1564 .map(|r| (r.name.clone(), r.return_type.clone()))
1565 .collect(),
1566 body: stmt.body.clone(),
1567 };
1568
1569 if stmt.or_replace {
1570 self.catalog.replace_procedure(def).map_err(|e| {
1571 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1572 })?;
1573 } else {
1574 match self.catalog.register_procedure(def) {
1575 Ok(()) => {}
1576 Err(_) if stmt.if_not_exists => {
1577 return Ok(QueryResult::empty());
1578 }
1579 Err(e) => {
1580 return Err(Error::Query(QueryError::new(
1581 QueryErrorKind::Semantic,
1582 e.to_string(),
1583 )));
1584 }
1585 }
1586 }
1587
1588 wal_log!(
1589 self,
1590 WalRecord::CreateProcedure {
1591 name: stmt.name.clone(),
1592 params: stmt
1593 .params
1594 .iter()
1595 .map(|p| (p.name.clone(), p.param_type.clone()))
1596 .collect(),
1597 returns: stmt
1598 .returns
1599 .iter()
1600 .map(|r| (r.name.clone(), r.return_type.clone()))
1601 .collect(),
1602 body: stmt.body,
1603 }
1604 );
1605 Ok(QueryResult::status(format!(
1606 "Created procedure '{}'",
1607 stmt.name
1608 )))
1609 }
1610 SchemaStatement::DropProcedure { name, if_exists } => {
1611 match self.catalog.drop_procedure(&name) {
1612 Ok(()) => {}
1613 Err(_) if if_exists => {
1614 return Ok(QueryResult::empty());
1615 }
1616 Err(e) => {
1617 return Err(Error::Query(QueryError::new(
1618 QueryErrorKind::Semantic,
1619 e.to_string(),
1620 )));
1621 }
1622 }
1623 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1624 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1625 }
1626 SchemaStatement::ShowIndexes => {
1627 return self.execute_show_indexes();
1628 }
1629 SchemaStatement::ShowConstraints => {
1630 return self.execute_show_constraints();
1631 }
1632 SchemaStatement::ShowNodeTypes => {
1633 return self.execute_show_node_types();
1634 }
1635 SchemaStatement::ShowEdgeTypes => {
1636 return self.execute_show_edge_types();
1637 }
1638 SchemaStatement::ShowGraphTypes => {
1639 return self.execute_show_graph_types();
1640 }
1641 SchemaStatement::ShowGraphType(name) => {
1642 return self.execute_show_graph_type(&name);
1643 }
1644 SchemaStatement::ShowCurrentGraphType => {
1645 return self.execute_show_current_graph_type();
1646 }
1647 SchemaStatement::ShowGraphs => {
1648 return self.execute_show_graphs();
1649 }
1650 SchemaStatement::ShowSchemas => {
1651 return self.execute_show_schemas();
1652 }
1653 };
1654
1655 if result.is_ok() {
1658 self.query_cache.clear();
1659 }
1660
1661 result
1662 }
1663
1664 #[cfg(all(feature = "gql", feature = "vector-index"))]
1666 fn create_vector_index_on_store(
1667 store: &LpgStore,
1668 label: &str,
1669 property: &str,
1670 dimensions: Option<usize>,
1671 metric: Option<&str>,
1672 ) -> Result<()> {
1673 use grafeo_common::types::{PropertyKey, Value};
1674 use grafeo_common::utils::error::Error;
1675 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1676
1677 let metric = match metric {
1678 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1679 Error::Internal(format!(
1680 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1681 ))
1682 })?,
1683 None => DistanceMetric::Cosine,
1684 };
1685
1686 let prop_key = PropertyKey::new(property);
1687 let mut found_dims: Option<usize> = dimensions;
1688 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1689
1690 for node in store.nodes_with_label(label) {
1691 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1692 if let Some(expected) = found_dims {
1693 if v.len() != expected {
1694 return Err(Error::Internal(format!(
1695 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1696 v.len(),
1697 node.id.0
1698 )));
1699 }
1700 } else {
1701 found_dims = Some(v.len());
1702 }
1703 vectors.push((node.id, v.to_vec()));
1704 }
1705 }
1706
1707 let Some(dims) = found_dims else {
1708 return Err(Error::Internal(format!(
1709 "No vector properties found on :{label}({property}) and no dimensions specified"
1710 )));
1711 };
1712
1713 let config = HnswConfig::new(dims, metric);
1714 let index = HnswIndex::with_capacity(config, vectors.len());
1715 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1716 for (node_id, vec) in &vectors {
1717 index.insert(*node_id, vec, &accessor);
1718 }
1719
1720 store.add_vector_index(label, property, Arc::new(index));
1721 Ok(())
1722 }
1723
1724 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1726 fn create_vector_index_on_store(
1727 _store: &LpgStore,
1728 _label: &str,
1729 _property: &str,
1730 _dimensions: Option<usize>,
1731 _metric: Option<&str>,
1732 ) -> Result<()> {
1733 Err(grafeo_common::utils::error::Error::Internal(
1734 "Vector index support requires the 'vector-index' feature".to_string(),
1735 ))
1736 }
1737
1738 #[cfg(all(feature = "gql", feature = "text-index"))]
1740 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1741 use grafeo_common::types::{PropertyKey, Value};
1742 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1743
1744 let mut index = InvertedIndex::new(BM25Config::default());
1745 let prop_key = PropertyKey::new(property);
1746
1747 let nodes = store.nodes_by_label(label);
1748 for node_id in nodes {
1749 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1750 index.insert(node_id, text.as_str());
1751 }
1752 }
1753
1754 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1755 Ok(())
1756 }
1757
1758 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1760 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1761 Err(grafeo_common::utils::error::Error::Internal(
1762 "Text index support requires the 'text-index' feature".to_string(),
1763 ))
1764 }
1765
1766 fn execute_show_indexes(&self) -> Result<QueryResult> {
1768 let indexes = self.catalog.all_indexes();
1769 let columns = vec![
1770 "name".to_string(),
1771 "type".to_string(),
1772 "label".to_string(),
1773 "property".to_string(),
1774 ];
1775 let rows: Vec<Vec<Value>> = indexes
1776 .into_iter()
1777 .map(|def| {
1778 let label_name = self
1779 .catalog
1780 .get_label_name(def.label)
1781 .unwrap_or_else(|| "?".into());
1782 let prop_name = self
1783 .catalog
1784 .get_property_key_name(def.property_key)
1785 .unwrap_or_else(|| "?".into());
1786 vec![
1787 Value::from(format!("idx_{}_{}", label_name, prop_name)),
1788 Value::from(format!("{:?}", def.index_type)),
1789 Value::from(&*label_name),
1790 Value::from(&*prop_name),
1791 ]
1792 })
1793 .collect();
1794 Ok(QueryResult {
1795 columns,
1796 column_types: Vec::new(),
1797 rows,
1798 ..QueryResult::empty()
1799 })
1800 }
1801
1802 fn execute_show_constraints(&self) -> Result<QueryResult> {
1804 Ok(QueryResult {
1807 columns: vec![
1808 "name".to_string(),
1809 "type".to_string(),
1810 "label".to_string(),
1811 "properties".to_string(),
1812 ],
1813 column_types: Vec::new(),
1814 rows: Vec::new(),
1815 ..QueryResult::empty()
1816 })
1817 }
1818
1819 fn execute_show_node_types(&self) -> Result<QueryResult> {
1821 let columns = vec![
1822 "name".to_string(),
1823 "properties".to_string(),
1824 "constraints".to_string(),
1825 "parents".to_string(),
1826 ];
1827 let type_names = self.catalog.all_node_type_names();
1828 let rows: Vec<Vec<Value>> = type_names
1829 .into_iter()
1830 .filter_map(|name| {
1831 let def = self.catalog.get_node_type(&name)?;
1832 let props: Vec<String> = def
1833 .properties
1834 .iter()
1835 .map(|p| {
1836 let nullable = if p.nullable { "" } else { " NOT NULL" };
1837 format!("{} {}{}", p.name, p.data_type, nullable)
1838 })
1839 .collect();
1840 let constraints: Vec<String> =
1841 def.constraints.iter().map(|c| format!("{c:?}")).collect();
1842 let parents = def.parent_types.join(", ");
1843 Some(vec![
1844 Value::from(name),
1845 Value::from(props.join(", ")),
1846 Value::from(constraints.join(", ")),
1847 Value::from(parents),
1848 ])
1849 })
1850 .collect();
1851 Ok(QueryResult {
1852 columns,
1853 column_types: Vec::new(),
1854 rows,
1855 ..QueryResult::empty()
1856 })
1857 }
1858
1859 fn execute_show_edge_types(&self) -> Result<QueryResult> {
1861 let columns = vec![
1862 "name".to_string(),
1863 "properties".to_string(),
1864 "source_types".to_string(),
1865 "target_types".to_string(),
1866 ];
1867 let type_names = self.catalog.all_edge_type_names();
1868 let rows: Vec<Vec<Value>> = type_names
1869 .into_iter()
1870 .filter_map(|name| {
1871 let def = self.catalog.get_edge_type_def(&name)?;
1872 let props: Vec<String> = def
1873 .properties
1874 .iter()
1875 .map(|p| {
1876 let nullable = if p.nullable { "" } else { " NOT NULL" };
1877 format!("{} {}{}", p.name, p.data_type, nullable)
1878 })
1879 .collect();
1880 let src = def.source_node_types.join(", ");
1881 let tgt = def.target_node_types.join(", ");
1882 Some(vec![
1883 Value::from(name),
1884 Value::from(props.join(", ")),
1885 Value::from(src),
1886 Value::from(tgt),
1887 ])
1888 })
1889 .collect();
1890 Ok(QueryResult {
1891 columns,
1892 column_types: Vec::new(),
1893 rows,
1894 ..QueryResult::empty()
1895 })
1896 }
1897
1898 fn execute_show_graph_types(&self) -> Result<QueryResult> {
1900 let columns = vec![
1901 "name".to_string(),
1902 "open".to_string(),
1903 "node_types".to_string(),
1904 "edge_types".to_string(),
1905 ];
1906 let type_names = self.catalog.all_graph_type_names();
1907 let rows: Vec<Vec<Value>> = type_names
1908 .into_iter()
1909 .filter_map(|name| {
1910 let def = self.catalog.get_graph_type_def(&name)?;
1911 Some(vec![
1912 Value::from(name),
1913 Value::from(def.open),
1914 Value::from(def.allowed_node_types.join(", ")),
1915 Value::from(def.allowed_edge_types.join(", ")),
1916 ])
1917 })
1918 .collect();
1919 Ok(QueryResult {
1920 columns,
1921 column_types: Vec::new(),
1922 rows,
1923 ..QueryResult::empty()
1924 })
1925 }
1926
1927 fn execute_show_graphs(&self) -> Result<QueryResult> {
1933 let schema = self.current_schema.lock().clone();
1934 let all_names = self.store.graph_names();
1935
1936 let mut names: Vec<String> = match &schema {
1937 Some(s) => {
1938 let prefix = format!("{s}/");
1939 all_names
1940 .into_iter()
1941 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1942 .collect()
1943 }
1944 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1945 };
1946 names.sort();
1947
1948 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1949 Ok(QueryResult {
1950 columns: vec!["name".to_string()],
1951 column_types: Vec::new(),
1952 rows,
1953 ..QueryResult::empty()
1954 })
1955 }
1956
1957 fn execute_show_schemas(&self) -> Result<QueryResult> {
1959 let mut names = self.catalog.schema_names();
1960 names.sort();
1961 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1962 Ok(QueryResult {
1963 columns: vec!["name".to_string()],
1964 column_types: Vec::new(),
1965 rows,
1966 ..QueryResult::empty()
1967 })
1968 }
1969
1970 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1972 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1973
1974 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1975 Error::Query(QueryError::new(
1976 QueryErrorKind::Semantic,
1977 format!("Graph type '{name}' not found"),
1978 ))
1979 })?;
1980
1981 let columns = vec![
1982 "name".to_string(),
1983 "open".to_string(),
1984 "node_types".to_string(),
1985 "edge_types".to_string(),
1986 ];
1987 let rows = vec![vec![
1988 Value::from(def.name),
1989 Value::from(def.open),
1990 Value::from(def.allowed_node_types.join(", ")),
1991 Value::from(def.allowed_edge_types.join(", ")),
1992 ]];
1993 Ok(QueryResult {
1994 columns,
1995 column_types: Vec::new(),
1996 rows,
1997 ..QueryResult::empty()
1998 })
1999 }
2000
2001 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2003 let graph_name = self
2004 .current_graph()
2005 .unwrap_or_else(|| "default".to_string());
2006 let columns = vec![
2007 "graph".to_string(),
2008 "graph_type".to_string(),
2009 "open".to_string(),
2010 "node_types".to_string(),
2011 "edge_types".to_string(),
2012 ];
2013
2014 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2015 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2016 {
2017 let rows = vec![vec![
2018 Value::from(graph_name),
2019 Value::from(type_name),
2020 Value::from(def.open),
2021 Value::from(def.allowed_node_types.join(", ")),
2022 Value::from(def.allowed_edge_types.join(", ")),
2023 ]];
2024 return Ok(QueryResult {
2025 columns,
2026 column_types: Vec::new(),
2027 rows,
2028 ..QueryResult::empty()
2029 });
2030 }
2031
2032 Ok(QueryResult {
2034 columns,
2035 column_types: Vec::new(),
2036 rows: vec![vec![
2037 Value::from(graph_name),
2038 Value::Null,
2039 Value::Null,
2040 Value::Null,
2041 Value::Null,
2042 ]],
2043 ..QueryResult::empty()
2044 })
2045 }
2046
2047 #[cfg(feature = "gql")]
2074 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2075 self.require_lpg("GQL")?;
2076
2077 use crate::query::{
2078 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2079 processor::QueryLanguage, translators::gql,
2080 };
2081
2082 #[cfg(not(target_arch = "wasm32"))]
2083 let start_time = std::time::Instant::now();
2084
2085 let translation = gql::translate_full(query)?;
2087 let logical_plan = match translation {
2088 gql::GqlTranslationResult::SessionCommand(cmd) => {
2089 return self.execute_session_command(cmd);
2090 }
2091 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2092 if *self.read_only_tx.lock() {
2094 return Err(grafeo_common::utils::error::Error::Transaction(
2095 grafeo_common::utils::error::TransactionError::ReadOnly,
2096 ));
2097 }
2098 return self.execute_schema_command(cmd);
2099 }
2100 gql::GqlTranslationResult::Plan(plan) => {
2101 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2103 return Err(grafeo_common::utils::error::Error::Transaction(
2104 grafeo_common::utils::error::TransactionError::ReadOnly,
2105 ));
2106 }
2107 plan
2108 }
2109 };
2110
2111 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2113
2114 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2116 cached_plan
2117 } else {
2118 let mut binder = Binder::new();
2120 let _binding_context = binder.bind(&logical_plan)?;
2121
2122 let active = self.active_store();
2124 let optimizer = Optimizer::from_graph_store(&*active);
2125 let plan = optimizer.optimize(logical_plan)?;
2126
2127 self.query_cache.put_optimized(cache_key, plan.clone());
2129
2130 plan
2131 };
2132
2133 let active = self.active_store();
2135
2136 if optimized_plan.explain {
2138 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2139 let mut plan = optimized_plan;
2140 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2141 return Ok(explain_result(&plan));
2142 }
2143
2144 if optimized_plan.profile {
2146 let has_mutations = optimized_plan.root.has_mutations();
2147 return self.with_auto_commit(has_mutations, || {
2148 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2149 let planner = self.create_planner_for_store(
2150 Arc::clone(&active),
2151 viewing_epoch,
2152 transaction_id,
2153 );
2154 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2155
2156 let executor = Executor::with_columns(physical_plan.columns.clone())
2157 .with_deadline(self.query_deadline());
2158 let _result = executor.execute(physical_plan.operator.as_mut())?;
2159
2160 let total_time_ms;
2161 #[cfg(not(target_arch = "wasm32"))]
2162 {
2163 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2164 }
2165 #[cfg(target_arch = "wasm32")]
2166 {
2167 total_time_ms = 0.0;
2168 }
2169
2170 let profile_tree = crate::query::profile::build_profile_tree(
2171 &optimized_plan.root,
2172 &mut entries.into_iter(),
2173 );
2174 Ok(crate::query::profile::profile_result(
2175 &profile_tree,
2176 total_time_ms,
2177 ))
2178 });
2179 }
2180
2181 let has_mutations = optimized_plan.root.has_mutations();
2182
2183 self.with_auto_commit(has_mutations, || {
2184 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2186
2187 let planner =
2190 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2191 let mut physical_plan = planner.plan(&optimized_plan)?;
2192
2193 let executor = Executor::with_columns(physical_plan.columns.clone())
2195 .with_deadline(self.query_deadline());
2196 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2197
2198 let rows_scanned = result.rows.len() as u64;
2200 #[cfg(not(target_arch = "wasm32"))]
2201 {
2202 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2203 result.execution_time_ms = Some(elapsed_ms);
2204 }
2205 result.rows_scanned = Some(rows_scanned);
2206
2207 Ok(result)
2208 })
2209 }
2210
2211 #[cfg(feature = "gql")]
2220 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2221 let previous = self.viewing_epoch_override.lock().replace(epoch);
2222 let result = self.execute(query);
2223 *self.viewing_epoch_override.lock() = previous;
2224 result
2225 }
2226
2227 #[cfg(feature = "gql")]
2233 pub fn execute_with_params(
2234 &self,
2235 query: &str,
2236 params: std::collections::HashMap<String, Value>,
2237 ) -> Result<QueryResult> {
2238 self.require_lpg("GQL")?;
2239
2240 use crate::query::processor::{QueryLanguage, QueryProcessor};
2241
2242 let has_mutations = Self::query_looks_like_mutation(query);
2243 let active = self.active_store();
2244
2245 self.with_auto_commit(has_mutations, || {
2246 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2248
2249 let processor = QueryProcessor::for_graph_store_with_transaction(
2251 Arc::clone(&active),
2252 Arc::clone(&self.transaction_manager),
2253 )?;
2254
2255 let processor = if let Some(transaction_id) = transaction_id {
2257 processor.with_transaction_context(viewing_epoch, transaction_id)
2258 } else {
2259 processor
2260 };
2261
2262 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2263 })
2264 }
2265
2266 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2272 pub fn execute_with_params(
2273 &self,
2274 _query: &str,
2275 _params: std::collections::HashMap<String, Value>,
2276 ) -> Result<QueryResult> {
2277 Err(grafeo_common::utils::error::Error::Internal(
2278 "No query language enabled".to_string(),
2279 ))
2280 }
2281
2282 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2288 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2289 Err(grafeo_common::utils::error::Error::Internal(
2290 "No query language enabled".to_string(),
2291 ))
2292 }
2293
2294 #[cfg(feature = "cypher")]
2300 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2301 use crate::query::{
2302 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2303 processor::QueryLanguage, translators::cypher,
2304 };
2305 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2306
2307 let translation = cypher::translate_full(query)?;
2309 match translation {
2310 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2311 if *self.read_only_tx.lock() {
2312 return Err(GrafeoError::Query(QueryError::new(
2313 QueryErrorKind::Semantic,
2314 "Cannot execute schema DDL in a read-only transaction",
2315 )));
2316 }
2317 return self.execute_schema_command(cmd);
2318 }
2319 cypher::CypherTranslationResult::ShowIndexes => {
2320 return self.execute_show_indexes();
2321 }
2322 cypher::CypherTranslationResult::ShowConstraints => {
2323 return self.execute_show_constraints();
2324 }
2325 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2326 return self.execute_show_current_graph_type();
2327 }
2328 cypher::CypherTranslationResult::Plan(_) => {
2329 }
2331 }
2332
2333 #[cfg(not(target_arch = "wasm32"))]
2334 let start_time = std::time::Instant::now();
2335
2336 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2338
2339 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2341 cached_plan
2342 } else {
2343 let logical_plan = cypher::translate(query)?;
2345
2346 let mut binder = Binder::new();
2348 let _binding_context = binder.bind(&logical_plan)?;
2349
2350 let active = self.active_store();
2352 let optimizer = Optimizer::from_graph_store(&*active);
2353 let plan = optimizer.optimize(logical_plan)?;
2354
2355 self.query_cache.put_optimized(cache_key, plan.clone());
2357
2358 plan
2359 };
2360
2361 let active = self.active_store();
2363
2364 if optimized_plan.explain {
2366 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2367 let mut plan = optimized_plan;
2368 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2369 return Ok(explain_result(&plan));
2370 }
2371
2372 if optimized_plan.profile {
2374 let has_mutations = optimized_plan.root.has_mutations();
2375 return self.with_auto_commit(has_mutations, || {
2376 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2377 let planner = self.create_planner_for_store(
2378 Arc::clone(&active),
2379 viewing_epoch,
2380 transaction_id,
2381 );
2382 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2383
2384 let executor = Executor::with_columns(physical_plan.columns.clone())
2385 .with_deadline(self.query_deadline());
2386 let _result = executor.execute(physical_plan.operator.as_mut())?;
2387
2388 let total_time_ms;
2389 #[cfg(not(target_arch = "wasm32"))]
2390 {
2391 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2392 }
2393 #[cfg(target_arch = "wasm32")]
2394 {
2395 total_time_ms = 0.0;
2396 }
2397
2398 let profile_tree = crate::query::profile::build_profile_tree(
2399 &optimized_plan.root,
2400 &mut entries.into_iter(),
2401 );
2402 Ok(crate::query::profile::profile_result(
2403 &profile_tree,
2404 total_time_ms,
2405 ))
2406 });
2407 }
2408
2409 let has_mutations = optimized_plan.root.has_mutations();
2410
2411 self.with_auto_commit(has_mutations, || {
2412 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2414
2415 let planner =
2417 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2418 let mut physical_plan = planner.plan(&optimized_plan)?;
2419
2420 let executor = Executor::with_columns(physical_plan.columns.clone())
2422 .with_deadline(self.query_deadline());
2423 executor.execute(physical_plan.operator.as_mut())
2424 })
2425 }
2426
2427 #[cfg(feature = "gremlin")]
2451 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2452 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2453
2454 let logical_plan = gremlin::translate(query)?;
2456
2457 let mut binder = Binder::new();
2459 let _binding_context = binder.bind(&logical_plan)?;
2460
2461 let active = self.active_store();
2463 let optimizer = Optimizer::from_graph_store(&*active);
2464 let optimized_plan = optimizer.optimize(logical_plan)?;
2465
2466 let has_mutations = optimized_plan.root.has_mutations();
2467
2468 self.with_auto_commit(has_mutations, || {
2469 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2471
2472 let planner =
2474 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2475 let mut physical_plan = planner.plan(&optimized_plan)?;
2476
2477 let executor = Executor::with_columns(physical_plan.columns.clone())
2479 .with_deadline(self.query_deadline());
2480 executor.execute(physical_plan.operator.as_mut())
2481 })
2482 }
2483
2484 #[cfg(feature = "gremlin")]
2490 pub fn execute_gremlin_with_params(
2491 &self,
2492 query: &str,
2493 params: std::collections::HashMap<String, Value>,
2494 ) -> Result<QueryResult> {
2495 use crate::query::processor::{QueryLanguage, QueryProcessor};
2496
2497 let has_mutations = Self::query_looks_like_mutation(query);
2498 let active = self.active_store();
2499
2500 self.with_auto_commit(has_mutations, || {
2501 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2503
2504 let processor = QueryProcessor::for_graph_store_with_transaction(
2506 Arc::clone(&active),
2507 Arc::clone(&self.transaction_manager),
2508 )?;
2509
2510 let processor = if let Some(transaction_id) = transaction_id {
2512 processor.with_transaction_context(viewing_epoch, transaction_id)
2513 } else {
2514 processor
2515 };
2516
2517 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2518 })
2519 }
2520
2521 #[cfg(feature = "graphql")]
2545 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2546 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2547
2548 let logical_plan = graphql::translate(query)?;
2550
2551 let mut binder = Binder::new();
2553 let _binding_context = binder.bind(&logical_plan)?;
2554
2555 let active = self.active_store();
2557 let optimizer = Optimizer::from_graph_store(&*active);
2558 let optimized_plan = optimizer.optimize(logical_plan)?;
2559
2560 let has_mutations = optimized_plan.root.has_mutations();
2561
2562 self.with_auto_commit(has_mutations, || {
2563 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2565
2566 let planner =
2568 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2569 let mut physical_plan = planner.plan(&optimized_plan)?;
2570
2571 let executor = Executor::with_columns(physical_plan.columns.clone())
2573 .with_deadline(self.query_deadline());
2574 executor.execute(physical_plan.operator.as_mut())
2575 })
2576 }
2577
2578 #[cfg(feature = "graphql")]
2584 pub fn execute_graphql_with_params(
2585 &self,
2586 query: &str,
2587 params: std::collections::HashMap<String, Value>,
2588 ) -> Result<QueryResult> {
2589 use crate::query::processor::{QueryLanguage, QueryProcessor};
2590
2591 let has_mutations = Self::query_looks_like_mutation(query);
2592 let active = self.active_store();
2593
2594 self.with_auto_commit(has_mutations, || {
2595 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2597
2598 let processor = QueryProcessor::for_graph_store_with_transaction(
2600 Arc::clone(&active),
2601 Arc::clone(&self.transaction_manager),
2602 )?;
2603
2604 let processor = if let Some(transaction_id) = transaction_id {
2606 processor.with_transaction_context(viewing_epoch, transaction_id)
2607 } else {
2608 processor
2609 };
2610
2611 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2612 })
2613 }
2614
2615 #[cfg(all(feature = "graphql", feature = "rdf"))]
2621 pub fn execute_graphql_rdf(&self, query: &str) -> Result<QueryResult> {
2622 use crate::query::{
2623 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::graphql_rdf,
2624 };
2625
2626 let logical_plan = graphql_rdf::translate(query, "http://example.org/")?;
2627
2628 let active = self.active_store();
2629 let optimizer = Optimizer::from_graph_store(&*active);
2630 let optimized_plan = optimizer.optimize(logical_plan)?;
2631
2632 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2633 .with_transaction_id(*self.current_transaction.lock());
2634 #[cfg(feature = "wal")]
2635 let planner = planner.with_wal(self.wal.clone());
2636 let mut physical_plan = planner.plan(&optimized_plan)?;
2637
2638 let executor = Executor::with_columns(physical_plan.columns.clone())
2639 .with_deadline(self.query_deadline());
2640 executor.execute(physical_plan.operator.as_mut())
2641 }
2642
2643 #[cfg(all(feature = "graphql", feature = "rdf"))]
2649 pub fn execute_graphql_rdf_with_params(
2650 &self,
2651 query: &str,
2652 params: std::collections::HashMap<String, Value>,
2653 ) -> Result<QueryResult> {
2654 use crate::query::processor::{QueryLanguage, QueryProcessor};
2655
2656 let has_mutations = Self::query_looks_like_mutation(query);
2657 let active = self.active_store();
2658
2659 self.with_auto_commit(has_mutations, || {
2660 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2661
2662 let processor = QueryProcessor::for_graph_store_with_transaction(
2663 Arc::clone(&active),
2664 Arc::clone(&self.transaction_manager),
2665 )?;
2666
2667 let processor = if let Some(transaction_id) = transaction_id {
2668 processor.with_transaction_context(viewing_epoch, transaction_id)
2669 } else {
2670 processor
2671 };
2672
2673 processor.process(query, QueryLanguage::GraphQLRdf, Some(¶ms))
2674 })
2675 }
2676
2677 #[cfg(feature = "sql-pgq")]
2702 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2703 use crate::query::{
2704 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2705 processor::QueryLanguage, translators::sql_pgq,
2706 };
2707
2708 let logical_plan = sql_pgq::translate(query)?;
2710
2711 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2713 return Ok(QueryResult {
2714 columns: vec!["status".into()],
2715 column_types: vec![grafeo_common::types::LogicalType::String],
2716 rows: vec![vec![Value::from(format!(
2717 "Property graph '{}' created ({} node tables, {} edge tables)",
2718 cpg.name,
2719 cpg.node_tables.len(),
2720 cpg.edge_tables.len()
2721 ))]],
2722 execution_time_ms: None,
2723 rows_scanned: None,
2724 status_message: None,
2725 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2726 });
2727 }
2728
2729 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2731
2732 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2734 cached_plan
2735 } else {
2736 let mut binder = Binder::new();
2738 let _binding_context = binder.bind(&logical_plan)?;
2739
2740 let active = self.active_store();
2742 let optimizer = Optimizer::from_graph_store(&*active);
2743 let plan = optimizer.optimize(logical_plan)?;
2744
2745 self.query_cache.put_optimized(cache_key, plan.clone());
2747
2748 plan
2749 };
2750
2751 let active = self.active_store();
2752 let has_mutations = optimized_plan.root.has_mutations();
2753
2754 self.with_auto_commit(has_mutations, || {
2755 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2757
2758 let planner =
2760 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2761 let mut physical_plan = planner.plan(&optimized_plan)?;
2762
2763 let executor = Executor::with_columns(physical_plan.columns.clone())
2765 .with_deadline(self.query_deadline());
2766 executor.execute(physical_plan.operator.as_mut())
2767 })
2768 }
2769
2770 #[cfg(feature = "sql-pgq")]
2776 pub fn execute_sql_with_params(
2777 &self,
2778 query: &str,
2779 params: std::collections::HashMap<String, Value>,
2780 ) -> Result<QueryResult> {
2781 use crate::query::processor::{QueryLanguage, QueryProcessor};
2782
2783 let has_mutations = Self::query_looks_like_mutation(query);
2784 let active = self.active_store();
2785
2786 self.with_auto_commit(has_mutations, || {
2787 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2789
2790 let processor = QueryProcessor::for_graph_store_with_transaction(
2792 Arc::clone(&active),
2793 Arc::clone(&self.transaction_manager),
2794 )?;
2795
2796 let processor = if let Some(transaction_id) = transaction_id {
2798 processor.with_transaction_context(viewing_epoch, transaction_id)
2799 } else {
2800 processor
2801 };
2802
2803 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
2804 })
2805 }
2806
2807 #[cfg(all(feature = "sparql", feature = "rdf"))]
2813 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
2814 use crate::query::{
2815 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
2816 };
2817
2818 let logical_plan = sparql::translate(query)?;
2820
2821 let active = self.active_store();
2823 let optimizer = Optimizer::from_graph_store(&*active);
2824 let optimized_plan = optimizer.optimize(logical_plan)?;
2825
2826 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2828 .with_transaction_id(*self.current_transaction.lock());
2829 #[cfg(feature = "wal")]
2830 let planner = planner.with_wal(self.wal.clone());
2831 let mut physical_plan = planner.plan(&optimized_plan)?;
2832
2833 let executor = Executor::with_columns(physical_plan.columns.clone())
2835 .with_deadline(self.query_deadline());
2836 executor.execute(physical_plan.operator.as_mut())
2837 }
2838
2839 #[cfg(all(feature = "sparql", feature = "rdf"))]
2845 pub fn execute_sparql_with_params(
2846 &self,
2847 query: &str,
2848 params: std::collections::HashMap<String, Value>,
2849 ) -> Result<QueryResult> {
2850 use crate::query::{
2851 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, processor::substitute_params,
2852 translators::sparql,
2853 };
2854
2855 let mut logical_plan = sparql::translate(query)?;
2856
2857 substitute_params(&mut logical_plan, ¶ms)?;
2858
2859 let active = self.active_store();
2860 let optimizer = Optimizer::from_graph_store(&*active);
2861 let optimized_plan = optimizer.optimize(logical_plan)?;
2862
2863 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2864 .with_transaction_id(*self.current_transaction.lock());
2865 #[cfg(feature = "wal")]
2866 let planner = planner.with_wal(self.wal.clone());
2867 let mut physical_plan = planner.plan(&optimized_plan)?;
2868
2869 let executor = Executor::with_columns(physical_plan.columns.clone())
2870 .with_deadline(self.query_deadline());
2871 executor.execute(physical_plan.operator.as_mut())
2872 }
2873
2874 pub fn execute_language(
2883 &self,
2884 query: &str,
2885 language: &str,
2886 params: Option<std::collections::HashMap<String, Value>>,
2887 ) -> Result<QueryResult> {
2888 match language {
2889 "gql" => {
2890 if let Some(p) = params {
2891 self.execute_with_params(query, p)
2892 } else {
2893 self.execute(query)
2894 }
2895 }
2896 #[cfg(feature = "cypher")]
2897 "cypher" => {
2898 if let Some(p) = params {
2899 use crate::query::processor::{QueryLanguage, QueryProcessor};
2900 let has_mutations = Self::query_looks_like_mutation(query);
2901 let active = self.active_store();
2902 self.with_auto_commit(has_mutations, || {
2903 let processor = QueryProcessor::for_graph_store_with_transaction(
2904 Arc::clone(&active),
2905 Arc::clone(&self.transaction_manager),
2906 )?;
2907 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2908 let processor = if let Some(transaction_id) = transaction_id {
2909 processor.with_transaction_context(viewing_epoch, transaction_id)
2910 } else {
2911 processor
2912 };
2913 processor.process(query, QueryLanguage::Cypher, Some(&p))
2914 })
2915 } else {
2916 self.execute_cypher(query)
2917 }
2918 }
2919 #[cfg(feature = "gremlin")]
2920 "gremlin" => {
2921 if let Some(p) = params {
2922 self.execute_gremlin_with_params(query, p)
2923 } else {
2924 self.execute_gremlin(query)
2925 }
2926 }
2927 #[cfg(feature = "graphql")]
2928 "graphql" => {
2929 if let Some(p) = params {
2930 self.execute_graphql_with_params(query, p)
2931 } else {
2932 self.execute_graphql(query)
2933 }
2934 }
2935 #[cfg(all(feature = "graphql", feature = "rdf"))]
2936 "graphql-rdf" => {
2937 if let Some(p) = params {
2938 self.execute_graphql_rdf_with_params(query, p)
2939 } else {
2940 self.execute_graphql_rdf(query)
2941 }
2942 }
2943 #[cfg(feature = "sql-pgq")]
2944 "sql" | "sql-pgq" => {
2945 if let Some(p) = params {
2946 self.execute_sql_with_params(query, p)
2947 } else {
2948 self.execute_sql(query)
2949 }
2950 }
2951 #[cfg(all(feature = "sparql", feature = "rdf"))]
2952 "sparql" => {
2953 if let Some(p) = params {
2954 self.execute_sparql_with_params(query, p)
2955 } else {
2956 self.execute_sparql(query)
2957 }
2958 }
2959 other => Err(grafeo_common::utils::error::Error::Query(
2960 grafeo_common::utils::error::QueryError::new(
2961 grafeo_common::utils::error::QueryErrorKind::Semantic,
2962 format!("Unknown query language: '{other}'"),
2963 ),
2964 )),
2965 }
2966 }
2967
2968 pub fn clear_plan_cache(&self) {
2995 self.query_cache.clear();
2996 }
2997
2998 pub fn begin_transaction(&mut self) -> Result<()> {
3006 self.begin_transaction_inner(false, None)
3007 }
3008
3009 pub fn begin_transaction_with_isolation(
3017 &mut self,
3018 isolation_level: crate::transaction::IsolationLevel,
3019 ) -> Result<()> {
3020 self.begin_transaction_inner(false, Some(isolation_level))
3021 }
3022
3023 fn begin_transaction_inner(
3025 &self,
3026 read_only: bool,
3027 isolation_level: Option<crate::transaction::IsolationLevel>,
3028 ) -> Result<()> {
3029 let mut current = self.current_transaction.lock();
3030 if current.is_some() {
3031 drop(current);
3033 let mut depth = self.transaction_nesting_depth.lock();
3034 *depth += 1;
3035 let sp_name = format!("_nested_tx_{}", *depth);
3036 self.savepoint(&sp_name)?;
3037 return Ok(());
3038 }
3039
3040 let active = self.active_lpg_store();
3041 self.transaction_start_node_count
3042 .store(active.node_count(), Ordering::Relaxed);
3043 self.transaction_start_edge_count
3044 .store(active.edge_count(), Ordering::Relaxed);
3045 let transaction_id = if let Some(level) = isolation_level {
3046 self.transaction_manager.begin_with_isolation(level)
3047 } else {
3048 self.transaction_manager.begin()
3049 };
3050 *current = Some(transaction_id);
3051 *self.read_only_tx.lock() = read_only;
3052
3053 let key = self.active_graph_storage_key();
3056 let mut touched = self.touched_graphs.lock();
3057 touched.clear();
3058 touched.push(key);
3059
3060 Ok(())
3061 }
3062
3063 pub fn commit(&mut self) -> Result<()> {
3071 self.commit_inner()
3072 }
3073
3074 fn commit_inner(&self) -> Result<()> {
3076 {
3078 let mut depth = self.transaction_nesting_depth.lock();
3079 if *depth > 0 {
3080 let sp_name = format!("_nested_tx_{depth}");
3081 *depth -= 1;
3082 drop(depth);
3083 return self.release_savepoint(&sp_name);
3084 }
3085 }
3086
3087 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3088 grafeo_common::utils::error::Error::Transaction(
3089 grafeo_common::utils::error::TransactionError::InvalidState(
3090 "No active transaction".to_string(),
3091 ),
3092 )
3093 })?;
3094
3095 let touched = self.touched_graphs.lock().clone();
3098 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3099 Ok(epoch) => epoch,
3100 Err(e) => {
3101 for graph_name in &touched {
3103 let store = self.resolve_store(graph_name);
3104 store.rollback_transaction_properties(transaction_id);
3105 }
3106 #[cfg(feature = "rdf")]
3107 self.rdf_store.rollback_transaction(transaction_id);
3108 *self.read_only_tx.lock() = false;
3109 self.savepoints.lock().clear();
3110 self.touched_graphs.lock().clear();
3111 return Err(e);
3112 }
3113 };
3114
3115 for graph_name in &touched {
3117 let store = self.resolve_store(graph_name);
3118 store.finalize_version_epochs(transaction_id, commit_epoch);
3119 }
3120
3121 #[cfg(feature = "rdf")]
3123 self.rdf_store.commit_transaction(transaction_id);
3124
3125 for graph_name in &touched {
3126 let store = self.resolve_store(graph_name);
3127 store.commit_transaction_properties(transaction_id);
3128 }
3129
3130 let current_epoch = self.transaction_manager.current_epoch();
3133 for graph_name in &touched {
3134 let store = self.resolve_store(graph_name);
3135 store.sync_epoch(current_epoch);
3136 }
3137
3138 *self.read_only_tx.lock() = false;
3140 self.savepoints.lock().clear();
3141 self.touched_graphs.lock().clear();
3142
3143 if self.gc_interval > 0 {
3145 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3146 if count.is_multiple_of(self.gc_interval) {
3147 let min_epoch = self.transaction_manager.min_active_epoch();
3148 for graph_name in &touched {
3149 let store = self.resolve_store(graph_name);
3150 store.gc_versions(min_epoch);
3151 }
3152 self.transaction_manager.gc();
3153 }
3154 }
3155
3156 Ok(())
3157 }
3158
3159 pub fn rollback(&mut self) -> Result<()> {
3183 self.rollback_inner()
3184 }
3185
3186 fn rollback_inner(&self) -> Result<()> {
3188 {
3190 let mut depth = self.transaction_nesting_depth.lock();
3191 if *depth > 0 {
3192 let sp_name = format!("_nested_tx_{depth}");
3193 *depth -= 1;
3194 drop(depth);
3195 return self.rollback_to_savepoint(&sp_name);
3196 }
3197 }
3198
3199 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3200 grafeo_common::utils::error::Error::Transaction(
3201 grafeo_common::utils::error::TransactionError::InvalidState(
3202 "No active transaction".to_string(),
3203 ),
3204 )
3205 })?;
3206
3207 *self.read_only_tx.lock() = false;
3209
3210 let touched = self.touched_graphs.lock().clone();
3212 for graph_name in &touched {
3213 let store = self.resolve_store(graph_name);
3214 store.discard_uncommitted_versions(transaction_id);
3215 }
3216
3217 #[cfg(feature = "rdf")]
3219 self.rdf_store.rollback_transaction(transaction_id);
3220
3221 self.savepoints.lock().clear();
3223 self.touched_graphs.lock().clear();
3224
3225 self.transaction_manager.abort(transaction_id)
3227 }
3228
3229 pub fn savepoint(&self, name: &str) -> Result<()> {
3239 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3240 grafeo_common::utils::error::Error::Transaction(
3241 grafeo_common::utils::error::TransactionError::InvalidState(
3242 "No active transaction".to_string(),
3243 ),
3244 )
3245 })?;
3246
3247 let touched = self.touched_graphs.lock().clone();
3249 let graph_snapshots: Vec<GraphSavepoint> = touched
3250 .iter()
3251 .map(|graph_name| {
3252 let store = self.resolve_store(graph_name);
3253 GraphSavepoint {
3254 graph_name: graph_name.clone(),
3255 next_node_id: store.peek_next_node_id(),
3256 next_edge_id: store.peek_next_edge_id(),
3257 undo_log_position: store.property_undo_log_position(tx_id),
3258 }
3259 })
3260 .collect();
3261
3262 self.savepoints.lock().push(SavepointState {
3263 name: name.to_string(),
3264 graph_snapshots,
3265 active_graph: self.current_graph.lock().clone(),
3266 });
3267 Ok(())
3268 }
3269
3270 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3279 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3280 grafeo_common::utils::error::Error::Transaction(
3281 grafeo_common::utils::error::TransactionError::InvalidState(
3282 "No active transaction".to_string(),
3283 ),
3284 )
3285 })?;
3286
3287 let mut savepoints = self.savepoints.lock();
3288
3289 let pos = savepoints
3291 .iter()
3292 .rposition(|sp| sp.name == name)
3293 .ok_or_else(|| {
3294 grafeo_common::utils::error::Error::Transaction(
3295 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3296 "Savepoint '{name}' not found"
3297 )),
3298 )
3299 })?;
3300
3301 let sp_state = savepoints[pos].clone();
3302
3303 savepoints.truncate(pos);
3305 drop(savepoints);
3306
3307 for gs in &sp_state.graph_snapshots {
3309 let store = self.resolve_store(&gs.graph_name);
3310
3311 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3313
3314 let current_next_node = store.peek_next_node_id();
3316 let current_next_edge = store.peek_next_edge_id();
3317
3318 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3319 .map(NodeId::new)
3320 .collect();
3321 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3322 .map(EdgeId::new)
3323 .collect();
3324
3325 if !node_ids.is_empty() || !edge_ids.is_empty() {
3326 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3327 }
3328 }
3329
3330 let touched = self.touched_graphs.lock().clone();
3334 for graph_name in &touched {
3335 let already_captured = sp_state
3336 .graph_snapshots
3337 .iter()
3338 .any(|gs| gs.graph_name == *graph_name);
3339 if !already_captured {
3340 let store = self.resolve_store(graph_name);
3341 store.discard_uncommitted_versions(transaction_id);
3342 }
3343 }
3344
3345 let mut touched = self.touched_graphs.lock();
3347 touched.clear();
3348 for gs in &sp_state.graph_snapshots {
3349 if !touched.contains(&gs.graph_name) {
3350 touched.push(gs.graph_name.clone());
3351 }
3352 }
3353
3354 Ok(())
3355 }
3356
3357 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3363 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3364 grafeo_common::utils::error::Error::Transaction(
3365 grafeo_common::utils::error::TransactionError::InvalidState(
3366 "No active transaction".to_string(),
3367 ),
3368 )
3369 })?;
3370
3371 let mut savepoints = self.savepoints.lock();
3372 let pos = savepoints
3373 .iter()
3374 .rposition(|sp| sp.name == name)
3375 .ok_or_else(|| {
3376 grafeo_common::utils::error::Error::Transaction(
3377 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3378 "Savepoint '{name}' not found"
3379 )),
3380 )
3381 })?;
3382 savepoints.remove(pos);
3383 Ok(())
3384 }
3385
3386 #[must_use]
3388 pub fn in_transaction(&self) -> bool {
3389 self.current_transaction.lock().is_some()
3390 }
3391
3392 #[must_use]
3394 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3395 *self.current_transaction.lock()
3396 }
3397
3398 #[must_use]
3400 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3401 &self.transaction_manager
3402 }
3403
3404 #[must_use]
3406 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3407 (
3408 self.transaction_start_node_count.load(Ordering::Relaxed),
3409 self.active_lpg_store().node_count(),
3410 )
3411 }
3412
3413 #[must_use]
3415 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3416 (
3417 self.transaction_start_edge_count.load(Ordering::Relaxed),
3418 self.active_lpg_store().edge_count(),
3419 )
3420 }
3421
3422 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3456 crate::transaction::PreparedCommit::new(self)
3457 }
3458
3459 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3461 self.auto_commit = auto_commit;
3462 }
3463
3464 #[must_use]
3466 pub fn auto_commit(&self) -> bool {
3467 self.auto_commit
3468 }
3469
3470 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3475 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3476 }
3477
3478 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3481 where
3482 F: FnOnce() -> Result<QueryResult>,
3483 {
3484 if self.needs_auto_commit(has_mutations) {
3485 self.begin_transaction_inner(false, None)?;
3486 match body() {
3487 Ok(result) => {
3488 self.commit_inner()?;
3489 Ok(result)
3490 }
3491 Err(e) => {
3492 let _ = self.rollback_inner();
3493 Err(e)
3494 }
3495 }
3496 } else {
3497 body()
3498 }
3499 }
3500
3501 fn query_looks_like_mutation(query: &str) -> bool {
3507 let upper = query.to_ascii_uppercase();
3508 upper.contains("INSERT")
3509 || upper.contains("CREATE")
3510 || upper.contains("DELETE")
3511 || upper.contains("MERGE")
3512 || upper.contains("SET")
3513 || upper.contains("REMOVE")
3514 || upper.contains("DROP")
3515 || upper.contains("ALTER")
3516 }
3517
3518 #[must_use]
3520 fn query_deadline(&self) -> Option<Instant> {
3521 #[cfg(not(target_arch = "wasm32"))]
3522 {
3523 self.query_timeout.map(|d| Instant::now() + d)
3524 }
3525 #[cfg(target_arch = "wasm32")]
3526 {
3527 let _ = &self.query_timeout;
3528 None
3529 }
3530 }
3531
3532 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3534 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3535 match expr {
3536 Expression::Literal(Literal::Integer(n)) => Some(*n),
3537 _ => None,
3538 }
3539 }
3540
3541 #[must_use]
3547 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3548 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3550 return (epoch, None);
3551 }
3552
3553 if let Some(transaction_id) = *self.current_transaction.lock() {
3554 let epoch = self
3556 .transaction_manager
3557 .start_epoch(transaction_id)
3558 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3559 (epoch, Some(transaction_id))
3560 } else {
3561 (self.transaction_manager.current_epoch(), None)
3563 }
3564 }
3565
3566 fn create_planner_for_store(
3571 &self,
3572 store: Arc<dyn GraphStoreMut>,
3573 viewing_epoch: EpochId,
3574 transaction_id: Option<TransactionId>,
3575 ) -> crate::query::Planner {
3576 use crate::query::Planner;
3577 use grafeo_core::execution::operators::{LazyValue, SessionContext};
3578
3579 let info_store = Arc::clone(&store);
3581 let schema_store = Arc::clone(&store);
3582
3583 let session_context = SessionContext {
3584 current_schema: self.current_schema(),
3585 current_graph: self.current_graph(),
3586 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3587 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3588 };
3589
3590 let mut planner = Planner::with_context(
3591 Arc::clone(&store),
3592 Arc::clone(&self.transaction_manager),
3593 transaction_id,
3594 viewing_epoch,
3595 )
3596 .with_factorized_execution(self.factorized_execution)
3597 .with_catalog(Arc::clone(&self.catalog))
3598 .with_session_context(session_context);
3599
3600 let validator =
3602 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3603 planner = planner.with_validator(Arc::new(validator));
3604
3605 planner
3606 }
3607
3608 fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3610 use grafeo_common::types::PropertyKey;
3611 use std::collections::BTreeMap;
3612
3613 let mut map = BTreeMap::new();
3614 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3615 map.insert(
3616 PropertyKey::from("node_count"),
3617 Value::Int64(store.node_count() as i64),
3618 );
3619 map.insert(
3620 PropertyKey::from("edge_count"),
3621 Value::Int64(store.edge_count() as i64),
3622 );
3623 map.insert(
3624 PropertyKey::from("version"),
3625 Value::String(env!("CARGO_PKG_VERSION").into()),
3626 );
3627 Value::Map(map.into())
3628 }
3629
3630 fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3632 use grafeo_common::types::PropertyKey;
3633 use std::collections::BTreeMap;
3634
3635 let labels: Vec<Value> = store
3636 .all_labels()
3637 .into_iter()
3638 .map(|l| Value::String(l.into()))
3639 .collect();
3640 let edge_types: Vec<Value> = store
3641 .all_edge_types()
3642 .into_iter()
3643 .map(|t| Value::String(t.into()))
3644 .collect();
3645 let property_keys: Vec<Value> = store
3646 .all_property_keys()
3647 .into_iter()
3648 .map(|k| Value::String(k.into()))
3649 .collect();
3650
3651 let mut map = BTreeMap::new();
3652 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3653 map.insert(
3654 PropertyKey::from("edge_types"),
3655 Value::List(edge_types.into()),
3656 );
3657 map.insert(
3658 PropertyKey::from("property_keys"),
3659 Value::List(property_keys.into()),
3660 );
3661 Value::Map(map.into())
3662 }
3663
3664 pub fn create_node(&self, labels: &[&str]) -> NodeId {
3669 let (epoch, transaction_id) = self.get_transaction_context();
3670 self.active_lpg_store().create_node_versioned(
3671 labels,
3672 epoch,
3673 transaction_id.unwrap_or(TransactionId::SYSTEM),
3674 )
3675 }
3676
3677 pub fn create_node_with_props<'a>(
3681 &self,
3682 labels: &[&str],
3683 properties: impl IntoIterator<Item = (&'a str, Value)>,
3684 ) -> NodeId {
3685 let (epoch, transaction_id) = self.get_transaction_context();
3686 self.active_lpg_store().create_node_with_props_versioned(
3687 labels,
3688 properties,
3689 epoch,
3690 transaction_id.unwrap_or(TransactionId::SYSTEM),
3691 )
3692 }
3693
3694 pub fn create_edge(
3699 &self,
3700 src: NodeId,
3701 dst: NodeId,
3702 edge_type: &str,
3703 ) -> grafeo_common::types::EdgeId {
3704 let (epoch, transaction_id) = self.get_transaction_context();
3705 self.active_lpg_store().create_edge_versioned(
3706 src,
3707 dst,
3708 edge_type,
3709 epoch,
3710 transaction_id.unwrap_or(TransactionId::SYSTEM),
3711 )
3712 }
3713
3714 #[must_use]
3742 pub fn get_node(&self, id: NodeId) -> Option<Node> {
3743 let (epoch, transaction_id) = self.get_transaction_context();
3744 self.active_lpg_store().get_node_versioned(
3745 id,
3746 epoch,
3747 transaction_id.unwrap_or(TransactionId::SYSTEM),
3748 )
3749 }
3750
3751 #[must_use]
3775 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3776 self.get_node(id)
3777 .and_then(|node| node.get_property(key).cloned())
3778 }
3779
3780 #[must_use]
3787 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3788 let (epoch, transaction_id) = self.get_transaction_context();
3789 self.active_lpg_store().get_edge_versioned(
3790 id,
3791 epoch,
3792 transaction_id.unwrap_or(TransactionId::SYSTEM),
3793 )
3794 }
3795
3796 #[must_use]
3822 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3823 self.active_lpg_store()
3824 .edges_from(node, Direction::Outgoing)
3825 .collect()
3826 }
3827
3828 #[must_use]
3837 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3838 self.active_lpg_store()
3839 .edges_from(node, Direction::Incoming)
3840 .collect()
3841 }
3842
3843 #[must_use]
3855 pub fn get_neighbors_outgoing_by_type(
3856 &self,
3857 node: NodeId,
3858 edge_type: &str,
3859 ) -> Vec<(NodeId, EdgeId)> {
3860 self.active_lpg_store()
3861 .edges_from(node, Direction::Outgoing)
3862 .filter(|(_, edge_id)| {
3863 self.get_edge(*edge_id)
3864 .is_some_and(|e| e.edge_type.as_str() == edge_type)
3865 })
3866 .collect()
3867 }
3868
3869 #[must_use]
3876 pub fn node_exists(&self, id: NodeId) -> bool {
3877 self.get_node(id).is_some()
3878 }
3879
3880 #[must_use]
3882 pub fn edge_exists(&self, id: EdgeId) -> bool {
3883 self.get_edge(id).is_some()
3884 }
3885
3886 #[must_use]
3890 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3891 let active = self.active_lpg_store();
3892 let out = active.out_degree(node);
3893 let in_degree = active.in_degree(node);
3894 (out, in_degree)
3895 }
3896
3897 #[must_use]
3907 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3908 let (epoch, transaction_id) = self.get_transaction_context();
3909 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3910 let active = self.active_lpg_store();
3911 ids.iter()
3912 .map(|&id| active.get_node_versioned(id, epoch, tx))
3913 .collect()
3914 }
3915
3916 #[cfg(feature = "cdc")]
3920 pub fn history(
3921 &self,
3922 entity_id: impl Into<crate::cdc::EntityId>,
3923 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3924 Ok(self.cdc_log.history(entity_id.into()))
3925 }
3926
3927 #[cfg(feature = "cdc")]
3929 pub fn history_since(
3930 &self,
3931 entity_id: impl Into<crate::cdc::EntityId>,
3932 since_epoch: EpochId,
3933 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3934 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3935 }
3936
3937 #[cfg(feature = "cdc")]
3939 pub fn changes_between(
3940 &self,
3941 start_epoch: EpochId,
3942 end_epoch: EpochId,
3943 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3944 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3945 }
3946}
3947
3948impl Drop for Session {
3949 fn drop(&mut self) {
3950 if self.in_transaction() {
3953 let _ = self.rollback_inner();
3954 }
3955 }
3956}
3957
3958#[cfg(test)]
3959mod tests {
3960 use crate::database::GrafeoDB;
3961
3962 #[test]
3963 fn test_session_create_node() {
3964 let db = GrafeoDB::new_in_memory();
3965 let session = db.session();
3966
3967 let id = session.create_node(&["Person"]);
3968 assert!(id.is_valid());
3969 assert_eq!(db.node_count(), 1);
3970 }
3971
3972 #[test]
3973 fn test_session_transaction() {
3974 let db = GrafeoDB::new_in_memory();
3975 let mut session = db.session();
3976
3977 assert!(!session.in_transaction());
3978
3979 session.begin_transaction().unwrap();
3980 assert!(session.in_transaction());
3981
3982 session.commit().unwrap();
3983 assert!(!session.in_transaction());
3984 }
3985
3986 #[test]
3987 fn test_session_transaction_context() {
3988 let db = GrafeoDB::new_in_memory();
3989 let mut session = db.session();
3990
3991 let (_epoch1, transaction_id1) = session.get_transaction_context();
3993 assert!(transaction_id1.is_none());
3994
3995 session.begin_transaction().unwrap();
3997 let (epoch2, transaction_id2) = session.get_transaction_context();
3998 assert!(transaction_id2.is_some());
3999 let _ = epoch2; session.commit().unwrap();
4004 let (epoch3, tx_id3) = session.get_transaction_context();
4005 assert!(tx_id3.is_none());
4006 assert!(epoch3.as_u64() >= epoch2.as_u64());
4008 }
4009
4010 #[test]
4011 fn test_session_rollback() {
4012 let db = GrafeoDB::new_in_memory();
4013 let mut session = db.session();
4014
4015 session.begin_transaction().unwrap();
4016 session.rollback().unwrap();
4017 assert!(!session.in_transaction());
4018 }
4019
4020 #[test]
4021 fn test_session_rollback_discards_versions() {
4022 use grafeo_common::types::TransactionId;
4023
4024 let db = GrafeoDB::new_in_memory();
4025
4026 let node_before = db.store().create_node(&["Person"]);
4028 assert!(node_before.is_valid());
4029 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4030
4031 let mut session = db.session();
4033 session.begin_transaction().unwrap();
4034 let transaction_id = session.current_transaction.lock().unwrap();
4035
4036 let epoch = db.store().current_epoch();
4038 let node_in_tx = db
4039 .store()
4040 .create_node_versioned(&["Person"], epoch, transaction_id);
4041 assert!(node_in_tx.is_valid());
4042
4043 assert_eq!(
4047 db.node_count(),
4048 1,
4049 "PENDING nodes should be invisible to non-versioned node_count()"
4050 );
4051 assert!(
4052 db.store()
4053 .get_node_versioned(node_in_tx, epoch, transaction_id)
4054 .is_some(),
4055 "Transaction node should be visible to its own transaction"
4056 );
4057
4058 session.rollback().unwrap();
4060 assert!(!session.in_transaction());
4061
4062 let count_after = db.node_count();
4065 assert_eq!(
4066 count_after, 1,
4067 "Rollback should discard uncommitted node, but got {count_after}"
4068 );
4069
4070 let current_epoch = db.store().current_epoch();
4072 assert!(
4073 db.store()
4074 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4075 .is_some(),
4076 "Original node should still exist"
4077 );
4078
4079 assert!(
4081 db.store()
4082 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4083 .is_none(),
4084 "Transaction node should be gone"
4085 );
4086 }
4087
4088 #[test]
4089 fn test_session_create_node_in_transaction() {
4090 let db = GrafeoDB::new_in_memory();
4092
4093 let node_before = db.create_node(&["Person"]);
4095 assert!(node_before.is_valid());
4096 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4097
4098 let mut session = db.session();
4100 session.begin_transaction().unwrap();
4101 let transaction_id = session.current_transaction.lock().unwrap();
4102
4103 let node_in_tx = session.create_node(&["Person"]);
4105 assert!(node_in_tx.is_valid());
4106
4107 assert_eq!(
4110 db.node_count(),
4111 1,
4112 "PENDING nodes should be invisible to non-versioned node_count()"
4113 );
4114 let epoch = db.store().current_epoch();
4115 assert!(
4116 db.store()
4117 .get_node_versioned(node_in_tx, epoch, transaction_id)
4118 .is_some(),
4119 "Transaction node should be visible to its own transaction"
4120 );
4121
4122 session.rollback().unwrap();
4124
4125 let count_after = db.node_count();
4127 assert_eq!(
4128 count_after, 1,
4129 "Rollback should discard node created via session.create_node(), but got {count_after}"
4130 );
4131 }
4132
4133 #[test]
4134 fn test_session_create_node_with_props_in_transaction() {
4135 use grafeo_common::types::Value;
4136
4137 let db = GrafeoDB::new_in_memory();
4139
4140 db.create_node(&["Person"]);
4142 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4143
4144 let mut session = db.session();
4146 session.begin_transaction().unwrap();
4147 let transaction_id = session.current_transaction.lock().unwrap();
4148
4149 let node_in_tx =
4150 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4151 assert!(node_in_tx.is_valid());
4152
4153 assert_eq!(
4156 db.node_count(),
4157 1,
4158 "PENDING nodes should be invisible to non-versioned node_count()"
4159 );
4160 let epoch = db.store().current_epoch();
4161 assert!(
4162 db.store()
4163 .get_node_versioned(node_in_tx, epoch, transaction_id)
4164 .is_some(),
4165 "Transaction node should be visible to its own transaction"
4166 );
4167
4168 session.rollback().unwrap();
4170
4171 let count_after = db.node_count();
4173 assert_eq!(
4174 count_after, 1,
4175 "Rollback should discard node created via session.create_node_with_props()"
4176 );
4177 }
4178
4179 #[cfg(feature = "gql")]
4180 mod gql_tests {
4181 use super::*;
4182
4183 #[test]
4184 fn test_gql_query_execution() {
4185 let db = GrafeoDB::new_in_memory();
4186 let session = db.session();
4187
4188 session.create_node(&["Person"]);
4190 session.create_node(&["Person"]);
4191 session.create_node(&["Animal"]);
4192
4193 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4195
4196 assert_eq!(result.row_count(), 2);
4198 assert_eq!(result.column_count(), 1);
4199 assert_eq!(result.columns[0], "n");
4200 }
4201
4202 #[test]
4203 fn test_gql_empty_result() {
4204 let db = GrafeoDB::new_in_memory();
4205 let session = db.session();
4206
4207 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4209
4210 assert_eq!(result.row_count(), 0);
4211 }
4212
4213 #[test]
4214 fn test_gql_parse_error() {
4215 let db = GrafeoDB::new_in_memory();
4216 let session = db.session();
4217
4218 let result = session.execute("MATCH (n RETURN n");
4220
4221 assert!(result.is_err());
4222 }
4223
4224 #[test]
4225 fn test_gql_relationship_traversal() {
4226 let db = GrafeoDB::new_in_memory();
4227 let session = db.session();
4228
4229 let alix = session.create_node(&["Person"]);
4231 let gus = session.create_node(&["Person"]);
4232 let vincent = session.create_node(&["Person"]);
4233
4234 session.create_edge(alix, gus, "KNOWS");
4235 session.create_edge(alix, vincent, "KNOWS");
4236
4237 let result = session
4239 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4240 .unwrap();
4241
4242 assert_eq!(result.row_count(), 2);
4244 assert_eq!(result.column_count(), 2);
4245 assert_eq!(result.columns[0], "a");
4246 assert_eq!(result.columns[1], "b");
4247 }
4248
4249 #[test]
4250 fn test_gql_relationship_with_type_filter() {
4251 let db = GrafeoDB::new_in_memory();
4252 let session = db.session();
4253
4254 let alix = session.create_node(&["Person"]);
4256 let gus = session.create_node(&["Person"]);
4257 let vincent = session.create_node(&["Person"]);
4258
4259 session.create_edge(alix, gus, "KNOWS");
4260 session.create_edge(alix, vincent, "WORKS_WITH");
4261
4262 let result = session
4264 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4265 .unwrap();
4266
4267 assert_eq!(result.row_count(), 1);
4269 }
4270
4271 #[test]
4272 fn test_gql_semantic_error_undefined_variable() {
4273 let db = GrafeoDB::new_in_memory();
4274 let session = db.session();
4275
4276 let result = session.execute("MATCH (n:Person) RETURN x");
4278
4279 assert!(result.is_err());
4281 let Err(err) = result else {
4282 panic!("Expected error")
4283 };
4284 assert!(
4285 err.to_string().contains("Undefined variable"),
4286 "Expected undefined variable error, got: {}",
4287 err
4288 );
4289 }
4290
4291 #[test]
4292 fn test_gql_where_clause_property_filter() {
4293 use grafeo_common::types::Value;
4294
4295 let db = GrafeoDB::new_in_memory();
4296 let session = db.session();
4297
4298 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4300 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4301 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4302
4303 let result = session
4305 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4306 .unwrap();
4307
4308 assert_eq!(result.row_count(), 2);
4310 }
4311
4312 #[test]
4313 fn test_gql_where_clause_equality() {
4314 use grafeo_common::types::Value;
4315
4316 let db = GrafeoDB::new_in_memory();
4317 let session = db.session();
4318
4319 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4321 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4322 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4323
4324 let result = session
4326 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4327 .unwrap();
4328
4329 assert_eq!(result.row_count(), 2);
4331 }
4332
4333 #[test]
4334 fn test_gql_return_property_access() {
4335 use grafeo_common::types::Value;
4336
4337 let db = GrafeoDB::new_in_memory();
4338 let session = db.session();
4339
4340 session.create_node_with_props(
4342 &["Person"],
4343 [
4344 ("name", Value::String("Alix".into())),
4345 ("age", Value::Int64(30)),
4346 ],
4347 );
4348 session.create_node_with_props(
4349 &["Person"],
4350 [
4351 ("name", Value::String("Gus".into())),
4352 ("age", Value::Int64(25)),
4353 ],
4354 );
4355
4356 let result = session
4358 .execute("MATCH (n:Person) RETURN n.name, n.age")
4359 .unwrap();
4360
4361 assert_eq!(result.row_count(), 2);
4363 assert_eq!(result.column_count(), 2);
4364 assert_eq!(result.columns[0], "n.name");
4365 assert_eq!(result.columns[1], "n.age");
4366
4367 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4369 assert!(names.contains(&&Value::String("Alix".into())));
4370 assert!(names.contains(&&Value::String("Gus".into())));
4371 }
4372
4373 #[test]
4374 fn test_gql_return_mixed_expressions() {
4375 use grafeo_common::types::Value;
4376
4377 let db = GrafeoDB::new_in_memory();
4378 let session = db.session();
4379
4380 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4382
4383 let result = session
4385 .execute("MATCH (n:Person) RETURN n, n.name")
4386 .unwrap();
4387
4388 assert_eq!(result.row_count(), 1);
4389 assert_eq!(result.column_count(), 2);
4390 assert_eq!(result.columns[0], "n");
4391 assert_eq!(result.columns[1], "n.name");
4392
4393 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4395 }
4396 }
4397
4398 #[cfg(feature = "cypher")]
4399 mod cypher_tests {
4400 use super::*;
4401
4402 #[test]
4403 fn test_cypher_query_execution() {
4404 let db = GrafeoDB::new_in_memory();
4405 let session = db.session();
4406
4407 session.create_node(&["Person"]);
4409 session.create_node(&["Person"]);
4410 session.create_node(&["Animal"]);
4411
4412 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4414
4415 assert_eq!(result.row_count(), 2);
4417 assert_eq!(result.column_count(), 1);
4418 assert_eq!(result.columns[0], "n");
4419 }
4420
4421 #[test]
4422 fn test_cypher_empty_result() {
4423 let db = GrafeoDB::new_in_memory();
4424 let session = db.session();
4425
4426 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4428
4429 assert_eq!(result.row_count(), 0);
4430 }
4431
4432 #[test]
4433 fn test_cypher_parse_error() {
4434 let db = GrafeoDB::new_in_memory();
4435 let session = db.session();
4436
4437 let result = session.execute_cypher("MATCH (n RETURN n");
4439
4440 assert!(result.is_err());
4441 }
4442 }
4443
4444 mod direct_lookup_tests {
4447 use super::*;
4448 use grafeo_common::types::Value;
4449
4450 #[test]
4451 fn test_get_node() {
4452 let db = GrafeoDB::new_in_memory();
4453 let session = db.session();
4454
4455 let id = session.create_node(&["Person"]);
4456 let node = session.get_node(id);
4457
4458 assert!(node.is_some());
4459 let node = node.unwrap();
4460 assert_eq!(node.id, id);
4461 }
4462
4463 #[test]
4464 fn test_get_node_not_found() {
4465 use grafeo_common::types::NodeId;
4466
4467 let db = GrafeoDB::new_in_memory();
4468 let session = db.session();
4469
4470 let node = session.get_node(NodeId::new(9999));
4472 assert!(node.is_none());
4473 }
4474
4475 #[test]
4476 fn test_get_node_property() {
4477 let db = GrafeoDB::new_in_memory();
4478 let session = db.session();
4479
4480 let id = session
4481 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4482
4483 let name = session.get_node_property(id, "name");
4484 assert_eq!(name, Some(Value::String("Alix".into())));
4485
4486 let missing = session.get_node_property(id, "missing");
4488 assert!(missing.is_none());
4489 }
4490
4491 #[test]
4492 fn test_get_edge() {
4493 let db = GrafeoDB::new_in_memory();
4494 let session = db.session();
4495
4496 let alix = session.create_node(&["Person"]);
4497 let gus = session.create_node(&["Person"]);
4498 let edge_id = session.create_edge(alix, gus, "KNOWS");
4499
4500 let edge = session.get_edge(edge_id);
4501 assert!(edge.is_some());
4502 let edge = edge.unwrap();
4503 assert_eq!(edge.id, edge_id);
4504 assert_eq!(edge.src, alix);
4505 assert_eq!(edge.dst, gus);
4506 }
4507
4508 #[test]
4509 fn test_get_edge_not_found() {
4510 use grafeo_common::types::EdgeId;
4511
4512 let db = GrafeoDB::new_in_memory();
4513 let session = db.session();
4514
4515 let edge = session.get_edge(EdgeId::new(9999));
4516 assert!(edge.is_none());
4517 }
4518
4519 #[test]
4520 fn test_get_neighbors_outgoing() {
4521 let db = GrafeoDB::new_in_memory();
4522 let session = db.session();
4523
4524 let alix = session.create_node(&["Person"]);
4525 let gus = session.create_node(&["Person"]);
4526 let harm = session.create_node(&["Person"]);
4527
4528 session.create_edge(alix, gus, "KNOWS");
4529 session.create_edge(alix, harm, "KNOWS");
4530
4531 let neighbors = session.get_neighbors_outgoing(alix);
4532 assert_eq!(neighbors.len(), 2);
4533
4534 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4535 assert!(neighbor_ids.contains(&gus));
4536 assert!(neighbor_ids.contains(&harm));
4537 }
4538
4539 #[test]
4540 fn test_get_neighbors_incoming() {
4541 let db = GrafeoDB::new_in_memory();
4542 let session = db.session();
4543
4544 let alix = session.create_node(&["Person"]);
4545 let gus = session.create_node(&["Person"]);
4546 let harm = session.create_node(&["Person"]);
4547
4548 session.create_edge(gus, alix, "KNOWS");
4549 session.create_edge(harm, alix, "KNOWS");
4550
4551 let neighbors = session.get_neighbors_incoming(alix);
4552 assert_eq!(neighbors.len(), 2);
4553
4554 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4555 assert!(neighbor_ids.contains(&gus));
4556 assert!(neighbor_ids.contains(&harm));
4557 }
4558
4559 #[test]
4560 fn test_get_neighbors_outgoing_by_type() {
4561 let db = GrafeoDB::new_in_memory();
4562 let session = db.session();
4563
4564 let alix = session.create_node(&["Person"]);
4565 let gus = session.create_node(&["Person"]);
4566 let company = session.create_node(&["Company"]);
4567
4568 session.create_edge(alix, gus, "KNOWS");
4569 session.create_edge(alix, company, "WORKS_AT");
4570
4571 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4572 assert_eq!(knows_neighbors.len(), 1);
4573 assert_eq!(knows_neighbors[0].0, gus);
4574
4575 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4576 assert_eq!(works_neighbors.len(), 1);
4577 assert_eq!(works_neighbors[0].0, company);
4578
4579 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4581 assert!(no_neighbors.is_empty());
4582 }
4583
4584 #[test]
4585 fn test_node_exists() {
4586 use grafeo_common::types::NodeId;
4587
4588 let db = GrafeoDB::new_in_memory();
4589 let session = db.session();
4590
4591 let id = session.create_node(&["Person"]);
4592
4593 assert!(session.node_exists(id));
4594 assert!(!session.node_exists(NodeId::new(9999)));
4595 }
4596
4597 #[test]
4598 fn test_edge_exists() {
4599 use grafeo_common::types::EdgeId;
4600
4601 let db = GrafeoDB::new_in_memory();
4602 let session = db.session();
4603
4604 let alix = session.create_node(&["Person"]);
4605 let gus = session.create_node(&["Person"]);
4606 let edge_id = session.create_edge(alix, gus, "KNOWS");
4607
4608 assert!(session.edge_exists(edge_id));
4609 assert!(!session.edge_exists(EdgeId::new(9999)));
4610 }
4611
4612 #[test]
4613 fn test_get_degree() {
4614 let db = GrafeoDB::new_in_memory();
4615 let session = db.session();
4616
4617 let alix = session.create_node(&["Person"]);
4618 let gus = session.create_node(&["Person"]);
4619 let harm = session.create_node(&["Person"]);
4620
4621 session.create_edge(alix, gus, "KNOWS");
4623 session.create_edge(alix, harm, "KNOWS");
4624 session.create_edge(gus, alix, "KNOWS");
4626
4627 let (out_degree, in_degree) = session.get_degree(alix);
4628 assert_eq!(out_degree, 2);
4629 assert_eq!(in_degree, 1);
4630
4631 let lonely = session.create_node(&["Person"]);
4633 let (out, in_deg) = session.get_degree(lonely);
4634 assert_eq!(out, 0);
4635 assert_eq!(in_deg, 0);
4636 }
4637
4638 #[test]
4639 fn test_get_nodes_batch() {
4640 let db = GrafeoDB::new_in_memory();
4641 let session = db.session();
4642
4643 let alix = session.create_node(&["Person"]);
4644 let gus = session.create_node(&["Person"]);
4645 let harm = session.create_node(&["Person"]);
4646
4647 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4648 assert_eq!(nodes.len(), 3);
4649 assert!(nodes[0].is_some());
4650 assert!(nodes[1].is_some());
4651 assert!(nodes[2].is_some());
4652
4653 use grafeo_common::types::NodeId;
4655 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4656 assert_eq!(nodes_with_missing.len(), 3);
4657 assert!(nodes_with_missing[0].is_some());
4658 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
4660 }
4661
4662 #[test]
4663 fn test_auto_commit_setting() {
4664 let db = GrafeoDB::new_in_memory();
4665 let mut session = db.session();
4666
4667 assert!(session.auto_commit());
4669
4670 session.set_auto_commit(false);
4671 assert!(!session.auto_commit());
4672
4673 session.set_auto_commit(true);
4674 assert!(session.auto_commit());
4675 }
4676
4677 #[test]
4678 fn test_transaction_double_begin_nests() {
4679 let db = GrafeoDB::new_in_memory();
4680 let mut session = db.session();
4681
4682 session.begin_transaction().unwrap();
4683 let result = session.begin_transaction();
4685 assert!(result.is_ok());
4686 session.commit().unwrap();
4688 session.commit().unwrap();
4690 }
4691
4692 #[test]
4693 fn test_commit_without_transaction_error() {
4694 let db = GrafeoDB::new_in_memory();
4695 let mut session = db.session();
4696
4697 let result = session.commit();
4698 assert!(result.is_err());
4699 }
4700
4701 #[test]
4702 fn test_rollback_without_transaction_error() {
4703 let db = GrafeoDB::new_in_memory();
4704 let mut session = db.session();
4705
4706 let result = session.rollback();
4707 assert!(result.is_err());
4708 }
4709
4710 #[test]
4711 fn test_create_edge_in_transaction() {
4712 let db = GrafeoDB::new_in_memory();
4713 let mut session = db.session();
4714
4715 let alix = session.create_node(&["Person"]);
4717 let gus = session.create_node(&["Person"]);
4718
4719 session.begin_transaction().unwrap();
4721 let edge_id = session.create_edge(alix, gus, "KNOWS");
4722
4723 assert!(session.edge_exists(edge_id));
4725
4726 session.commit().unwrap();
4728
4729 assert!(session.edge_exists(edge_id));
4731 }
4732
4733 #[test]
4734 fn test_neighbors_empty_node() {
4735 let db = GrafeoDB::new_in_memory();
4736 let session = db.session();
4737
4738 let lonely = session.create_node(&["Person"]);
4739
4740 assert!(session.get_neighbors_outgoing(lonely).is_empty());
4741 assert!(session.get_neighbors_incoming(lonely).is_empty());
4742 assert!(
4743 session
4744 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4745 .is_empty()
4746 );
4747 }
4748 }
4749
4750 #[test]
4751 fn test_auto_gc_triggers_on_commit_interval() {
4752 use crate::config::Config;
4753
4754 let config = Config::in_memory().with_gc_interval(2);
4755 let db = GrafeoDB::with_config(config).unwrap();
4756 let mut session = db.session();
4757
4758 session.begin_transaction().unwrap();
4760 session.create_node(&["A"]);
4761 session.commit().unwrap();
4762
4763 session.begin_transaction().unwrap();
4765 session.create_node(&["B"]);
4766 session.commit().unwrap();
4767
4768 assert_eq!(db.node_count(), 2);
4770 }
4771
4772 #[test]
4773 fn test_query_timeout_config_propagates_to_session() {
4774 use crate::config::Config;
4775 use std::time::Duration;
4776
4777 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4778 let db = GrafeoDB::with_config(config).unwrap();
4779 let session = db.session();
4780
4781 assert!(session.query_deadline().is_some());
4783 }
4784
4785 #[test]
4786 fn test_no_query_timeout_returns_no_deadline() {
4787 let db = GrafeoDB::new_in_memory();
4788 let session = db.session();
4789
4790 assert!(session.query_deadline().is_none());
4792 }
4793
4794 #[test]
4795 fn test_graph_model_accessor() {
4796 use crate::config::GraphModel;
4797
4798 let db = GrafeoDB::new_in_memory();
4799 let session = db.session();
4800
4801 assert_eq!(session.graph_model(), GraphModel::Lpg);
4802 }
4803
4804 #[cfg(feature = "gql")]
4805 #[test]
4806 fn test_external_store_session() {
4807 use grafeo_core::graph::GraphStoreMut;
4808 use std::sync::Arc;
4809
4810 let config = crate::config::Config::in_memory();
4811 let store =
4812 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4813 let db = GrafeoDB::with_store(store, config).unwrap();
4814
4815 let mut session = db.session();
4816
4817 session.begin_transaction().unwrap();
4821 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4822
4823 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4825 assert_eq!(result.row_count(), 1);
4826
4827 session.commit().unwrap();
4828 }
4829
4830 #[cfg(feature = "gql")]
4833 mod session_command_tests {
4834 use super::*;
4835 use grafeo_common::types::Value;
4836
4837 #[test]
4838 fn test_use_graph_sets_current_graph() {
4839 let db = GrafeoDB::new_in_memory();
4840 let session = db.session();
4841
4842 session.execute("CREATE GRAPH mydb").unwrap();
4844 session.execute("USE GRAPH mydb").unwrap();
4845
4846 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4847 }
4848
4849 #[test]
4850 fn test_use_graph_nonexistent_errors() {
4851 let db = GrafeoDB::new_in_memory();
4852 let session = db.session();
4853
4854 let result = session.execute("USE GRAPH doesnotexist");
4855 assert!(result.is_err());
4856 let err = result.unwrap_err().to_string();
4857 assert!(
4858 err.contains("does not exist"),
4859 "Expected 'does not exist' error, got: {err}"
4860 );
4861 }
4862
4863 #[test]
4864 fn test_use_graph_default_always_valid() {
4865 let db = GrafeoDB::new_in_memory();
4866 let session = db.session();
4867
4868 session.execute("USE GRAPH default").unwrap();
4870 assert_eq!(session.current_graph(), Some("default".to_string()));
4871 }
4872
4873 #[test]
4874 fn test_session_set_graph() {
4875 let db = GrafeoDB::new_in_memory();
4876 let session = db.session();
4877
4878 session.execute("CREATE GRAPH analytics").unwrap();
4879 session.execute("SESSION SET GRAPH analytics").unwrap();
4880 assert_eq!(session.current_graph(), Some("analytics".to_string()));
4881 }
4882
4883 #[test]
4884 fn test_session_set_graph_nonexistent_errors() {
4885 let db = GrafeoDB::new_in_memory();
4886 let session = db.session();
4887
4888 let result = session.execute("SESSION SET GRAPH nosuchgraph");
4889 assert!(result.is_err());
4890 }
4891
4892 #[test]
4893 fn test_session_set_time_zone() {
4894 let db = GrafeoDB::new_in_memory();
4895 let session = db.session();
4896
4897 assert_eq!(session.time_zone(), None);
4898
4899 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4900 assert_eq!(session.time_zone(), Some("UTC".to_string()));
4901
4902 session
4903 .execute("SESSION SET TIME ZONE 'America/New_York'")
4904 .unwrap();
4905 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
4906 }
4907
4908 #[test]
4909 fn test_session_set_parameter() {
4910 let db = GrafeoDB::new_in_memory();
4911 let session = db.session();
4912
4913 session
4914 .execute("SESSION SET PARAMETER $timeout = 30")
4915 .unwrap();
4916
4917 assert!(session.get_parameter("timeout").is_some());
4920 }
4921
4922 #[test]
4923 fn test_session_reset_clears_all_state() {
4924 let db = GrafeoDB::new_in_memory();
4925 let session = db.session();
4926
4927 session.execute("CREATE GRAPH analytics").unwrap();
4929 session.execute("SESSION SET GRAPH analytics").unwrap();
4930 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4931 session
4932 .execute("SESSION SET PARAMETER $limit = 100")
4933 .unwrap();
4934
4935 assert!(session.current_graph().is_some());
4937 assert!(session.time_zone().is_some());
4938 assert!(session.get_parameter("limit").is_some());
4939
4940 session.execute("SESSION RESET").unwrap();
4942
4943 assert_eq!(session.current_graph(), None);
4944 assert_eq!(session.time_zone(), None);
4945 assert!(session.get_parameter("limit").is_none());
4946 }
4947
4948 #[test]
4949 fn test_session_close_clears_state() {
4950 let db = GrafeoDB::new_in_memory();
4951 let session = db.session();
4952
4953 session.execute("CREATE GRAPH analytics").unwrap();
4954 session.execute("SESSION SET GRAPH analytics").unwrap();
4955 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4956
4957 session.execute("SESSION CLOSE").unwrap();
4958
4959 assert_eq!(session.current_graph(), None);
4960 assert_eq!(session.time_zone(), None);
4961 }
4962
4963 #[test]
4964 fn test_create_graph() {
4965 let db = GrafeoDB::new_in_memory();
4966 let session = db.session();
4967
4968 session.execute("CREATE GRAPH mydb").unwrap();
4969
4970 session.execute("USE GRAPH mydb").unwrap();
4972 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4973 }
4974
4975 #[test]
4976 fn test_create_graph_duplicate_errors() {
4977 let db = GrafeoDB::new_in_memory();
4978 let session = db.session();
4979
4980 session.execute("CREATE GRAPH mydb").unwrap();
4981 let result = session.execute("CREATE GRAPH mydb");
4982
4983 assert!(result.is_err());
4984 let err = result.unwrap_err().to_string();
4985 assert!(
4986 err.contains("already exists"),
4987 "Expected 'already exists' error, got: {err}"
4988 );
4989 }
4990
4991 #[test]
4992 fn test_create_graph_if_not_exists() {
4993 let db = GrafeoDB::new_in_memory();
4994 let session = db.session();
4995
4996 session.execute("CREATE GRAPH mydb").unwrap();
4997 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
4999 }
5000
5001 #[test]
5002 fn test_drop_graph() {
5003 let db = GrafeoDB::new_in_memory();
5004 let session = db.session();
5005
5006 session.execute("CREATE GRAPH mydb").unwrap();
5007 session.execute("DROP GRAPH mydb").unwrap();
5008
5009 let result = session.execute("USE GRAPH mydb");
5011 assert!(result.is_err());
5012 }
5013
5014 #[test]
5015 fn test_drop_graph_nonexistent_errors() {
5016 let db = GrafeoDB::new_in_memory();
5017 let session = db.session();
5018
5019 let result = session.execute("DROP GRAPH nosuchgraph");
5020 assert!(result.is_err());
5021 let err = result.unwrap_err().to_string();
5022 assert!(
5023 err.contains("does not exist"),
5024 "Expected 'does not exist' error, got: {err}"
5025 );
5026 }
5027
5028 #[test]
5029 fn test_drop_graph_if_exists() {
5030 let db = GrafeoDB::new_in_memory();
5031 let session = db.session();
5032
5033 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5035 }
5036
5037 #[test]
5038 fn test_start_transaction_via_gql() {
5039 let db = GrafeoDB::new_in_memory();
5040 let session = db.session();
5041
5042 session.execute("START TRANSACTION").unwrap();
5043 assert!(session.in_transaction());
5044 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5045 session.execute("COMMIT").unwrap();
5046 assert!(!session.in_transaction());
5047
5048 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5049 assert_eq!(result.rows.len(), 1);
5050 }
5051
5052 #[test]
5053 fn test_start_transaction_read_only_blocks_insert() {
5054 let db = GrafeoDB::new_in_memory();
5055 let session = db.session();
5056
5057 session.execute("START TRANSACTION READ ONLY").unwrap();
5058 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5059 assert!(result.is_err());
5060 let err = result.unwrap_err().to_string();
5061 assert!(
5062 err.contains("read-only"),
5063 "Expected read-only error, got: {err}"
5064 );
5065 session.execute("ROLLBACK").unwrap();
5066 }
5067
5068 #[test]
5069 fn test_start_transaction_read_only_allows_reads() {
5070 let db = GrafeoDB::new_in_memory();
5071 let mut session = db.session();
5072 session.begin_transaction().unwrap();
5073 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5074 session.commit().unwrap();
5075
5076 session.execute("START TRANSACTION READ ONLY").unwrap();
5077 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5078 assert_eq!(result.rows.len(), 1);
5079 session.execute("COMMIT").unwrap();
5080 }
5081
5082 #[test]
5083 fn test_rollback_via_gql() {
5084 let db = GrafeoDB::new_in_memory();
5085 let session = db.session();
5086
5087 session.execute("START TRANSACTION").unwrap();
5088 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5089 session.execute("ROLLBACK").unwrap();
5090
5091 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5092 assert!(result.rows.is_empty());
5093 }
5094
5095 #[test]
5096 fn test_start_transaction_with_isolation_level() {
5097 let db = GrafeoDB::new_in_memory();
5098 let session = db.session();
5099
5100 session
5101 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5102 .unwrap();
5103 assert!(session.in_transaction());
5104 session.execute("ROLLBACK").unwrap();
5105 }
5106
5107 #[test]
5108 fn test_session_commands_return_empty_result() {
5109 let db = GrafeoDB::new_in_memory();
5110 let session = db.session();
5111
5112 session.execute("CREATE GRAPH test").unwrap();
5113 let result = session.execute("SESSION SET GRAPH test").unwrap();
5114 assert_eq!(result.row_count(), 0);
5115 assert_eq!(result.column_count(), 0);
5116 }
5117
5118 #[test]
5119 fn test_current_graph_default_is_none() {
5120 let db = GrafeoDB::new_in_memory();
5121 let session = db.session();
5122
5123 assert_eq!(session.current_graph(), None);
5124 }
5125
5126 #[test]
5127 fn test_time_zone_default_is_none() {
5128 let db = GrafeoDB::new_in_memory();
5129 let session = db.session();
5130
5131 assert_eq!(session.time_zone(), None);
5132 }
5133
5134 #[test]
5135 fn test_session_state_independent_across_sessions() {
5136 let db = GrafeoDB::new_in_memory();
5137 let session1 = db.session();
5138 let session2 = db.session();
5139
5140 session1.execute("CREATE GRAPH first").unwrap();
5141 session1.execute("CREATE GRAPH second").unwrap();
5142 session1.execute("SESSION SET GRAPH first").unwrap();
5143 session2.execute("SESSION SET GRAPH second").unwrap();
5144
5145 assert_eq!(session1.current_graph(), Some("first".to_string()));
5146 assert_eq!(session2.current_graph(), Some("second".to_string()));
5147 }
5148
5149 #[test]
5150 fn test_show_node_types() {
5151 let db = GrafeoDB::new_in_memory();
5152 let session = db.session();
5153
5154 session
5155 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5156 .unwrap();
5157
5158 let result = session.execute("SHOW NODE TYPES").unwrap();
5159 assert_eq!(
5160 result.columns,
5161 vec!["name", "properties", "constraints", "parents"]
5162 );
5163 assert_eq!(result.rows.len(), 1);
5164 assert_eq!(result.rows[0][0], Value::from("Person"));
5166 }
5167
5168 #[test]
5169 fn test_show_edge_types() {
5170 let db = GrafeoDB::new_in_memory();
5171 let session = db.session();
5172
5173 session
5174 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5175 .unwrap();
5176
5177 let result = session.execute("SHOW EDGE TYPES").unwrap();
5178 assert_eq!(
5179 result.columns,
5180 vec!["name", "properties", "source_types", "target_types"]
5181 );
5182 assert_eq!(result.rows.len(), 1);
5183 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5184 }
5185
5186 #[test]
5187 fn test_show_graph_types() {
5188 let db = GrafeoDB::new_in_memory();
5189 let session = db.session();
5190
5191 session
5192 .execute("CREATE NODE TYPE Person (name STRING)")
5193 .unwrap();
5194 session
5195 .execute(
5196 "CREATE GRAPH TYPE social (\
5197 NODE TYPE Person (name STRING)\
5198 )",
5199 )
5200 .unwrap();
5201
5202 let result = session.execute("SHOW GRAPH TYPES").unwrap();
5203 assert_eq!(
5204 result.columns,
5205 vec!["name", "open", "node_types", "edge_types"]
5206 );
5207 assert_eq!(result.rows.len(), 1);
5208 assert_eq!(result.rows[0][0], Value::from("social"));
5209 }
5210
5211 #[test]
5212 fn test_show_graph_type_named() {
5213 let db = GrafeoDB::new_in_memory();
5214 let session = db.session();
5215
5216 session
5217 .execute("CREATE NODE TYPE Person (name STRING)")
5218 .unwrap();
5219 session
5220 .execute(
5221 "CREATE GRAPH TYPE social (\
5222 NODE TYPE Person (name STRING)\
5223 )",
5224 )
5225 .unwrap();
5226
5227 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5228 assert_eq!(result.rows.len(), 1);
5229 assert_eq!(result.rows[0][0], Value::from("social"));
5230 }
5231
5232 #[test]
5233 fn test_show_graph_type_not_found() {
5234 let db = GrafeoDB::new_in_memory();
5235 let session = db.session();
5236
5237 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5238 assert!(result.is_err());
5239 }
5240
5241 #[test]
5242 fn test_show_indexes_via_gql() {
5243 let db = GrafeoDB::new_in_memory();
5244 let session = db.session();
5245
5246 let result = session.execute("SHOW INDEXES").unwrap();
5247 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5248 }
5249
5250 #[test]
5251 fn test_show_constraints_via_gql() {
5252 let db = GrafeoDB::new_in_memory();
5253 let session = db.session();
5254
5255 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5256 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5257 }
5258
5259 #[test]
5260 fn test_pattern_form_graph_type_roundtrip() {
5261 let db = GrafeoDB::new_in_memory();
5262 let session = db.session();
5263
5264 session
5266 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5267 .unwrap();
5268 session
5269 .execute("CREATE NODE TYPE City (name STRING)")
5270 .unwrap();
5271 session
5272 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5273 .unwrap();
5274 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5275
5276 session
5278 .execute(
5279 "CREATE GRAPH TYPE social (\
5280 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5281 (:Person)-[:LIVES_IN]->(:City)\
5282 )",
5283 )
5284 .unwrap();
5285
5286 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5288 assert_eq!(result.rows.len(), 1);
5289 assert_eq!(result.rows[0][0], Value::from("social"));
5290 }
5291 }
5292}