1#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_core::graph::Direction;
17use grafeo_core::graph::GraphStoreMut;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::catalog::{Catalog, CatalogConstraintValidator};
23use crate::config::{AdaptiveConfig, GraphModel};
24use crate::database::QueryResult;
25use crate::query::cache::QueryCache;
26use crate::transaction::TransactionManager;
27
28fn parse_default_literal(text: &str) -> Value {
33 if text.eq_ignore_ascii_case("null") {
34 return Value::Null;
35 }
36 if text.eq_ignore_ascii_case("true") {
37 return Value::Bool(true);
38 }
39 if text.eq_ignore_ascii_case("false") {
40 return Value::Bool(false);
41 }
42 if (text.starts_with('\'') && text.ends_with('\''))
44 || (text.starts_with('"') && text.ends_with('"'))
45 {
46 return Value::String(text[1..text.len() - 1].into());
47 }
48 if let Ok(i) = text.parse::<i64>() {
50 return Value::Int64(i);
51 }
52 if let Ok(f) = text.parse::<f64>() {
53 return Value::Float64(f);
54 }
55 Value::String(text.into())
57}
58
59pub(crate) struct SessionConfig {
64 pub transaction_manager: Arc<TransactionManager>,
65 pub query_cache: Arc<QueryCache>,
66 pub catalog: Arc<Catalog>,
67 pub adaptive_config: AdaptiveConfig,
68 pub factorized_execution: bool,
69 pub graph_model: GraphModel,
70 pub query_timeout: Option<Duration>,
71 pub commit_counter: Arc<AtomicUsize>,
72 pub gc_interval: usize,
73}
74
75pub struct Session {
81 store: Arc<LpgStore>,
83 graph_store: Arc<dyn GraphStoreMut>,
85 catalog: Arc<Catalog>,
87 #[cfg(feature = "rdf")]
89 rdf_store: Arc<RdfStore>,
90 transaction_manager: Arc<TransactionManager>,
92 query_cache: Arc<QueryCache>,
94 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
98 read_only_tx: parking_lot::Mutex<bool>,
100 auto_commit: bool,
102 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
105 factorized_execution: bool,
107 graph_model: GraphModel,
109 query_timeout: Option<Duration>,
111 commit_counter: Arc<AtomicUsize>,
113 gc_interval: usize,
115 transaction_start_node_count: AtomicUsize,
117 transaction_start_edge_count: AtomicUsize,
119 #[cfg(feature = "wal")]
121 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
122 #[cfg(feature = "wal")]
124 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
125 #[cfg(feature = "cdc")]
127 cdc_log: Arc<crate::cdc::CdcLog>,
128 current_graph: parking_lot::Mutex<Option<String>>,
130 current_schema: parking_lot::Mutex<Option<String>>,
133 time_zone: parking_lot::Mutex<Option<String>>,
135 session_params:
137 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
138 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
140 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
142 transaction_nesting_depth: parking_lot::Mutex<u32>,
146 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
150 #[cfg(feature = "metrics")]
152 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
153 #[cfg(feature = "metrics")]
155 tx_start_time: parking_lot::Mutex<Option<Instant>>,
156}
157
158#[derive(Clone)]
160struct GraphSavepoint {
161 graph_name: Option<String>,
162 next_node_id: u64,
163 next_edge_id: u64,
164 undo_log_position: usize,
165}
166
167#[derive(Clone)]
169struct SavepointState {
170 name: String,
171 graph_snapshots: Vec<GraphSavepoint>,
172 #[allow(dead_code)]
175 active_graph: Option<String>,
176}
177
178impl Session {
179 #[allow(dead_code)]
181 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
182 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
183 Self {
184 store,
185 graph_store,
186 catalog: cfg.catalog,
187 #[cfg(feature = "rdf")]
188 rdf_store: Arc::new(RdfStore::new()),
189 transaction_manager: cfg.transaction_manager,
190 query_cache: cfg.query_cache,
191 current_transaction: parking_lot::Mutex::new(None),
192 read_only_tx: parking_lot::Mutex::new(false),
193 auto_commit: true,
194 adaptive_config: cfg.adaptive_config,
195 factorized_execution: cfg.factorized_execution,
196 graph_model: cfg.graph_model,
197 query_timeout: cfg.query_timeout,
198 commit_counter: cfg.commit_counter,
199 gc_interval: cfg.gc_interval,
200 transaction_start_node_count: AtomicUsize::new(0),
201 transaction_start_edge_count: AtomicUsize::new(0),
202 #[cfg(feature = "wal")]
203 wal: None,
204 #[cfg(feature = "wal")]
205 wal_graph_context: None,
206 #[cfg(feature = "cdc")]
207 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
208 current_graph: parking_lot::Mutex::new(None),
209 current_schema: parking_lot::Mutex::new(None),
210 time_zone: parking_lot::Mutex::new(None),
211 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
212 viewing_epoch_override: parking_lot::Mutex::new(None),
213 savepoints: parking_lot::Mutex::new(Vec::new()),
214 transaction_nesting_depth: parking_lot::Mutex::new(0),
215 touched_graphs: parking_lot::Mutex::new(Vec::new()),
216 #[cfg(feature = "metrics")]
217 metrics: None,
218 #[cfg(feature = "metrics")]
219 tx_start_time: parking_lot::Mutex::new(None),
220 }
221 }
222
223 #[cfg(feature = "wal")]
228 pub(crate) fn set_wal(
229 &mut self,
230 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
231 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
232 ) {
233 self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
235 Arc::clone(&self.store),
236 Arc::clone(&wal),
237 Arc::clone(&wal_graph_context),
238 ));
239 self.wal = Some(wal);
240 self.wal_graph_context = Some(wal_graph_context);
241 }
242
243 #[cfg(feature = "cdc")]
245 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
246 self.cdc_log = cdc_log;
247 }
248
249 #[cfg(feature = "metrics")]
251 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
252 self.metrics = Some(metrics);
253 }
254
255 pub(crate) fn with_external_store(
264 store: Arc<dyn GraphStoreMut>,
265 cfg: SessionConfig,
266 ) -> Result<Self> {
267 Ok(Self {
268 store: Arc::new(LpgStore::new()?),
269 graph_store: store,
270 catalog: cfg.catalog,
271 #[cfg(feature = "rdf")]
272 rdf_store: Arc::new(RdfStore::new()),
273 transaction_manager: cfg.transaction_manager,
274 query_cache: cfg.query_cache,
275 current_transaction: parking_lot::Mutex::new(None),
276 read_only_tx: parking_lot::Mutex::new(false),
277 auto_commit: true,
278 adaptive_config: cfg.adaptive_config,
279 factorized_execution: cfg.factorized_execution,
280 graph_model: cfg.graph_model,
281 query_timeout: cfg.query_timeout,
282 commit_counter: cfg.commit_counter,
283 gc_interval: cfg.gc_interval,
284 transaction_start_node_count: AtomicUsize::new(0),
285 transaction_start_edge_count: AtomicUsize::new(0),
286 #[cfg(feature = "wal")]
287 wal: None,
288 #[cfg(feature = "wal")]
289 wal_graph_context: None,
290 #[cfg(feature = "cdc")]
291 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
292 current_graph: parking_lot::Mutex::new(None),
293 current_schema: parking_lot::Mutex::new(None),
294 time_zone: parking_lot::Mutex::new(None),
295 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
296 viewing_epoch_override: parking_lot::Mutex::new(None),
297 savepoints: parking_lot::Mutex::new(Vec::new()),
298 transaction_nesting_depth: parking_lot::Mutex::new(0),
299 touched_graphs: parking_lot::Mutex::new(Vec::new()),
300 #[cfg(feature = "metrics")]
301 metrics: None,
302 #[cfg(feature = "metrics")]
303 tx_start_time: parking_lot::Mutex::new(None),
304 })
305 }
306
307 #[must_use]
309 pub fn graph_model(&self) -> GraphModel {
310 self.graph_model
311 }
312
313 pub fn use_graph(&self, name: &str) {
317 *self.current_graph.lock() = Some(name.to_string());
318 }
319
320 #[must_use]
322 pub fn current_graph(&self) -> Option<String> {
323 self.current_graph.lock().clone()
324 }
325
326 pub fn set_schema(&self, name: &str) {
330 *self.current_schema.lock() = Some(name.to_string());
331 }
332
333 #[must_use]
337 pub fn current_schema(&self) -> Option<String> {
338 self.current_schema.lock().clone()
339 }
340
341 fn effective_graph_key(&self, graph_name: &str) -> String {
346 let schema = self.current_schema.lock().clone();
347 match schema {
348 Some(s) => format!("{s}/{graph_name}"),
349 None => graph_name.to_string(),
350 }
351 }
352
353 fn active_graph_storage_key(&self) -> Option<String> {
357 let graph = self.current_graph.lock().clone();
358 let schema = self.current_schema.lock().clone();
359 match (schema, graph) {
360 (_, None) => None,
361 (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
362 (None, Some(name)) => Some(name),
363 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
364 }
365 }
366
367 fn active_store(&self) -> Arc<dyn GraphStoreMut> {
375 let key = self.active_graph_storage_key();
376 match key {
377 None => Arc::clone(&self.graph_store),
378 Some(ref name) => match self.store.graph(name) {
379 Some(named_store) => {
380 #[cfg(feature = "wal")]
381 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
382 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
383 named_store,
384 Arc::clone(wal),
385 name.clone(),
386 Arc::clone(ctx),
387 )) as Arc<dyn GraphStoreMut>;
388 }
389 named_store as Arc<dyn GraphStoreMut>
390 }
391 None => Arc::clone(&self.graph_store),
392 },
393 }
394 }
395
396 fn active_lpg_store(&self) -> Arc<LpgStore> {
401 let key = self.active_graph_storage_key();
402 match key {
403 None => Arc::clone(&self.store),
404 Some(ref name) => self
405 .store
406 .graph(name)
407 .unwrap_or_else(|| Arc::clone(&self.store)),
408 }
409 }
410
411 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
414 match graph_name {
415 None => Arc::clone(&self.store),
416 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
417 Some(name) => self
418 .store
419 .graph(name)
420 .unwrap_or_else(|| Arc::clone(&self.store)),
421 }
422 }
423
424 fn track_graph_touch(&self) {
429 if self.current_transaction.lock().is_some() {
430 let key = self.active_graph_storage_key();
431 let mut touched = self.touched_graphs.lock();
432 if !touched.contains(&key) {
433 touched.push(key);
434 }
435 }
436 }
437
438 pub fn set_time_zone(&self, tz: &str) {
440 *self.time_zone.lock() = Some(tz.to_string());
441 }
442
443 #[must_use]
445 pub fn time_zone(&self) -> Option<String> {
446 self.time_zone.lock().clone()
447 }
448
449 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
451 self.session_params.lock().insert(key.to_string(), value);
452 }
453
454 #[must_use]
456 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
457 self.session_params.lock().get(key).cloned()
458 }
459
460 pub fn reset_session(&self) {
462 *self.current_schema.lock() = None;
463 *self.current_graph.lock() = None;
464 *self.time_zone.lock() = None;
465 self.session_params.lock().clear();
466 *self.viewing_epoch_override.lock() = None;
467 }
468
469 pub fn reset_schema(&self) {
471 *self.current_schema.lock() = None;
472 }
473
474 pub fn reset_graph(&self) {
476 *self.current_graph.lock() = None;
477 }
478
479 pub fn reset_time_zone(&self) {
481 *self.time_zone.lock() = None;
482 }
483
484 pub fn reset_parameters(&self) {
486 self.session_params.lock().clear();
487 }
488
489 pub fn set_viewing_epoch(&self, epoch: EpochId) {
497 *self.viewing_epoch_override.lock() = Some(epoch);
498 }
499
500 pub fn clear_viewing_epoch(&self) {
502 *self.viewing_epoch_override.lock() = None;
503 }
504
505 #[must_use]
507 pub fn viewing_epoch(&self) -> Option<EpochId> {
508 *self.viewing_epoch_override.lock()
509 }
510
511 #[must_use]
515 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
516 self.active_lpg_store().get_node_history(id)
517 }
518
519 #[must_use]
523 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
524 self.active_lpg_store().get_edge_history(id)
525 }
526
527 fn require_lpg(&self, language: &str) -> Result<()> {
529 if self.graph_model == GraphModel::Rdf {
530 return Err(grafeo_common::utils::error::Error::Internal(format!(
531 "This is an RDF database. {language} queries require an LPG database."
532 )));
533 }
534 Ok(())
535 }
536
537 #[cfg(feature = "gql")]
539 fn execute_session_command(
540 &self,
541 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
542 ) -> Result<QueryResult> {
543 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
544 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
545
546 if *self.read_only_tx.lock() {
548 match &cmd {
549 SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
550 return Err(Error::Transaction(
551 grafeo_common::utils::error::TransactionError::ReadOnly,
552 ));
553 }
554 _ => {} }
556 }
557
558 match cmd {
559 SessionCommand::CreateGraph {
560 name,
561 if_not_exists,
562 typed,
563 like_graph,
564 copy_of,
565 open: _,
566 } => {
567 let storage_key = self.effective_graph_key(&name);
569
570 if let Some(ref src) = like_graph {
572 let src_key = self.effective_graph_key(src);
573 if self.store.graph(&src_key).is_none() {
574 return Err(Error::Query(QueryError::new(
575 QueryErrorKind::Semantic,
576 format!("Source graph '{src}' does not exist"),
577 )));
578 }
579 }
580 if let Some(ref src) = copy_of {
581 let src_key = self.effective_graph_key(src);
582 if self.store.graph(&src_key).is_none() {
583 return Err(Error::Query(QueryError::new(
584 QueryErrorKind::Semantic,
585 format!("Source graph '{src}' does not exist"),
586 )));
587 }
588 }
589
590 let created = self
591 .store
592 .create_graph(&storage_key)
593 .map_err(|e| Error::Internal(e.to_string()))?;
594 if !created && !if_not_exists {
595 return Err(Error::Query(QueryError::new(
596 QueryErrorKind::Semantic,
597 format!("Graph '{name}' already exists"),
598 )));
599 }
600 if created {
601 #[cfg(feature = "wal")]
602 self.log_schema_wal(
603 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
604 name: storage_key.clone(),
605 },
606 );
607 }
608
609 if let Some(ref src) = copy_of {
611 let src_key = self.effective_graph_key(src);
612 self.store
613 .copy_graph(Some(&src_key), Some(&storage_key))
614 .map_err(|e| Error::Internal(e.to_string()))?;
615 }
616
617 if let Some(type_name) = typed
619 && let Err(e) = self
620 .catalog
621 .bind_graph_type(&storage_key, type_name.clone())
622 {
623 return Err(Error::Query(QueryError::new(
624 QueryErrorKind::Semantic,
625 e.to_string(),
626 )));
627 }
628
629 if let Some(ref src) = like_graph {
631 let src_key = self.effective_graph_key(src);
632 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
633 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
634 }
635 }
636
637 Ok(QueryResult::empty())
638 }
639 SessionCommand::DropGraph { name, if_exists } => {
640 let storage_key = self.effective_graph_key(&name);
641 let dropped = self.store.drop_graph(&storage_key);
642 if !dropped && !if_exists {
643 return Err(Error::Query(QueryError::new(
644 QueryErrorKind::Semantic,
645 format!("Graph '{name}' does not exist"),
646 )));
647 }
648 if dropped {
649 #[cfg(feature = "wal")]
650 self.log_schema_wal(
651 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
652 name: storage_key.clone(),
653 },
654 );
655 let mut current = self.current_graph.lock();
657 if current
658 .as_deref()
659 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
660 {
661 *current = None;
662 }
663 }
664 Ok(QueryResult::empty())
665 }
666 SessionCommand::UseGraph(name) => {
667 let effective_key = self.effective_graph_key(&name);
669 if !name.eq_ignore_ascii_case("default")
670 && self.store.graph(&effective_key).is_none()
671 {
672 return Err(Error::Query(QueryError::new(
673 QueryErrorKind::Semantic,
674 format!("Graph '{name}' does not exist"),
675 )));
676 }
677 self.use_graph(&name);
678 self.track_graph_touch();
680 Ok(QueryResult::empty())
681 }
682 SessionCommand::SessionSetGraph(name) => {
683 let effective_key = self.effective_graph_key(&name);
685 if !name.eq_ignore_ascii_case("default")
686 && self.store.graph(&effective_key).is_none()
687 {
688 return Err(Error::Query(QueryError::new(
689 QueryErrorKind::Semantic,
690 format!("Graph '{name}' does not exist"),
691 )));
692 }
693 self.use_graph(&name);
694 self.track_graph_touch();
696 Ok(QueryResult::empty())
697 }
698 SessionCommand::SessionSetSchema(name) => {
699 if !self.catalog.schema_exists(&name) {
701 return Err(Error::Query(QueryError::new(
702 QueryErrorKind::Semantic,
703 format!("Schema '{name}' does not exist"),
704 )));
705 }
706 self.set_schema(&name);
707 Ok(QueryResult::empty())
708 }
709 SessionCommand::SessionSetTimeZone(tz) => {
710 self.set_time_zone(&tz);
711 Ok(QueryResult::empty())
712 }
713 SessionCommand::SessionSetParameter(key, expr) => {
714 if key.eq_ignore_ascii_case("viewing_epoch") {
715 match Self::eval_integer_literal(&expr) {
716 Some(n) if n >= 0 => {
717 self.set_viewing_epoch(EpochId::new(n as u64));
718 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
719 }
720 _ => Err(Error::Query(QueryError::new(
721 QueryErrorKind::Semantic,
722 "viewing_epoch must be a non-negative integer literal",
723 ))),
724 }
725 } else {
726 self.set_parameter(&key, Value::Null);
729 Ok(QueryResult::empty())
730 }
731 }
732 SessionCommand::SessionReset(target) => {
733 use grafeo_adapters::query::gql::ast::SessionResetTarget;
734 match target {
735 SessionResetTarget::All => self.reset_session(),
736 SessionResetTarget::Schema => self.reset_schema(),
737 SessionResetTarget::Graph => self.reset_graph(),
738 SessionResetTarget::TimeZone => self.reset_time_zone(),
739 SessionResetTarget::Parameters => self.reset_parameters(),
740 }
741 Ok(QueryResult::empty())
742 }
743 SessionCommand::SessionClose => {
744 self.reset_session();
745 Ok(QueryResult::empty())
746 }
747 SessionCommand::StartTransaction {
748 read_only,
749 isolation_level,
750 } => {
751 let engine_level = isolation_level.map(|l| match l {
752 TransactionIsolationLevel::ReadCommitted => {
753 crate::transaction::IsolationLevel::ReadCommitted
754 }
755 TransactionIsolationLevel::SnapshotIsolation => {
756 crate::transaction::IsolationLevel::SnapshotIsolation
757 }
758 TransactionIsolationLevel::Serializable => {
759 crate::transaction::IsolationLevel::Serializable
760 }
761 });
762 self.begin_transaction_inner(read_only, engine_level)?;
763 Ok(QueryResult::status("Transaction started"))
764 }
765 SessionCommand::Commit => {
766 self.commit_inner()?;
767 Ok(QueryResult::status("Transaction committed"))
768 }
769 SessionCommand::Rollback => {
770 self.rollback_inner()?;
771 Ok(QueryResult::status("Transaction rolled back"))
772 }
773 SessionCommand::Savepoint(name) => {
774 self.savepoint(&name)?;
775 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
776 }
777 SessionCommand::RollbackToSavepoint(name) => {
778 self.rollback_to_savepoint(&name)?;
779 Ok(QueryResult::status(format!(
780 "Rolled back to savepoint '{name}'"
781 )))
782 }
783 SessionCommand::ReleaseSavepoint(name) => {
784 self.release_savepoint(&name)?;
785 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
786 }
787 }
788 }
789
790 #[cfg(feature = "wal")]
792 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
793 if let Some(ref wal) = self.wal
794 && let Err(e) = wal.log(record)
795 {
796 tracing::warn!("Failed to log schema change to WAL: {}", e);
797 }
798 }
799
800 #[cfg(feature = "gql")]
802 fn execute_schema_command(
803 &self,
804 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
805 ) -> Result<QueryResult> {
806 use crate::catalog::{
807 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
808 };
809 use grafeo_adapters::query::gql::ast::SchemaStatement;
810 #[cfg(feature = "wal")]
811 use grafeo_adapters::storage::wal::WalRecord;
812 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
813
814 macro_rules! wal_log {
816 ($self:expr, $record:expr) => {
817 #[cfg(feature = "wal")]
818 $self.log_schema_wal(&$record);
819 };
820 }
821
822 let result = match cmd {
823 SchemaStatement::CreateNodeType(stmt) => {
824 #[cfg(feature = "wal")]
825 let props_for_wal: Vec<(String, String, bool)> = stmt
826 .properties
827 .iter()
828 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
829 .collect();
830 let def = NodeTypeDefinition {
831 name: stmt.name.clone(),
832 properties: stmt
833 .properties
834 .iter()
835 .map(|p| TypedProperty {
836 name: p.name.clone(),
837 data_type: PropertyDataType::from_type_name(&p.data_type),
838 nullable: p.nullable,
839 default_value: p
840 .default_value
841 .as_ref()
842 .map(|s| parse_default_literal(s)),
843 })
844 .collect(),
845 constraints: Vec::new(),
846 parent_types: stmt.parent_types.clone(),
847 };
848 let result = if stmt.or_replace {
849 let _ = self.catalog.drop_node_type(&stmt.name);
850 self.catalog.register_node_type(def)
851 } else {
852 self.catalog.register_node_type(def)
853 };
854 match result {
855 Ok(()) => {
856 wal_log!(
857 self,
858 WalRecord::CreateNodeType {
859 name: stmt.name.clone(),
860 properties: props_for_wal,
861 constraints: Vec::new(),
862 }
863 );
864 Ok(QueryResult::status(format!(
865 "Created node type '{}'",
866 stmt.name
867 )))
868 }
869 Err(e) if stmt.if_not_exists => {
870 let _ = e;
871 Ok(QueryResult::status("No change"))
872 }
873 Err(e) => Err(Error::Query(QueryError::new(
874 QueryErrorKind::Semantic,
875 e.to_string(),
876 ))),
877 }
878 }
879 SchemaStatement::CreateEdgeType(stmt) => {
880 #[cfg(feature = "wal")]
881 let props_for_wal: Vec<(String, String, bool)> = stmt
882 .properties
883 .iter()
884 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
885 .collect();
886 let def = EdgeTypeDefinition {
887 name: stmt.name.clone(),
888 properties: stmt
889 .properties
890 .iter()
891 .map(|p| TypedProperty {
892 name: p.name.clone(),
893 data_type: PropertyDataType::from_type_name(&p.data_type),
894 nullable: p.nullable,
895 default_value: p
896 .default_value
897 .as_ref()
898 .map(|s| parse_default_literal(s)),
899 })
900 .collect(),
901 constraints: Vec::new(),
902 source_node_types: stmt.source_node_types.clone(),
903 target_node_types: stmt.target_node_types.clone(),
904 };
905 let result = if stmt.or_replace {
906 let _ = self.catalog.drop_edge_type_def(&stmt.name);
907 self.catalog.register_edge_type_def(def)
908 } else {
909 self.catalog.register_edge_type_def(def)
910 };
911 match result {
912 Ok(()) => {
913 wal_log!(
914 self,
915 WalRecord::CreateEdgeType {
916 name: stmt.name.clone(),
917 properties: props_for_wal,
918 constraints: Vec::new(),
919 }
920 );
921 Ok(QueryResult::status(format!(
922 "Created edge type '{}'",
923 stmt.name
924 )))
925 }
926 Err(e) if stmt.if_not_exists => {
927 let _ = e;
928 Ok(QueryResult::status("No change"))
929 }
930 Err(e) => Err(Error::Query(QueryError::new(
931 QueryErrorKind::Semantic,
932 e.to_string(),
933 ))),
934 }
935 }
936 SchemaStatement::CreateVectorIndex(stmt) => {
937 Self::create_vector_index_on_store(
938 &self.active_lpg_store(),
939 &stmt.node_label,
940 &stmt.property,
941 stmt.dimensions,
942 stmt.metric.as_deref(),
943 )?;
944 wal_log!(
945 self,
946 WalRecord::CreateIndex {
947 name: stmt.name.clone(),
948 label: stmt.node_label.clone(),
949 property: stmt.property.clone(),
950 index_type: "vector".to_string(),
951 }
952 );
953 Ok(QueryResult::status(format!(
954 "Created vector index '{}'",
955 stmt.name
956 )))
957 }
958 SchemaStatement::DropNodeType { name, if_exists } => {
959 match self.catalog.drop_node_type(&name) {
960 Ok(()) => {
961 wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
962 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
963 }
964 Err(e) if if_exists => {
965 let _ = e;
966 Ok(QueryResult::status("No change"))
967 }
968 Err(e) => Err(Error::Query(QueryError::new(
969 QueryErrorKind::Semantic,
970 e.to_string(),
971 ))),
972 }
973 }
974 SchemaStatement::DropEdgeType { name, if_exists } => {
975 match self.catalog.drop_edge_type_def(&name) {
976 Ok(()) => {
977 wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
978 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
979 }
980 Err(e) if if_exists => {
981 let _ = e;
982 Ok(QueryResult::status("No change"))
983 }
984 Err(e) => Err(Error::Query(QueryError::new(
985 QueryErrorKind::Semantic,
986 e.to_string(),
987 ))),
988 }
989 }
990 SchemaStatement::CreateIndex(stmt) => {
991 use grafeo_adapters::query::gql::ast::IndexKind;
992 let active = self.active_lpg_store();
993 let index_type_str = match stmt.index_kind {
994 IndexKind::Property => "property",
995 IndexKind::BTree => "btree",
996 IndexKind::Text => "text",
997 IndexKind::Vector => "vector",
998 };
999 match stmt.index_kind {
1000 IndexKind::Property | IndexKind::BTree => {
1001 for prop in &stmt.properties {
1002 active.create_property_index(prop);
1003 }
1004 }
1005 IndexKind::Text => {
1006 for prop in &stmt.properties {
1007 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1008 }
1009 }
1010 IndexKind::Vector => {
1011 for prop in &stmt.properties {
1012 Self::create_vector_index_on_store(
1013 &active,
1014 &stmt.label,
1015 prop,
1016 stmt.options.dimensions,
1017 stmt.options.metric.as_deref(),
1018 )?;
1019 }
1020 }
1021 }
1022 #[cfg(feature = "wal")]
1023 for prop in &stmt.properties {
1024 wal_log!(
1025 self,
1026 WalRecord::CreateIndex {
1027 name: stmt.name.clone(),
1028 label: stmt.label.clone(),
1029 property: prop.clone(),
1030 index_type: index_type_str.to_string(),
1031 }
1032 );
1033 }
1034 Ok(QueryResult::status(format!(
1035 "Created {} index '{}'",
1036 index_type_str, stmt.name
1037 )))
1038 }
1039 SchemaStatement::DropIndex { name, if_exists } => {
1040 let dropped = self.active_lpg_store().drop_property_index(&name);
1042 if dropped || if_exists {
1043 if dropped {
1044 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1045 }
1046 Ok(QueryResult::status(if dropped {
1047 format!("Dropped index '{name}'")
1048 } else {
1049 "No change".to_string()
1050 }))
1051 } else {
1052 Err(Error::Query(QueryError::new(
1053 QueryErrorKind::Semantic,
1054 format!("Index '{name}' does not exist"),
1055 )))
1056 }
1057 }
1058 SchemaStatement::CreateConstraint(stmt) => {
1059 use crate::catalog::TypeConstraint;
1060 use grafeo_adapters::query::gql::ast::ConstraintKind;
1061 let kind_str = match stmt.constraint_kind {
1062 ConstraintKind::Unique => "unique",
1063 ConstraintKind::NodeKey => "node_key",
1064 ConstraintKind::NotNull => "not_null",
1065 ConstraintKind::Exists => "exists",
1066 };
1067 let constraint_name = stmt
1068 .name
1069 .clone()
1070 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1071
1072 match stmt.constraint_kind {
1074 ConstraintKind::Unique => {
1075 for prop in &stmt.properties {
1076 let label_id = self.catalog.get_or_create_label(&stmt.label);
1077 let prop_id = self.catalog.get_or_create_property_key(prop);
1078 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1079 }
1080 let _ = self.catalog.add_constraint_to_type(
1081 &stmt.label,
1082 TypeConstraint::Unique(stmt.properties.clone()),
1083 );
1084 }
1085 ConstraintKind::NodeKey => {
1086 for prop in &stmt.properties {
1087 let label_id = self.catalog.get_or_create_label(&stmt.label);
1088 let prop_id = self.catalog.get_or_create_property_key(prop);
1089 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1090 let _ = self.catalog.add_required_property(label_id, prop_id);
1091 }
1092 let _ = self.catalog.add_constraint_to_type(
1093 &stmt.label,
1094 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1095 );
1096 }
1097 ConstraintKind::NotNull | ConstraintKind::Exists => {
1098 for prop in &stmt.properties {
1099 let label_id = self.catalog.get_or_create_label(&stmt.label);
1100 let prop_id = self.catalog.get_or_create_property_key(prop);
1101 let _ = self.catalog.add_required_property(label_id, prop_id);
1102 let _ = self.catalog.add_constraint_to_type(
1103 &stmt.label,
1104 TypeConstraint::NotNull(prop.clone()),
1105 );
1106 }
1107 }
1108 }
1109
1110 wal_log!(
1111 self,
1112 WalRecord::CreateConstraint {
1113 name: constraint_name.clone(),
1114 label: stmt.label.clone(),
1115 properties: stmt.properties.clone(),
1116 kind: kind_str.to_string(),
1117 }
1118 );
1119 Ok(QueryResult::status(format!(
1120 "Created {kind_str} constraint '{constraint_name}'"
1121 )))
1122 }
1123 SchemaStatement::DropConstraint { name, if_exists } => {
1124 let _ = if_exists;
1125 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1126 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1127 }
1128 SchemaStatement::CreateGraphType(stmt) => {
1129 use crate::catalog::GraphTypeDefinition;
1130 use grafeo_adapters::query::gql::ast::InlineElementType;
1131
1132 let (mut node_types, mut edge_types, open) =
1134 if let Some(ref like_graph) = stmt.like_graph {
1135 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1137 if let Some(existing) = self
1138 .catalog
1139 .schema()
1140 .and_then(|s| s.get_graph_type(&type_name))
1141 {
1142 (
1143 existing.allowed_node_types.clone(),
1144 existing.allowed_edge_types.clone(),
1145 existing.open,
1146 )
1147 } else {
1148 (Vec::new(), Vec::new(), true)
1149 }
1150 } else {
1151 let nt = self.catalog.all_node_type_names();
1153 let et = self.catalog.all_edge_type_names();
1154 if nt.is_empty() && et.is_empty() {
1155 (Vec::new(), Vec::new(), true)
1156 } else {
1157 (nt, et, false)
1158 }
1159 }
1160 } else {
1161 (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1162 };
1163
1164 for inline in &stmt.inline_types {
1166 match inline {
1167 InlineElementType::Node {
1168 name,
1169 properties,
1170 key_labels,
1171 ..
1172 } => {
1173 let def = NodeTypeDefinition {
1174 name: name.clone(),
1175 properties: properties
1176 .iter()
1177 .map(|p| TypedProperty {
1178 name: p.name.clone(),
1179 data_type: PropertyDataType::from_type_name(&p.data_type),
1180 nullable: p.nullable,
1181 default_value: None,
1182 })
1183 .collect(),
1184 constraints: Vec::new(),
1185 parent_types: key_labels.clone(),
1186 };
1187 self.catalog.register_or_replace_node_type(def);
1189 #[cfg(feature = "wal")]
1190 {
1191 let props_for_wal: Vec<(String, String, bool)> = properties
1192 .iter()
1193 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1194 .collect();
1195 self.log_schema_wal(&WalRecord::CreateNodeType {
1196 name: name.clone(),
1197 properties: props_for_wal,
1198 constraints: Vec::new(),
1199 });
1200 }
1201 if !node_types.contains(name) {
1202 node_types.push(name.clone());
1203 }
1204 }
1205 InlineElementType::Edge {
1206 name,
1207 properties,
1208 source_node_types,
1209 target_node_types,
1210 ..
1211 } => {
1212 let def = EdgeTypeDefinition {
1213 name: name.clone(),
1214 properties: properties
1215 .iter()
1216 .map(|p| TypedProperty {
1217 name: p.name.clone(),
1218 data_type: PropertyDataType::from_type_name(&p.data_type),
1219 nullable: p.nullable,
1220 default_value: None,
1221 })
1222 .collect(),
1223 constraints: Vec::new(),
1224 source_node_types: source_node_types.clone(),
1225 target_node_types: target_node_types.clone(),
1226 };
1227 self.catalog.register_or_replace_edge_type_def(def);
1228 #[cfg(feature = "wal")]
1229 {
1230 let props_for_wal: Vec<(String, String, bool)> = properties
1231 .iter()
1232 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1233 .collect();
1234 self.log_schema_wal(&WalRecord::CreateEdgeType {
1235 name: name.clone(),
1236 properties: props_for_wal,
1237 constraints: Vec::new(),
1238 });
1239 }
1240 if !edge_types.contains(name) {
1241 edge_types.push(name.clone());
1242 }
1243 }
1244 }
1245 }
1246
1247 let def = GraphTypeDefinition {
1248 name: stmt.name.clone(),
1249 allowed_node_types: node_types.clone(),
1250 allowed_edge_types: edge_types.clone(),
1251 open,
1252 };
1253 let result = if stmt.or_replace {
1254 let _ = self.catalog.drop_graph_type(&stmt.name);
1256 self.catalog.register_graph_type(def)
1257 } else {
1258 self.catalog.register_graph_type(def)
1259 };
1260 match result {
1261 Ok(()) => {
1262 wal_log!(
1263 self,
1264 WalRecord::CreateGraphType {
1265 name: stmt.name.clone(),
1266 node_types,
1267 edge_types,
1268 open,
1269 }
1270 );
1271 Ok(QueryResult::status(format!(
1272 "Created graph type '{}'",
1273 stmt.name
1274 )))
1275 }
1276 Err(e) if stmt.if_not_exists => {
1277 let _ = e;
1278 Ok(QueryResult::status("No change"))
1279 }
1280 Err(e) => Err(Error::Query(QueryError::new(
1281 QueryErrorKind::Semantic,
1282 e.to_string(),
1283 ))),
1284 }
1285 }
1286 SchemaStatement::DropGraphType { name, if_exists } => {
1287 match self.catalog.drop_graph_type(&name) {
1288 Ok(()) => {
1289 wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1290 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1291 }
1292 Err(e) if if_exists => {
1293 let _ = e;
1294 Ok(QueryResult::status("No change"))
1295 }
1296 Err(e) => Err(Error::Query(QueryError::new(
1297 QueryErrorKind::Semantic,
1298 e.to_string(),
1299 ))),
1300 }
1301 }
1302 SchemaStatement::CreateSchema {
1303 name,
1304 if_not_exists,
1305 } => match self.catalog.register_schema_namespace(name.clone()) {
1306 Ok(()) => {
1307 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1308 Ok(QueryResult::status(format!("Created schema '{name}'")))
1309 }
1310 Err(e) if if_not_exists => {
1311 let _ = e;
1312 Ok(QueryResult::status("No change"))
1313 }
1314 Err(e) => Err(Error::Query(QueryError::new(
1315 QueryErrorKind::Semantic,
1316 e.to_string(),
1317 ))),
1318 },
1319 SchemaStatement::DropSchema { name, if_exists } => {
1320 let prefix = format!("{name}/");
1322 let has_graphs = self
1323 .store
1324 .graph_names()
1325 .iter()
1326 .any(|g| g.starts_with(&prefix));
1327 if has_graphs {
1328 return Err(Error::Query(QueryError::new(
1329 QueryErrorKind::Semantic,
1330 format!(
1331 "Schema '{name}' is not empty: drop all graphs in the schema first"
1332 ),
1333 )));
1334 }
1335 match self.catalog.drop_schema_namespace(&name) {
1336 Ok(()) => {
1337 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1338 let mut current = self.current_schema.lock();
1340 if current
1341 .as_deref()
1342 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1343 {
1344 *current = None;
1345 }
1346 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1347 }
1348 Err(e) if if_exists => {
1349 let _ = e;
1350 Ok(QueryResult::status("No change"))
1351 }
1352 Err(e) => Err(Error::Query(QueryError::new(
1353 QueryErrorKind::Semantic,
1354 e.to_string(),
1355 ))),
1356 }
1357 }
1358 SchemaStatement::AlterNodeType(stmt) => {
1359 use grafeo_adapters::query::gql::ast::TypeAlteration;
1360 let mut wal_alts = Vec::new();
1361 for alt in &stmt.alterations {
1362 match alt {
1363 TypeAlteration::AddProperty(prop) => {
1364 let typed = TypedProperty {
1365 name: prop.name.clone(),
1366 data_type: PropertyDataType::from_type_name(&prop.data_type),
1367 nullable: prop.nullable,
1368 default_value: prop
1369 .default_value
1370 .as_ref()
1371 .map(|s| parse_default_literal(s)),
1372 };
1373 self.catalog
1374 .alter_node_type_add_property(&stmt.name, typed)
1375 .map_err(|e| {
1376 Error::Query(QueryError::new(
1377 QueryErrorKind::Semantic,
1378 e.to_string(),
1379 ))
1380 })?;
1381 wal_alts.push((
1382 "add".to_string(),
1383 prop.name.clone(),
1384 prop.data_type.clone(),
1385 prop.nullable,
1386 ));
1387 }
1388 TypeAlteration::DropProperty(name) => {
1389 self.catalog
1390 .alter_node_type_drop_property(&stmt.name, name)
1391 .map_err(|e| {
1392 Error::Query(QueryError::new(
1393 QueryErrorKind::Semantic,
1394 e.to_string(),
1395 ))
1396 })?;
1397 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1398 }
1399 }
1400 }
1401 wal_log!(
1402 self,
1403 WalRecord::AlterNodeType {
1404 name: stmt.name.clone(),
1405 alterations: wal_alts,
1406 }
1407 );
1408 Ok(QueryResult::status(format!(
1409 "Altered node type '{}'",
1410 stmt.name
1411 )))
1412 }
1413 SchemaStatement::AlterEdgeType(stmt) => {
1414 use grafeo_adapters::query::gql::ast::TypeAlteration;
1415 let mut wal_alts = Vec::new();
1416 for alt in &stmt.alterations {
1417 match alt {
1418 TypeAlteration::AddProperty(prop) => {
1419 let typed = TypedProperty {
1420 name: prop.name.clone(),
1421 data_type: PropertyDataType::from_type_name(&prop.data_type),
1422 nullable: prop.nullable,
1423 default_value: prop
1424 .default_value
1425 .as_ref()
1426 .map(|s| parse_default_literal(s)),
1427 };
1428 self.catalog
1429 .alter_edge_type_add_property(&stmt.name, typed)
1430 .map_err(|e| {
1431 Error::Query(QueryError::new(
1432 QueryErrorKind::Semantic,
1433 e.to_string(),
1434 ))
1435 })?;
1436 wal_alts.push((
1437 "add".to_string(),
1438 prop.name.clone(),
1439 prop.data_type.clone(),
1440 prop.nullable,
1441 ));
1442 }
1443 TypeAlteration::DropProperty(name) => {
1444 self.catalog
1445 .alter_edge_type_drop_property(&stmt.name, name)
1446 .map_err(|e| {
1447 Error::Query(QueryError::new(
1448 QueryErrorKind::Semantic,
1449 e.to_string(),
1450 ))
1451 })?;
1452 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1453 }
1454 }
1455 }
1456 wal_log!(
1457 self,
1458 WalRecord::AlterEdgeType {
1459 name: stmt.name.clone(),
1460 alterations: wal_alts,
1461 }
1462 );
1463 Ok(QueryResult::status(format!(
1464 "Altered edge type '{}'",
1465 stmt.name
1466 )))
1467 }
1468 SchemaStatement::AlterGraphType(stmt) => {
1469 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1470 let mut wal_alts = Vec::new();
1471 for alt in &stmt.alterations {
1472 match alt {
1473 GraphTypeAlteration::AddNodeType(name) => {
1474 self.catalog
1475 .alter_graph_type_add_node_type(&stmt.name, name.clone())
1476 .map_err(|e| {
1477 Error::Query(QueryError::new(
1478 QueryErrorKind::Semantic,
1479 e.to_string(),
1480 ))
1481 })?;
1482 wal_alts.push(("add_node_type".to_string(), name.clone()));
1483 }
1484 GraphTypeAlteration::DropNodeType(name) => {
1485 self.catalog
1486 .alter_graph_type_drop_node_type(&stmt.name, name)
1487 .map_err(|e| {
1488 Error::Query(QueryError::new(
1489 QueryErrorKind::Semantic,
1490 e.to_string(),
1491 ))
1492 })?;
1493 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1494 }
1495 GraphTypeAlteration::AddEdgeType(name) => {
1496 self.catalog
1497 .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1498 .map_err(|e| {
1499 Error::Query(QueryError::new(
1500 QueryErrorKind::Semantic,
1501 e.to_string(),
1502 ))
1503 })?;
1504 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1505 }
1506 GraphTypeAlteration::DropEdgeType(name) => {
1507 self.catalog
1508 .alter_graph_type_drop_edge_type(&stmt.name, name)
1509 .map_err(|e| {
1510 Error::Query(QueryError::new(
1511 QueryErrorKind::Semantic,
1512 e.to_string(),
1513 ))
1514 })?;
1515 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1516 }
1517 }
1518 }
1519 wal_log!(
1520 self,
1521 WalRecord::AlterGraphType {
1522 name: stmt.name.clone(),
1523 alterations: wal_alts,
1524 }
1525 );
1526 Ok(QueryResult::status(format!(
1527 "Altered graph type '{}'",
1528 stmt.name
1529 )))
1530 }
1531 SchemaStatement::CreateProcedure(stmt) => {
1532 use crate::catalog::ProcedureDefinition;
1533
1534 let def = ProcedureDefinition {
1535 name: stmt.name.clone(),
1536 params: stmt
1537 .params
1538 .iter()
1539 .map(|p| (p.name.clone(), p.param_type.clone()))
1540 .collect(),
1541 returns: stmt
1542 .returns
1543 .iter()
1544 .map(|r| (r.name.clone(), r.return_type.clone()))
1545 .collect(),
1546 body: stmt.body.clone(),
1547 };
1548
1549 if stmt.or_replace {
1550 self.catalog.replace_procedure(def).map_err(|e| {
1551 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1552 })?;
1553 } else {
1554 match self.catalog.register_procedure(def) {
1555 Ok(()) => {}
1556 Err(_) if stmt.if_not_exists => {
1557 return Ok(QueryResult::empty());
1558 }
1559 Err(e) => {
1560 return Err(Error::Query(QueryError::new(
1561 QueryErrorKind::Semantic,
1562 e.to_string(),
1563 )));
1564 }
1565 }
1566 }
1567
1568 wal_log!(
1569 self,
1570 WalRecord::CreateProcedure {
1571 name: stmt.name.clone(),
1572 params: stmt
1573 .params
1574 .iter()
1575 .map(|p| (p.name.clone(), p.param_type.clone()))
1576 .collect(),
1577 returns: stmt
1578 .returns
1579 .iter()
1580 .map(|r| (r.name.clone(), r.return_type.clone()))
1581 .collect(),
1582 body: stmt.body,
1583 }
1584 );
1585 Ok(QueryResult::status(format!(
1586 "Created procedure '{}'",
1587 stmt.name
1588 )))
1589 }
1590 SchemaStatement::DropProcedure { name, if_exists } => {
1591 match self.catalog.drop_procedure(&name) {
1592 Ok(()) => {}
1593 Err(_) if if_exists => {
1594 return Ok(QueryResult::empty());
1595 }
1596 Err(e) => {
1597 return Err(Error::Query(QueryError::new(
1598 QueryErrorKind::Semantic,
1599 e.to_string(),
1600 )));
1601 }
1602 }
1603 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1604 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1605 }
1606 SchemaStatement::ShowIndexes => {
1607 return self.execute_show_indexes();
1608 }
1609 SchemaStatement::ShowConstraints => {
1610 return self.execute_show_constraints();
1611 }
1612 SchemaStatement::ShowNodeTypes => {
1613 return self.execute_show_node_types();
1614 }
1615 SchemaStatement::ShowEdgeTypes => {
1616 return self.execute_show_edge_types();
1617 }
1618 SchemaStatement::ShowGraphTypes => {
1619 return self.execute_show_graph_types();
1620 }
1621 SchemaStatement::ShowGraphType(name) => {
1622 return self.execute_show_graph_type(&name);
1623 }
1624 SchemaStatement::ShowCurrentGraphType => {
1625 return self.execute_show_current_graph_type();
1626 }
1627 SchemaStatement::ShowGraphs => {
1628 return self.execute_show_graphs();
1629 }
1630 SchemaStatement::ShowSchemas => {
1631 return self.execute_show_schemas();
1632 }
1633 };
1634
1635 if result.is_ok() {
1638 self.query_cache.clear();
1639 }
1640
1641 result
1642 }
1643
1644 #[cfg(all(feature = "gql", feature = "vector-index"))]
1646 fn create_vector_index_on_store(
1647 store: &LpgStore,
1648 label: &str,
1649 property: &str,
1650 dimensions: Option<usize>,
1651 metric: Option<&str>,
1652 ) -> Result<()> {
1653 use grafeo_common::types::{PropertyKey, Value};
1654 use grafeo_common::utils::error::Error;
1655 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1656
1657 let metric = match metric {
1658 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1659 Error::Internal(format!(
1660 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1661 ))
1662 })?,
1663 None => DistanceMetric::Cosine,
1664 };
1665
1666 let prop_key = PropertyKey::new(property);
1667 let mut found_dims: Option<usize> = dimensions;
1668 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1669
1670 for node in store.nodes_with_label(label) {
1671 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1672 if let Some(expected) = found_dims {
1673 if v.len() != expected {
1674 return Err(Error::Internal(format!(
1675 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1676 v.len(),
1677 node.id.0
1678 )));
1679 }
1680 } else {
1681 found_dims = Some(v.len());
1682 }
1683 vectors.push((node.id, v.to_vec()));
1684 }
1685 }
1686
1687 let Some(dims) = found_dims else {
1688 return Err(Error::Internal(format!(
1689 "No vector properties found on :{label}({property}) and no dimensions specified"
1690 )));
1691 };
1692
1693 let config = HnswConfig::new(dims, metric);
1694 let index = HnswIndex::with_capacity(config, vectors.len());
1695 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1696 for (node_id, vec) in &vectors {
1697 index.insert(*node_id, vec, &accessor);
1698 }
1699
1700 store.add_vector_index(label, property, Arc::new(index));
1701 Ok(())
1702 }
1703
1704 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1706 fn create_vector_index_on_store(
1707 _store: &LpgStore,
1708 _label: &str,
1709 _property: &str,
1710 _dimensions: Option<usize>,
1711 _metric: Option<&str>,
1712 ) -> Result<()> {
1713 Err(grafeo_common::utils::error::Error::Internal(
1714 "Vector index support requires the 'vector-index' feature".to_string(),
1715 ))
1716 }
1717
1718 #[cfg(all(feature = "gql", feature = "text-index"))]
1720 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1721 use grafeo_common::types::{PropertyKey, Value};
1722 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1723
1724 let mut index = InvertedIndex::new(BM25Config::default());
1725 let prop_key = PropertyKey::new(property);
1726
1727 let nodes = store.nodes_by_label(label);
1728 for node_id in nodes {
1729 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1730 index.insert(node_id, text.as_str());
1731 }
1732 }
1733
1734 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1735 Ok(())
1736 }
1737
1738 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1740 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1741 Err(grafeo_common::utils::error::Error::Internal(
1742 "Text index support requires the 'text-index' feature".to_string(),
1743 ))
1744 }
1745
1746 fn execute_show_indexes(&self) -> Result<QueryResult> {
1748 let indexes = self.catalog.all_indexes();
1749 let columns = vec![
1750 "name".to_string(),
1751 "type".to_string(),
1752 "label".to_string(),
1753 "property".to_string(),
1754 ];
1755 let rows: Vec<Vec<Value>> = indexes
1756 .into_iter()
1757 .map(|def| {
1758 let label_name = self
1759 .catalog
1760 .get_label_name(def.label)
1761 .unwrap_or_else(|| "?".into());
1762 let prop_name = self
1763 .catalog
1764 .get_property_key_name(def.property_key)
1765 .unwrap_or_else(|| "?".into());
1766 vec![
1767 Value::from(format!("idx_{}_{}", label_name, prop_name)),
1768 Value::from(format!("{:?}", def.index_type)),
1769 Value::from(&*label_name),
1770 Value::from(&*prop_name),
1771 ]
1772 })
1773 .collect();
1774 Ok(QueryResult {
1775 columns,
1776 column_types: Vec::new(),
1777 rows,
1778 ..QueryResult::empty()
1779 })
1780 }
1781
1782 fn execute_show_constraints(&self) -> Result<QueryResult> {
1784 Ok(QueryResult {
1787 columns: vec![
1788 "name".to_string(),
1789 "type".to_string(),
1790 "label".to_string(),
1791 "properties".to_string(),
1792 ],
1793 column_types: Vec::new(),
1794 rows: Vec::new(),
1795 ..QueryResult::empty()
1796 })
1797 }
1798
1799 fn execute_show_node_types(&self) -> Result<QueryResult> {
1801 let columns = vec![
1802 "name".to_string(),
1803 "properties".to_string(),
1804 "constraints".to_string(),
1805 "parents".to_string(),
1806 ];
1807 let type_names = self.catalog.all_node_type_names();
1808 let rows: Vec<Vec<Value>> = type_names
1809 .into_iter()
1810 .filter_map(|name| {
1811 let def = self.catalog.get_node_type(&name)?;
1812 let props: Vec<String> = def
1813 .properties
1814 .iter()
1815 .map(|p| {
1816 let nullable = if p.nullable { "" } else { " NOT NULL" };
1817 format!("{} {}{}", p.name, p.data_type, nullable)
1818 })
1819 .collect();
1820 let constraints: Vec<String> =
1821 def.constraints.iter().map(|c| format!("{c:?}")).collect();
1822 let parents = def.parent_types.join(", ");
1823 Some(vec![
1824 Value::from(name),
1825 Value::from(props.join(", ")),
1826 Value::from(constraints.join(", ")),
1827 Value::from(parents),
1828 ])
1829 })
1830 .collect();
1831 Ok(QueryResult {
1832 columns,
1833 column_types: Vec::new(),
1834 rows,
1835 ..QueryResult::empty()
1836 })
1837 }
1838
1839 fn execute_show_edge_types(&self) -> Result<QueryResult> {
1841 let columns = vec![
1842 "name".to_string(),
1843 "properties".to_string(),
1844 "source_types".to_string(),
1845 "target_types".to_string(),
1846 ];
1847 let type_names = self.catalog.all_edge_type_names();
1848 let rows: Vec<Vec<Value>> = type_names
1849 .into_iter()
1850 .filter_map(|name| {
1851 let def = self.catalog.get_edge_type_def(&name)?;
1852 let props: Vec<String> = def
1853 .properties
1854 .iter()
1855 .map(|p| {
1856 let nullable = if p.nullable { "" } else { " NOT NULL" };
1857 format!("{} {}{}", p.name, p.data_type, nullable)
1858 })
1859 .collect();
1860 let src = def.source_node_types.join(", ");
1861 let tgt = def.target_node_types.join(", ");
1862 Some(vec![
1863 Value::from(name),
1864 Value::from(props.join(", ")),
1865 Value::from(src),
1866 Value::from(tgt),
1867 ])
1868 })
1869 .collect();
1870 Ok(QueryResult {
1871 columns,
1872 column_types: Vec::new(),
1873 rows,
1874 ..QueryResult::empty()
1875 })
1876 }
1877
1878 fn execute_show_graph_types(&self) -> Result<QueryResult> {
1880 let columns = vec![
1881 "name".to_string(),
1882 "open".to_string(),
1883 "node_types".to_string(),
1884 "edge_types".to_string(),
1885 ];
1886 let type_names = self.catalog.all_graph_type_names();
1887 let rows: Vec<Vec<Value>> = type_names
1888 .into_iter()
1889 .filter_map(|name| {
1890 let def = self.catalog.get_graph_type_def(&name)?;
1891 Some(vec![
1892 Value::from(name),
1893 Value::from(def.open),
1894 Value::from(def.allowed_node_types.join(", ")),
1895 Value::from(def.allowed_edge_types.join(", ")),
1896 ])
1897 })
1898 .collect();
1899 Ok(QueryResult {
1900 columns,
1901 column_types: Vec::new(),
1902 rows,
1903 ..QueryResult::empty()
1904 })
1905 }
1906
1907 fn execute_show_graphs(&self) -> Result<QueryResult> {
1913 let schema = self.current_schema.lock().clone();
1914 let all_names = self.store.graph_names();
1915
1916 let mut names: Vec<String> = match &schema {
1917 Some(s) => {
1918 let prefix = format!("{s}/");
1919 all_names
1920 .into_iter()
1921 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1922 .collect()
1923 }
1924 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1925 };
1926 names.sort();
1927
1928 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1929 Ok(QueryResult {
1930 columns: vec!["name".to_string()],
1931 column_types: Vec::new(),
1932 rows,
1933 ..QueryResult::empty()
1934 })
1935 }
1936
1937 fn execute_show_schemas(&self) -> Result<QueryResult> {
1939 let mut names = self.catalog.schema_names();
1940 names.sort();
1941 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1942 Ok(QueryResult {
1943 columns: vec!["name".to_string()],
1944 column_types: Vec::new(),
1945 rows,
1946 ..QueryResult::empty()
1947 })
1948 }
1949
1950 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1952 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1953
1954 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1955 Error::Query(QueryError::new(
1956 QueryErrorKind::Semantic,
1957 format!("Graph type '{name}' not found"),
1958 ))
1959 })?;
1960
1961 let columns = vec![
1962 "name".to_string(),
1963 "open".to_string(),
1964 "node_types".to_string(),
1965 "edge_types".to_string(),
1966 ];
1967 let rows = vec![vec![
1968 Value::from(def.name),
1969 Value::from(def.open),
1970 Value::from(def.allowed_node_types.join(", ")),
1971 Value::from(def.allowed_edge_types.join(", ")),
1972 ]];
1973 Ok(QueryResult {
1974 columns,
1975 column_types: Vec::new(),
1976 rows,
1977 ..QueryResult::empty()
1978 })
1979 }
1980
1981 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1983 let graph_name = self
1984 .current_graph()
1985 .unwrap_or_else(|| "default".to_string());
1986 let columns = vec![
1987 "graph".to_string(),
1988 "graph_type".to_string(),
1989 "open".to_string(),
1990 "node_types".to_string(),
1991 "edge_types".to_string(),
1992 ];
1993
1994 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
1995 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
1996 {
1997 let rows = vec![vec![
1998 Value::from(graph_name),
1999 Value::from(type_name),
2000 Value::from(def.open),
2001 Value::from(def.allowed_node_types.join(", ")),
2002 Value::from(def.allowed_edge_types.join(", ")),
2003 ]];
2004 return Ok(QueryResult {
2005 columns,
2006 column_types: Vec::new(),
2007 rows,
2008 ..QueryResult::empty()
2009 });
2010 }
2011
2012 Ok(QueryResult {
2014 columns,
2015 column_types: Vec::new(),
2016 rows: vec![vec![
2017 Value::from(graph_name),
2018 Value::Null,
2019 Value::Null,
2020 Value::Null,
2021 Value::Null,
2022 ]],
2023 ..QueryResult::empty()
2024 })
2025 }
2026
2027 #[cfg(feature = "gql")]
2054 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2055 self.require_lpg("GQL")?;
2056
2057 use crate::query::{
2058 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2059 processor::QueryLanguage, translators::gql,
2060 };
2061
2062 let _span = tracing::info_span!(
2063 "grafeo::session::execute",
2064 language = "gql",
2065 query_len = query.len(),
2066 )
2067 .entered();
2068
2069 #[cfg(not(target_arch = "wasm32"))]
2070 let start_time = std::time::Instant::now();
2071
2072 let translation = gql::translate_full(query)?;
2074 let logical_plan = match translation {
2075 gql::GqlTranslationResult::SessionCommand(cmd) => {
2076 return self.execute_session_command(cmd);
2077 }
2078 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2079 if *self.read_only_tx.lock() {
2081 return Err(grafeo_common::utils::error::Error::Transaction(
2082 grafeo_common::utils::error::TransactionError::ReadOnly,
2083 ));
2084 }
2085 return self.execute_schema_command(cmd);
2086 }
2087 gql::GqlTranslationResult::Plan(plan) => {
2088 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2090 return Err(grafeo_common::utils::error::Error::Transaction(
2091 grafeo_common::utils::error::TransactionError::ReadOnly,
2092 ));
2093 }
2094 plan
2095 }
2096 };
2097
2098 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2100
2101 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2103 cached_plan
2104 } else {
2105 let mut binder = Binder::new();
2107 let _binding_context = binder.bind(&logical_plan)?;
2108
2109 let active = self.active_store();
2111 let optimizer = Optimizer::from_graph_store(&*active);
2112 let plan = optimizer.optimize(logical_plan)?;
2113
2114 self.query_cache.put_optimized(cache_key, plan.clone());
2116
2117 plan
2118 };
2119
2120 let active = self.active_store();
2122
2123 if optimized_plan.explain {
2125 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2126 let mut plan = optimized_plan;
2127 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2128 return Ok(explain_result(&plan));
2129 }
2130
2131 if optimized_plan.profile {
2133 let has_mutations = optimized_plan.root.has_mutations();
2134 return self.with_auto_commit(has_mutations, || {
2135 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2136 let planner = self.create_planner_for_store(
2137 Arc::clone(&active),
2138 viewing_epoch,
2139 transaction_id,
2140 );
2141 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2142
2143 let executor = Executor::with_columns(physical_plan.columns.clone())
2144 .with_deadline(self.query_deadline());
2145 let _result = executor.execute(physical_plan.operator.as_mut())?;
2146
2147 let total_time_ms;
2148 #[cfg(not(target_arch = "wasm32"))]
2149 {
2150 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2151 }
2152 #[cfg(target_arch = "wasm32")]
2153 {
2154 total_time_ms = 0.0;
2155 }
2156
2157 let profile_tree = crate::query::profile::build_profile_tree(
2158 &optimized_plan.root,
2159 &mut entries.into_iter(),
2160 );
2161 Ok(crate::query::profile::profile_result(
2162 &profile_tree,
2163 total_time_ms,
2164 ))
2165 });
2166 }
2167
2168 let has_mutations = optimized_plan.root.has_mutations();
2169
2170 let result = self.with_auto_commit(has_mutations, || {
2171 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2173
2174 let has_active_tx = self.current_transaction.lock().is_some();
2179 let read_only = !has_mutations && !has_active_tx;
2180 let planner = self.create_planner_for_store_with_read_only(
2181 Arc::clone(&active),
2182 viewing_epoch,
2183 transaction_id,
2184 read_only,
2185 );
2186 let mut physical_plan = planner.plan(&optimized_plan)?;
2187
2188 let executor = Executor::with_columns(physical_plan.columns.clone())
2190 .with_deadline(self.query_deadline());
2191 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2192
2193 let rows_scanned = result.rows.len() as u64;
2195 #[cfg(not(target_arch = "wasm32"))]
2196 {
2197 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2198 result.execution_time_ms = Some(elapsed_ms);
2199 }
2200 result.rows_scanned = Some(rows_scanned);
2201
2202 Ok(result)
2203 });
2204
2205 #[cfg(feature = "metrics")]
2207 {
2208 #[cfg(not(target_arch = "wasm32"))]
2209 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2210 #[cfg(target_arch = "wasm32")]
2211 let elapsed_ms = None;
2212 self.record_query_metrics("gql", elapsed_ms, &result);
2213 }
2214
2215 result
2216 }
2217
2218 #[cfg(feature = "gql")]
2227 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2228 let previous = self.viewing_epoch_override.lock().replace(epoch);
2229 let result = self.execute(query);
2230 *self.viewing_epoch_override.lock() = previous;
2231 result
2232 }
2233
2234 #[cfg(feature = "gql")]
2240 pub fn execute_with_params(
2241 &self,
2242 query: &str,
2243 params: std::collections::HashMap<String, Value>,
2244 ) -> Result<QueryResult> {
2245 self.require_lpg("GQL")?;
2246
2247 use crate::query::processor::{QueryLanguage, QueryProcessor};
2248
2249 let has_mutations = Self::query_looks_like_mutation(query);
2250 let active = self.active_store();
2251
2252 self.with_auto_commit(has_mutations, || {
2253 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2255
2256 let processor = QueryProcessor::for_graph_store_with_transaction(
2258 Arc::clone(&active),
2259 Arc::clone(&self.transaction_manager),
2260 )?;
2261
2262 let processor = if let Some(transaction_id) = transaction_id {
2264 processor.with_transaction_context(viewing_epoch, transaction_id)
2265 } else {
2266 processor
2267 };
2268
2269 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2270 })
2271 }
2272
2273 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2279 pub fn execute_with_params(
2280 &self,
2281 _query: &str,
2282 _params: std::collections::HashMap<String, Value>,
2283 ) -> Result<QueryResult> {
2284 Err(grafeo_common::utils::error::Error::Internal(
2285 "No query language enabled".to_string(),
2286 ))
2287 }
2288
2289 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2295 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2296 Err(grafeo_common::utils::error::Error::Internal(
2297 "No query language enabled".to_string(),
2298 ))
2299 }
2300
2301 #[cfg(feature = "cypher")]
2307 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2308 use crate::query::{
2309 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2310 processor::QueryLanguage, translators::cypher,
2311 };
2312 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2313
2314 let translation = cypher::translate_full(query)?;
2316 match translation {
2317 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2318 if *self.read_only_tx.lock() {
2319 return Err(GrafeoError::Query(QueryError::new(
2320 QueryErrorKind::Semantic,
2321 "Cannot execute schema DDL in a read-only transaction",
2322 )));
2323 }
2324 return self.execute_schema_command(cmd);
2325 }
2326 cypher::CypherTranslationResult::ShowIndexes => {
2327 return self.execute_show_indexes();
2328 }
2329 cypher::CypherTranslationResult::ShowConstraints => {
2330 return self.execute_show_constraints();
2331 }
2332 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2333 return self.execute_show_current_graph_type();
2334 }
2335 cypher::CypherTranslationResult::Plan(_) => {
2336 }
2338 }
2339
2340 #[cfg(not(target_arch = "wasm32"))]
2341 let start_time = std::time::Instant::now();
2342
2343 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2345
2346 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2348 cached_plan
2349 } else {
2350 let logical_plan = cypher::translate(query)?;
2352
2353 let mut binder = Binder::new();
2355 let _binding_context = binder.bind(&logical_plan)?;
2356
2357 let active = self.active_store();
2359 let optimizer = Optimizer::from_graph_store(&*active);
2360 let plan = optimizer.optimize(logical_plan)?;
2361
2362 self.query_cache.put_optimized(cache_key, plan.clone());
2364
2365 plan
2366 };
2367
2368 let active = self.active_store();
2370
2371 if optimized_plan.explain {
2373 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2374 let mut plan = optimized_plan;
2375 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2376 return Ok(explain_result(&plan));
2377 }
2378
2379 if optimized_plan.profile {
2381 let has_mutations = optimized_plan.root.has_mutations();
2382 return self.with_auto_commit(has_mutations, || {
2383 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2384 let planner = self.create_planner_for_store(
2385 Arc::clone(&active),
2386 viewing_epoch,
2387 transaction_id,
2388 );
2389 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2390
2391 let executor = Executor::with_columns(physical_plan.columns.clone())
2392 .with_deadline(self.query_deadline());
2393 let _result = executor.execute(physical_plan.operator.as_mut())?;
2394
2395 let total_time_ms;
2396 #[cfg(not(target_arch = "wasm32"))]
2397 {
2398 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2399 }
2400 #[cfg(target_arch = "wasm32")]
2401 {
2402 total_time_ms = 0.0;
2403 }
2404
2405 let profile_tree = crate::query::profile::build_profile_tree(
2406 &optimized_plan.root,
2407 &mut entries.into_iter(),
2408 );
2409 Ok(crate::query::profile::profile_result(
2410 &profile_tree,
2411 total_time_ms,
2412 ))
2413 });
2414 }
2415
2416 let has_mutations = optimized_plan.root.has_mutations();
2417
2418 let result = self.with_auto_commit(has_mutations, || {
2419 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2421
2422 let planner =
2424 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2425 let mut physical_plan = planner.plan(&optimized_plan)?;
2426
2427 let executor = Executor::with_columns(physical_plan.columns.clone())
2429 .with_deadline(self.query_deadline());
2430 executor.execute(physical_plan.operator.as_mut())
2431 });
2432
2433 #[cfg(feature = "metrics")]
2434 {
2435 #[cfg(not(target_arch = "wasm32"))]
2436 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2437 #[cfg(target_arch = "wasm32")]
2438 let elapsed_ms = None;
2439 self.record_query_metrics("cypher", elapsed_ms, &result);
2440 }
2441
2442 result
2443 }
2444
2445 #[cfg(feature = "gremlin")]
2469 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2470 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2471
2472 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2473 let start_time = Instant::now();
2474
2475 let logical_plan = gremlin::translate(query)?;
2477
2478 let mut binder = Binder::new();
2480 let _binding_context = binder.bind(&logical_plan)?;
2481
2482 let active = self.active_store();
2484 let optimizer = Optimizer::from_graph_store(&*active);
2485 let optimized_plan = optimizer.optimize(logical_plan)?;
2486
2487 let has_mutations = optimized_plan.root.has_mutations();
2488
2489 let result = self.with_auto_commit(has_mutations, || {
2490 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2492
2493 let planner =
2495 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2496 let mut physical_plan = planner.plan(&optimized_plan)?;
2497
2498 let executor = Executor::with_columns(physical_plan.columns.clone())
2500 .with_deadline(self.query_deadline());
2501 executor.execute(physical_plan.operator.as_mut())
2502 });
2503
2504 #[cfg(feature = "metrics")]
2505 {
2506 #[cfg(not(target_arch = "wasm32"))]
2507 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2508 #[cfg(target_arch = "wasm32")]
2509 let elapsed_ms = None;
2510 self.record_query_metrics("gremlin", elapsed_ms, &result);
2511 }
2512
2513 result
2514 }
2515
2516 #[cfg(feature = "gremlin")]
2522 pub fn execute_gremlin_with_params(
2523 &self,
2524 query: &str,
2525 params: std::collections::HashMap<String, Value>,
2526 ) -> Result<QueryResult> {
2527 use crate::query::processor::{QueryLanguage, QueryProcessor};
2528
2529 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2530 let start_time = Instant::now();
2531
2532 let has_mutations = Self::query_looks_like_mutation(query);
2533 let active = self.active_store();
2534
2535 let result = self.with_auto_commit(has_mutations, || {
2536 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2537 let processor = QueryProcessor::for_graph_store_with_transaction(
2538 Arc::clone(&active),
2539 Arc::clone(&self.transaction_manager),
2540 )?;
2541 let processor = if let Some(transaction_id) = transaction_id {
2542 processor.with_transaction_context(viewing_epoch, transaction_id)
2543 } else {
2544 processor
2545 };
2546 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2547 });
2548
2549 #[cfg(feature = "metrics")]
2550 {
2551 #[cfg(not(target_arch = "wasm32"))]
2552 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2553 #[cfg(target_arch = "wasm32")]
2554 let elapsed_ms = None;
2555 self.record_query_metrics("gremlin", elapsed_ms, &result);
2556 }
2557
2558 result
2559 }
2560
2561 #[cfg(feature = "graphql")]
2585 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2586 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2587
2588 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2589 let start_time = Instant::now();
2590
2591 let logical_plan = graphql::translate(query)?;
2592 let mut binder = Binder::new();
2593 let _binding_context = binder.bind(&logical_plan)?;
2594
2595 let active = self.active_store();
2596 let optimizer = Optimizer::from_graph_store(&*active);
2597 let optimized_plan = optimizer.optimize(logical_plan)?;
2598 let has_mutations = optimized_plan.root.has_mutations();
2599
2600 let result = self.with_auto_commit(has_mutations, || {
2601 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2602 let planner =
2603 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2604 let mut physical_plan = planner.plan(&optimized_plan)?;
2605 let executor = Executor::with_columns(physical_plan.columns.clone())
2606 .with_deadline(self.query_deadline());
2607 executor.execute(physical_plan.operator.as_mut())
2608 });
2609
2610 #[cfg(feature = "metrics")]
2611 {
2612 #[cfg(not(target_arch = "wasm32"))]
2613 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2614 #[cfg(target_arch = "wasm32")]
2615 let elapsed_ms = None;
2616 self.record_query_metrics("graphql", elapsed_ms, &result);
2617 }
2618
2619 result
2620 }
2621
2622 #[cfg(feature = "graphql")]
2628 pub fn execute_graphql_with_params(
2629 &self,
2630 query: &str,
2631 params: std::collections::HashMap<String, Value>,
2632 ) -> Result<QueryResult> {
2633 use crate::query::processor::{QueryLanguage, QueryProcessor};
2634
2635 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2636 let start_time = Instant::now();
2637
2638 let has_mutations = Self::query_looks_like_mutation(query);
2639 let active = self.active_store();
2640
2641 let result = self.with_auto_commit(has_mutations, || {
2642 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2643 let processor = QueryProcessor::for_graph_store_with_transaction(
2644 Arc::clone(&active),
2645 Arc::clone(&self.transaction_manager),
2646 )?;
2647 let processor = if let Some(transaction_id) = transaction_id {
2648 processor.with_transaction_context(viewing_epoch, transaction_id)
2649 } else {
2650 processor
2651 };
2652 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2653 });
2654
2655 #[cfg(feature = "metrics")]
2656 {
2657 #[cfg(not(target_arch = "wasm32"))]
2658 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2659 #[cfg(target_arch = "wasm32")]
2660 let elapsed_ms = None;
2661 self.record_query_metrics("graphql", elapsed_ms, &result);
2662 }
2663
2664 result
2665 }
2666
2667 #[cfg(feature = "sql-pgq")]
2692 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2693 use crate::query::{
2694 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2695 processor::QueryLanguage, translators::sql_pgq,
2696 };
2697
2698 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2699 let start_time = Instant::now();
2700
2701 let logical_plan = sql_pgq::translate(query)?;
2703
2704 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2706 return Ok(QueryResult {
2707 columns: vec!["status".into()],
2708 column_types: vec![grafeo_common::types::LogicalType::String],
2709 rows: vec![vec![Value::from(format!(
2710 "Property graph '{}' created ({} node tables, {} edge tables)",
2711 cpg.name,
2712 cpg.node_tables.len(),
2713 cpg.edge_tables.len()
2714 ))]],
2715 execution_time_ms: None,
2716 rows_scanned: None,
2717 status_message: None,
2718 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2719 });
2720 }
2721
2722 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2723
2724 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2725 cached_plan
2726 } else {
2727 let mut binder = Binder::new();
2728 let _binding_context = binder.bind(&logical_plan)?;
2729 let active = self.active_store();
2730 let optimizer = Optimizer::from_graph_store(&*active);
2731 let plan = optimizer.optimize(logical_plan)?;
2732 self.query_cache.put_optimized(cache_key, plan.clone());
2733 plan
2734 };
2735
2736 let active = self.active_store();
2737 let has_mutations = optimized_plan.root.has_mutations();
2738
2739 let result = self.with_auto_commit(has_mutations, || {
2740 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2741 let planner =
2742 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2743 let mut physical_plan = planner.plan(&optimized_plan)?;
2744 let executor = Executor::with_columns(physical_plan.columns.clone())
2745 .with_deadline(self.query_deadline());
2746 executor.execute(physical_plan.operator.as_mut())
2747 });
2748
2749 #[cfg(feature = "metrics")]
2750 {
2751 #[cfg(not(target_arch = "wasm32"))]
2752 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2753 #[cfg(target_arch = "wasm32")]
2754 let elapsed_ms = None;
2755 self.record_query_metrics("sql", elapsed_ms, &result);
2756 }
2757
2758 result
2759 }
2760
2761 #[cfg(feature = "sql-pgq")]
2767 pub fn execute_sql_with_params(
2768 &self,
2769 query: &str,
2770 params: std::collections::HashMap<String, Value>,
2771 ) -> Result<QueryResult> {
2772 use crate::query::processor::{QueryLanguage, QueryProcessor};
2773
2774 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2775 let start_time = Instant::now();
2776
2777 let has_mutations = Self::query_looks_like_mutation(query);
2778 let active = self.active_store();
2779
2780 let result = self.with_auto_commit(has_mutations, || {
2781 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2782 let processor = QueryProcessor::for_graph_store_with_transaction(
2783 Arc::clone(&active),
2784 Arc::clone(&self.transaction_manager),
2785 )?;
2786 let processor = if let Some(transaction_id) = transaction_id {
2787 processor.with_transaction_context(viewing_epoch, transaction_id)
2788 } else {
2789 processor
2790 };
2791 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
2792 });
2793
2794 #[cfg(feature = "metrics")]
2795 {
2796 #[cfg(not(target_arch = "wasm32"))]
2797 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2798 #[cfg(target_arch = "wasm32")]
2799 let elapsed_ms = None;
2800 self.record_query_metrics("sql", elapsed_ms, &result);
2801 }
2802
2803 result
2804 }
2805
2806 pub fn execute_language(
2815 &self,
2816 query: &str,
2817 language: &str,
2818 params: Option<std::collections::HashMap<String, Value>>,
2819 ) -> Result<QueryResult> {
2820 let _span = tracing::info_span!(
2821 "grafeo::session::execute",
2822 language,
2823 query_len = query.len(),
2824 )
2825 .entered();
2826 match language {
2827 "gql" => {
2828 if let Some(p) = params {
2829 self.execute_with_params(query, p)
2830 } else {
2831 self.execute(query)
2832 }
2833 }
2834 #[cfg(feature = "cypher")]
2835 "cypher" => {
2836 if let Some(p) = params {
2837 use crate::query::processor::{QueryLanguage, QueryProcessor};
2838
2839 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2840 let start_time = Instant::now();
2841
2842 let has_mutations = Self::query_looks_like_mutation(query);
2843 let active = self.active_store();
2844 let result = self.with_auto_commit(has_mutations, || {
2845 let processor = QueryProcessor::for_graph_store_with_transaction(
2846 Arc::clone(&active),
2847 Arc::clone(&self.transaction_manager),
2848 )?;
2849 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2850 let processor = if let Some(transaction_id) = transaction_id {
2851 processor.with_transaction_context(viewing_epoch, transaction_id)
2852 } else {
2853 processor
2854 };
2855 processor.process(query, QueryLanguage::Cypher, Some(&p))
2856 });
2857
2858 #[cfg(feature = "metrics")]
2859 {
2860 #[cfg(not(target_arch = "wasm32"))]
2861 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2862 #[cfg(target_arch = "wasm32")]
2863 let elapsed_ms = None;
2864 self.record_query_metrics("cypher", elapsed_ms, &result);
2865 }
2866
2867 result
2868 } else {
2869 self.execute_cypher(query)
2870 }
2871 }
2872 #[cfg(feature = "gremlin")]
2873 "gremlin" => {
2874 if let Some(p) = params {
2875 self.execute_gremlin_with_params(query, p)
2876 } else {
2877 self.execute_gremlin(query)
2878 }
2879 }
2880 #[cfg(feature = "graphql")]
2881 "graphql" => {
2882 if let Some(p) = params {
2883 self.execute_graphql_with_params(query, p)
2884 } else {
2885 self.execute_graphql(query)
2886 }
2887 }
2888 #[cfg(all(feature = "graphql", feature = "rdf"))]
2889 "graphql-rdf" => {
2890 if let Some(p) = params {
2891 self.execute_graphql_rdf_with_params(query, p)
2892 } else {
2893 self.execute_graphql_rdf(query)
2894 }
2895 }
2896 #[cfg(feature = "sql-pgq")]
2897 "sql" | "sql-pgq" => {
2898 if let Some(p) = params {
2899 self.execute_sql_with_params(query, p)
2900 } else {
2901 self.execute_sql(query)
2902 }
2903 }
2904 #[cfg(all(feature = "sparql", feature = "rdf"))]
2905 "sparql" => {
2906 if let Some(p) = params {
2907 self.execute_sparql_with_params(query, p)
2908 } else {
2909 self.execute_sparql(query)
2910 }
2911 }
2912 other => Err(grafeo_common::utils::error::Error::Query(
2913 grafeo_common::utils::error::QueryError::new(
2914 grafeo_common::utils::error::QueryErrorKind::Semantic,
2915 format!("Unknown query language: '{other}'"),
2916 ),
2917 )),
2918 }
2919 }
2920
2921 pub fn clear_plan_cache(&self) {
2948 self.query_cache.clear();
2949 }
2950
2951 pub fn begin_transaction(&mut self) -> Result<()> {
2959 self.begin_transaction_inner(false, None)
2960 }
2961
2962 pub fn begin_transaction_with_isolation(
2970 &mut self,
2971 isolation_level: crate::transaction::IsolationLevel,
2972 ) -> Result<()> {
2973 self.begin_transaction_inner(false, Some(isolation_level))
2974 }
2975
2976 fn begin_transaction_inner(
2978 &self,
2979 read_only: bool,
2980 isolation_level: Option<crate::transaction::IsolationLevel>,
2981 ) -> Result<()> {
2982 let _span = tracing::debug_span!("grafeo::tx::begin", read_only).entered();
2983 let mut current = self.current_transaction.lock();
2984 if current.is_some() {
2985 drop(current);
2987 let mut depth = self.transaction_nesting_depth.lock();
2988 *depth += 1;
2989 let sp_name = format!("_nested_tx_{}", *depth);
2990 self.savepoint(&sp_name)?;
2991 return Ok(());
2992 }
2993
2994 let active = self.active_lpg_store();
2995 self.transaction_start_node_count
2996 .store(active.node_count(), Ordering::Relaxed);
2997 self.transaction_start_edge_count
2998 .store(active.edge_count(), Ordering::Relaxed);
2999 let transaction_id = if let Some(level) = isolation_level {
3000 self.transaction_manager.begin_with_isolation(level)
3001 } else {
3002 self.transaction_manager.begin()
3003 };
3004 *current = Some(transaction_id);
3005 *self.read_only_tx.lock() = read_only;
3006
3007 let key = self.active_graph_storage_key();
3010 let mut touched = self.touched_graphs.lock();
3011 touched.clear();
3012 touched.push(key);
3013
3014 #[cfg(feature = "metrics")]
3015 {
3016 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3017 #[cfg(not(target_arch = "wasm32"))]
3018 {
3019 *self.tx_start_time.lock() = Some(Instant::now());
3020 }
3021 }
3022
3023 Ok(())
3024 }
3025
3026 pub fn commit(&mut self) -> Result<()> {
3034 self.commit_inner()
3035 }
3036
3037 fn commit_inner(&self) -> Result<()> {
3039 let _span = tracing::debug_span!("grafeo::tx::commit").entered();
3040 {
3042 let mut depth = self.transaction_nesting_depth.lock();
3043 if *depth > 0 {
3044 let sp_name = format!("_nested_tx_{depth}");
3045 *depth -= 1;
3046 drop(depth);
3047 return self.release_savepoint(&sp_name);
3048 }
3049 }
3050
3051 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3052 grafeo_common::utils::error::Error::Transaction(
3053 grafeo_common::utils::error::TransactionError::InvalidState(
3054 "No active transaction".to_string(),
3055 ),
3056 )
3057 })?;
3058
3059 let touched = self.touched_graphs.lock().clone();
3062 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3063 Ok(epoch) => epoch,
3064 Err(e) => {
3065 for graph_name in &touched {
3067 let store = self.resolve_store(graph_name);
3068 store.rollback_transaction_properties(transaction_id);
3069 }
3070 #[cfg(feature = "rdf")]
3071 self.rollback_rdf_transaction(transaction_id);
3072 *self.read_only_tx.lock() = false;
3073 self.savepoints.lock().clear();
3074 self.touched_graphs.lock().clear();
3075 #[cfg(feature = "metrics")]
3076 {
3077 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3078 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3079 #[cfg(not(target_arch = "wasm32"))]
3080 if let Some(start) = self.tx_start_time.lock().take() {
3081 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3082 crate::metrics::record_metric!(
3083 self.metrics,
3084 tx_duration,
3085 observe duration_ms
3086 );
3087 }
3088 }
3089 return Err(e);
3090 }
3091 };
3092
3093 for graph_name in &touched {
3095 let store = self.resolve_store(graph_name);
3096 store.finalize_version_epochs(transaction_id, commit_epoch);
3097 }
3098
3099 #[cfg(feature = "rdf")]
3101 self.commit_rdf_transaction(transaction_id);
3102
3103 for graph_name in &touched {
3104 let store = self.resolve_store(graph_name);
3105 store.commit_transaction_properties(transaction_id);
3106 }
3107
3108 let current_epoch = self.transaction_manager.current_epoch();
3111 for graph_name in &touched {
3112 let store = self.resolve_store(graph_name);
3113 store.sync_epoch(current_epoch);
3114 }
3115
3116 *self.read_only_tx.lock() = false;
3118 self.savepoints.lock().clear();
3119 self.touched_graphs.lock().clear();
3120
3121 if self.gc_interval > 0 {
3123 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3124 if count.is_multiple_of(self.gc_interval) {
3125 let min_epoch = self.transaction_manager.min_active_epoch();
3126 for graph_name in &touched {
3127 let store = self.resolve_store(graph_name);
3128 store.gc_versions(min_epoch);
3129 }
3130 self.transaction_manager.gc();
3131 #[cfg(feature = "metrics")]
3132 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3133 }
3134 }
3135
3136 #[cfg(feature = "metrics")]
3137 {
3138 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3139 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3140 #[cfg(not(target_arch = "wasm32"))]
3141 if let Some(start) = self.tx_start_time.lock().take() {
3142 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3143 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3144 }
3145 }
3146
3147 Ok(())
3148 }
3149
3150 pub fn rollback(&mut self) -> Result<()> {
3174 self.rollback_inner()
3175 }
3176
3177 fn rollback_inner(&self) -> Result<()> {
3179 let _span = tracing::debug_span!("grafeo::tx::rollback").entered();
3180 {
3182 let mut depth = self.transaction_nesting_depth.lock();
3183 if *depth > 0 {
3184 let sp_name = format!("_nested_tx_{depth}");
3185 *depth -= 1;
3186 drop(depth);
3187 return self.rollback_to_savepoint(&sp_name);
3188 }
3189 }
3190
3191 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3192 grafeo_common::utils::error::Error::Transaction(
3193 grafeo_common::utils::error::TransactionError::InvalidState(
3194 "No active transaction".to_string(),
3195 ),
3196 )
3197 })?;
3198
3199 *self.read_only_tx.lock() = false;
3201
3202 let touched = self.touched_graphs.lock().clone();
3204 for graph_name in &touched {
3205 let store = self.resolve_store(graph_name);
3206 store.discard_uncommitted_versions(transaction_id);
3207 }
3208
3209 #[cfg(feature = "rdf")]
3211 self.rollback_rdf_transaction(transaction_id);
3212
3213 self.savepoints.lock().clear();
3215 self.touched_graphs.lock().clear();
3216
3217 let result = self.transaction_manager.abort(transaction_id);
3219
3220 #[cfg(feature = "metrics")]
3221 if result.is_ok() {
3222 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3223 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3224 #[cfg(not(target_arch = "wasm32"))]
3225 if let Some(start) = self.tx_start_time.lock().take() {
3226 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3227 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3228 }
3229 }
3230
3231 result
3232 }
3233
3234 pub fn savepoint(&self, name: &str) -> Result<()> {
3244 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3245 grafeo_common::utils::error::Error::Transaction(
3246 grafeo_common::utils::error::TransactionError::InvalidState(
3247 "No active transaction".to_string(),
3248 ),
3249 )
3250 })?;
3251
3252 let touched = self.touched_graphs.lock().clone();
3254 let graph_snapshots: Vec<GraphSavepoint> = touched
3255 .iter()
3256 .map(|graph_name| {
3257 let store = self.resolve_store(graph_name);
3258 GraphSavepoint {
3259 graph_name: graph_name.clone(),
3260 next_node_id: store.peek_next_node_id(),
3261 next_edge_id: store.peek_next_edge_id(),
3262 undo_log_position: store.property_undo_log_position(tx_id),
3263 }
3264 })
3265 .collect();
3266
3267 self.savepoints.lock().push(SavepointState {
3268 name: name.to_string(),
3269 graph_snapshots,
3270 active_graph: self.current_graph.lock().clone(),
3271 });
3272 Ok(())
3273 }
3274
3275 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3284 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3285 grafeo_common::utils::error::Error::Transaction(
3286 grafeo_common::utils::error::TransactionError::InvalidState(
3287 "No active transaction".to_string(),
3288 ),
3289 )
3290 })?;
3291
3292 let mut savepoints = self.savepoints.lock();
3293
3294 let pos = savepoints
3296 .iter()
3297 .rposition(|sp| sp.name == name)
3298 .ok_or_else(|| {
3299 grafeo_common::utils::error::Error::Transaction(
3300 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3301 "Savepoint '{name}' not found"
3302 )),
3303 )
3304 })?;
3305
3306 let sp_state = savepoints[pos].clone();
3307
3308 savepoints.truncate(pos);
3310 drop(savepoints);
3311
3312 for gs in &sp_state.graph_snapshots {
3314 let store = self.resolve_store(&gs.graph_name);
3315
3316 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3318
3319 let current_next_node = store.peek_next_node_id();
3321 let current_next_edge = store.peek_next_edge_id();
3322
3323 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3324 .map(NodeId::new)
3325 .collect();
3326 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3327 .map(EdgeId::new)
3328 .collect();
3329
3330 if !node_ids.is_empty() || !edge_ids.is_empty() {
3331 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3332 }
3333 }
3334
3335 let touched = self.touched_graphs.lock().clone();
3339 for graph_name in &touched {
3340 let already_captured = sp_state
3341 .graph_snapshots
3342 .iter()
3343 .any(|gs| gs.graph_name == *graph_name);
3344 if !already_captured {
3345 let store = self.resolve_store(graph_name);
3346 store.discard_uncommitted_versions(transaction_id);
3347 }
3348 }
3349
3350 let mut touched = self.touched_graphs.lock();
3352 touched.clear();
3353 for gs in &sp_state.graph_snapshots {
3354 if !touched.contains(&gs.graph_name) {
3355 touched.push(gs.graph_name.clone());
3356 }
3357 }
3358
3359 Ok(())
3360 }
3361
3362 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3368 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3369 grafeo_common::utils::error::Error::Transaction(
3370 grafeo_common::utils::error::TransactionError::InvalidState(
3371 "No active transaction".to_string(),
3372 ),
3373 )
3374 })?;
3375
3376 let mut savepoints = self.savepoints.lock();
3377 let pos = savepoints
3378 .iter()
3379 .rposition(|sp| sp.name == name)
3380 .ok_or_else(|| {
3381 grafeo_common::utils::error::Error::Transaction(
3382 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3383 "Savepoint '{name}' not found"
3384 )),
3385 )
3386 })?;
3387 savepoints.remove(pos);
3388 Ok(())
3389 }
3390
3391 #[must_use]
3393 pub fn in_transaction(&self) -> bool {
3394 self.current_transaction.lock().is_some()
3395 }
3396
3397 #[must_use]
3399 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3400 *self.current_transaction.lock()
3401 }
3402
3403 #[must_use]
3405 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3406 &self.transaction_manager
3407 }
3408
3409 #[must_use]
3411 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3412 (
3413 self.transaction_start_node_count.load(Ordering::Relaxed),
3414 self.active_lpg_store().node_count(),
3415 )
3416 }
3417
3418 #[must_use]
3420 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3421 (
3422 self.transaction_start_edge_count.load(Ordering::Relaxed),
3423 self.active_lpg_store().edge_count(),
3424 )
3425 }
3426
3427 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3461 crate::transaction::PreparedCommit::new(self)
3462 }
3463
3464 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3466 self.auto_commit = auto_commit;
3467 }
3468
3469 #[must_use]
3471 pub fn auto_commit(&self) -> bool {
3472 self.auto_commit
3473 }
3474
3475 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3480 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3481 }
3482
3483 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3486 where
3487 F: FnOnce() -> Result<QueryResult>,
3488 {
3489 if self.needs_auto_commit(has_mutations) {
3490 self.begin_transaction_inner(false, None)?;
3491 match body() {
3492 Ok(result) => {
3493 self.commit_inner()?;
3494 Ok(result)
3495 }
3496 Err(e) => {
3497 let _ = self.rollback_inner();
3498 Err(e)
3499 }
3500 }
3501 } else {
3502 body()
3503 }
3504 }
3505
3506 fn query_looks_like_mutation(query: &str) -> bool {
3512 let upper = query.to_ascii_uppercase();
3513 upper.contains("INSERT")
3514 || upper.contains("CREATE")
3515 || upper.contains("DELETE")
3516 || upper.contains("MERGE")
3517 || upper.contains("SET")
3518 || upper.contains("REMOVE")
3519 || upper.contains("DROP")
3520 || upper.contains("ALTER")
3521 }
3522
3523 #[must_use]
3525 fn query_deadline(&self) -> Option<Instant> {
3526 #[cfg(not(target_arch = "wasm32"))]
3527 {
3528 self.query_timeout.map(|d| Instant::now() + d)
3529 }
3530 #[cfg(target_arch = "wasm32")]
3531 {
3532 let _ = &self.query_timeout;
3533 None
3534 }
3535 }
3536
3537 #[cfg(feature = "metrics")]
3543 fn record_query_metrics(
3544 &self,
3545 language: &str,
3546 elapsed_ms: Option<f64>,
3547 result: &Result<crate::database::QueryResult>,
3548 ) {
3549 use crate::metrics::record_metric;
3550
3551 record_metric!(self.metrics, query_count, inc);
3552 if let Some(ref reg) = self.metrics {
3553 reg.query_count_by_language.increment(language);
3554 }
3555 if let Some(ms) = elapsed_ms {
3556 record_metric!(self.metrics, query_latency, observe ms);
3557 }
3558 match result {
3559 Ok(r) => {
3560 let returned = r.rows.len() as u64;
3561 record_metric!(self.metrics, rows_returned, add returned);
3562 if let Some(scanned) = r.rows_scanned {
3563 record_metric!(self.metrics, rows_scanned, add scanned);
3564 }
3565 }
3566 Err(e) => {
3567 record_metric!(self.metrics, query_errors, inc);
3568 let msg = e.to_string();
3570 if msg.contains("exceeded timeout") {
3571 record_metric!(self.metrics, query_timeouts, inc);
3572 }
3573 }
3574 }
3575 }
3576
3577 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3579 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3580 match expr {
3581 Expression::Literal(Literal::Integer(n)) => Some(*n),
3582 _ => None,
3583 }
3584 }
3585
3586 #[must_use]
3592 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3593 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3595 return (epoch, None);
3596 }
3597
3598 if let Some(transaction_id) = *self.current_transaction.lock() {
3599 let epoch = self
3601 .transaction_manager
3602 .start_epoch(transaction_id)
3603 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3604 (epoch, Some(transaction_id))
3605 } else {
3606 (self.transaction_manager.current_epoch(), None)
3608 }
3609 }
3610
3611 fn create_planner_for_store(
3616 &self,
3617 store: Arc<dyn GraphStoreMut>,
3618 viewing_epoch: EpochId,
3619 transaction_id: Option<TransactionId>,
3620 ) -> crate::query::Planner {
3621 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3622 }
3623
3624 fn create_planner_for_store_with_read_only(
3625 &self,
3626 store: Arc<dyn GraphStoreMut>,
3627 viewing_epoch: EpochId,
3628 transaction_id: Option<TransactionId>,
3629 read_only: bool,
3630 ) -> crate::query::Planner {
3631 use crate::query::Planner;
3632 use grafeo_core::execution::operators::{LazyValue, SessionContext};
3633
3634 let info_store = Arc::clone(&store);
3636 let schema_store = Arc::clone(&store);
3637
3638 let session_context = SessionContext {
3639 current_schema: self.current_schema(),
3640 current_graph: self.current_graph(),
3641 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3642 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3643 };
3644
3645 let mut planner = Planner::with_context(
3646 Arc::clone(&store),
3647 Arc::clone(&self.transaction_manager),
3648 transaction_id,
3649 viewing_epoch,
3650 )
3651 .with_factorized_execution(self.factorized_execution)
3652 .with_catalog(Arc::clone(&self.catalog))
3653 .with_session_context(session_context)
3654 .with_read_only(read_only);
3655
3656 let validator =
3658 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3659 planner = planner.with_validator(Arc::new(validator));
3660
3661 planner
3662 }
3663
3664 fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3666 use grafeo_common::types::PropertyKey;
3667 use std::collections::BTreeMap;
3668
3669 let mut map = BTreeMap::new();
3670 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3671 map.insert(
3672 PropertyKey::from("node_count"),
3673 Value::Int64(store.node_count() as i64),
3674 );
3675 map.insert(
3676 PropertyKey::from("edge_count"),
3677 Value::Int64(store.edge_count() as i64),
3678 );
3679 map.insert(
3680 PropertyKey::from("version"),
3681 Value::String(env!("CARGO_PKG_VERSION").into()),
3682 );
3683 Value::Map(map.into())
3684 }
3685
3686 fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3688 use grafeo_common::types::PropertyKey;
3689 use std::collections::BTreeMap;
3690
3691 let labels: Vec<Value> = store
3692 .all_labels()
3693 .into_iter()
3694 .map(|l| Value::String(l.into()))
3695 .collect();
3696 let edge_types: Vec<Value> = store
3697 .all_edge_types()
3698 .into_iter()
3699 .map(|t| Value::String(t.into()))
3700 .collect();
3701 let property_keys: Vec<Value> = store
3702 .all_property_keys()
3703 .into_iter()
3704 .map(|k| Value::String(k.into()))
3705 .collect();
3706
3707 let mut map = BTreeMap::new();
3708 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3709 map.insert(
3710 PropertyKey::from("edge_types"),
3711 Value::List(edge_types.into()),
3712 );
3713 map.insert(
3714 PropertyKey::from("property_keys"),
3715 Value::List(property_keys.into()),
3716 );
3717 Value::Map(map.into())
3718 }
3719
3720 pub fn create_node(&self, labels: &[&str]) -> NodeId {
3725 let (epoch, transaction_id) = self.get_transaction_context();
3726 self.active_lpg_store().create_node_versioned(
3727 labels,
3728 epoch,
3729 transaction_id.unwrap_or(TransactionId::SYSTEM),
3730 )
3731 }
3732
3733 pub fn create_node_with_props<'a>(
3737 &self,
3738 labels: &[&str],
3739 properties: impl IntoIterator<Item = (&'a str, Value)>,
3740 ) -> NodeId {
3741 let (epoch, transaction_id) = self.get_transaction_context();
3742 self.active_lpg_store().create_node_with_props_versioned(
3743 labels,
3744 properties,
3745 epoch,
3746 transaction_id.unwrap_or(TransactionId::SYSTEM),
3747 )
3748 }
3749
3750 pub fn create_edge(
3755 &self,
3756 src: NodeId,
3757 dst: NodeId,
3758 edge_type: &str,
3759 ) -> grafeo_common::types::EdgeId {
3760 let (epoch, transaction_id) = self.get_transaction_context();
3761 self.active_lpg_store().create_edge_versioned(
3762 src,
3763 dst,
3764 edge_type,
3765 epoch,
3766 transaction_id.unwrap_or(TransactionId::SYSTEM),
3767 )
3768 }
3769
3770 #[must_use]
3798 pub fn get_node(&self, id: NodeId) -> Option<Node> {
3799 let (epoch, transaction_id) = self.get_transaction_context();
3800 self.active_lpg_store().get_node_versioned(
3801 id,
3802 epoch,
3803 transaction_id.unwrap_or(TransactionId::SYSTEM),
3804 )
3805 }
3806
3807 #[must_use]
3831 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3832 self.get_node(id)
3833 .and_then(|node| node.get_property(key).cloned())
3834 }
3835
3836 #[must_use]
3843 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3844 let (epoch, transaction_id) = self.get_transaction_context();
3845 self.active_lpg_store().get_edge_versioned(
3846 id,
3847 epoch,
3848 transaction_id.unwrap_or(TransactionId::SYSTEM),
3849 )
3850 }
3851
3852 #[must_use]
3878 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3879 self.active_lpg_store()
3880 .edges_from(node, Direction::Outgoing)
3881 .collect()
3882 }
3883
3884 #[must_use]
3893 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3894 self.active_lpg_store()
3895 .edges_from(node, Direction::Incoming)
3896 .collect()
3897 }
3898
3899 #[must_use]
3911 pub fn get_neighbors_outgoing_by_type(
3912 &self,
3913 node: NodeId,
3914 edge_type: &str,
3915 ) -> Vec<(NodeId, EdgeId)> {
3916 self.active_lpg_store()
3917 .edges_from(node, Direction::Outgoing)
3918 .filter(|(_, edge_id)| {
3919 self.get_edge(*edge_id)
3920 .is_some_and(|e| e.edge_type.as_str() == edge_type)
3921 })
3922 .collect()
3923 }
3924
3925 #[must_use]
3932 pub fn node_exists(&self, id: NodeId) -> bool {
3933 self.get_node(id).is_some()
3934 }
3935
3936 #[must_use]
3938 pub fn edge_exists(&self, id: EdgeId) -> bool {
3939 self.get_edge(id).is_some()
3940 }
3941
3942 #[must_use]
3946 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3947 let active = self.active_lpg_store();
3948 let out = active.out_degree(node);
3949 let in_degree = active.in_degree(node);
3950 (out, in_degree)
3951 }
3952
3953 #[must_use]
3963 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3964 let (epoch, transaction_id) = self.get_transaction_context();
3965 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3966 let active = self.active_lpg_store();
3967 ids.iter()
3968 .map(|&id| active.get_node_versioned(id, epoch, tx))
3969 .collect()
3970 }
3971
3972 #[cfg(feature = "cdc")]
3976 pub fn history(
3977 &self,
3978 entity_id: impl Into<crate::cdc::EntityId>,
3979 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3980 Ok(self.cdc_log.history(entity_id.into()))
3981 }
3982
3983 #[cfg(feature = "cdc")]
3985 pub fn history_since(
3986 &self,
3987 entity_id: impl Into<crate::cdc::EntityId>,
3988 since_epoch: EpochId,
3989 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3990 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3991 }
3992
3993 #[cfg(feature = "cdc")]
3995 pub fn changes_between(
3996 &self,
3997 start_epoch: EpochId,
3998 end_epoch: EpochId,
3999 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4000 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4001 }
4002}
4003
4004impl Drop for Session {
4005 fn drop(&mut self) {
4006 if self.in_transaction() {
4009 let _ = self.rollback_inner();
4010 }
4011
4012 #[cfg(feature = "metrics")]
4013 if let Some(ref reg) = self.metrics {
4014 reg.session_active
4015 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4016 }
4017 }
4018}
4019
4020#[cfg(test)]
4021mod tests {
4022 use super::parse_default_literal;
4023 use crate::database::GrafeoDB;
4024 use grafeo_common::types::Value;
4025
4026 #[test]
4031 fn parse_default_literal_null() {
4032 assert_eq!(parse_default_literal("null"), Value::Null);
4033 assert_eq!(parse_default_literal("NULL"), Value::Null);
4034 assert_eq!(parse_default_literal("Null"), Value::Null);
4035 }
4036
4037 #[test]
4038 fn parse_default_literal_bool() {
4039 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4040 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4041 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4042 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4043 }
4044
4045 #[test]
4046 fn parse_default_literal_string_single_quoted() {
4047 assert_eq!(
4048 parse_default_literal("'hello'"),
4049 Value::String("hello".into())
4050 );
4051 }
4052
4053 #[test]
4054 fn parse_default_literal_string_double_quoted() {
4055 assert_eq!(
4056 parse_default_literal("\"world\""),
4057 Value::String("world".into())
4058 );
4059 }
4060
4061 #[test]
4062 fn parse_default_literal_integer() {
4063 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4064 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4065 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4066 }
4067
4068 #[test]
4069 fn parse_default_literal_float() {
4070 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4071 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4072 }
4073
4074 #[test]
4075 fn parse_default_literal_fallback_string() {
4076 assert_eq!(
4078 parse_default_literal("some_identifier"),
4079 Value::String("some_identifier".into())
4080 );
4081 }
4082
4083 #[test]
4084 fn test_session_create_node() {
4085 let db = GrafeoDB::new_in_memory();
4086 let session = db.session();
4087
4088 let id = session.create_node(&["Person"]);
4089 assert!(id.is_valid());
4090 assert_eq!(db.node_count(), 1);
4091 }
4092
4093 #[test]
4094 fn test_session_transaction() {
4095 let db = GrafeoDB::new_in_memory();
4096 let mut session = db.session();
4097
4098 assert!(!session.in_transaction());
4099
4100 session.begin_transaction().unwrap();
4101 assert!(session.in_transaction());
4102
4103 session.commit().unwrap();
4104 assert!(!session.in_transaction());
4105 }
4106
4107 #[test]
4108 fn test_session_transaction_context() {
4109 let db = GrafeoDB::new_in_memory();
4110 let mut session = db.session();
4111
4112 let (_epoch1, transaction_id1) = session.get_transaction_context();
4114 assert!(transaction_id1.is_none());
4115
4116 session.begin_transaction().unwrap();
4118 let (epoch2, transaction_id2) = session.get_transaction_context();
4119 assert!(transaction_id2.is_some());
4120 let _ = epoch2; session.commit().unwrap();
4125 let (epoch3, tx_id3) = session.get_transaction_context();
4126 assert!(tx_id3.is_none());
4127 assert!(epoch3.as_u64() >= epoch2.as_u64());
4129 }
4130
4131 #[test]
4132 fn test_session_rollback() {
4133 let db = GrafeoDB::new_in_memory();
4134 let mut session = db.session();
4135
4136 session.begin_transaction().unwrap();
4137 session.rollback().unwrap();
4138 assert!(!session.in_transaction());
4139 }
4140
4141 #[test]
4142 fn test_session_rollback_discards_versions() {
4143 use grafeo_common::types::TransactionId;
4144
4145 let db = GrafeoDB::new_in_memory();
4146
4147 let node_before = db.store().create_node(&["Person"]);
4149 assert!(node_before.is_valid());
4150 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4151
4152 let mut session = db.session();
4154 session.begin_transaction().unwrap();
4155 let transaction_id = session.current_transaction.lock().unwrap();
4156
4157 let epoch = db.store().current_epoch();
4159 let node_in_tx = db
4160 .store()
4161 .create_node_versioned(&["Person"], epoch, transaction_id);
4162 assert!(node_in_tx.is_valid());
4163
4164 assert_eq!(
4168 db.node_count(),
4169 1,
4170 "PENDING nodes should be invisible to non-versioned node_count()"
4171 );
4172 assert!(
4173 db.store()
4174 .get_node_versioned(node_in_tx, epoch, transaction_id)
4175 .is_some(),
4176 "Transaction node should be visible to its own transaction"
4177 );
4178
4179 session.rollback().unwrap();
4181 assert!(!session.in_transaction());
4182
4183 let count_after = db.node_count();
4186 assert_eq!(
4187 count_after, 1,
4188 "Rollback should discard uncommitted node, but got {count_after}"
4189 );
4190
4191 let current_epoch = db.store().current_epoch();
4193 assert!(
4194 db.store()
4195 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4196 .is_some(),
4197 "Original node should still exist"
4198 );
4199
4200 assert!(
4202 db.store()
4203 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4204 .is_none(),
4205 "Transaction node should be gone"
4206 );
4207 }
4208
4209 #[test]
4210 fn test_session_create_node_in_transaction() {
4211 let db = GrafeoDB::new_in_memory();
4213
4214 let node_before = db.create_node(&["Person"]);
4216 assert!(node_before.is_valid());
4217 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4218
4219 let mut session = db.session();
4221 session.begin_transaction().unwrap();
4222 let transaction_id = session.current_transaction.lock().unwrap();
4223
4224 let node_in_tx = session.create_node(&["Person"]);
4226 assert!(node_in_tx.is_valid());
4227
4228 assert_eq!(
4231 db.node_count(),
4232 1,
4233 "PENDING nodes should be invisible to non-versioned node_count()"
4234 );
4235 let epoch = db.store().current_epoch();
4236 assert!(
4237 db.store()
4238 .get_node_versioned(node_in_tx, epoch, transaction_id)
4239 .is_some(),
4240 "Transaction node should be visible to its own transaction"
4241 );
4242
4243 session.rollback().unwrap();
4245
4246 let count_after = db.node_count();
4248 assert_eq!(
4249 count_after, 1,
4250 "Rollback should discard node created via session.create_node(), but got {count_after}"
4251 );
4252 }
4253
4254 #[test]
4255 fn test_session_create_node_with_props_in_transaction() {
4256 use grafeo_common::types::Value;
4257
4258 let db = GrafeoDB::new_in_memory();
4260
4261 db.create_node(&["Person"]);
4263 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4264
4265 let mut session = db.session();
4267 session.begin_transaction().unwrap();
4268 let transaction_id = session.current_transaction.lock().unwrap();
4269
4270 let node_in_tx =
4271 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4272 assert!(node_in_tx.is_valid());
4273
4274 assert_eq!(
4277 db.node_count(),
4278 1,
4279 "PENDING nodes should be invisible to non-versioned node_count()"
4280 );
4281 let epoch = db.store().current_epoch();
4282 assert!(
4283 db.store()
4284 .get_node_versioned(node_in_tx, epoch, transaction_id)
4285 .is_some(),
4286 "Transaction node should be visible to its own transaction"
4287 );
4288
4289 session.rollback().unwrap();
4291
4292 let count_after = db.node_count();
4294 assert_eq!(
4295 count_after, 1,
4296 "Rollback should discard node created via session.create_node_with_props()"
4297 );
4298 }
4299
4300 #[cfg(feature = "gql")]
4301 mod gql_tests {
4302 use super::*;
4303
4304 #[test]
4305 fn test_gql_query_execution() {
4306 let db = GrafeoDB::new_in_memory();
4307 let session = db.session();
4308
4309 session.create_node(&["Person"]);
4311 session.create_node(&["Person"]);
4312 session.create_node(&["Animal"]);
4313
4314 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4316
4317 assert_eq!(result.row_count(), 2);
4319 assert_eq!(result.column_count(), 1);
4320 assert_eq!(result.columns[0], "n");
4321 }
4322
4323 #[test]
4324 fn test_gql_empty_result() {
4325 let db = GrafeoDB::new_in_memory();
4326 let session = db.session();
4327
4328 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4330
4331 assert_eq!(result.row_count(), 0);
4332 }
4333
4334 #[test]
4335 fn test_gql_parse_error() {
4336 let db = GrafeoDB::new_in_memory();
4337 let session = db.session();
4338
4339 let result = session.execute("MATCH (n RETURN n");
4341
4342 assert!(result.is_err());
4343 }
4344
4345 #[test]
4346 fn test_gql_relationship_traversal() {
4347 let db = GrafeoDB::new_in_memory();
4348 let session = db.session();
4349
4350 let alix = session.create_node(&["Person"]);
4352 let gus = session.create_node(&["Person"]);
4353 let vincent = session.create_node(&["Person"]);
4354
4355 session.create_edge(alix, gus, "KNOWS");
4356 session.create_edge(alix, vincent, "KNOWS");
4357
4358 let result = session
4360 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4361 .unwrap();
4362
4363 assert_eq!(result.row_count(), 2);
4365 assert_eq!(result.column_count(), 2);
4366 assert_eq!(result.columns[0], "a");
4367 assert_eq!(result.columns[1], "b");
4368 }
4369
4370 #[test]
4371 fn test_gql_relationship_with_type_filter() {
4372 let db = GrafeoDB::new_in_memory();
4373 let session = db.session();
4374
4375 let alix = session.create_node(&["Person"]);
4377 let gus = session.create_node(&["Person"]);
4378 let vincent = session.create_node(&["Person"]);
4379
4380 session.create_edge(alix, gus, "KNOWS");
4381 session.create_edge(alix, vincent, "WORKS_WITH");
4382
4383 let result = session
4385 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4386 .unwrap();
4387
4388 assert_eq!(result.row_count(), 1);
4390 }
4391
4392 #[test]
4393 fn test_gql_semantic_error_undefined_variable() {
4394 let db = GrafeoDB::new_in_memory();
4395 let session = db.session();
4396
4397 let result = session.execute("MATCH (n:Person) RETURN x");
4399
4400 assert!(result.is_err());
4402 let Err(err) = result else {
4403 panic!("Expected error")
4404 };
4405 assert!(
4406 err.to_string().contains("Undefined variable"),
4407 "Expected undefined variable error, got: {}",
4408 err
4409 );
4410 }
4411
4412 #[test]
4413 fn test_gql_where_clause_property_filter() {
4414 use grafeo_common::types::Value;
4415
4416 let db = GrafeoDB::new_in_memory();
4417 let session = db.session();
4418
4419 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4421 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4422 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4423
4424 let result = session
4426 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4427 .unwrap();
4428
4429 assert_eq!(result.row_count(), 2);
4431 }
4432
4433 #[test]
4434 fn test_gql_where_clause_equality() {
4435 use grafeo_common::types::Value;
4436
4437 let db = GrafeoDB::new_in_memory();
4438 let session = db.session();
4439
4440 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4442 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4443 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4444
4445 let result = session
4447 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4448 .unwrap();
4449
4450 assert_eq!(result.row_count(), 2);
4452 }
4453
4454 #[test]
4455 fn test_gql_return_property_access() {
4456 use grafeo_common::types::Value;
4457
4458 let db = GrafeoDB::new_in_memory();
4459 let session = db.session();
4460
4461 session.create_node_with_props(
4463 &["Person"],
4464 [
4465 ("name", Value::String("Alix".into())),
4466 ("age", Value::Int64(30)),
4467 ],
4468 );
4469 session.create_node_with_props(
4470 &["Person"],
4471 [
4472 ("name", Value::String("Gus".into())),
4473 ("age", Value::Int64(25)),
4474 ],
4475 );
4476
4477 let result = session
4479 .execute("MATCH (n:Person) RETURN n.name, n.age")
4480 .unwrap();
4481
4482 assert_eq!(result.row_count(), 2);
4484 assert_eq!(result.column_count(), 2);
4485 assert_eq!(result.columns[0], "n.name");
4486 assert_eq!(result.columns[1], "n.age");
4487
4488 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4490 assert!(names.contains(&&Value::String("Alix".into())));
4491 assert!(names.contains(&&Value::String("Gus".into())));
4492 }
4493
4494 #[test]
4495 fn test_gql_return_mixed_expressions() {
4496 use grafeo_common::types::Value;
4497
4498 let db = GrafeoDB::new_in_memory();
4499 let session = db.session();
4500
4501 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4503
4504 let result = session
4506 .execute("MATCH (n:Person) RETURN n, n.name")
4507 .unwrap();
4508
4509 assert_eq!(result.row_count(), 1);
4510 assert_eq!(result.column_count(), 2);
4511 assert_eq!(result.columns[0], "n");
4512 assert_eq!(result.columns[1], "n.name");
4513
4514 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4516 }
4517 }
4518
4519 #[cfg(feature = "cypher")]
4520 mod cypher_tests {
4521 use super::*;
4522
4523 #[test]
4524 fn test_cypher_query_execution() {
4525 let db = GrafeoDB::new_in_memory();
4526 let session = db.session();
4527
4528 session.create_node(&["Person"]);
4530 session.create_node(&["Person"]);
4531 session.create_node(&["Animal"]);
4532
4533 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4535
4536 assert_eq!(result.row_count(), 2);
4538 assert_eq!(result.column_count(), 1);
4539 assert_eq!(result.columns[0], "n");
4540 }
4541
4542 #[test]
4543 fn test_cypher_empty_result() {
4544 let db = GrafeoDB::new_in_memory();
4545 let session = db.session();
4546
4547 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4549
4550 assert_eq!(result.row_count(), 0);
4551 }
4552
4553 #[test]
4554 fn test_cypher_parse_error() {
4555 let db = GrafeoDB::new_in_memory();
4556 let session = db.session();
4557
4558 let result = session.execute_cypher("MATCH (n RETURN n");
4560
4561 assert!(result.is_err());
4562 }
4563 }
4564
4565 mod direct_lookup_tests {
4568 use super::*;
4569 use grafeo_common::types::Value;
4570
4571 #[test]
4572 fn test_get_node() {
4573 let db = GrafeoDB::new_in_memory();
4574 let session = db.session();
4575
4576 let id = session.create_node(&["Person"]);
4577 let node = session.get_node(id);
4578
4579 assert!(node.is_some());
4580 let node = node.unwrap();
4581 assert_eq!(node.id, id);
4582 }
4583
4584 #[test]
4585 fn test_get_node_not_found() {
4586 use grafeo_common::types::NodeId;
4587
4588 let db = GrafeoDB::new_in_memory();
4589 let session = db.session();
4590
4591 let node = session.get_node(NodeId::new(9999));
4593 assert!(node.is_none());
4594 }
4595
4596 #[test]
4597 fn test_get_node_property() {
4598 let db = GrafeoDB::new_in_memory();
4599 let session = db.session();
4600
4601 let id = session
4602 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4603
4604 let name = session.get_node_property(id, "name");
4605 assert_eq!(name, Some(Value::String("Alix".into())));
4606
4607 let missing = session.get_node_property(id, "missing");
4609 assert!(missing.is_none());
4610 }
4611
4612 #[test]
4613 fn test_get_edge() {
4614 let db = GrafeoDB::new_in_memory();
4615 let session = db.session();
4616
4617 let alix = session.create_node(&["Person"]);
4618 let gus = session.create_node(&["Person"]);
4619 let edge_id = session.create_edge(alix, gus, "KNOWS");
4620
4621 let edge = session.get_edge(edge_id);
4622 assert!(edge.is_some());
4623 let edge = edge.unwrap();
4624 assert_eq!(edge.id, edge_id);
4625 assert_eq!(edge.src, alix);
4626 assert_eq!(edge.dst, gus);
4627 }
4628
4629 #[test]
4630 fn test_get_edge_not_found() {
4631 use grafeo_common::types::EdgeId;
4632
4633 let db = GrafeoDB::new_in_memory();
4634 let session = db.session();
4635
4636 let edge = session.get_edge(EdgeId::new(9999));
4637 assert!(edge.is_none());
4638 }
4639
4640 #[test]
4641 fn test_get_neighbors_outgoing() {
4642 let db = GrafeoDB::new_in_memory();
4643 let session = db.session();
4644
4645 let alix = session.create_node(&["Person"]);
4646 let gus = session.create_node(&["Person"]);
4647 let harm = session.create_node(&["Person"]);
4648
4649 session.create_edge(alix, gus, "KNOWS");
4650 session.create_edge(alix, harm, "KNOWS");
4651
4652 let neighbors = session.get_neighbors_outgoing(alix);
4653 assert_eq!(neighbors.len(), 2);
4654
4655 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4656 assert!(neighbor_ids.contains(&gus));
4657 assert!(neighbor_ids.contains(&harm));
4658 }
4659
4660 #[test]
4661 fn test_get_neighbors_incoming() {
4662 let db = GrafeoDB::new_in_memory();
4663 let session = db.session();
4664
4665 let alix = session.create_node(&["Person"]);
4666 let gus = session.create_node(&["Person"]);
4667 let harm = session.create_node(&["Person"]);
4668
4669 session.create_edge(gus, alix, "KNOWS");
4670 session.create_edge(harm, alix, "KNOWS");
4671
4672 let neighbors = session.get_neighbors_incoming(alix);
4673 assert_eq!(neighbors.len(), 2);
4674
4675 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4676 assert!(neighbor_ids.contains(&gus));
4677 assert!(neighbor_ids.contains(&harm));
4678 }
4679
4680 #[test]
4681 fn test_get_neighbors_outgoing_by_type() {
4682 let db = GrafeoDB::new_in_memory();
4683 let session = db.session();
4684
4685 let alix = session.create_node(&["Person"]);
4686 let gus = session.create_node(&["Person"]);
4687 let company = session.create_node(&["Company"]);
4688
4689 session.create_edge(alix, gus, "KNOWS");
4690 session.create_edge(alix, company, "WORKS_AT");
4691
4692 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4693 assert_eq!(knows_neighbors.len(), 1);
4694 assert_eq!(knows_neighbors[0].0, gus);
4695
4696 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4697 assert_eq!(works_neighbors.len(), 1);
4698 assert_eq!(works_neighbors[0].0, company);
4699
4700 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4702 assert!(no_neighbors.is_empty());
4703 }
4704
4705 #[test]
4706 fn test_node_exists() {
4707 use grafeo_common::types::NodeId;
4708
4709 let db = GrafeoDB::new_in_memory();
4710 let session = db.session();
4711
4712 let id = session.create_node(&["Person"]);
4713
4714 assert!(session.node_exists(id));
4715 assert!(!session.node_exists(NodeId::new(9999)));
4716 }
4717
4718 #[test]
4719 fn test_edge_exists() {
4720 use grafeo_common::types::EdgeId;
4721
4722 let db = GrafeoDB::new_in_memory();
4723 let session = db.session();
4724
4725 let alix = session.create_node(&["Person"]);
4726 let gus = session.create_node(&["Person"]);
4727 let edge_id = session.create_edge(alix, gus, "KNOWS");
4728
4729 assert!(session.edge_exists(edge_id));
4730 assert!(!session.edge_exists(EdgeId::new(9999)));
4731 }
4732
4733 #[test]
4734 fn test_get_degree() {
4735 let db = GrafeoDB::new_in_memory();
4736 let session = db.session();
4737
4738 let alix = session.create_node(&["Person"]);
4739 let gus = session.create_node(&["Person"]);
4740 let harm = session.create_node(&["Person"]);
4741
4742 session.create_edge(alix, gus, "KNOWS");
4744 session.create_edge(alix, harm, "KNOWS");
4745 session.create_edge(gus, alix, "KNOWS");
4747
4748 let (out_degree, in_degree) = session.get_degree(alix);
4749 assert_eq!(out_degree, 2);
4750 assert_eq!(in_degree, 1);
4751
4752 let lonely = session.create_node(&["Person"]);
4754 let (out, in_deg) = session.get_degree(lonely);
4755 assert_eq!(out, 0);
4756 assert_eq!(in_deg, 0);
4757 }
4758
4759 #[test]
4760 fn test_get_nodes_batch() {
4761 let db = GrafeoDB::new_in_memory();
4762 let session = db.session();
4763
4764 let alix = session.create_node(&["Person"]);
4765 let gus = session.create_node(&["Person"]);
4766 let harm = session.create_node(&["Person"]);
4767
4768 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4769 assert_eq!(nodes.len(), 3);
4770 assert!(nodes[0].is_some());
4771 assert!(nodes[1].is_some());
4772 assert!(nodes[2].is_some());
4773
4774 use grafeo_common::types::NodeId;
4776 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4777 assert_eq!(nodes_with_missing.len(), 3);
4778 assert!(nodes_with_missing[0].is_some());
4779 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
4781 }
4782
4783 #[test]
4784 fn test_auto_commit_setting() {
4785 let db = GrafeoDB::new_in_memory();
4786 let mut session = db.session();
4787
4788 assert!(session.auto_commit());
4790
4791 session.set_auto_commit(false);
4792 assert!(!session.auto_commit());
4793
4794 session.set_auto_commit(true);
4795 assert!(session.auto_commit());
4796 }
4797
4798 #[test]
4799 fn test_transaction_double_begin_nests() {
4800 let db = GrafeoDB::new_in_memory();
4801 let mut session = db.session();
4802
4803 session.begin_transaction().unwrap();
4804 let result = session.begin_transaction();
4806 assert!(result.is_ok());
4807 session.commit().unwrap();
4809 session.commit().unwrap();
4811 }
4812
4813 #[test]
4814 fn test_commit_without_transaction_error() {
4815 let db = GrafeoDB::new_in_memory();
4816 let mut session = db.session();
4817
4818 let result = session.commit();
4819 assert!(result.is_err());
4820 }
4821
4822 #[test]
4823 fn test_rollback_without_transaction_error() {
4824 let db = GrafeoDB::new_in_memory();
4825 let mut session = db.session();
4826
4827 let result = session.rollback();
4828 assert!(result.is_err());
4829 }
4830
4831 #[test]
4832 fn test_create_edge_in_transaction() {
4833 let db = GrafeoDB::new_in_memory();
4834 let mut session = db.session();
4835
4836 let alix = session.create_node(&["Person"]);
4838 let gus = session.create_node(&["Person"]);
4839
4840 session.begin_transaction().unwrap();
4842 let edge_id = session.create_edge(alix, gus, "KNOWS");
4843
4844 assert!(session.edge_exists(edge_id));
4846
4847 session.commit().unwrap();
4849
4850 assert!(session.edge_exists(edge_id));
4852 }
4853
4854 #[test]
4855 fn test_neighbors_empty_node() {
4856 let db = GrafeoDB::new_in_memory();
4857 let session = db.session();
4858
4859 let lonely = session.create_node(&["Person"]);
4860
4861 assert!(session.get_neighbors_outgoing(lonely).is_empty());
4862 assert!(session.get_neighbors_incoming(lonely).is_empty());
4863 assert!(
4864 session
4865 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4866 .is_empty()
4867 );
4868 }
4869 }
4870
4871 #[test]
4872 fn test_auto_gc_triggers_on_commit_interval() {
4873 use crate::config::Config;
4874
4875 let config = Config::in_memory().with_gc_interval(2);
4876 let db = GrafeoDB::with_config(config).unwrap();
4877 let mut session = db.session();
4878
4879 session.begin_transaction().unwrap();
4881 session.create_node(&["A"]);
4882 session.commit().unwrap();
4883
4884 session.begin_transaction().unwrap();
4886 session.create_node(&["B"]);
4887 session.commit().unwrap();
4888
4889 assert_eq!(db.node_count(), 2);
4891 }
4892
4893 #[test]
4894 fn test_query_timeout_config_propagates_to_session() {
4895 use crate::config::Config;
4896 use std::time::Duration;
4897
4898 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4899 let db = GrafeoDB::with_config(config).unwrap();
4900 let session = db.session();
4901
4902 assert!(session.query_deadline().is_some());
4904 }
4905
4906 #[test]
4907 fn test_no_query_timeout_returns_no_deadline() {
4908 let db = GrafeoDB::new_in_memory();
4909 let session = db.session();
4910
4911 assert!(session.query_deadline().is_none());
4913 }
4914
4915 #[test]
4916 fn test_graph_model_accessor() {
4917 use crate::config::GraphModel;
4918
4919 let db = GrafeoDB::new_in_memory();
4920 let session = db.session();
4921
4922 assert_eq!(session.graph_model(), GraphModel::Lpg);
4923 }
4924
4925 #[cfg(feature = "gql")]
4926 #[test]
4927 fn test_external_store_session() {
4928 use grafeo_core::graph::GraphStoreMut;
4929 use std::sync::Arc;
4930
4931 let config = crate::config::Config::in_memory();
4932 let store =
4933 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4934 let db = GrafeoDB::with_store(store, config).unwrap();
4935
4936 let mut session = db.session();
4937
4938 session.begin_transaction().unwrap();
4942 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4943
4944 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4946 assert_eq!(result.row_count(), 1);
4947
4948 session.commit().unwrap();
4949 }
4950
4951 #[cfg(feature = "gql")]
4954 mod session_command_tests {
4955 use super::*;
4956 use grafeo_common::types::Value;
4957
4958 #[test]
4959 fn test_use_graph_sets_current_graph() {
4960 let db = GrafeoDB::new_in_memory();
4961 let session = db.session();
4962
4963 session.execute("CREATE GRAPH mydb").unwrap();
4965 session.execute("USE GRAPH mydb").unwrap();
4966
4967 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4968 }
4969
4970 #[test]
4971 fn test_use_graph_nonexistent_errors() {
4972 let db = GrafeoDB::new_in_memory();
4973 let session = db.session();
4974
4975 let result = session.execute("USE GRAPH doesnotexist");
4976 assert!(result.is_err());
4977 let err = result.unwrap_err().to_string();
4978 assert!(
4979 err.contains("does not exist"),
4980 "Expected 'does not exist' error, got: {err}"
4981 );
4982 }
4983
4984 #[test]
4985 fn test_use_graph_default_always_valid() {
4986 let db = GrafeoDB::new_in_memory();
4987 let session = db.session();
4988
4989 session.execute("USE GRAPH default").unwrap();
4991 assert_eq!(session.current_graph(), Some("default".to_string()));
4992 }
4993
4994 #[test]
4995 fn test_session_set_graph() {
4996 let db = GrafeoDB::new_in_memory();
4997 let session = db.session();
4998
4999 session.execute("CREATE GRAPH analytics").unwrap();
5000 session.execute("SESSION SET GRAPH analytics").unwrap();
5001 assert_eq!(session.current_graph(), Some("analytics".to_string()));
5002 }
5003
5004 #[test]
5005 fn test_session_set_graph_nonexistent_errors() {
5006 let db = GrafeoDB::new_in_memory();
5007 let session = db.session();
5008
5009 let result = session.execute("SESSION SET GRAPH nosuchgraph");
5010 assert!(result.is_err());
5011 }
5012
5013 #[test]
5014 fn test_session_set_time_zone() {
5015 let db = GrafeoDB::new_in_memory();
5016 let session = db.session();
5017
5018 assert_eq!(session.time_zone(), None);
5019
5020 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5021 assert_eq!(session.time_zone(), Some("UTC".to_string()));
5022
5023 session
5024 .execute("SESSION SET TIME ZONE 'America/New_York'")
5025 .unwrap();
5026 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5027 }
5028
5029 #[test]
5030 fn test_session_set_parameter() {
5031 let db = GrafeoDB::new_in_memory();
5032 let session = db.session();
5033
5034 session
5035 .execute("SESSION SET PARAMETER $timeout = 30")
5036 .unwrap();
5037
5038 assert!(session.get_parameter("timeout").is_some());
5041 }
5042
5043 #[test]
5044 fn test_session_reset_clears_all_state() {
5045 let db = GrafeoDB::new_in_memory();
5046 let session = db.session();
5047
5048 session.execute("CREATE GRAPH analytics").unwrap();
5050 session.execute("SESSION SET GRAPH analytics").unwrap();
5051 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5052 session
5053 .execute("SESSION SET PARAMETER $limit = 100")
5054 .unwrap();
5055
5056 assert!(session.current_graph().is_some());
5058 assert!(session.time_zone().is_some());
5059 assert!(session.get_parameter("limit").is_some());
5060
5061 session.execute("SESSION RESET").unwrap();
5063
5064 assert_eq!(session.current_graph(), None);
5065 assert_eq!(session.time_zone(), None);
5066 assert!(session.get_parameter("limit").is_none());
5067 }
5068
5069 #[test]
5070 fn test_session_close_clears_state() {
5071 let db = GrafeoDB::new_in_memory();
5072 let session = db.session();
5073
5074 session.execute("CREATE GRAPH analytics").unwrap();
5075 session.execute("SESSION SET GRAPH analytics").unwrap();
5076 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5077
5078 session.execute("SESSION CLOSE").unwrap();
5079
5080 assert_eq!(session.current_graph(), None);
5081 assert_eq!(session.time_zone(), None);
5082 }
5083
5084 #[test]
5085 fn test_create_graph() {
5086 let db = GrafeoDB::new_in_memory();
5087 let session = db.session();
5088
5089 session.execute("CREATE GRAPH mydb").unwrap();
5090
5091 session.execute("USE GRAPH mydb").unwrap();
5093 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5094 }
5095
5096 #[test]
5097 fn test_create_graph_duplicate_errors() {
5098 let db = GrafeoDB::new_in_memory();
5099 let session = db.session();
5100
5101 session.execute("CREATE GRAPH mydb").unwrap();
5102 let result = session.execute("CREATE GRAPH mydb");
5103
5104 assert!(result.is_err());
5105 let err = result.unwrap_err().to_string();
5106 assert!(
5107 err.contains("already exists"),
5108 "Expected 'already exists' error, got: {err}"
5109 );
5110 }
5111
5112 #[test]
5113 fn test_create_graph_if_not_exists() {
5114 let db = GrafeoDB::new_in_memory();
5115 let session = db.session();
5116
5117 session.execute("CREATE GRAPH mydb").unwrap();
5118 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5120 }
5121
5122 #[test]
5123 fn test_drop_graph() {
5124 let db = GrafeoDB::new_in_memory();
5125 let session = db.session();
5126
5127 session.execute("CREATE GRAPH mydb").unwrap();
5128 session.execute("DROP GRAPH mydb").unwrap();
5129
5130 let result = session.execute("USE GRAPH mydb");
5132 assert!(result.is_err());
5133 }
5134
5135 #[test]
5136 fn test_drop_graph_nonexistent_errors() {
5137 let db = GrafeoDB::new_in_memory();
5138 let session = db.session();
5139
5140 let result = session.execute("DROP GRAPH nosuchgraph");
5141 assert!(result.is_err());
5142 let err = result.unwrap_err().to_string();
5143 assert!(
5144 err.contains("does not exist"),
5145 "Expected 'does not exist' error, got: {err}"
5146 );
5147 }
5148
5149 #[test]
5150 fn test_drop_graph_if_exists() {
5151 let db = GrafeoDB::new_in_memory();
5152 let session = db.session();
5153
5154 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5156 }
5157
5158 #[test]
5159 fn test_start_transaction_via_gql() {
5160 let db = GrafeoDB::new_in_memory();
5161 let session = db.session();
5162
5163 session.execute("START TRANSACTION").unwrap();
5164 assert!(session.in_transaction());
5165 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5166 session.execute("COMMIT").unwrap();
5167 assert!(!session.in_transaction());
5168
5169 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5170 assert_eq!(result.rows.len(), 1);
5171 }
5172
5173 #[test]
5174 fn test_start_transaction_read_only_blocks_insert() {
5175 let db = GrafeoDB::new_in_memory();
5176 let session = db.session();
5177
5178 session.execute("START TRANSACTION READ ONLY").unwrap();
5179 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5180 assert!(result.is_err());
5181 let err = result.unwrap_err().to_string();
5182 assert!(
5183 err.contains("read-only"),
5184 "Expected read-only error, got: {err}"
5185 );
5186 session.execute("ROLLBACK").unwrap();
5187 }
5188
5189 #[test]
5190 fn test_start_transaction_read_only_allows_reads() {
5191 let db = GrafeoDB::new_in_memory();
5192 let mut session = db.session();
5193 session.begin_transaction().unwrap();
5194 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5195 session.commit().unwrap();
5196
5197 session.execute("START TRANSACTION READ ONLY").unwrap();
5198 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5199 assert_eq!(result.rows.len(), 1);
5200 session.execute("COMMIT").unwrap();
5201 }
5202
5203 #[test]
5204 fn test_rollback_via_gql() {
5205 let db = GrafeoDB::new_in_memory();
5206 let session = db.session();
5207
5208 session.execute("START TRANSACTION").unwrap();
5209 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5210 session.execute("ROLLBACK").unwrap();
5211
5212 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5213 assert!(result.rows.is_empty());
5214 }
5215
5216 #[test]
5217 fn test_start_transaction_with_isolation_level() {
5218 let db = GrafeoDB::new_in_memory();
5219 let session = db.session();
5220
5221 session
5222 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5223 .unwrap();
5224 assert!(session.in_transaction());
5225 session.execute("ROLLBACK").unwrap();
5226 }
5227
5228 #[test]
5229 fn test_session_commands_return_empty_result() {
5230 let db = GrafeoDB::new_in_memory();
5231 let session = db.session();
5232
5233 session.execute("CREATE GRAPH test").unwrap();
5234 let result = session.execute("SESSION SET GRAPH test").unwrap();
5235 assert_eq!(result.row_count(), 0);
5236 assert_eq!(result.column_count(), 0);
5237 }
5238
5239 #[test]
5240 fn test_current_graph_default_is_none() {
5241 let db = GrafeoDB::new_in_memory();
5242 let session = db.session();
5243
5244 assert_eq!(session.current_graph(), None);
5245 }
5246
5247 #[test]
5248 fn test_time_zone_default_is_none() {
5249 let db = GrafeoDB::new_in_memory();
5250 let session = db.session();
5251
5252 assert_eq!(session.time_zone(), None);
5253 }
5254
5255 #[test]
5256 fn test_session_state_independent_across_sessions() {
5257 let db = GrafeoDB::new_in_memory();
5258 let session1 = db.session();
5259 let session2 = db.session();
5260
5261 session1.execute("CREATE GRAPH first").unwrap();
5262 session1.execute("CREATE GRAPH second").unwrap();
5263 session1.execute("SESSION SET GRAPH first").unwrap();
5264 session2.execute("SESSION SET GRAPH second").unwrap();
5265
5266 assert_eq!(session1.current_graph(), Some("first".to_string()));
5267 assert_eq!(session2.current_graph(), Some("second".to_string()));
5268 }
5269
5270 #[test]
5271 fn test_show_node_types() {
5272 let db = GrafeoDB::new_in_memory();
5273 let session = db.session();
5274
5275 session
5276 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5277 .unwrap();
5278
5279 let result = session.execute("SHOW NODE TYPES").unwrap();
5280 assert_eq!(
5281 result.columns,
5282 vec!["name", "properties", "constraints", "parents"]
5283 );
5284 assert_eq!(result.rows.len(), 1);
5285 assert_eq!(result.rows[0][0], Value::from("Person"));
5287 }
5288
5289 #[test]
5290 fn test_show_edge_types() {
5291 let db = GrafeoDB::new_in_memory();
5292 let session = db.session();
5293
5294 session
5295 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5296 .unwrap();
5297
5298 let result = session.execute("SHOW EDGE TYPES").unwrap();
5299 assert_eq!(
5300 result.columns,
5301 vec!["name", "properties", "source_types", "target_types"]
5302 );
5303 assert_eq!(result.rows.len(), 1);
5304 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5305 }
5306
5307 #[test]
5308 fn test_show_graph_types() {
5309 let db = GrafeoDB::new_in_memory();
5310 let session = db.session();
5311
5312 session
5313 .execute("CREATE NODE TYPE Person (name STRING)")
5314 .unwrap();
5315 session
5316 .execute(
5317 "CREATE GRAPH TYPE social (\
5318 NODE TYPE Person (name STRING)\
5319 )",
5320 )
5321 .unwrap();
5322
5323 let result = session.execute("SHOW GRAPH TYPES").unwrap();
5324 assert_eq!(
5325 result.columns,
5326 vec!["name", "open", "node_types", "edge_types"]
5327 );
5328 assert_eq!(result.rows.len(), 1);
5329 assert_eq!(result.rows[0][0], Value::from("social"));
5330 }
5331
5332 #[test]
5333 fn test_show_graph_type_named() {
5334 let db = GrafeoDB::new_in_memory();
5335 let session = db.session();
5336
5337 session
5338 .execute("CREATE NODE TYPE Person (name STRING)")
5339 .unwrap();
5340 session
5341 .execute(
5342 "CREATE GRAPH TYPE social (\
5343 NODE TYPE Person (name STRING)\
5344 )",
5345 )
5346 .unwrap();
5347
5348 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5349 assert_eq!(result.rows.len(), 1);
5350 assert_eq!(result.rows[0][0], Value::from("social"));
5351 }
5352
5353 #[test]
5354 fn test_show_graph_type_not_found() {
5355 let db = GrafeoDB::new_in_memory();
5356 let session = db.session();
5357
5358 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5359 assert!(result.is_err());
5360 }
5361
5362 #[test]
5363 fn test_show_indexes_via_gql() {
5364 let db = GrafeoDB::new_in_memory();
5365 let session = db.session();
5366
5367 let result = session.execute("SHOW INDEXES").unwrap();
5368 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5369 }
5370
5371 #[test]
5372 fn test_show_constraints_via_gql() {
5373 let db = GrafeoDB::new_in_memory();
5374 let session = db.session();
5375
5376 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5377 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5378 }
5379
5380 #[test]
5381 fn test_pattern_form_graph_type_roundtrip() {
5382 let db = GrafeoDB::new_in_memory();
5383 let session = db.session();
5384
5385 session
5387 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5388 .unwrap();
5389 session
5390 .execute("CREATE NODE TYPE City (name STRING)")
5391 .unwrap();
5392 session
5393 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5394 .unwrap();
5395 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5396
5397 session
5399 .execute(
5400 "CREATE GRAPH TYPE social (\
5401 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5402 (:Person)-[:LIVES_IN]->(:City)\
5403 )",
5404 )
5405 .unwrap();
5406
5407 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5409 assert_eq!(result.rows.len(), 1);
5410 assert_eq!(result.rows[0][0], Value::from("social"));
5411 }
5412 }
5413}