1#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_core::graph::Direction;
17use grafeo_core::graph::GraphStoreMut;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::catalog::{Catalog, CatalogConstraintValidator};
23use crate::config::{AdaptiveConfig, GraphModel};
24use crate::database::QueryResult;
25use crate::query::cache::QueryCache;
26use crate::transaction::TransactionManager;
27
28fn parse_default_literal(text: &str) -> Value {
33 if text.eq_ignore_ascii_case("null") {
34 return Value::Null;
35 }
36 if text.eq_ignore_ascii_case("true") {
37 return Value::Bool(true);
38 }
39 if text.eq_ignore_ascii_case("false") {
40 return Value::Bool(false);
41 }
42 if (text.starts_with('\'') && text.ends_with('\''))
44 || (text.starts_with('"') && text.ends_with('"'))
45 {
46 return Value::String(text[1..text.len() - 1].into());
47 }
48 if let Ok(i) = text.parse::<i64>() {
50 return Value::Int64(i);
51 }
52 if let Ok(f) = text.parse::<f64>() {
53 return Value::Float64(f);
54 }
55 Value::String(text.into())
57}
58
59pub(crate) struct SessionConfig {
64 pub transaction_manager: Arc<TransactionManager>,
65 pub query_cache: Arc<QueryCache>,
66 pub catalog: Arc<Catalog>,
67 pub adaptive_config: AdaptiveConfig,
68 pub factorized_execution: bool,
69 pub graph_model: GraphModel,
70 pub query_timeout: Option<Duration>,
71 pub commit_counter: Arc<AtomicUsize>,
72 pub gc_interval: usize,
73}
74
75pub struct Session {
81 store: Arc<LpgStore>,
83 graph_store: Arc<dyn GraphStoreMut>,
85 catalog: Arc<Catalog>,
87 #[cfg(feature = "rdf")]
89 rdf_store: Arc<RdfStore>,
90 transaction_manager: Arc<TransactionManager>,
92 query_cache: Arc<QueryCache>,
94 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
98 read_only_tx: parking_lot::Mutex<bool>,
100 auto_commit: bool,
102 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
105 factorized_execution: bool,
107 graph_model: GraphModel,
109 query_timeout: Option<Duration>,
111 commit_counter: Arc<AtomicUsize>,
113 gc_interval: usize,
115 transaction_start_node_count: AtomicUsize,
117 transaction_start_edge_count: AtomicUsize,
119 #[cfg(feature = "wal")]
121 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
122 #[cfg(feature = "wal")]
124 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
125 #[cfg(feature = "cdc")]
127 cdc_log: Arc<crate::cdc::CdcLog>,
128 current_graph: parking_lot::Mutex<Option<String>>,
130 current_schema: parking_lot::Mutex<Option<String>>,
133 time_zone: parking_lot::Mutex<Option<String>>,
135 session_params:
137 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
138 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
140 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
142 transaction_nesting_depth: parking_lot::Mutex<u32>,
146 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
150 #[cfg(feature = "metrics")]
152 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
153 #[cfg(feature = "metrics")]
155 tx_start_time: parking_lot::Mutex<Option<Instant>>,
156}
157
158#[derive(Clone)]
160struct GraphSavepoint {
161 graph_name: Option<String>,
162 next_node_id: u64,
163 next_edge_id: u64,
164 undo_log_position: usize,
165}
166
167#[derive(Clone)]
169struct SavepointState {
170 name: String,
171 graph_snapshots: Vec<GraphSavepoint>,
172 #[allow(dead_code)]
175 active_graph: Option<String>,
176}
177
178impl Session {
179 #[allow(dead_code)]
181 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
182 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
183 Self {
184 store,
185 graph_store,
186 catalog: cfg.catalog,
187 #[cfg(feature = "rdf")]
188 rdf_store: Arc::new(RdfStore::new()),
189 transaction_manager: cfg.transaction_manager,
190 query_cache: cfg.query_cache,
191 current_transaction: parking_lot::Mutex::new(None),
192 read_only_tx: parking_lot::Mutex::new(false),
193 auto_commit: true,
194 adaptive_config: cfg.adaptive_config,
195 factorized_execution: cfg.factorized_execution,
196 graph_model: cfg.graph_model,
197 query_timeout: cfg.query_timeout,
198 commit_counter: cfg.commit_counter,
199 gc_interval: cfg.gc_interval,
200 transaction_start_node_count: AtomicUsize::new(0),
201 transaction_start_edge_count: AtomicUsize::new(0),
202 #[cfg(feature = "wal")]
203 wal: None,
204 #[cfg(feature = "wal")]
205 wal_graph_context: None,
206 #[cfg(feature = "cdc")]
207 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
208 current_graph: parking_lot::Mutex::new(None),
209 current_schema: parking_lot::Mutex::new(None),
210 time_zone: parking_lot::Mutex::new(None),
211 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
212 viewing_epoch_override: parking_lot::Mutex::new(None),
213 savepoints: parking_lot::Mutex::new(Vec::new()),
214 transaction_nesting_depth: parking_lot::Mutex::new(0),
215 touched_graphs: parking_lot::Mutex::new(Vec::new()),
216 #[cfg(feature = "metrics")]
217 metrics: None,
218 #[cfg(feature = "metrics")]
219 tx_start_time: parking_lot::Mutex::new(None),
220 }
221 }
222
223 #[cfg(feature = "wal")]
228 pub(crate) fn set_wal(
229 &mut self,
230 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
231 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
232 ) {
233 self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
235 Arc::clone(&self.store),
236 Arc::clone(&wal),
237 Arc::clone(&wal_graph_context),
238 ));
239 self.wal = Some(wal);
240 self.wal_graph_context = Some(wal_graph_context);
241 }
242
243 #[cfg(feature = "cdc")]
245 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
246 self.cdc_log = cdc_log;
247 }
248
249 #[cfg(feature = "metrics")]
251 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
252 self.metrics = Some(metrics);
253 }
254
255 pub(crate) fn with_external_store(
264 store: Arc<dyn GraphStoreMut>,
265 cfg: SessionConfig,
266 ) -> Result<Self> {
267 Ok(Self {
268 store: Arc::new(LpgStore::new()?),
269 graph_store: store,
270 catalog: cfg.catalog,
271 #[cfg(feature = "rdf")]
272 rdf_store: Arc::new(RdfStore::new()),
273 transaction_manager: cfg.transaction_manager,
274 query_cache: cfg.query_cache,
275 current_transaction: parking_lot::Mutex::new(None),
276 read_only_tx: parking_lot::Mutex::new(false),
277 auto_commit: true,
278 adaptive_config: cfg.adaptive_config,
279 factorized_execution: cfg.factorized_execution,
280 graph_model: cfg.graph_model,
281 query_timeout: cfg.query_timeout,
282 commit_counter: cfg.commit_counter,
283 gc_interval: cfg.gc_interval,
284 transaction_start_node_count: AtomicUsize::new(0),
285 transaction_start_edge_count: AtomicUsize::new(0),
286 #[cfg(feature = "wal")]
287 wal: None,
288 #[cfg(feature = "wal")]
289 wal_graph_context: None,
290 #[cfg(feature = "cdc")]
291 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
292 current_graph: parking_lot::Mutex::new(None),
293 current_schema: parking_lot::Mutex::new(None),
294 time_zone: parking_lot::Mutex::new(None),
295 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
296 viewing_epoch_override: parking_lot::Mutex::new(None),
297 savepoints: parking_lot::Mutex::new(Vec::new()),
298 transaction_nesting_depth: parking_lot::Mutex::new(0),
299 touched_graphs: parking_lot::Mutex::new(Vec::new()),
300 #[cfg(feature = "metrics")]
301 metrics: None,
302 #[cfg(feature = "metrics")]
303 tx_start_time: parking_lot::Mutex::new(None),
304 })
305 }
306
307 #[must_use]
309 pub fn graph_model(&self) -> GraphModel {
310 self.graph_model
311 }
312
313 pub fn use_graph(&self, name: &str) {
317 *self.current_graph.lock() = Some(name.to_string());
318 }
319
320 #[must_use]
322 pub fn current_graph(&self) -> Option<String> {
323 self.current_graph.lock().clone()
324 }
325
326 pub fn set_schema(&self, name: &str) {
330 *self.current_schema.lock() = Some(name.to_string());
331 }
332
333 #[must_use]
337 pub fn current_schema(&self) -> Option<String> {
338 self.current_schema.lock().clone()
339 }
340
341 fn effective_graph_key(&self, graph_name: &str) -> String {
346 let schema = self.current_schema.lock().clone();
347 match schema {
348 Some(s) => format!("{s}/{graph_name}"),
349 None => graph_name.to_string(),
350 }
351 }
352
353 fn active_graph_storage_key(&self) -> Option<String> {
357 let graph = self.current_graph.lock().clone();
358 let schema = self.current_schema.lock().clone();
359 match (schema, graph) {
360 (_, None) => None,
361 (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
362 (None, Some(name)) => Some(name),
363 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
364 }
365 }
366
367 fn active_store(&self) -> Arc<dyn GraphStoreMut> {
375 let key = self.active_graph_storage_key();
376 match key {
377 None => Arc::clone(&self.graph_store),
378 Some(ref name) => match self.store.graph(name) {
379 Some(named_store) => {
380 #[cfg(feature = "wal")]
381 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
382 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
383 named_store,
384 Arc::clone(wal),
385 name.clone(),
386 Arc::clone(ctx),
387 )) as Arc<dyn GraphStoreMut>;
388 }
389 named_store as Arc<dyn GraphStoreMut>
390 }
391 None => Arc::clone(&self.graph_store),
392 },
393 }
394 }
395
396 fn active_lpg_store(&self) -> Arc<LpgStore> {
401 let key = self.active_graph_storage_key();
402 match key {
403 None => Arc::clone(&self.store),
404 Some(ref name) => self
405 .store
406 .graph(name)
407 .unwrap_or_else(|| Arc::clone(&self.store)),
408 }
409 }
410
411 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
414 match graph_name {
415 None => Arc::clone(&self.store),
416 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
417 Some(name) => self
418 .store
419 .graph(name)
420 .unwrap_or_else(|| Arc::clone(&self.store)),
421 }
422 }
423
424 fn track_graph_touch(&self) {
429 if self.current_transaction.lock().is_some() {
430 let key = self.active_graph_storage_key();
431 let mut touched = self.touched_graphs.lock();
432 if !touched.contains(&key) {
433 touched.push(key);
434 }
435 }
436 }
437
438 pub fn set_time_zone(&self, tz: &str) {
440 *self.time_zone.lock() = Some(tz.to_string());
441 }
442
443 #[must_use]
445 pub fn time_zone(&self) -> Option<String> {
446 self.time_zone.lock().clone()
447 }
448
449 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
451 self.session_params.lock().insert(key.to_string(), value);
452 }
453
454 #[must_use]
456 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
457 self.session_params.lock().get(key).cloned()
458 }
459
460 pub fn reset_session(&self) {
462 *self.current_schema.lock() = None;
463 *self.current_graph.lock() = None;
464 *self.time_zone.lock() = None;
465 self.session_params.lock().clear();
466 *self.viewing_epoch_override.lock() = None;
467 }
468
469 pub fn reset_schema(&self) {
471 *self.current_schema.lock() = None;
472 }
473
474 pub fn reset_graph(&self) {
476 *self.current_graph.lock() = None;
477 }
478
479 pub fn reset_time_zone(&self) {
481 *self.time_zone.lock() = None;
482 }
483
484 pub fn reset_parameters(&self) {
486 self.session_params.lock().clear();
487 }
488
489 pub fn set_viewing_epoch(&self, epoch: EpochId) {
497 *self.viewing_epoch_override.lock() = Some(epoch);
498 }
499
500 pub fn clear_viewing_epoch(&self) {
502 *self.viewing_epoch_override.lock() = None;
503 }
504
505 #[must_use]
507 pub fn viewing_epoch(&self) -> Option<EpochId> {
508 *self.viewing_epoch_override.lock()
509 }
510
511 #[must_use]
515 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
516 self.active_lpg_store().get_node_history(id)
517 }
518
519 #[must_use]
523 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
524 self.active_lpg_store().get_edge_history(id)
525 }
526
527 fn require_lpg(&self, language: &str) -> Result<()> {
529 if self.graph_model == GraphModel::Rdf {
530 return Err(grafeo_common::utils::error::Error::Internal(format!(
531 "This is an RDF database. {language} queries require an LPG database."
532 )));
533 }
534 Ok(())
535 }
536
537 #[cfg(feature = "gql")]
539 fn execute_session_command(
540 &self,
541 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
542 ) -> Result<QueryResult> {
543 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
544 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
545
546 if *self.read_only_tx.lock() {
548 match &cmd {
549 SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
550 return Err(Error::Transaction(
551 grafeo_common::utils::error::TransactionError::ReadOnly,
552 ));
553 }
554 _ => {} }
556 }
557
558 match cmd {
559 SessionCommand::CreateGraph {
560 name,
561 if_not_exists,
562 typed,
563 like_graph,
564 copy_of,
565 open: _,
566 } => {
567 let storage_key = self.effective_graph_key(&name);
569
570 if let Some(ref src) = like_graph {
572 let src_key = self.effective_graph_key(src);
573 if self.store.graph(&src_key).is_none() {
574 return Err(Error::Query(QueryError::new(
575 QueryErrorKind::Semantic,
576 format!("Source graph '{src}' does not exist"),
577 )));
578 }
579 }
580 if let Some(ref src) = copy_of {
581 let src_key = self.effective_graph_key(src);
582 if self.store.graph(&src_key).is_none() {
583 return Err(Error::Query(QueryError::new(
584 QueryErrorKind::Semantic,
585 format!("Source graph '{src}' does not exist"),
586 )));
587 }
588 }
589
590 let created = self
591 .store
592 .create_graph(&storage_key)
593 .map_err(|e| Error::Internal(e.to_string()))?;
594 if !created && !if_not_exists {
595 return Err(Error::Query(QueryError::new(
596 QueryErrorKind::Semantic,
597 format!("Graph '{name}' already exists"),
598 )));
599 }
600 if created {
601 #[cfg(feature = "wal")]
602 self.log_schema_wal(
603 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
604 name: storage_key.clone(),
605 },
606 );
607 }
608
609 if let Some(ref src) = copy_of {
611 let src_key = self.effective_graph_key(src);
612 self.store
613 .copy_graph(Some(&src_key), Some(&storage_key))
614 .map_err(|e| Error::Internal(e.to_string()))?;
615 }
616
617 if let Some(type_name) = typed
619 && let Err(e) = self
620 .catalog
621 .bind_graph_type(&storage_key, type_name.clone())
622 {
623 return Err(Error::Query(QueryError::new(
624 QueryErrorKind::Semantic,
625 e.to_string(),
626 )));
627 }
628
629 if let Some(ref src) = like_graph {
631 let src_key = self.effective_graph_key(src);
632 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
633 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
634 }
635 }
636
637 Ok(QueryResult::empty())
638 }
639 SessionCommand::DropGraph { name, if_exists } => {
640 let storage_key = self.effective_graph_key(&name);
641 let dropped = self.store.drop_graph(&storage_key);
642 if !dropped && !if_exists {
643 return Err(Error::Query(QueryError::new(
644 QueryErrorKind::Semantic,
645 format!("Graph '{name}' does not exist"),
646 )));
647 }
648 if dropped {
649 #[cfg(feature = "wal")]
650 self.log_schema_wal(
651 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
652 name: storage_key.clone(),
653 },
654 );
655 let mut current = self.current_graph.lock();
657 if current
658 .as_deref()
659 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
660 {
661 *current = None;
662 }
663 }
664 Ok(QueryResult::empty())
665 }
666 SessionCommand::UseGraph(name) => {
667 let effective_key = self.effective_graph_key(&name);
669 if !name.eq_ignore_ascii_case("default")
670 && self.store.graph(&effective_key).is_none()
671 {
672 return Err(Error::Query(QueryError::new(
673 QueryErrorKind::Semantic,
674 format!("Graph '{name}' does not exist"),
675 )));
676 }
677 self.use_graph(&name);
678 self.track_graph_touch();
680 Ok(QueryResult::empty())
681 }
682 SessionCommand::SessionSetGraph(name) => {
683 let effective_key = self.effective_graph_key(&name);
685 if !name.eq_ignore_ascii_case("default")
686 && self.store.graph(&effective_key).is_none()
687 {
688 return Err(Error::Query(QueryError::new(
689 QueryErrorKind::Semantic,
690 format!("Graph '{name}' does not exist"),
691 )));
692 }
693 self.use_graph(&name);
694 self.track_graph_touch();
696 Ok(QueryResult::empty())
697 }
698 SessionCommand::SessionSetSchema(name) => {
699 if !self.catalog.schema_exists(&name) {
701 return Err(Error::Query(QueryError::new(
702 QueryErrorKind::Semantic,
703 format!("Schema '{name}' does not exist"),
704 )));
705 }
706 self.set_schema(&name);
707 Ok(QueryResult::empty())
708 }
709 SessionCommand::SessionSetTimeZone(tz) => {
710 self.set_time_zone(&tz);
711 Ok(QueryResult::empty())
712 }
713 SessionCommand::SessionSetParameter(key, expr) => {
714 if key.eq_ignore_ascii_case("viewing_epoch") {
715 match Self::eval_integer_literal(&expr) {
716 Some(n) if n >= 0 => {
717 self.set_viewing_epoch(EpochId::new(n as u64));
718 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
719 }
720 _ => Err(Error::Query(QueryError::new(
721 QueryErrorKind::Semantic,
722 "viewing_epoch must be a non-negative integer literal",
723 ))),
724 }
725 } else {
726 self.set_parameter(&key, Value::Null);
729 Ok(QueryResult::empty())
730 }
731 }
732 SessionCommand::SessionReset(target) => {
733 use grafeo_adapters::query::gql::ast::SessionResetTarget;
734 match target {
735 SessionResetTarget::All => self.reset_session(),
736 SessionResetTarget::Schema => self.reset_schema(),
737 SessionResetTarget::Graph => self.reset_graph(),
738 SessionResetTarget::TimeZone => self.reset_time_zone(),
739 SessionResetTarget::Parameters => self.reset_parameters(),
740 }
741 Ok(QueryResult::empty())
742 }
743 SessionCommand::SessionClose => {
744 self.reset_session();
745 Ok(QueryResult::empty())
746 }
747 SessionCommand::StartTransaction {
748 read_only,
749 isolation_level,
750 } => {
751 let engine_level = isolation_level.map(|l| match l {
752 TransactionIsolationLevel::ReadCommitted => {
753 crate::transaction::IsolationLevel::ReadCommitted
754 }
755 TransactionIsolationLevel::SnapshotIsolation => {
756 crate::transaction::IsolationLevel::SnapshotIsolation
757 }
758 TransactionIsolationLevel::Serializable => {
759 crate::transaction::IsolationLevel::Serializable
760 }
761 });
762 self.begin_transaction_inner(read_only, engine_level)?;
763 Ok(QueryResult::status("Transaction started"))
764 }
765 SessionCommand::Commit => {
766 self.commit_inner()?;
767 Ok(QueryResult::status("Transaction committed"))
768 }
769 SessionCommand::Rollback => {
770 self.rollback_inner()?;
771 Ok(QueryResult::status("Transaction rolled back"))
772 }
773 SessionCommand::Savepoint(name) => {
774 self.savepoint(&name)?;
775 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
776 }
777 SessionCommand::RollbackToSavepoint(name) => {
778 self.rollback_to_savepoint(&name)?;
779 Ok(QueryResult::status(format!(
780 "Rolled back to savepoint '{name}'"
781 )))
782 }
783 SessionCommand::ReleaseSavepoint(name) => {
784 self.release_savepoint(&name)?;
785 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
786 }
787 }
788 }
789
790 #[cfg(feature = "wal")]
792 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
793 if let Some(ref wal) = self.wal
794 && let Err(e) = wal.log(record)
795 {
796 tracing::warn!("Failed to log schema change to WAL: {}", e);
797 }
798 }
799
800 #[cfg(feature = "gql")]
802 fn execute_schema_command(
803 &self,
804 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
805 ) -> Result<QueryResult> {
806 use crate::catalog::{
807 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
808 };
809 use grafeo_adapters::query::gql::ast::SchemaStatement;
810 #[cfg(feature = "wal")]
811 use grafeo_adapters::storage::wal::WalRecord;
812 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
813
814 macro_rules! wal_log {
816 ($self:expr, $record:expr) => {
817 #[cfg(feature = "wal")]
818 $self.log_schema_wal(&$record);
819 };
820 }
821
822 let result = match cmd {
823 SchemaStatement::CreateNodeType(stmt) => {
824 #[cfg(feature = "wal")]
825 let props_for_wal: Vec<(String, String, bool)> = stmt
826 .properties
827 .iter()
828 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
829 .collect();
830 let def = NodeTypeDefinition {
831 name: stmt.name.clone(),
832 properties: stmt
833 .properties
834 .iter()
835 .map(|p| TypedProperty {
836 name: p.name.clone(),
837 data_type: PropertyDataType::from_type_name(&p.data_type),
838 nullable: p.nullable,
839 default_value: p
840 .default_value
841 .as_ref()
842 .map(|s| parse_default_literal(s)),
843 })
844 .collect(),
845 constraints: Vec::new(),
846 parent_types: stmt.parent_types.clone(),
847 };
848 let result = if stmt.or_replace {
849 let _ = self.catalog.drop_node_type(&stmt.name);
850 self.catalog.register_node_type(def)
851 } else {
852 self.catalog.register_node_type(def)
853 };
854 match result {
855 Ok(()) => {
856 wal_log!(
857 self,
858 WalRecord::CreateNodeType {
859 name: stmt.name.clone(),
860 properties: props_for_wal,
861 constraints: Vec::new(),
862 }
863 );
864 Ok(QueryResult::status(format!(
865 "Created node type '{}'",
866 stmt.name
867 )))
868 }
869 Err(e) if stmt.if_not_exists => {
870 let _ = e;
871 Ok(QueryResult::status("No change"))
872 }
873 Err(e) => Err(Error::Query(QueryError::new(
874 QueryErrorKind::Semantic,
875 e.to_string(),
876 ))),
877 }
878 }
879 SchemaStatement::CreateEdgeType(stmt) => {
880 #[cfg(feature = "wal")]
881 let props_for_wal: Vec<(String, String, bool)> = stmt
882 .properties
883 .iter()
884 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
885 .collect();
886 let def = EdgeTypeDefinition {
887 name: stmt.name.clone(),
888 properties: stmt
889 .properties
890 .iter()
891 .map(|p| TypedProperty {
892 name: p.name.clone(),
893 data_type: PropertyDataType::from_type_name(&p.data_type),
894 nullable: p.nullable,
895 default_value: p
896 .default_value
897 .as_ref()
898 .map(|s| parse_default_literal(s)),
899 })
900 .collect(),
901 constraints: Vec::new(),
902 source_node_types: stmt.source_node_types.clone(),
903 target_node_types: stmt.target_node_types.clone(),
904 };
905 let result = if stmt.or_replace {
906 let _ = self.catalog.drop_edge_type_def(&stmt.name);
907 self.catalog.register_edge_type_def(def)
908 } else {
909 self.catalog.register_edge_type_def(def)
910 };
911 match result {
912 Ok(()) => {
913 wal_log!(
914 self,
915 WalRecord::CreateEdgeType {
916 name: stmt.name.clone(),
917 properties: props_for_wal,
918 constraints: Vec::new(),
919 }
920 );
921 Ok(QueryResult::status(format!(
922 "Created edge type '{}'",
923 stmt.name
924 )))
925 }
926 Err(e) if stmt.if_not_exists => {
927 let _ = e;
928 Ok(QueryResult::status("No change"))
929 }
930 Err(e) => Err(Error::Query(QueryError::new(
931 QueryErrorKind::Semantic,
932 e.to_string(),
933 ))),
934 }
935 }
936 SchemaStatement::CreateVectorIndex(stmt) => {
937 Self::create_vector_index_on_store(
938 &self.active_lpg_store(),
939 &stmt.node_label,
940 &stmt.property,
941 stmt.dimensions,
942 stmt.metric.as_deref(),
943 )?;
944 wal_log!(
945 self,
946 WalRecord::CreateIndex {
947 name: stmt.name.clone(),
948 label: stmt.node_label.clone(),
949 property: stmt.property.clone(),
950 index_type: "vector".to_string(),
951 }
952 );
953 Ok(QueryResult::status(format!(
954 "Created vector index '{}'",
955 stmt.name
956 )))
957 }
958 SchemaStatement::DropNodeType { name, if_exists } => {
959 match self.catalog.drop_node_type(&name) {
960 Ok(()) => {
961 wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
962 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
963 }
964 Err(e) if if_exists => {
965 let _ = e;
966 Ok(QueryResult::status("No change"))
967 }
968 Err(e) => Err(Error::Query(QueryError::new(
969 QueryErrorKind::Semantic,
970 e.to_string(),
971 ))),
972 }
973 }
974 SchemaStatement::DropEdgeType { name, if_exists } => {
975 match self.catalog.drop_edge_type_def(&name) {
976 Ok(()) => {
977 wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
978 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
979 }
980 Err(e) if if_exists => {
981 let _ = e;
982 Ok(QueryResult::status("No change"))
983 }
984 Err(e) => Err(Error::Query(QueryError::new(
985 QueryErrorKind::Semantic,
986 e.to_string(),
987 ))),
988 }
989 }
990 SchemaStatement::CreateIndex(stmt) => {
991 use grafeo_adapters::query::gql::ast::IndexKind;
992 let active = self.active_lpg_store();
993 let index_type_str = match stmt.index_kind {
994 IndexKind::Property => "property",
995 IndexKind::BTree => "btree",
996 IndexKind::Text => "text",
997 IndexKind::Vector => "vector",
998 };
999 match stmt.index_kind {
1000 IndexKind::Property | IndexKind::BTree => {
1001 for prop in &stmt.properties {
1002 active.create_property_index(prop);
1003 }
1004 }
1005 IndexKind::Text => {
1006 for prop in &stmt.properties {
1007 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1008 }
1009 }
1010 IndexKind::Vector => {
1011 for prop in &stmt.properties {
1012 Self::create_vector_index_on_store(
1013 &active,
1014 &stmt.label,
1015 prop,
1016 stmt.options.dimensions,
1017 stmt.options.metric.as_deref(),
1018 )?;
1019 }
1020 }
1021 }
1022 #[cfg(feature = "wal")]
1023 for prop in &stmt.properties {
1024 wal_log!(
1025 self,
1026 WalRecord::CreateIndex {
1027 name: stmt.name.clone(),
1028 label: stmt.label.clone(),
1029 property: prop.clone(),
1030 index_type: index_type_str.to_string(),
1031 }
1032 );
1033 }
1034 Ok(QueryResult::status(format!(
1035 "Created {} index '{}'",
1036 index_type_str, stmt.name
1037 )))
1038 }
1039 SchemaStatement::DropIndex { name, if_exists } => {
1040 let dropped = self.active_lpg_store().drop_property_index(&name);
1042 if dropped || if_exists {
1043 if dropped {
1044 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1045 }
1046 Ok(QueryResult::status(if dropped {
1047 format!("Dropped index '{name}'")
1048 } else {
1049 "No change".to_string()
1050 }))
1051 } else {
1052 Err(Error::Query(QueryError::new(
1053 QueryErrorKind::Semantic,
1054 format!("Index '{name}' does not exist"),
1055 )))
1056 }
1057 }
1058 SchemaStatement::CreateConstraint(stmt) => {
1059 use crate::catalog::TypeConstraint;
1060 use grafeo_adapters::query::gql::ast::ConstraintKind;
1061 let kind_str = match stmt.constraint_kind {
1062 ConstraintKind::Unique => "unique",
1063 ConstraintKind::NodeKey => "node_key",
1064 ConstraintKind::NotNull => "not_null",
1065 ConstraintKind::Exists => "exists",
1066 };
1067 let constraint_name = stmt
1068 .name
1069 .clone()
1070 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1071
1072 match stmt.constraint_kind {
1074 ConstraintKind::Unique => {
1075 for prop in &stmt.properties {
1076 let label_id = self.catalog.get_or_create_label(&stmt.label);
1077 let prop_id = self.catalog.get_or_create_property_key(prop);
1078 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1079 }
1080 let _ = self.catalog.add_constraint_to_type(
1081 &stmt.label,
1082 TypeConstraint::Unique(stmt.properties.clone()),
1083 );
1084 }
1085 ConstraintKind::NodeKey => {
1086 for prop in &stmt.properties {
1087 let label_id = self.catalog.get_or_create_label(&stmt.label);
1088 let prop_id = self.catalog.get_or_create_property_key(prop);
1089 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1090 let _ = self.catalog.add_required_property(label_id, prop_id);
1091 }
1092 let _ = self.catalog.add_constraint_to_type(
1093 &stmt.label,
1094 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1095 );
1096 }
1097 ConstraintKind::NotNull | ConstraintKind::Exists => {
1098 for prop in &stmt.properties {
1099 let label_id = self.catalog.get_or_create_label(&stmt.label);
1100 let prop_id = self.catalog.get_or_create_property_key(prop);
1101 let _ = self.catalog.add_required_property(label_id, prop_id);
1102 let _ = self.catalog.add_constraint_to_type(
1103 &stmt.label,
1104 TypeConstraint::NotNull(prop.clone()),
1105 );
1106 }
1107 }
1108 }
1109
1110 wal_log!(
1111 self,
1112 WalRecord::CreateConstraint {
1113 name: constraint_name.clone(),
1114 label: stmt.label.clone(),
1115 properties: stmt.properties.clone(),
1116 kind: kind_str.to_string(),
1117 }
1118 );
1119 Ok(QueryResult::status(format!(
1120 "Created {kind_str} constraint '{constraint_name}'"
1121 )))
1122 }
1123 SchemaStatement::DropConstraint { name, if_exists } => {
1124 let _ = if_exists;
1125 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1126 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1127 }
1128 SchemaStatement::CreateGraphType(stmt) => {
1129 use crate::catalog::GraphTypeDefinition;
1130 use grafeo_adapters::query::gql::ast::InlineElementType;
1131
1132 let (mut node_types, mut edge_types, open) =
1134 if let Some(ref like_graph) = stmt.like_graph {
1135 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1137 if let Some(existing) = self
1138 .catalog
1139 .schema()
1140 .and_then(|s| s.get_graph_type(&type_name))
1141 {
1142 (
1143 existing.allowed_node_types.clone(),
1144 existing.allowed_edge_types.clone(),
1145 existing.open,
1146 )
1147 } else {
1148 (Vec::new(), Vec::new(), true)
1149 }
1150 } else {
1151 let nt = self.catalog.all_node_type_names();
1153 let et = self.catalog.all_edge_type_names();
1154 if nt.is_empty() && et.is_empty() {
1155 (Vec::new(), Vec::new(), true)
1156 } else {
1157 (nt, et, false)
1158 }
1159 }
1160 } else {
1161 (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1162 };
1163
1164 for inline in &stmt.inline_types {
1166 match inline {
1167 InlineElementType::Node {
1168 name,
1169 properties,
1170 key_labels,
1171 ..
1172 } => {
1173 let def = NodeTypeDefinition {
1174 name: name.clone(),
1175 properties: properties
1176 .iter()
1177 .map(|p| TypedProperty {
1178 name: p.name.clone(),
1179 data_type: PropertyDataType::from_type_name(&p.data_type),
1180 nullable: p.nullable,
1181 default_value: None,
1182 })
1183 .collect(),
1184 constraints: Vec::new(),
1185 parent_types: key_labels.clone(),
1186 };
1187 self.catalog.register_or_replace_node_type(def);
1189 #[cfg(feature = "wal")]
1190 {
1191 let props_for_wal: Vec<(String, String, bool)> = properties
1192 .iter()
1193 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1194 .collect();
1195 self.log_schema_wal(&WalRecord::CreateNodeType {
1196 name: name.clone(),
1197 properties: props_for_wal,
1198 constraints: Vec::new(),
1199 });
1200 }
1201 if !node_types.contains(name) {
1202 node_types.push(name.clone());
1203 }
1204 }
1205 InlineElementType::Edge {
1206 name,
1207 properties,
1208 source_node_types,
1209 target_node_types,
1210 ..
1211 } => {
1212 let def = EdgeTypeDefinition {
1213 name: name.clone(),
1214 properties: properties
1215 .iter()
1216 .map(|p| TypedProperty {
1217 name: p.name.clone(),
1218 data_type: PropertyDataType::from_type_name(&p.data_type),
1219 nullable: p.nullable,
1220 default_value: None,
1221 })
1222 .collect(),
1223 constraints: Vec::new(),
1224 source_node_types: source_node_types.clone(),
1225 target_node_types: target_node_types.clone(),
1226 };
1227 self.catalog.register_or_replace_edge_type_def(def);
1228 #[cfg(feature = "wal")]
1229 {
1230 let props_for_wal: Vec<(String, String, bool)> = properties
1231 .iter()
1232 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1233 .collect();
1234 self.log_schema_wal(&WalRecord::CreateEdgeType {
1235 name: name.clone(),
1236 properties: props_for_wal,
1237 constraints: Vec::new(),
1238 });
1239 }
1240 if !edge_types.contains(name) {
1241 edge_types.push(name.clone());
1242 }
1243 }
1244 }
1245 }
1246
1247 let def = GraphTypeDefinition {
1248 name: stmt.name.clone(),
1249 allowed_node_types: node_types.clone(),
1250 allowed_edge_types: edge_types.clone(),
1251 open,
1252 };
1253 let result = if stmt.or_replace {
1254 let _ = self.catalog.drop_graph_type(&stmt.name);
1256 self.catalog.register_graph_type(def)
1257 } else {
1258 self.catalog.register_graph_type(def)
1259 };
1260 match result {
1261 Ok(()) => {
1262 wal_log!(
1263 self,
1264 WalRecord::CreateGraphType {
1265 name: stmt.name.clone(),
1266 node_types,
1267 edge_types,
1268 open,
1269 }
1270 );
1271 Ok(QueryResult::status(format!(
1272 "Created graph type '{}'",
1273 stmt.name
1274 )))
1275 }
1276 Err(e) if stmt.if_not_exists => {
1277 let _ = e;
1278 Ok(QueryResult::status("No change"))
1279 }
1280 Err(e) => Err(Error::Query(QueryError::new(
1281 QueryErrorKind::Semantic,
1282 e.to_string(),
1283 ))),
1284 }
1285 }
1286 SchemaStatement::DropGraphType { name, if_exists } => {
1287 match self.catalog.drop_graph_type(&name) {
1288 Ok(()) => {
1289 wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1290 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1291 }
1292 Err(e) if if_exists => {
1293 let _ = e;
1294 Ok(QueryResult::status("No change"))
1295 }
1296 Err(e) => Err(Error::Query(QueryError::new(
1297 QueryErrorKind::Semantic,
1298 e.to_string(),
1299 ))),
1300 }
1301 }
1302 SchemaStatement::CreateSchema {
1303 name,
1304 if_not_exists,
1305 } => match self.catalog.register_schema_namespace(name.clone()) {
1306 Ok(()) => {
1307 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1308 Ok(QueryResult::status(format!("Created schema '{name}'")))
1309 }
1310 Err(e) if if_not_exists => {
1311 let _ = e;
1312 Ok(QueryResult::status("No change"))
1313 }
1314 Err(e) => Err(Error::Query(QueryError::new(
1315 QueryErrorKind::Semantic,
1316 e.to_string(),
1317 ))),
1318 },
1319 SchemaStatement::DropSchema { name, if_exists } => {
1320 let prefix = format!("{name}/");
1322 let has_graphs = self
1323 .store
1324 .graph_names()
1325 .iter()
1326 .any(|g| g.starts_with(&prefix));
1327 if has_graphs {
1328 return Err(Error::Query(QueryError::new(
1329 QueryErrorKind::Semantic,
1330 format!(
1331 "Schema '{name}' is not empty: drop all graphs in the schema first"
1332 ),
1333 )));
1334 }
1335 match self.catalog.drop_schema_namespace(&name) {
1336 Ok(()) => {
1337 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1338 let mut current = self.current_schema.lock();
1340 if current
1341 .as_deref()
1342 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1343 {
1344 *current = None;
1345 }
1346 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1347 }
1348 Err(e) if if_exists => {
1349 let _ = e;
1350 Ok(QueryResult::status("No change"))
1351 }
1352 Err(e) => Err(Error::Query(QueryError::new(
1353 QueryErrorKind::Semantic,
1354 e.to_string(),
1355 ))),
1356 }
1357 }
1358 SchemaStatement::AlterNodeType(stmt) => {
1359 use grafeo_adapters::query::gql::ast::TypeAlteration;
1360 let mut wal_alts = Vec::new();
1361 for alt in &stmt.alterations {
1362 match alt {
1363 TypeAlteration::AddProperty(prop) => {
1364 let typed = TypedProperty {
1365 name: prop.name.clone(),
1366 data_type: PropertyDataType::from_type_name(&prop.data_type),
1367 nullable: prop.nullable,
1368 default_value: prop
1369 .default_value
1370 .as_ref()
1371 .map(|s| parse_default_literal(s)),
1372 };
1373 self.catalog
1374 .alter_node_type_add_property(&stmt.name, typed)
1375 .map_err(|e| {
1376 Error::Query(QueryError::new(
1377 QueryErrorKind::Semantic,
1378 e.to_string(),
1379 ))
1380 })?;
1381 wal_alts.push((
1382 "add".to_string(),
1383 prop.name.clone(),
1384 prop.data_type.clone(),
1385 prop.nullable,
1386 ));
1387 }
1388 TypeAlteration::DropProperty(name) => {
1389 self.catalog
1390 .alter_node_type_drop_property(&stmt.name, name)
1391 .map_err(|e| {
1392 Error::Query(QueryError::new(
1393 QueryErrorKind::Semantic,
1394 e.to_string(),
1395 ))
1396 })?;
1397 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1398 }
1399 }
1400 }
1401 wal_log!(
1402 self,
1403 WalRecord::AlterNodeType {
1404 name: stmt.name.clone(),
1405 alterations: wal_alts,
1406 }
1407 );
1408 Ok(QueryResult::status(format!(
1409 "Altered node type '{}'",
1410 stmt.name
1411 )))
1412 }
1413 SchemaStatement::AlterEdgeType(stmt) => {
1414 use grafeo_adapters::query::gql::ast::TypeAlteration;
1415 let mut wal_alts = Vec::new();
1416 for alt in &stmt.alterations {
1417 match alt {
1418 TypeAlteration::AddProperty(prop) => {
1419 let typed = TypedProperty {
1420 name: prop.name.clone(),
1421 data_type: PropertyDataType::from_type_name(&prop.data_type),
1422 nullable: prop.nullable,
1423 default_value: prop
1424 .default_value
1425 .as_ref()
1426 .map(|s| parse_default_literal(s)),
1427 };
1428 self.catalog
1429 .alter_edge_type_add_property(&stmt.name, typed)
1430 .map_err(|e| {
1431 Error::Query(QueryError::new(
1432 QueryErrorKind::Semantic,
1433 e.to_string(),
1434 ))
1435 })?;
1436 wal_alts.push((
1437 "add".to_string(),
1438 prop.name.clone(),
1439 prop.data_type.clone(),
1440 prop.nullable,
1441 ));
1442 }
1443 TypeAlteration::DropProperty(name) => {
1444 self.catalog
1445 .alter_edge_type_drop_property(&stmt.name, name)
1446 .map_err(|e| {
1447 Error::Query(QueryError::new(
1448 QueryErrorKind::Semantic,
1449 e.to_string(),
1450 ))
1451 })?;
1452 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1453 }
1454 }
1455 }
1456 wal_log!(
1457 self,
1458 WalRecord::AlterEdgeType {
1459 name: stmt.name.clone(),
1460 alterations: wal_alts,
1461 }
1462 );
1463 Ok(QueryResult::status(format!(
1464 "Altered edge type '{}'",
1465 stmt.name
1466 )))
1467 }
1468 SchemaStatement::AlterGraphType(stmt) => {
1469 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1470 let mut wal_alts = Vec::new();
1471 for alt in &stmt.alterations {
1472 match alt {
1473 GraphTypeAlteration::AddNodeType(name) => {
1474 self.catalog
1475 .alter_graph_type_add_node_type(&stmt.name, name.clone())
1476 .map_err(|e| {
1477 Error::Query(QueryError::new(
1478 QueryErrorKind::Semantic,
1479 e.to_string(),
1480 ))
1481 })?;
1482 wal_alts.push(("add_node_type".to_string(), name.clone()));
1483 }
1484 GraphTypeAlteration::DropNodeType(name) => {
1485 self.catalog
1486 .alter_graph_type_drop_node_type(&stmt.name, name)
1487 .map_err(|e| {
1488 Error::Query(QueryError::new(
1489 QueryErrorKind::Semantic,
1490 e.to_string(),
1491 ))
1492 })?;
1493 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1494 }
1495 GraphTypeAlteration::AddEdgeType(name) => {
1496 self.catalog
1497 .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1498 .map_err(|e| {
1499 Error::Query(QueryError::new(
1500 QueryErrorKind::Semantic,
1501 e.to_string(),
1502 ))
1503 })?;
1504 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1505 }
1506 GraphTypeAlteration::DropEdgeType(name) => {
1507 self.catalog
1508 .alter_graph_type_drop_edge_type(&stmt.name, name)
1509 .map_err(|e| {
1510 Error::Query(QueryError::new(
1511 QueryErrorKind::Semantic,
1512 e.to_string(),
1513 ))
1514 })?;
1515 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1516 }
1517 }
1518 }
1519 wal_log!(
1520 self,
1521 WalRecord::AlterGraphType {
1522 name: stmt.name.clone(),
1523 alterations: wal_alts,
1524 }
1525 );
1526 Ok(QueryResult::status(format!(
1527 "Altered graph type '{}'",
1528 stmt.name
1529 )))
1530 }
1531 SchemaStatement::CreateProcedure(stmt) => {
1532 use crate::catalog::ProcedureDefinition;
1533
1534 let def = ProcedureDefinition {
1535 name: stmt.name.clone(),
1536 params: stmt
1537 .params
1538 .iter()
1539 .map(|p| (p.name.clone(), p.param_type.clone()))
1540 .collect(),
1541 returns: stmt
1542 .returns
1543 .iter()
1544 .map(|r| (r.name.clone(), r.return_type.clone()))
1545 .collect(),
1546 body: stmt.body.clone(),
1547 };
1548
1549 if stmt.or_replace {
1550 self.catalog.replace_procedure(def).map_err(|e| {
1551 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1552 })?;
1553 } else {
1554 match self.catalog.register_procedure(def) {
1555 Ok(()) => {}
1556 Err(_) if stmt.if_not_exists => {
1557 return Ok(QueryResult::empty());
1558 }
1559 Err(e) => {
1560 return Err(Error::Query(QueryError::new(
1561 QueryErrorKind::Semantic,
1562 e.to_string(),
1563 )));
1564 }
1565 }
1566 }
1567
1568 wal_log!(
1569 self,
1570 WalRecord::CreateProcedure {
1571 name: stmt.name.clone(),
1572 params: stmt
1573 .params
1574 .iter()
1575 .map(|p| (p.name.clone(), p.param_type.clone()))
1576 .collect(),
1577 returns: stmt
1578 .returns
1579 .iter()
1580 .map(|r| (r.name.clone(), r.return_type.clone()))
1581 .collect(),
1582 body: stmt.body,
1583 }
1584 );
1585 Ok(QueryResult::status(format!(
1586 "Created procedure '{}'",
1587 stmt.name
1588 )))
1589 }
1590 SchemaStatement::DropProcedure { name, if_exists } => {
1591 match self.catalog.drop_procedure(&name) {
1592 Ok(()) => {}
1593 Err(_) if if_exists => {
1594 return Ok(QueryResult::empty());
1595 }
1596 Err(e) => {
1597 return Err(Error::Query(QueryError::new(
1598 QueryErrorKind::Semantic,
1599 e.to_string(),
1600 )));
1601 }
1602 }
1603 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1604 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1605 }
1606 SchemaStatement::ShowIndexes => {
1607 return self.execute_show_indexes();
1608 }
1609 SchemaStatement::ShowConstraints => {
1610 return self.execute_show_constraints();
1611 }
1612 SchemaStatement::ShowNodeTypes => {
1613 return self.execute_show_node_types();
1614 }
1615 SchemaStatement::ShowEdgeTypes => {
1616 return self.execute_show_edge_types();
1617 }
1618 SchemaStatement::ShowGraphTypes => {
1619 return self.execute_show_graph_types();
1620 }
1621 SchemaStatement::ShowGraphType(name) => {
1622 return self.execute_show_graph_type(&name);
1623 }
1624 SchemaStatement::ShowCurrentGraphType => {
1625 return self.execute_show_current_graph_type();
1626 }
1627 SchemaStatement::ShowGraphs => {
1628 return self.execute_show_graphs();
1629 }
1630 SchemaStatement::ShowSchemas => {
1631 return self.execute_show_schemas();
1632 }
1633 };
1634
1635 if result.is_ok() {
1638 self.query_cache.clear();
1639 }
1640
1641 result
1642 }
1643
1644 #[cfg(all(feature = "gql", feature = "vector-index"))]
1646 fn create_vector_index_on_store(
1647 store: &LpgStore,
1648 label: &str,
1649 property: &str,
1650 dimensions: Option<usize>,
1651 metric: Option<&str>,
1652 ) -> Result<()> {
1653 use grafeo_common::types::{PropertyKey, Value};
1654 use grafeo_common::utils::error::Error;
1655 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1656
1657 let metric = match metric {
1658 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1659 Error::Internal(format!(
1660 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1661 ))
1662 })?,
1663 None => DistanceMetric::Cosine,
1664 };
1665
1666 let prop_key = PropertyKey::new(property);
1667 let mut found_dims: Option<usize> = dimensions;
1668 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1669
1670 for node in store.nodes_with_label(label) {
1671 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1672 if let Some(expected) = found_dims {
1673 if v.len() != expected {
1674 return Err(Error::Internal(format!(
1675 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1676 v.len(),
1677 node.id.0
1678 )));
1679 }
1680 } else {
1681 found_dims = Some(v.len());
1682 }
1683 vectors.push((node.id, v.to_vec()));
1684 }
1685 }
1686
1687 let Some(dims) = found_dims else {
1688 return Err(Error::Internal(format!(
1689 "No vector properties found on :{label}({property}) and no dimensions specified"
1690 )));
1691 };
1692
1693 let config = HnswConfig::new(dims, metric);
1694 let index = HnswIndex::with_capacity(config, vectors.len());
1695 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1696 for (node_id, vec) in &vectors {
1697 index.insert(*node_id, vec, &accessor);
1698 }
1699
1700 store.add_vector_index(label, property, Arc::new(index));
1701 Ok(())
1702 }
1703
1704 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1706 fn create_vector_index_on_store(
1707 _store: &LpgStore,
1708 _label: &str,
1709 _property: &str,
1710 _dimensions: Option<usize>,
1711 _metric: Option<&str>,
1712 ) -> Result<()> {
1713 Err(grafeo_common::utils::error::Error::Internal(
1714 "Vector index support requires the 'vector-index' feature".to_string(),
1715 ))
1716 }
1717
1718 #[cfg(all(feature = "gql", feature = "text-index"))]
1720 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1721 use grafeo_common::types::{PropertyKey, Value};
1722 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1723
1724 let mut index = InvertedIndex::new(BM25Config::default());
1725 let prop_key = PropertyKey::new(property);
1726
1727 let nodes = store.nodes_by_label(label);
1728 for node_id in nodes {
1729 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1730 index.insert(node_id, text.as_str());
1731 }
1732 }
1733
1734 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1735 Ok(())
1736 }
1737
1738 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1740 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1741 Err(grafeo_common::utils::error::Error::Internal(
1742 "Text index support requires the 'text-index' feature".to_string(),
1743 ))
1744 }
1745
1746 fn execute_show_indexes(&self) -> Result<QueryResult> {
1748 let indexes = self.catalog.all_indexes();
1749 let columns = vec![
1750 "name".to_string(),
1751 "type".to_string(),
1752 "label".to_string(),
1753 "property".to_string(),
1754 ];
1755 let rows: Vec<Vec<Value>> = indexes
1756 .into_iter()
1757 .map(|def| {
1758 let label_name = self
1759 .catalog
1760 .get_label_name(def.label)
1761 .unwrap_or_else(|| "?".into());
1762 let prop_name = self
1763 .catalog
1764 .get_property_key_name(def.property_key)
1765 .unwrap_or_else(|| "?".into());
1766 vec![
1767 Value::from(format!("idx_{}_{}", label_name, prop_name)),
1768 Value::from(format!("{:?}", def.index_type)),
1769 Value::from(&*label_name),
1770 Value::from(&*prop_name),
1771 ]
1772 })
1773 .collect();
1774 Ok(QueryResult {
1775 columns,
1776 column_types: Vec::new(),
1777 rows,
1778 ..QueryResult::empty()
1779 })
1780 }
1781
1782 fn execute_show_constraints(&self) -> Result<QueryResult> {
1784 Ok(QueryResult {
1787 columns: vec![
1788 "name".to_string(),
1789 "type".to_string(),
1790 "label".to_string(),
1791 "properties".to_string(),
1792 ],
1793 column_types: Vec::new(),
1794 rows: Vec::new(),
1795 ..QueryResult::empty()
1796 })
1797 }
1798
1799 fn execute_show_node_types(&self) -> Result<QueryResult> {
1801 let columns = vec![
1802 "name".to_string(),
1803 "properties".to_string(),
1804 "constraints".to_string(),
1805 "parents".to_string(),
1806 ];
1807 let type_names = self.catalog.all_node_type_names();
1808 let rows: Vec<Vec<Value>> = type_names
1809 .into_iter()
1810 .filter_map(|name| {
1811 let def = self.catalog.get_node_type(&name)?;
1812 let props: Vec<String> = def
1813 .properties
1814 .iter()
1815 .map(|p| {
1816 let nullable = if p.nullable { "" } else { " NOT NULL" };
1817 format!("{} {}{}", p.name, p.data_type, nullable)
1818 })
1819 .collect();
1820 let constraints: Vec<String> =
1821 def.constraints.iter().map(|c| format!("{c:?}")).collect();
1822 let parents = def.parent_types.join(", ");
1823 Some(vec![
1824 Value::from(name),
1825 Value::from(props.join(", ")),
1826 Value::from(constraints.join(", ")),
1827 Value::from(parents),
1828 ])
1829 })
1830 .collect();
1831 Ok(QueryResult {
1832 columns,
1833 column_types: Vec::new(),
1834 rows,
1835 ..QueryResult::empty()
1836 })
1837 }
1838
1839 fn execute_show_edge_types(&self) -> Result<QueryResult> {
1841 let columns = vec![
1842 "name".to_string(),
1843 "properties".to_string(),
1844 "source_types".to_string(),
1845 "target_types".to_string(),
1846 ];
1847 let type_names = self.catalog.all_edge_type_names();
1848 let rows: Vec<Vec<Value>> = type_names
1849 .into_iter()
1850 .filter_map(|name| {
1851 let def = self.catalog.get_edge_type_def(&name)?;
1852 let props: Vec<String> = def
1853 .properties
1854 .iter()
1855 .map(|p| {
1856 let nullable = if p.nullable { "" } else { " NOT NULL" };
1857 format!("{} {}{}", p.name, p.data_type, nullable)
1858 })
1859 .collect();
1860 let src = def.source_node_types.join(", ");
1861 let tgt = def.target_node_types.join(", ");
1862 Some(vec![
1863 Value::from(name),
1864 Value::from(props.join(", ")),
1865 Value::from(src),
1866 Value::from(tgt),
1867 ])
1868 })
1869 .collect();
1870 Ok(QueryResult {
1871 columns,
1872 column_types: Vec::new(),
1873 rows,
1874 ..QueryResult::empty()
1875 })
1876 }
1877
1878 fn execute_show_graph_types(&self) -> Result<QueryResult> {
1880 let columns = vec![
1881 "name".to_string(),
1882 "open".to_string(),
1883 "node_types".to_string(),
1884 "edge_types".to_string(),
1885 ];
1886 let type_names = self.catalog.all_graph_type_names();
1887 let rows: Vec<Vec<Value>> = type_names
1888 .into_iter()
1889 .filter_map(|name| {
1890 let def = self.catalog.get_graph_type_def(&name)?;
1891 Some(vec![
1892 Value::from(name),
1893 Value::from(def.open),
1894 Value::from(def.allowed_node_types.join(", ")),
1895 Value::from(def.allowed_edge_types.join(", ")),
1896 ])
1897 })
1898 .collect();
1899 Ok(QueryResult {
1900 columns,
1901 column_types: Vec::new(),
1902 rows,
1903 ..QueryResult::empty()
1904 })
1905 }
1906
1907 fn execute_show_graphs(&self) -> Result<QueryResult> {
1913 let schema = self.current_schema.lock().clone();
1914 let all_names = self.store.graph_names();
1915
1916 let mut names: Vec<String> = match &schema {
1917 Some(s) => {
1918 let prefix = format!("{s}/");
1919 all_names
1920 .into_iter()
1921 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1922 .collect()
1923 }
1924 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1925 };
1926 names.sort();
1927
1928 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1929 Ok(QueryResult {
1930 columns: vec!["name".to_string()],
1931 column_types: Vec::new(),
1932 rows,
1933 ..QueryResult::empty()
1934 })
1935 }
1936
1937 fn execute_show_schemas(&self) -> Result<QueryResult> {
1939 let mut names = self.catalog.schema_names();
1940 names.sort();
1941 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1942 Ok(QueryResult {
1943 columns: vec!["name".to_string()],
1944 column_types: Vec::new(),
1945 rows,
1946 ..QueryResult::empty()
1947 })
1948 }
1949
1950 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1952 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1953
1954 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1955 Error::Query(QueryError::new(
1956 QueryErrorKind::Semantic,
1957 format!("Graph type '{name}' not found"),
1958 ))
1959 })?;
1960
1961 let columns = vec![
1962 "name".to_string(),
1963 "open".to_string(),
1964 "node_types".to_string(),
1965 "edge_types".to_string(),
1966 ];
1967 let rows = vec![vec![
1968 Value::from(def.name),
1969 Value::from(def.open),
1970 Value::from(def.allowed_node_types.join(", ")),
1971 Value::from(def.allowed_edge_types.join(", ")),
1972 ]];
1973 Ok(QueryResult {
1974 columns,
1975 column_types: Vec::new(),
1976 rows,
1977 ..QueryResult::empty()
1978 })
1979 }
1980
1981 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1983 let graph_name = self
1984 .current_graph()
1985 .unwrap_or_else(|| "default".to_string());
1986 let columns = vec![
1987 "graph".to_string(),
1988 "graph_type".to_string(),
1989 "open".to_string(),
1990 "node_types".to_string(),
1991 "edge_types".to_string(),
1992 ];
1993
1994 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
1995 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
1996 {
1997 let rows = vec![vec![
1998 Value::from(graph_name),
1999 Value::from(type_name),
2000 Value::from(def.open),
2001 Value::from(def.allowed_node_types.join(", ")),
2002 Value::from(def.allowed_edge_types.join(", ")),
2003 ]];
2004 return Ok(QueryResult {
2005 columns,
2006 column_types: Vec::new(),
2007 rows,
2008 ..QueryResult::empty()
2009 });
2010 }
2011
2012 Ok(QueryResult {
2014 columns,
2015 column_types: Vec::new(),
2016 rows: vec![vec![
2017 Value::from(graph_name),
2018 Value::Null,
2019 Value::Null,
2020 Value::Null,
2021 Value::Null,
2022 ]],
2023 ..QueryResult::empty()
2024 })
2025 }
2026
2027 #[cfg(feature = "gql")]
2054 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2055 self.require_lpg("GQL")?;
2056
2057 use crate::query::{
2058 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2059 processor::QueryLanguage, translators::gql,
2060 };
2061
2062 #[cfg(not(target_arch = "wasm32"))]
2063 let start_time = std::time::Instant::now();
2064
2065 let translation = gql::translate_full(query)?;
2067 let logical_plan = match translation {
2068 gql::GqlTranslationResult::SessionCommand(cmd) => {
2069 return self.execute_session_command(cmd);
2070 }
2071 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2072 if *self.read_only_tx.lock() {
2074 return Err(grafeo_common::utils::error::Error::Transaction(
2075 grafeo_common::utils::error::TransactionError::ReadOnly,
2076 ));
2077 }
2078 return self.execute_schema_command(cmd);
2079 }
2080 gql::GqlTranslationResult::Plan(plan) => {
2081 if *self.read_only_tx.lock() && plan.root.has_mutations() {
2083 return Err(grafeo_common::utils::error::Error::Transaction(
2084 grafeo_common::utils::error::TransactionError::ReadOnly,
2085 ));
2086 }
2087 plan
2088 }
2089 };
2090
2091 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2093
2094 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2096 cached_plan
2097 } else {
2098 let mut binder = Binder::new();
2100 let _binding_context = binder.bind(&logical_plan)?;
2101
2102 let active = self.active_store();
2104 let optimizer = Optimizer::from_graph_store(&*active);
2105 let plan = optimizer.optimize(logical_plan)?;
2106
2107 self.query_cache.put_optimized(cache_key, plan.clone());
2109
2110 plan
2111 };
2112
2113 let active = self.active_store();
2115
2116 if optimized_plan.explain {
2118 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2119 let mut plan = optimized_plan;
2120 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2121 return Ok(explain_result(&plan));
2122 }
2123
2124 if optimized_plan.profile {
2126 let has_mutations = optimized_plan.root.has_mutations();
2127 return self.with_auto_commit(has_mutations, || {
2128 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2129 let planner = self.create_planner_for_store(
2130 Arc::clone(&active),
2131 viewing_epoch,
2132 transaction_id,
2133 );
2134 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2135
2136 let executor = Executor::with_columns(physical_plan.columns.clone())
2137 .with_deadline(self.query_deadline());
2138 let _result = executor.execute(physical_plan.operator.as_mut())?;
2139
2140 let total_time_ms;
2141 #[cfg(not(target_arch = "wasm32"))]
2142 {
2143 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2144 }
2145 #[cfg(target_arch = "wasm32")]
2146 {
2147 total_time_ms = 0.0;
2148 }
2149
2150 let profile_tree = crate::query::profile::build_profile_tree(
2151 &optimized_plan.root,
2152 &mut entries.into_iter(),
2153 );
2154 Ok(crate::query::profile::profile_result(
2155 &profile_tree,
2156 total_time_ms,
2157 ))
2158 });
2159 }
2160
2161 let has_mutations = optimized_plan.root.has_mutations();
2162
2163 let result = self.with_auto_commit(has_mutations, || {
2164 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2166
2167 let planner =
2170 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2171 let mut physical_plan = planner.plan(&optimized_plan)?;
2172
2173 let executor = Executor::with_columns(physical_plan.columns.clone())
2175 .with_deadline(self.query_deadline());
2176 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2177
2178 let rows_scanned = result.rows.len() as u64;
2180 #[cfg(not(target_arch = "wasm32"))]
2181 {
2182 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2183 result.execution_time_ms = Some(elapsed_ms);
2184 }
2185 result.rows_scanned = Some(rows_scanned);
2186
2187 Ok(result)
2188 });
2189
2190 #[cfg(feature = "metrics")]
2192 {
2193 #[cfg(not(target_arch = "wasm32"))]
2194 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2195 #[cfg(target_arch = "wasm32")]
2196 let elapsed_ms = None;
2197 self.record_query_metrics("gql", elapsed_ms, &result);
2198 }
2199
2200 result
2201 }
2202
2203 #[cfg(feature = "gql")]
2212 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2213 let previous = self.viewing_epoch_override.lock().replace(epoch);
2214 let result = self.execute(query);
2215 *self.viewing_epoch_override.lock() = previous;
2216 result
2217 }
2218
2219 #[cfg(feature = "gql")]
2225 pub fn execute_with_params(
2226 &self,
2227 query: &str,
2228 params: std::collections::HashMap<String, Value>,
2229 ) -> Result<QueryResult> {
2230 self.require_lpg("GQL")?;
2231
2232 use crate::query::processor::{QueryLanguage, QueryProcessor};
2233
2234 let has_mutations = Self::query_looks_like_mutation(query);
2235 let active = self.active_store();
2236
2237 self.with_auto_commit(has_mutations, || {
2238 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2240
2241 let processor = QueryProcessor::for_graph_store_with_transaction(
2243 Arc::clone(&active),
2244 Arc::clone(&self.transaction_manager),
2245 )?;
2246
2247 let processor = if let Some(transaction_id) = transaction_id {
2249 processor.with_transaction_context(viewing_epoch, transaction_id)
2250 } else {
2251 processor
2252 };
2253
2254 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2255 })
2256 }
2257
2258 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2264 pub fn execute_with_params(
2265 &self,
2266 _query: &str,
2267 _params: std::collections::HashMap<String, Value>,
2268 ) -> Result<QueryResult> {
2269 Err(grafeo_common::utils::error::Error::Internal(
2270 "No query language enabled".to_string(),
2271 ))
2272 }
2273
2274 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2280 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2281 Err(grafeo_common::utils::error::Error::Internal(
2282 "No query language enabled".to_string(),
2283 ))
2284 }
2285
2286 #[cfg(feature = "cypher")]
2292 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2293 use crate::query::{
2294 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2295 processor::QueryLanguage, translators::cypher,
2296 };
2297 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2298
2299 let translation = cypher::translate_full(query)?;
2301 match translation {
2302 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2303 if *self.read_only_tx.lock() {
2304 return Err(GrafeoError::Query(QueryError::new(
2305 QueryErrorKind::Semantic,
2306 "Cannot execute schema DDL in a read-only transaction",
2307 )));
2308 }
2309 return self.execute_schema_command(cmd);
2310 }
2311 cypher::CypherTranslationResult::ShowIndexes => {
2312 return self.execute_show_indexes();
2313 }
2314 cypher::CypherTranslationResult::ShowConstraints => {
2315 return self.execute_show_constraints();
2316 }
2317 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2318 return self.execute_show_current_graph_type();
2319 }
2320 cypher::CypherTranslationResult::Plan(_) => {
2321 }
2323 }
2324
2325 #[cfg(not(target_arch = "wasm32"))]
2326 let start_time = std::time::Instant::now();
2327
2328 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2330
2331 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2333 cached_plan
2334 } else {
2335 let logical_plan = cypher::translate(query)?;
2337
2338 let mut binder = Binder::new();
2340 let _binding_context = binder.bind(&logical_plan)?;
2341
2342 let active = self.active_store();
2344 let optimizer = Optimizer::from_graph_store(&*active);
2345 let plan = optimizer.optimize(logical_plan)?;
2346
2347 self.query_cache.put_optimized(cache_key, plan.clone());
2349
2350 plan
2351 };
2352
2353 let active = self.active_store();
2355
2356 if optimized_plan.explain {
2358 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2359 let mut plan = optimized_plan;
2360 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2361 return Ok(explain_result(&plan));
2362 }
2363
2364 if optimized_plan.profile {
2366 let has_mutations = optimized_plan.root.has_mutations();
2367 return self.with_auto_commit(has_mutations, || {
2368 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2369 let planner = self.create_planner_for_store(
2370 Arc::clone(&active),
2371 viewing_epoch,
2372 transaction_id,
2373 );
2374 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2375
2376 let executor = Executor::with_columns(physical_plan.columns.clone())
2377 .with_deadline(self.query_deadline());
2378 let _result = executor.execute(physical_plan.operator.as_mut())?;
2379
2380 let total_time_ms;
2381 #[cfg(not(target_arch = "wasm32"))]
2382 {
2383 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2384 }
2385 #[cfg(target_arch = "wasm32")]
2386 {
2387 total_time_ms = 0.0;
2388 }
2389
2390 let profile_tree = crate::query::profile::build_profile_tree(
2391 &optimized_plan.root,
2392 &mut entries.into_iter(),
2393 );
2394 Ok(crate::query::profile::profile_result(
2395 &profile_tree,
2396 total_time_ms,
2397 ))
2398 });
2399 }
2400
2401 let has_mutations = optimized_plan.root.has_mutations();
2402
2403 let result = self.with_auto_commit(has_mutations, || {
2404 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2406
2407 let planner =
2409 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2410 let mut physical_plan = planner.plan(&optimized_plan)?;
2411
2412 let executor = Executor::with_columns(physical_plan.columns.clone())
2414 .with_deadline(self.query_deadline());
2415 executor.execute(physical_plan.operator.as_mut())
2416 });
2417
2418 #[cfg(feature = "metrics")]
2419 {
2420 #[cfg(not(target_arch = "wasm32"))]
2421 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2422 #[cfg(target_arch = "wasm32")]
2423 let elapsed_ms = None;
2424 self.record_query_metrics("cypher", elapsed_ms, &result);
2425 }
2426
2427 result
2428 }
2429
2430 #[cfg(feature = "gremlin")]
2454 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2455 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2456
2457 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2458 let start_time = Instant::now();
2459
2460 let logical_plan = gremlin::translate(query)?;
2462
2463 let mut binder = Binder::new();
2465 let _binding_context = binder.bind(&logical_plan)?;
2466
2467 let active = self.active_store();
2469 let optimizer = Optimizer::from_graph_store(&*active);
2470 let optimized_plan = optimizer.optimize(logical_plan)?;
2471
2472 let has_mutations = optimized_plan.root.has_mutations();
2473
2474 let result = self.with_auto_commit(has_mutations, || {
2475 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2477
2478 let planner =
2480 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2481 let mut physical_plan = planner.plan(&optimized_plan)?;
2482
2483 let executor = Executor::with_columns(physical_plan.columns.clone())
2485 .with_deadline(self.query_deadline());
2486 executor.execute(physical_plan.operator.as_mut())
2487 });
2488
2489 #[cfg(feature = "metrics")]
2490 {
2491 #[cfg(not(target_arch = "wasm32"))]
2492 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2493 #[cfg(target_arch = "wasm32")]
2494 let elapsed_ms = None;
2495 self.record_query_metrics("gremlin", elapsed_ms, &result);
2496 }
2497
2498 result
2499 }
2500
2501 #[cfg(feature = "gremlin")]
2507 pub fn execute_gremlin_with_params(
2508 &self,
2509 query: &str,
2510 params: std::collections::HashMap<String, Value>,
2511 ) -> Result<QueryResult> {
2512 use crate::query::processor::{QueryLanguage, QueryProcessor};
2513
2514 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2515 let start_time = Instant::now();
2516
2517 let has_mutations = Self::query_looks_like_mutation(query);
2518 let active = self.active_store();
2519
2520 let result = self.with_auto_commit(has_mutations, || {
2521 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2522 let processor = QueryProcessor::for_graph_store_with_transaction(
2523 Arc::clone(&active),
2524 Arc::clone(&self.transaction_manager),
2525 )?;
2526 let processor = if let Some(transaction_id) = transaction_id {
2527 processor.with_transaction_context(viewing_epoch, transaction_id)
2528 } else {
2529 processor
2530 };
2531 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2532 });
2533
2534 #[cfg(feature = "metrics")]
2535 {
2536 #[cfg(not(target_arch = "wasm32"))]
2537 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2538 #[cfg(target_arch = "wasm32")]
2539 let elapsed_ms = None;
2540 self.record_query_metrics("gremlin", elapsed_ms, &result);
2541 }
2542
2543 result
2544 }
2545
2546 #[cfg(feature = "graphql")]
2570 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2571 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2572
2573 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2574 let start_time = Instant::now();
2575
2576 let logical_plan = graphql::translate(query)?;
2577 let mut binder = Binder::new();
2578 let _binding_context = binder.bind(&logical_plan)?;
2579
2580 let active = self.active_store();
2581 let optimizer = Optimizer::from_graph_store(&*active);
2582 let optimized_plan = optimizer.optimize(logical_plan)?;
2583 let has_mutations = optimized_plan.root.has_mutations();
2584
2585 let result = self.with_auto_commit(has_mutations, || {
2586 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2587 let planner =
2588 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2589 let mut physical_plan = planner.plan(&optimized_plan)?;
2590 let executor = Executor::with_columns(physical_plan.columns.clone())
2591 .with_deadline(self.query_deadline());
2592 executor.execute(physical_plan.operator.as_mut())
2593 });
2594
2595 #[cfg(feature = "metrics")]
2596 {
2597 #[cfg(not(target_arch = "wasm32"))]
2598 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2599 #[cfg(target_arch = "wasm32")]
2600 let elapsed_ms = None;
2601 self.record_query_metrics("graphql", elapsed_ms, &result);
2602 }
2603
2604 result
2605 }
2606
2607 #[cfg(feature = "graphql")]
2613 pub fn execute_graphql_with_params(
2614 &self,
2615 query: &str,
2616 params: std::collections::HashMap<String, Value>,
2617 ) -> Result<QueryResult> {
2618 use crate::query::processor::{QueryLanguage, QueryProcessor};
2619
2620 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2621 let start_time = Instant::now();
2622
2623 let has_mutations = Self::query_looks_like_mutation(query);
2624 let active = self.active_store();
2625
2626 let result = self.with_auto_commit(has_mutations, || {
2627 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2628 let processor = QueryProcessor::for_graph_store_with_transaction(
2629 Arc::clone(&active),
2630 Arc::clone(&self.transaction_manager),
2631 )?;
2632 let processor = if let Some(transaction_id) = transaction_id {
2633 processor.with_transaction_context(viewing_epoch, transaction_id)
2634 } else {
2635 processor
2636 };
2637 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2638 });
2639
2640 #[cfg(feature = "metrics")]
2641 {
2642 #[cfg(not(target_arch = "wasm32"))]
2643 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2644 #[cfg(target_arch = "wasm32")]
2645 let elapsed_ms = None;
2646 self.record_query_metrics("graphql", elapsed_ms, &result);
2647 }
2648
2649 result
2650 }
2651
2652 #[cfg(feature = "sql-pgq")]
2677 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2678 use crate::query::{
2679 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2680 processor::QueryLanguage, translators::sql_pgq,
2681 };
2682
2683 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2684 let start_time = Instant::now();
2685
2686 let logical_plan = sql_pgq::translate(query)?;
2688
2689 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2691 return Ok(QueryResult {
2692 columns: vec!["status".into()],
2693 column_types: vec![grafeo_common::types::LogicalType::String],
2694 rows: vec![vec![Value::from(format!(
2695 "Property graph '{}' created ({} node tables, {} edge tables)",
2696 cpg.name,
2697 cpg.node_tables.len(),
2698 cpg.edge_tables.len()
2699 ))]],
2700 execution_time_ms: None,
2701 rows_scanned: None,
2702 status_message: None,
2703 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2704 });
2705 }
2706
2707 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2708
2709 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2710 cached_plan
2711 } else {
2712 let mut binder = Binder::new();
2713 let _binding_context = binder.bind(&logical_plan)?;
2714 let active = self.active_store();
2715 let optimizer = Optimizer::from_graph_store(&*active);
2716 let plan = optimizer.optimize(logical_plan)?;
2717 self.query_cache.put_optimized(cache_key, plan.clone());
2718 plan
2719 };
2720
2721 let active = self.active_store();
2722 let has_mutations = optimized_plan.root.has_mutations();
2723
2724 let result = self.with_auto_commit(has_mutations, || {
2725 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2726 let planner =
2727 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2728 let mut physical_plan = planner.plan(&optimized_plan)?;
2729 let executor = Executor::with_columns(physical_plan.columns.clone())
2730 .with_deadline(self.query_deadline());
2731 executor.execute(physical_plan.operator.as_mut())
2732 });
2733
2734 #[cfg(feature = "metrics")]
2735 {
2736 #[cfg(not(target_arch = "wasm32"))]
2737 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2738 #[cfg(target_arch = "wasm32")]
2739 let elapsed_ms = None;
2740 self.record_query_metrics("sql", elapsed_ms, &result);
2741 }
2742
2743 result
2744 }
2745
2746 #[cfg(feature = "sql-pgq")]
2752 pub fn execute_sql_with_params(
2753 &self,
2754 query: &str,
2755 params: std::collections::HashMap<String, Value>,
2756 ) -> Result<QueryResult> {
2757 use crate::query::processor::{QueryLanguage, QueryProcessor};
2758
2759 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2760 let start_time = Instant::now();
2761
2762 let has_mutations = Self::query_looks_like_mutation(query);
2763 let active = self.active_store();
2764
2765 let result = self.with_auto_commit(has_mutations, || {
2766 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2767 let processor = QueryProcessor::for_graph_store_with_transaction(
2768 Arc::clone(&active),
2769 Arc::clone(&self.transaction_manager),
2770 )?;
2771 let processor = if let Some(transaction_id) = transaction_id {
2772 processor.with_transaction_context(viewing_epoch, transaction_id)
2773 } else {
2774 processor
2775 };
2776 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
2777 });
2778
2779 #[cfg(feature = "metrics")]
2780 {
2781 #[cfg(not(target_arch = "wasm32"))]
2782 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2783 #[cfg(target_arch = "wasm32")]
2784 let elapsed_ms = None;
2785 self.record_query_metrics("sql", elapsed_ms, &result);
2786 }
2787
2788 result
2789 }
2790
2791 pub fn execute_language(
2800 &self,
2801 query: &str,
2802 language: &str,
2803 params: Option<std::collections::HashMap<String, Value>>,
2804 ) -> Result<QueryResult> {
2805 match language {
2806 "gql" => {
2807 if let Some(p) = params {
2808 self.execute_with_params(query, p)
2809 } else {
2810 self.execute(query)
2811 }
2812 }
2813 #[cfg(feature = "cypher")]
2814 "cypher" => {
2815 if let Some(p) = params {
2816 use crate::query::processor::{QueryLanguage, QueryProcessor};
2817
2818 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2819 let start_time = Instant::now();
2820
2821 let has_mutations = Self::query_looks_like_mutation(query);
2822 let active = self.active_store();
2823 let result = self.with_auto_commit(has_mutations, || {
2824 let processor = QueryProcessor::for_graph_store_with_transaction(
2825 Arc::clone(&active),
2826 Arc::clone(&self.transaction_manager),
2827 )?;
2828 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2829 let processor = if let Some(transaction_id) = transaction_id {
2830 processor.with_transaction_context(viewing_epoch, transaction_id)
2831 } else {
2832 processor
2833 };
2834 processor.process(query, QueryLanguage::Cypher, Some(&p))
2835 });
2836
2837 #[cfg(feature = "metrics")]
2838 {
2839 #[cfg(not(target_arch = "wasm32"))]
2840 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2841 #[cfg(target_arch = "wasm32")]
2842 let elapsed_ms = None;
2843 self.record_query_metrics("cypher", elapsed_ms, &result);
2844 }
2845
2846 result
2847 } else {
2848 self.execute_cypher(query)
2849 }
2850 }
2851 #[cfg(feature = "gremlin")]
2852 "gremlin" => {
2853 if let Some(p) = params {
2854 self.execute_gremlin_with_params(query, p)
2855 } else {
2856 self.execute_gremlin(query)
2857 }
2858 }
2859 #[cfg(feature = "graphql")]
2860 "graphql" => {
2861 if let Some(p) = params {
2862 self.execute_graphql_with_params(query, p)
2863 } else {
2864 self.execute_graphql(query)
2865 }
2866 }
2867 #[cfg(all(feature = "graphql", feature = "rdf"))]
2868 "graphql-rdf" => {
2869 if let Some(p) = params {
2870 self.execute_graphql_rdf_with_params(query, p)
2871 } else {
2872 self.execute_graphql_rdf(query)
2873 }
2874 }
2875 #[cfg(feature = "sql-pgq")]
2876 "sql" | "sql-pgq" => {
2877 if let Some(p) = params {
2878 self.execute_sql_with_params(query, p)
2879 } else {
2880 self.execute_sql(query)
2881 }
2882 }
2883 #[cfg(all(feature = "sparql", feature = "rdf"))]
2884 "sparql" => {
2885 if let Some(p) = params {
2886 self.execute_sparql_with_params(query, p)
2887 } else {
2888 self.execute_sparql(query)
2889 }
2890 }
2891 other => Err(grafeo_common::utils::error::Error::Query(
2892 grafeo_common::utils::error::QueryError::new(
2893 grafeo_common::utils::error::QueryErrorKind::Semantic,
2894 format!("Unknown query language: '{other}'"),
2895 ),
2896 )),
2897 }
2898 }
2899
2900 pub fn clear_plan_cache(&self) {
2927 self.query_cache.clear();
2928 }
2929
2930 pub fn begin_transaction(&mut self) -> Result<()> {
2938 self.begin_transaction_inner(false, None)
2939 }
2940
2941 pub fn begin_transaction_with_isolation(
2949 &mut self,
2950 isolation_level: crate::transaction::IsolationLevel,
2951 ) -> Result<()> {
2952 self.begin_transaction_inner(false, Some(isolation_level))
2953 }
2954
2955 fn begin_transaction_inner(
2957 &self,
2958 read_only: bool,
2959 isolation_level: Option<crate::transaction::IsolationLevel>,
2960 ) -> Result<()> {
2961 let mut current = self.current_transaction.lock();
2962 if current.is_some() {
2963 drop(current);
2965 let mut depth = self.transaction_nesting_depth.lock();
2966 *depth += 1;
2967 let sp_name = format!("_nested_tx_{}", *depth);
2968 self.savepoint(&sp_name)?;
2969 return Ok(());
2970 }
2971
2972 let active = self.active_lpg_store();
2973 self.transaction_start_node_count
2974 .store(active.node_count(), Ordering::Relaxed);
2975 self.transaction_start_edge_count
2976 .store(active.edge_count(), Ordering::Relaxed);
2977 let transaction_id = if let Some(level) = isolation_level {
2978 self.transaction_manager.begin_with_isolation(level)
2979 } else {
2980 self.transaction_manager.begin()
2981 };
2982 *current = Some(transaction_id);
2983 *self.read_only_tx.lock() = read_only;
2984
2985 let key = self.active_graph_storage_key();
2988 let mut touched = self.touched_graphs.lock();
2989 touched.clear();
2990 touched.push(key);
2991
2992 #[cfg(feature = "metrics")]
2993 {
2994 crate::metrics::record_metric!(self.metrics, tx_active, inc);
2995 #[cfg(not(target_arch = "wasm32"))]
2996 {
2997 *self.tx_start_time.lock() = Some(Instant::now());
2998 }
2999 }
3000
3001 Ok(())
3002 }
3003
3004 pub fn commit(&mut self) -> Result<()> {
3012 self.commit_inner()
3013 }
3014
3015 fn commit_inner(&self) -> Result<()> {
3017 {
3019 let mut depth = self.transaction_nesting_depth.lock();
3020 if *depth > 0 {
3021 let sp_name = format!("_nested_tx_{depth}");
3022 *depth -= 1;
3023 drop(depth);
3024 return self.release_savepoint(&sp_name);
3025 }
3026 }
3027
3028 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3029 grafeo_common::utils::error::Error::Transaction(
3030 grafeo_common::utils::error::TransactionError::InvalidState(
3031 "No active transaction".to_string(),
3032 ),
3033 )
3034 })?;
3035
3036 let touched = self.touched_graphs.lock().clone();
3039 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3040 Ok(epoch) => epoch,
3041 Err(e) => {
3042 for graph_name in &touched {
3044 let store = self.resolve_store(graph_name);
3045 store.rollback_transaction_properties(transaction_id);
3046 }
3047 #[cfg(feature = "rdf")]
3048 self.rollback_rdf_transaction(transaction_id);
3049 *self.read_only_tx.lock() = false;
3050 self.savepoints.lock().clear();
3051 self.touched_graphs.lock().clear();
3052 #[cfg(feature = "metrics")]
3053 {
3054 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3055 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3056 #[cfg(not(target_arch = "wasm32"))]
3057 if let Some(start) = self.tx_start_time.lock().take() {
3058 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3059 crate::metrics::record_metric!(
3060 self.metrics,
3061 tx_duration,
3062 observe duration_ms
3063 );
3064 }
3065 }
3066 return Err(e);
3067 }
3068 };
3069
3070 for graph_name in &touched {
3072 let store = self.resolve_store(graph_name);
3073 store.finalize_version_epochs(transaction_id, commit_epoch);
3074 }
3075
3076 #[cfg(feature = "rdf")]
3078 self.commit_rdf_transaction(transaction_id);
3079
3080 for graph_name in &touched {
3081 let store = self.resolve_store(graph_name);
3082 store.commit_transaction_properties(transaction_id);
3083 }
3084
3085 let current_epoch = self.transaction_manager.current_epoch();
3088 for graph_name in &touched {
3089 let store = self.resolve_store(graph_name);
3090 store.sync_epoch(current_epoch);
3091 }
3092
3093 *self.read_only_tx.lock() = false;
3095 self.savepoints.lock().clear();
3096 self.touched_graphs.lock().clear();
3097
3098 if self.gc_interval > 0 {
3100 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3101 if count.is_multiple_of(self.gc_interval) {
3102 let min_epoch = self.transaction_manager.min_active_epoch();
3103 for graph_name in &touched {
3104 let store = self.resolve_store(graph_name);
3105 store.gc_versions(min_epoch);
3106 }
3107 self.transaction_manager.gc();
3108 #[cfg(feature = "metrics")]
3109 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3110 }
3111 }
3112
3113 #[cfg(feature = "metrics")]
3114 {
3115 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3116 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3117 #[cfg(not(target_arch = "wasm32"))]
3118 if let Some(start) = self.tx_start_time.lock().take() {
3119 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3120 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3121 }
3122 }
3123
3124 Ok(())
3125 }
3126
3127 pub fn rollback(&mut self) -> Result<()> {
3151 self.rollback_inner()
3152 }
3153
3154 fn rollback_inner(&self) -> Result<()> {
3156 {
3158 let mut depth = self.transaction_nesting_depth.lock();
3159 if *depth > 0 {
3160 let sp_name = format!("_nested_tx_{depth}");
3161 *depth -= 1;
3162 drop(depth);
3163 return self.rollback_to_savepoint(&sp_name);
3164 }
3165 }
3166
3167 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3168 grafeo_common::utils::error::Error::Transaction(
3169 grafeo_common::utils::error::TransactionError::InvalidState(
3170 "No active transaction".to_string(),
3171 ),
3172 )
3173 })?;
3174
3175 *self.read_only_tx.lock() = false;
3177
3178 let touched = self.touched_graphs.lock().clone();
3180 for graph_name in &touched {
3181 let store = self.resolve_store(graph_name);
3182 store.discard_uncommitted_versions(transaction_id);
3183 }
3184
3185 #[cfg(feature = "rdf")]
3187 self.rollback_rdf_transaction(transaction_id);
3188
3189 self.savepoints.lock().clear();
3191 self.touched_graphs.lock().clear();
3192
3193 let result = self.transaction_manager.abort(transaction_id);
3195
3196 #[cfg(feature = "metrics")]
3197 if result.is_ok() {
3198 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3199 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3200 #[cfg(not(target_arch = "wasm32"))]
3201 if let Some(start) = self.tx_start_time.lock().take() {
3202 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3203 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3204 }
3205 }
3206
3207 result
3208 }
3209
3210 pub fn savepoint(&self, name: &str) -> Result<()> {
3220 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3221 grafeo_common::utils::error::Error::Transaction(
3222 grafeo_common::utils::error::TransactionError::InvalidState(
3223 "No active transaction".to_string(),
3224 ),
3225 )
3226 })?;
3227
3228 let touched = self.touched_graphs.lock().clone();
3230 let graph_snapshots: Vec<GraphSavepoint> = touched
3231 .iter()
3232 .map(|graph_name| {
3233 let store = self.resolve_store(graph_name);
3234 GraphSavepoint {
3235 graph_name: graph_name.clone(),
3236 next_node_id: store.peek_next_node_id(),
3237 next_edge_id: store.peek_next_edge_id(),
3238 undo_log_position: store.property_undo_log_position(tx_id),
3239 }
3240 })
3241 .collect();
3242
3243 self.savepoints.lock().push(SavepointState {
3244 name: name.to_string(),
3245 graph_snapshots,
3246 active_graph: self.current_graph.lock().clone(),
3247 });
3248 Ok(())
3249 }
3250
3251 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3260 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3261 grafeo_common::utils::error::Error::Transaction(
3262 grafeo_common::utils::error::TransactionError::InvalidState(
3263 "No active transaction".to_string(),
3264 ),
3265 )
3266 })?;
3267
3268 let mut savepoints = self.savepoints.lock();
3269
3270 let pos = savepoints
3272 .iter()
3273 .rposition(|sp| sp.name == name)
3274 .ok_or_else(|| {
3275 grafeo_common::utils::error::Error::Transaction(
3276 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3277 "Savepoint '{name}' not found"
3278 )),
3279 )
3280 })?;
3281
3282 let sp_state = savepoints[pos].clone();
3283
3284 savepoints.truncate(pos);
3286 drop(savepoints);
3287
3288 for gs in &sp_state.graph_snapshots {
3290 let store = self.resolve_store(&gs.graph_name);
3291
3292 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3294
3295 let current_next_node = store.peek_next_node_id();
3297 let current_next_edge = store.peek_next_edge_id();
3298
3299 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3300 .map(NodeId::new)
3301 .collect();
3302 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3303 .map(EdgeId::new)
3304 .collect();
3305
3306 if !node_ids.is_empty() || !edge_ids.is_empty() {
3307 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3308 }
3309 }
3310
3311 let touched = self.touched_graphs.lock().clone();
3315 for graph_name in &touched {
3316 let already_captured = sp_state
3317 .graph_snapshots
3318 .iter()
3319 .any(|gs| gs.graph_name == *graph_name);
3320 if !already_captured {
3321 let store = self.resolve_store(graph_name);
3322 store.discard_uncommitted_versions(transaction_id);
3323 }
3324 }
3325
3326 let mut touched = self.touched_graphs.lock();
3328 touched.clear();
3329 for gs in &sp_state.graph_snapshots {
3330 if !touched.contains(&gs.graph_name) {
3331 touched.push(gs.graph_name.clone());
3332 }
3333 }
3334
3335 Ok(())
3336 }
3337
3338 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3344 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3345 grafeo_common::utils::error::Error::Transaction(
3346 grafeo_common::utils::error::TransactionError::InvalidState(
3347 "No active transaction".to_string(),
3348 ),
3349 )
3350 })?;
3351
3352 let mut savepoints = self.savepoints.lock();
3353 let pos = savepoints
3354 .iter()
3355 .rposition(|sp| sp.name == name)
3356 .ok_or_else(|| {
3357 grafeo_common::utils::error::Error::Transaction(
3358 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3359 "Savepoint '{name}' not found"
3360 )),
3361 )
3362 })?;
3363 savepoints.remove(pos);
3364 Ok(())
3365 }
3366
3367 #[must_use]
3369 pub fn in_transaction(&self) -> bool {
3370 self.current_transaction.lock().is_some()
3371 }
3372
3373 #[must_use]
3375 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3376 *self.current_transaction.lock()
3377 }
3378
3379 #[must_use]
3381 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3382 &self.transaction_manager
3383 }
3384
3385 #[must_use]
3387 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3388 (
3389 self.transaction_start_node_count.load(Ordering::Relaxed),
3390 self.active_lpg_store().node_count(),
3391 )
3392 }
3393
3394 #[must_use]
3396 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3397 (
3398 self.transaction_start_edge_count.load(Ordering::Relaxed),
3399 self.active_lpg_store().edge_count(),
3400 )
3401 }
3402
3403 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3437 crate::transaction::PreparedCommit::new(self)
3438 }
3439
3440 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3442 self.auto_commit = auto_commit;
3443 }
3444
3445 #[must_use]
3447 pub fn auto_commit(&self) -> bool {
3448 self.auto_commit
3449 }
3450
3451 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3456 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3457 }
3458
3459 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3462 where
3463 F: FnOnce() -> Result<QueryResult>,
3464 {
3465 if self.needs_auto_commit(has_mutations) {
3466 self.begin_transaction_inner(false, None)?;
3467 match body() {
3468 Ok(result) => {
3469 self.commit_inner()?;
3470 Ok(result)
3471 }
3472 Err(e) => {
3473 let _ = self.rollback_inner();
3474 Err(e)
3475 }
3476 }
3477 } else {
3478 body()
3479 }
3480 }
3481
3482 fn query_looks_like_mutation(query: &str) -> bool {
3488 let upper = query.to_ascii_uppercase();
3489 upper.contains("INSERT")
3490 || upper.contains("CREATE")
3491 || upper.contains("DELETE")
3492 || upper.contains("MERGE")
3493 || upper.contains("SET")
3494 || upper.contains("REMOVE")
3495 || upper.contains("DROP")
3496 || upper.contains("ALTER")
3497 }
3498
3499 #[must_use]
3501 fn query_deadline(&self) -> Option<Instant> {
3502 #[cfg(not(target_arch = "wasm32"))]
3503 {
3504 self.query_timeout.map(|d| Instant::now() + d)
3505 }
3506 #[cfg(target_arch = "wasm32")]
3507 {
3508 let _ = &self.query_timeout;
3509 None
3510 }
3511 }
3512
3513 #[cfg(feature = "metrics")]
3519 fn record_query_metrics(
3520 &self,
3521 language: &str,
3522 elapsed_ms: Option<f64>,
3523 result: &Result<crate::database::QueryResult>,
3524 ) {
3525 use crate::metrics::record_metric;
3526
3527 record_metric!(self.metrics, query_count, inc);
3528 if let Some(ref reg) = self.metrics {
3529 reg.query_count_by_language.increment(language);
3530 }
3531 if let Some(ms) = elapsed_ms {
3532 record_metric!(self.metrics, query_latency, observe ms);
3533 }
3534 match result {
3535 Ok(r) => {
3536 let returned = r.rows.len() as u64;
3537 record_metric!(self.metrics, rows_returned, add returned);
3538 if let Some(scanned) = r.rows_scanned {
3539 record_metric!(self.metrics, rows_scanned, add scanned);
3540 }
3541 }
3542 Err(e) => {
3543 record_metric!(self.metrics, query_errors, inc);
3544 let msg = e.to_string();
3546 if msg.contains("exceeded timeout") {
3547 record_metric!(self.metrics, query_timeouts, inc);
3548 }
3549 }
3550 }
3551 }
3552
3553 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3555 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3556 match expr {
3557 Expression::Literal(Literal::Integer(n)) => Some(*n),
3558 _ => None,
3559 }
3560 }
3561
3562 #[must_use]
3568 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3569 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3571 return (epoch, None);
3572 }
3573
3574 if let Some(transaction_id) = *self.current_transaction.lock() {
3575 let epoch = self
3577 .transaction_manager
3578 .start_epoch(transaction_id)
3579 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3580 (epoch, Some(transaction_id))
3581 } else {
3582 (self.transaction_manager.current_epoch(), None)
3584 }
3585 }
3586
3587 fn create_planner_for_store(
3592 &self,
3593 store: Arc<dyn GraphStoreMut>,
3594 viewing_epoch: EpochId,
3595 transaction_id: Option<TransactionId>,
3596 ) -> crate::query::Planner {
3597 use crate::query::Planner;
3598 use grafeo_core::execution::operators::{LazyValue, SessionContext};
3599
3600 let info_store = Arc::clone(&store);
3602 let schema_store = Arc::clone(&store);
3603
3604 let session_context = SessionContext {
3605 current_schema: self.current_schema(),
3606 current_graph: self.current_graph(),
3607 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3608 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3609 };
3610
3611 let mut planner = Planner::with_context(
3612 Arc::clone(&store),
3613 Arc::clone(&self.transaction_manager),
3614 transaction_id,
3615 viewing_epoch,
3616 )
3617 .with_factorized_execution(self.factorized_execution)
3618 .with_catalog(Arc::clone(&self.catalog))
3619 .with_session_context(session_context);
3620
3621 let validator =
3623 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3624 planner = planner.with_validator(Arc::new(validator));
3625
3626 planner
3627 }
3628
3629 fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3631 use grafeo_common::types::PropertyKey;
3632 use std::collections::BTreeMap;
3633
3634 let mut map = BTreeMap::new();
3635 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3636 map.insert(
3637 PropertyKey::from("node_count"),
3638 Value::Int64(store.node_count() as i64),
3639 );
3640 map.insert(
3641 PropertyKey::from("edge_count"),
3642 Value::Int64(store.edge_count() as i64),
3643 );
3644 map.insert(
3645 PropertyKey::from("version"),
3646 Value::String(env!("CARGO_PKG_VERSION").into()),
3647 );
3648 Value::Map(map.into())
3649 }
3650
3651 fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3653 use grafeo_common::types::PropertyKey;
3654 use std::collections::BTreeMap;
3655
3656 let labels: Vec<Value> = store
3657 .all_labels()
3658 .into_iter()
3659 .map(|l| Value::String(l.into()))
3660 .collect();
3661 let edge_types: Vec<Value> = store
3662 .all_edge_types()
3663 .into_iter()
3664 .map(|t| Value::String(t.into()))
3665 .collect();
3666 let property_keys: Vec<Value> = store
3667 .all_property_keys()
3668 .into_iter()
3669 .map(|k| Value::String(k.into()))
3670 .collect();
3671
3672 let mut map = BTreeMap::new();
3673 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3674 map.insert(
3675 PropertyKey::from("edge_types"),
3676 Value::List(edge_types.into()),
3677 );
3678 map.insert(
3679 PropertyKey::from("property_keys"),
3680 Value::List(property_keys.into()),
3681 );
3682 Value::Map(map.into())
3683 }
3684
3685 pub fn create_node(&self, labels: &[&str]) -> NodeId {
3690 let (epoch, transaction_id) = self.get_transaction_context();
3691 self.active_lpg_store().create_node_versioned(
3692 labels,
3693 epoch,
3694 transaction_id.unwrap_or(TransactionId::SYSTEM),
3695 )
3696 }
3697
3698 pub fn create_node_with_props<'a>(
3702 &self,
3703 labels: &[&str],
3704 properties: impl IntoIterator<Item = (&'a str, Value)>,
3705 ) -> NodeId {
3706 let (epoch, transaction_id) = self.get_transaction_context();
3707 self.active_lpg_store().create_node_with_props_versioned(
3708 labels,
3709 properties,
3710 epoch,
3711 transaction_id.unwrap_or(TransactionId::SYSTEM),
3712 )
3713 }
3714
3715 pub fn create_edge(
3720 &self,
3721 src: NodeId,
3722 dst: NodeId,
3723 edge_type: &str,
3724 ) -> grafeo_common::types::EdgeId {
3725 let (epoch, transaction_id) = self.get_transaction_context();
3726 self.active_lpg_store().create_edge_versioned(
3727 src,
3728 dst,
3729 edge_type,
3730 epoch,
3731 transaction_id.unwrap_or(TransactionId::SYSTEM),
3732 )
3733 }
3734
3735 #[must_use]
3763 pub fn get_node(&self, id: NodeId) -> Option<Node> {
3764 let (epoch, transaction_id) = self.get_transaction_context();
3765 self.active_lpg_store().get_node_versioned(
3766 id,
3767 epoch,
3768 transaction_id.unwrap_or(TransactionId::SYSTEM),
3769 )
3770 }
3771
3772 #[must_use]
3796 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3797 self.get_node(id)
3798 .and_then(|node| node.get_property(key).cloned())
3799 }
3800
3801 #[must_use]
3808 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3809 let (epoch, transaction_id) = self.get_transaction_context();
3810 self.active_lpg_store().get_edge_versioned(
3811 id,
3812 epoch,
3813 transaction_id.unwrap_or(TransactionId::SYSTEM),
3814 )
3815 }
3816
3817 #[must_use]
3843 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3844 self.active_lpg_store()
3845 .edges_from(node, Direction::Outgoing)
3846 .collect()
3847 }
3848
3849 #[must_use]
3858 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3859 self.active_lpg_store()
3860 .edges_from(node, Direction::Incoming)
3861 .collect()
3862 }
3863
3864 #[must_use]
3876 pub fn get_neighbors_outgoing_by_type(
3877 &self,
3878 node: NodeId,
3879 edge_type: &str,
3880 ) -> Vec<(NodeId, EdgeId)> {
3881 self.active_lpg_store()
3882 .edges_from(node, Direction::Outgoing)
3883 .filter(|(_, edge_id)| {
3884 self.get_edge(*edge_id)
3885 .is_some_and(|e| e.edge_type.as_str() == edge_type)
3886 })
3887 .collect()
3888 }
3889
3890 #[must_use]
3897 pub fn node_exists(&self, id: NodeId) -> bool {
3898 self.get_node(id).is_some()
3899 }
3900
3901 #[must_use]
3903 pub fn edge_exists(&self, id: EdgeId) -> bool {
3904 self.get_edge(id).is_some()
3905 }
3906
3907 #[must_use]
3911 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3912 let active = self.active_lpg_store();
3913 let out = active.out_degree(node);
3914 let in_degree = active.in_degree(node);
3915 (out, in_degree)
3916 }
3917
3918 #[must_use]
3928 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3929 let (epoch, transaction_id) = self.get_transaction_context();
3930 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3931 let active = self.active_lpg_store();
3932 ids.iter()
3933 .map(|&id| active.get_node_versioned(id, epoch, tx))
3934 .collect()
3935 }
3936
3937 #[cfg(feature = "cdc")]
3941 pub fn history(
3942 &self,
3943 entity_id: impl Into<crate::cdc::EntityId>,
3944 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3945 Ok(self.cdc_log.history(entity_id.into()))
3946 }
3947
3948 #[cfg(feature = "cdc")]
3950 pub fn history_since(
3951 &self,
3952 entity_id: impl Into<crate::cdc::EntityId>,
3953 since_epoch: EpochId,
3954 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3955 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3956 }
3957
3958 #[cfg(feature = "cdc")]
3960 pub fn changes_between(
3961 &self,
3962 start_epoch: EpochId,
3963 end_epoch: EpochId,
3964 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3965 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3966 }
3967}
3968
3969impl Drop for Session {
3970 fn drop(&mut self) {
3971 if self.in_transaction() {
3974 let _ = self.rollback_inner();
3975 }
3976
3977 #[cfg(feature = "metrics")]
3978 if let Some(ref reg) = self.metrics {
3979 reg.session_active
3980 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
3981 }
3982 }
3983}
3984
3985#[cfg(test)]
3986mod tests {
3987 use super::parse_default_literal;
3988 use crate::database::GrafeoDB;
3989 use grafeo_common::types::Value;
3990
3991 #[test]
3996 fn parse_default_literal_null() {
3997 assert_eq!(parse_default_literal("null"), Value::Null);
3998 assert_eq!(parse_default_literal("NULL"), Value::Null);
3999 assert_eq!(parse_default_literal("Null"), Value::Null);
4000 }
4001
4002 #[test]
4003 fn parse_default_literal_bool() {
4004 assert_eq!(parse_default_literal("true"), Value::Bool(true));
4005 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4006 assert_eq!(parse_default_literal("false"), Value::Bool(false));
4007 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4008 }
4009
4010 #[test]
4011 fn parse_default_literal_string_single_quoted() {
4012 assert_eq!(
4013 parse_default_literal("'hello'"),
4014 Value::String("hello".into())
4015 );
4016 }
4017
4018 #[test]
4019 fn parse_default_literal_string_double_quoted() {
4020 assert_eq!(
4021 parse_default_literal("\"world\""),
4022 Value::String("world".into())
4023 );
4024 }
4025
4026 #[test]
4027 fn parse_default_literal_integer() {
4028 assert_eq!(parse_default_literal("42"), Value::Int64(42));
4029 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4030 assert_eq!(parse_default_literal("0"), Value::Int64(0));
4031 }
4032
4033 #[test]
4034 fn parse_default_literal_float() {
4035 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4036 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4037 }
4038
4039 #[test]
4040 fn parse_default_literal_fallback_string() {
4041 assert_eq!(
4043 parse_default_literal("some_identifier"),
4044 Value::String("some_identifier".into())
4045 );
4046 }
4047
4048 #[test]
4049 fn test_session_create_node() {
4050 let db = GrafeoDB::new_in_memory();
4051 let session = db.session();
4052
4053 let id = session.create_node(&["Person"]);
4054 assert!(id.is_valid());
4055 assert_eq!(db.node_count(), 1);
4056 }
4057
4058 #[test]
4059 fn test_session_transaction() {
4060 let db = GrafeoDB::new_in_memory();
4061 let mut session = db.session();
4062
4063 assert!(!session.in_transaction());
4064
4065 session.begin_transaction().unwrap();
4066 assert!(session.in_transaction());
4067
4068 session.commit().unwrap();
4069 assert!(!session.in_transaction());
4070 }
4071
4072 #[test]
4073 fn test_session_transaction_context() {
4074 let db = GrafeoDB::new_in_memory();
4075 let mut session = db.session();
4076
4077 let (_epoch1, transaction_id1) = session.get_transaction_context();
4079 assert!(transaction_id1.is_none());
4080
4081 session.begin_transaction().unwrap();
4083 let (epoch2, transaction_id2) = session.get_transaction_context();
4084 assert!(transaction_id2.is_some());
4085 let _ = epoch2; session.commit().unwrap();
4090 let (epoch3, tx_id3) = session.get_transaction_context();
4091 assert!(tx_id3.is_none());
4092 assert!(epoch3.as_u64() >= epoch2.as_u64());
4094 }
4095
4096 #[test]
4097 fn test_session_rollback() {
4098 let db = GrafeoDB::new_in_memory();
4099 let mut session = db.session();
4100
4101 session.begin_transaction().unwrap();
4102 session.rollback().unwrap();
4103 assert!(!session.in_transaction());
4104 }
4105
4106 #[test]
4107 fn test_session_rollback_discards_versions() {
4108 use grafeo_common::types::TransactionId;
4109
4110 let db = GrafeoDB::new_in_memory();
4111
4112 let node_before = db.store().create_node(&["Person"]);
4114 assert!(node_before.is_valid());
4115 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4116
4117 let mut session = db.session();
4119 session.begin_transaction().unwrap();
4120 let transaction_id = session.current_transaction.lock().unwrap();
4121
4122 let epoch = db.store().current_epoch();
4124 let node_in_tx = db
4125 .store()
4126 .create_node_versioned(&["Person"], epoch, transaction_id);
4127 assert!(node_in_tx.is_valid());
4128
4129 assert_eq!(
4133 db.node_count(),
4134 1,
4135 "PENDING nodes should be invisible to non-versioned node_count()"
4136 );
4137 assert!(
4138 db.store()
4139 .get_node_versioned(node_in_tx, epoch, transaction_id)
4140 .is_some(),
4141 "Transaction node should be visible to its own transaction"
4142 );
4143
4144 session.rollback().unwrap();
4146 assert!(!session.in_transaction());
4147
4148 let count_after = db.node_count();
4151 assert_eq!(
4152 count_after, 1,
4153 "Rollback should discard uncommitted node, but got {count_after}"
4154 );
4155
4156 let current_epoch = db.store().current_epoch();
4158 assert!(
4159 db.store()
4160 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4161 .is_some(),
4162 "Original node should still exist"
4163 );
4164
4165 assert!(
4167 db.store()
4168 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4169 .is_none(),
4170 "Transaction node should be gone"
4171 );
4172 }
4173
4174 #[test]
4175 fn test_session_create_node_in_transaction() {
4176 let db = GrafeoDB::new_in_memory();
4178
4179 let node_before = db.create_node(&["Person"]);
4181 assert!(node_before.is_valid());
4182 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4183
4184 let mut session = db.session();
4186 session.begin_transaction().unwrap();
4187 let transaction_id = session.current_transaction.lock().unwrap();
4188
4189 let node_in_tx = session.create_node(&["Person"]);
4191 assert!(node_in_tx.is_valid());
4192
4193 assert_eq!(
4196 db.node_count(),
4197 1,
4198 "PENDING nodes should be invisible to non-versioned node_count()"
4199 );
4200 let epoch = db.store().current_epoch();
4201 assert!(
4202 db.store()
4203 .get_node_versioned(node_in_tx, epoch, transaction_id)
4204 .is_some(),
4205 "Transaction node should be visible to its own transaction"
4206 );
4207
4208 session.rollback().unwrap();
4210
4211 let count_after = db.node_count();
4213 assert_eq!(
4214 count_after, 1,
4215 "Rollback should discard node created via session.create_node(), but got {count_after}"
4216 );
4217 }
4218
4219 #[test]
4220 fn test_session_create_node_with_props_in_transaction() {
4221 use grafeo_common::types::Value;
4222
4223 let db = GrafeoDB::new_in_memory();
4225
4226 db.create_node(&["Person"]);
4228 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4229
4230 let mut session = db.session();
4232 session.begin_transaction().unwrap();
4233 let transaction_id = session.current_transaction.lock().unwrap();
4234
4235 let node_in_tx =
4236 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4237 assert!(node_in_tx.is_valid());
4238
4239 assert_eq!(
4242 db.node_count(),
4243 1,
4244 "PENDING nodes should be invisible to non-versioned node_count()"
4245 );
4246 let epoch = db.store().current_epoch();
4247 assert!(
4248 db.store()
4249 .get_node_versioned(node_in_tx, epoch, transaction_id)
4250 .is_some(),
4251 "Transaction node should be visible to its own transaction"
4252 );
4253
4254 session.rollback().unwrap();
4256
4257 let count_after = db.node_count();
4259 assert_eq!(
4260 count_after, 1,
4261 "Rollback should discard node created via session.create_node_with_props()"
4262 );
4263 }
4264
4265 #[cfg(feature = "gql")]
4266 mod gql_tests {
4267 use super::*;
4268
4269 #[test]
4270 fn test_gql_query_execution() {
4271 let db = GrafeoDB::new_in_memory();
4272 let session = db.session();
4273
4274 session.create_node(&["Person"]);
4276 session.create_node(&["Person"]);
4277 session.create_node(&["Animal"]);
4278
4279 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4281
4282 assert_eq!(result.row_count(), 2);
4284 assert_eq!(result.column_count(), 1);
4285 assert_eq!(result.columns[0], "n");
4286 }
4287
4288 #[test]
4289 fn test_gql_empty_result() {
4290 let db = GrafeoDB::new_in_memory();
4291 let session = db.session();
4292
4293 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4295
4296 assert_eq!(result.row_count(), 0);
4297 }
4298
4299 #[test]
4300 fn test_gql_parse_error() {
4301 let db = GrafeoDB::new_in_memory();
4302 let session = db.session();
4303
4304 let result = session.execute("MATCH (n RETURN n");
4306
4307 assert!(result.is_err());
4308 }
4309
4310 #[test]
4311 fn test_gql_relationship_traversal() {
4312 let db = GrafeoDB::new_in_memory();
4313 let session = db.session();
4314
4315 let alix = session.create_node(&["Person"]);
4317 let gus = session.create_node(&["Person"]);
4318 let vincent = session.create_node(&["Person"]);
4319
4320 session.create_edge(alix, gus, "KNOWS");
4321 session.create_edge(alix, vincent, "KNOWS");
4322
4323 let result = session
4325 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4326 .unwrap();
4327
4328 assert_eq!(result.row_count(), 2);
4330 assert_eq!(result.column_count(), 2);
4331 assert_eq!(result.columns[0], "a");
4332 assert_eq!(result.columns[1], "b");
4333 }
4334
4335 #[test]
4336 fn test_gql_relationship_with_type_filter() {
4337 let db = GrafeoDB::new_in_memory();
4338 let session = db.session();
4339
4340 let alix = session.create_node(&["Person"]);
4342 let gus = session.create_node(&["Person"]);
4343 let vincent = session.create_node(&["Person"]);
4344
4345 session.create_edge(alix, gus, "KNOWS");
4346 session.create_edge(alix, vincent, "WORKS_WITH");
4347
4348 let result = session
4350 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4351 .unwrap();
4352
4353 assert_eq!(result.row_count(), 1);
4355 }
4356
4357 #[test]
4358 fn test_gql_semantic_error_undefined_variable() {
4359 let db = GrafeoDB::new_in_memory();
4360 let session = db.session();
4361
4362 let result = session.execute("MATCH (n:Person) RETURN x");
4364
4365 assert!(result.is_err());
4367 let Err(err) = result else {
4368 panic!("Expected error")
4369 };
4370 assert!(
4371 err.to_string().contains("Undefined variable"),
4372 "Expected undefined variable error, got: {}",
4373 err
4374 );
4375 }
4376
4377 #[test]
4378 fn test_gql_where_clause_property_filter() {
4379 use grafeo_common::types::Value;
4380
4381 let db = GrafeoDB::new_in_memory();
4382 let session = db.session();
4383
4384 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4386 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4387 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4388
4389 let result = session
4391 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4392 .unwrap();
4393
4394 assert_eq!(result.row_count(), 2);
4396 }
4397
4398 #[test]
4399 fn test_gql_where_clause_equality() {
4400 use grafeo_common::types::Value;
4401
4402 let db = GrafeoDB::new_in_memory();
4403 let session = db.session();
4404
4405 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4407 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4408 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4409
4410 let result = session
4412 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4413 .unwrap();
4414
4415 assert_eq!(result.row_count(), 2);
4417 }
4418
4419 #[test]
4420 fn test_gql_return_property_access() {
4421 use grafeo_common::types::Value;
4422
4423 let db = GrafeoDB::new_in_memory();
4424 let session = db.session();
4425
4426 session.create_node_with_props(
4428 &["Person"],
4429 [
4430 ("name", Value::String("Alix".into())),
4431 ("age", Value::Int64(30)),
4432 ],
4433 );
4434 session.create_node_with_props(
4435 &["Person"],
4436 [
4437 ("name", Value::String("Gus".into())),
4438 ("age", Value::Int64(25)),
4439 ],
4440 );
4441
4442 let result = session
4444 .execute("MATCH (n:Person) RETURN n.name, n.age")
4445 .unwrap();
4446
4447 assert_eq!(result.row_count(), 2);
4449 assert_eq!(result.column_count(), 2);
4450 assert_eq!(result.columns[0], "n.name");
4451 assert_eq!(result.columns[1], "n.age");
4452
4453 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4455 assert!(names.contains(&&Value::String("Alix".into())));
4456 assert!(names.contains(&&Value::String("Gus".into())));
4457 }
4458
4459 #[test]
4460 fn test_gql_return_mixed_expressions() {
4461 use grafeo_common::types::Value;
4462
4463 let db = GrafeoDB::new_in_memory();
4464 let session = db.session();
4465
4466 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4468
4469 let result = session
4471 .execute("MATCH (n:Person) RETURN n, n.name")
4472 .unwrap();
4473
4474 assert_eq!(result.row_count(), 1);
4475 assert_eq!(result.column_count(), 2);
4476 assert_eq!(result.columns[0], "n");
4477 assert_eq!(result.columns[1], "n.name");
4478
4479 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4481 }
4482 }
4483
4484 #[cfg(feature = "cypher")]
4485 mod cypher_tests {
4486 use super::*;
4487
4488 #[test]
4489 fn test_cypher_query_execution() {
4490 let db = GrafeoDB::new_in_memory();
4491 let session = db.session();
4492
4493 session.create_node(&["Person"]);
4495 session.create_node(&["Person"]);
4496 session.create_node(&["Animal"]);
4497
4498 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4500
4501 assert_eq!(result.row_count(), 2);
4503 assert_eq!(result.column_count(), 1);
4504 assert_eq!(result.columns[0], "n");
4505 }
4506
4507 #[test]
4508 fn test_cypher_empty_result() {
4509 let db = GrafeoDB::new_in_memory();
4510 let session = db.session();
4511
4512 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4514
4515 assert_eq!(result.row_count(), 0);
4516 }
4517
4518 #[test]
4519 fn test_cypher_parse_error() {
4520 let db = GrafeoDB::new_in_memory();
4521 let session = db.session();
4522
4523 let result = session.execute_cypher("MATCH (n RETURN n");
4525
4526 assert!(result.is_err());
4527 }
4528 }
4529
4530 mod direct_lookup_tests {
4533 use super::*;
4534 use grafeo_common::types::Value;
4535
4536 #[test]
4537 fn test_get_node() {
4538 let db = GrafeoDB::new_in_memory();
4539 let session = db.session();
4540
4541 let id = session.create_node(&["Person"]);
4542 let node = session.get_node(id);
4543
4544 assert!(node.is_some());
4545 let node = node.unwrap();
4546 assert_eq!(node.id, id);
4547 }
4548
4549 #[test]
4550 fn test_get_node_not_found() {
4551 use grafeo_common::types::NodeId;
4552
4553 let db = GrafeoDB::new_in_memory();
4554 let session = db.session();
4555
4556 let node = session.get_node(NodeId::new(9999));
4558 assert!(node.is_none());
4559 }
4560
4561 #[test]
4562 fn test_get_node_property() {
4563 let db = GrafeoDB::new_in_memory();
4564 let session = db.session();
4565
4566 let id = session
4567 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4568
4569 let name = session.get_node_property(id, "name");
4570 assert_eq!(name, Some(Value::String("Alix".into())));
4571
4572 let missing = session.get_node_property(id, "missing");
4574 assert!(missing.is_none());
4575 }
4576
4577 #[test]
4578 fn test_get_edge() {
4579 let db = GrafeoDB::new_in_memory();
4580 let session = db.session();
4581
4582 let alix = session.create_node(&["Person"]);
4583 let gus = session.create_node(&["Person"]);
4584 let edge_id = session.create_edge(alix, gus, "KNOWS");
4585
4586 let edge = session.get_edge(edge_id);
4587 assert!(edge.is_some());
4588 let edge = edge.unwrap();
4589 assert_eq!(edge.id, edge_id);
4590 assert_eq!(edge.src, alix);
4591 assert_eq!(edge.dst, gus);
4592 }
4593
4594 #[test]
4595 fn test_get_edge_not_found() {
4596 use grafeo_common::types::EdgeId;
4597
4598 let db = GrafeoDB::new_in_memory();
4599 let session = db.session();
4600
4601 let edge = session.get_edge(EdgeId::new(9999));
4602 assert!(edge.is_none());
4603 }
4604
4605 #[test]
4606 fn test_get_neighbors_outgoing() {
4607 let db = GrafeoDB::new_in_memory();
4608 let session = db.session();
4609
4610 let alix = session.create_node(&["Person"]);
4611 let gus = session.create_node(&["Person"]);
4612 let harm = session.create_node(&["Person"]);
4613
4614 session.create_edge(alix, gus, "KNOWS");
4615 session.create_edge(alix, harm, "KNOWS");
4616
4617 let neighbors = session.get_neighbors_outgoing(alix);
4618 assert_eq!(neighbors.len(), 2);
4619
4620 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4621 assert!(neighbor_ids.contains(&gus));
4622 assert!(neighbor_ids.contains(&harm));
4623 }
4624
4625 #[test]
4626 fn test_get_neighbors_incoming() {
4627 let db = GrafeoDB::new_in_memory();
4628 let session = db.session();
4629
4630 let alix = session.create_node(&["Person"]);
4631 let gus = session.create_node(&["Person"]);
4632 let harm = session.create_node(&["Person"]);
4633
4634 session.create_edge(gus, alix, "KNOWS");
4635 session.create_edge(harm, alix, "KNOWS");
4636
4637 let neighbors = session.get_neighbors_incoming(alix);
4638 assert_eq!(neighbors.len(), 2);
4639
4640 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4641 assert!(neighbor_ids.contains(&gus));
4642 assert!(neighbor_ids.contains(&harm));
4643 }
4644
4645 #[test]
4646 fn test_get_neighbors_outgoing_by_type() {
4647 let db = GrafeoDB::new_in_memory();
4648 let session = db.session();
4649
4650 let alix = session.create_node(&["Person"]);
4651 let gus = session.create_node(&["Person"]);
4652 let company = session.create_node(&["Company"]);
4653
4654 session.create_edge(alix, gus, "KNOWS");
4655 session.create_edge(alix, company, "WORKS_AT");
4656
4657 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4658 assert_eq!(knows_neighbors.len(), 1);
4659 assert_eq!(knows_neighbors[0].0, gus);
4660
4661 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4662 assert_eq!(works_neighbors.len(), 1);
4663 assert_eq!(works_neighbors[0].0, company);
4664
4665 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4667 assert!(no_neighbors.is_empty());
4668 }
4669
4670 #[test]
4671 fn test_node_exists() {
4672 use grafeo_common::types::NodeId;
4673
4674 let db = GrafeoDB::new_in_memory();
4675 let session = db.session();
4676
4677 let id = session.create_node(&["Person"]);
4678
4679 assert!(session.node_exists(id));
4680 assert!(!session.node_exists(NodeId::new(9999)));
4681 }
4682
4683 #[test]
4684 fn test_edge_exists() {
4685 use grafeo_common::types::EdgeId;
4686
4687 let db = GrafeoDB::new_in_memory();
4688 let session = db.session();
4689
4690 let alix = session.create_node(&["Person"]);
4691 let gus = session.create_node(&["Person"]);
4692 let edge_id = session.create_edge(alix, gus, "KNOWS");
4693
4694 assert!(session.edge_exists(edge_id));
4695 assert!(!session.edge_exists(EdgeId::new(9999)));
4696 }
4697
4698 #[test]
4699 fn test_get_degree() {
4700 let db = GrafeoDB::new_in_memory();
4701 let session = db.session();
4702
4703 let alix = session.create_node(&["Person"]);
4704 let gus = session.create_node(&["Person"]);
4705 let harm = session.create_node(&["Person"]);
4706
4707 session.create_edge(alix, gus, "KNOWS");
4709 session.create_edge(alix, harm, "KNOWS");
4710 session.create_edge(gus, alix, "KNOWS");
4712
4713 let (out_degree, in_degree) = session.get_degree(alix);
4714 assert_eq!(out_degree, 2);
4715 assert_eq!(in_degree, 1);
4716
4717 let lonely = session.create_node(&["Person"]);
4719 let (out, in_deg) = session.get_degree(lonely);
4720 assert_eq!(out, 0);
4721 assert_eq!(in_deg, 0);
4722 }
4723
4724 #[test]
4725 fn test_get_nodes_batch() {
4726 let db = GrafeoDB::new_in_memory();
4727 let session = db.session();
4728
4729 let alix = session.create_node(&["Person"]);
4730 let gus = session.create_node(&["Person"]);
4731 let harm = session.create_node(&["Person"]);
4732
4733 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4734 assert_eq!(nodes.len(), 3);
4735 assert!(nodes[0].is_some());
4736 assert!(nodes[1].is_some());
4737 assert!(nodes[2].is_some());
4738
4739 use grafeo_common::types::NodeId;
4741 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4742 assert_eq!(nodes_with_missing.len(), 3);
4743 assert!(nodes_with_missing[0].is_some());
4744 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
4746 }
4747
4748 #[test]
4749 fn test_auto_commit_setting() {
4750 let db = GrafeoDB::new_in_memory();
4751 let mut session = db.session();
4752
4753 assert!(session.auto_commit());
4755
4756 session.set_auto_commit(false);
4757 assert!(!session.auto_commit());
4758
4759 session.set_auto_commit(true);
4760 assert!(session.auto_commit());
4761 }
4762
4763 #[test]
4764 fn test_transaction_double_begin_nests() {
4765 let db = GrafeoDB::new_in_memory();
4766 let mut session = db.session();
4767
4768 session.begin_transaction().unwrap();
4769 let result = session.begin_transaction();
4771 assert!(result.is_ok());
4772 session.commit().unwrap();
4774 session.commit().unwrap();
4776 }
4777
4778 #[test]
4779 fn test_commit_without_transaction_error() {
4780 let db = GrafeoDB::new_in_memory();
4781 let mut session = db.session();
4782
4783 let result = session.commit();
4784 assert!(result.is_err());
4785 }
4786
4787 #[test]
4788 fn test_rollback_without_transaction_error() {
4789 let db = GrafeoDB::new_in_memory();
4790 let mut session = db.session();
4791
4792 let result = session.rollback();
4793 assert!(result.is_err());
4794 }
4795
4796 #[test]
4797 fn test_create_edge_in_transaction() {
4798 let db = GrafeoDB::new_in_memory();
4799 let mut session = db.session();
4800
4801 let alix = session.create_node(&["Person"]);
4803 let gus = session.create_node(&["Person"]);
4804
4805 session.begin_transaction().unwrap();
4807 let edge_id = session.create_edge(alix, gus, "KNOWS");
4808
4809 assert!(session.edge_exists(edge_id));
4811
4812 session.commit().unwrap();
4814
4815 assert!(session.edge_exists(edge_id));
4817 }
4818
4819 #[test]
4820 fn test_neighbors_empty_node() {
4821 let db = GrafeoDB::new_in_memory();
4822 let session = db.session();
4823
4824 let lonely = session.create_node(&["Person"]);
4825
4826 assert!(session.get_neighbors_outgoing(lonely).is_empty());
4827 assert!(session.get_neighbors_incoming(lonely).is_empty());
4828 assert!(
4829 session
4830 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4831 .is_empty()
4832 );
4833 }
4834 }
4835
4836 #[test]
4837 fn test_auto_gc_triggers_on_commit_interval() {
4838 use crate::config::Config;
4839
4840 let config = Config::in_memory().with_gc_interval(2);
4841 let db = GrafeoDB::with_config(config).unwrap();
4842 let mut session = db.session();
4843
4844 session.begin_transaction().unwrap();
4846 session.create_node(&["A"]);
4847 session.commit().unwrap();
4848
4849 session.begin_transaction().unwrap();
4851 session.create_node(&["B"]);
4852 session.commit().unwrap();
4853
4854 assert_eq!(db.node_count(), 2);
4856 }
4857
4858 #[test]
4859 fn test_query_timeout_config_propagates_to_session() {
4860 use crate::config::Config;
4861 use std::time::Duration;
4862
4863 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4864 let db = GrafeoDB::with_config(config).unwrap();
4865 let session = db.session();
4866
4867 assert!(session.query_deadline().is_some());
4869 }
4870
4871 #[test]
4872 fn test_no_query_timeout_returns_no_deadline() {
4873 let db = GrafeoDB::new_in_memory();
4874 let session = db.session();
4875
4876 assert!(session.query_deadline().is_none());
4878 }
4879
4880 #[test]
4881 fn test_graph_model_accessor() {
4882 use crate::config::GraphModel;
4883
4884 let db = GrafeoDB::new_in_memory();
4885 let session = db.session();
4886
4887 assert_eq!(session.graph_model(), GraphModel::Lpg);
4888 }
4889
4890 #[cfg(feature = "gql")]
4891 #[test]
4892 fn test_external_store_session() {
4893 use grafeo_core::graph::GraphStoreMut;
4894 use std::sync::Arc;
4895
4896 let config = crate::config::Config::in_memory();
4897 let store =
4898 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4899 let db = GrafeoDB::with_store(store, config).unwrap();
4900
4901 let mut session = db.session();
4902
4903 session.begin_transaction().unwrap();
4907 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4908
4909 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4911 assert_eq!(result.row_count(), 1);
4912
4913 session.commit().unwrap();
4914 }
4915
4916 #[cfg(feature = "gql")]
4919 mod session_command_tests {
4920 use super::*;
4921 use grafeo_common::types::Value;
4922
4923 #[test]
4924 fn test_use_graph_sets_current_graph() {
4925 let db = GrafeoDB::new_in_memory();
4926 let session = db.session();
4927
4928 session.execute("CREATE GRAPH mydb").unwrap();
4930 session.execute("USE GRAPH mydb").unwrap();
4931
4932 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4933 }
4934
4935 #[test]
4936 fn test_use_graph_nonexistent_errors() {
4937 let db = GrafeoDB::new_in_memory();
4938 let session = db.session();
4939
4940 let result = session.execute("USE GRAPH doesnotexist");
4941 assert!(result.is_err());
4942 let err = result.unwrap_err().to_string();
4943 assert!(
4944 err.contains("does not exist"),
4945 "Expected 'does not exist' error, got: {err}"
4946 );
4947 }
4948
4949 #[test]
4950 fn test_use_graph_default_always_valid() {
4951 let db = GrafeoDB::new_in_memory();
4952 let session = db.session();
4953
4954 session.execute("USE GRAPH default").unwrap();
4956 assert_eq!(session.current_graph(), Some("default".to_string()));
4957 }
4958
4959 #[test]
4960 fn test_session_set_graph() {
4961 let db = GrafeoDB::new_in_memory();
4962 let session = db.session();
4963
4964 session.execute("CREATE GRAPH analytics").unwrap();
4965 session.execute("SESSION SET GRAPH analytics").unwrap();
4966 assert_eq!(session.current_graph(), Some("analytics".to_string()));
4967 }
4968
4969 #[test]
4970 fn test_session_set_graph_nonexistent_errors() {
4971 let db = GrafeoDB::new_in_memory();
4972 let session = db.session();
4973
4974 let result = session.execute("SESSION SET GRAPH nosuchgraph");
4975 assert!(result.is_err());
4976 }
4977
4978 #[test]
4979 fn test_session_set_time_zone() {
4980 let db = GrafeoDB::new_in_memory();
4981 let session = db.session();
4982
4983 assert_eq!(session.time_zone(), None);
4984
4985 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4986 assert_eq!(session.time_zone(), Some("UTC".to_string()));
4987
4988 session
4989 .execute("SESSION SET TIME ZONE 'America/New_York'")
4990 .unwrap();
4991 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
4992 }
4993
4994 #[test]
4995 fn test_session_set_parameter() {
4996 let db = GrafeoDB::new_in_memory();
4997 let session = db.session();
4998
4999 session
5000 .execute("SESSION SET PARAMETER $timeout = 30")
5001 .unwrap();
5002
5003 assert!(session.get_parameter("timeout").is_some());
5006 }
5007
5008 #[test]
5009 fn test_session_reset_clears_all_state() {
5010 let db = GrafeoDB::new_in_memory();
5011 let session = db.session();
5012
5013 session.execute("CREATE GRAPH analytics").unwrap();
5015 session.execute("SESSION SET GRAPH analytics").unwrap();
5016 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5017 session
5018 .execute("SESSION SET PARAMETER $limit = 100")
5019 .unwrap();
5020
5021 assert!(session.current_graph().is_some());
5023 assert!(session.time_zone().is_some());
5024 assert!(session.get_parameter("limit").is_some());
5025
5026 session.execute("SESSION RESET").unwrap();
5028
5029 assert_eq!(session.current_graph(), None);
5030 assert_eq!(session.time_zone(), None);
5031 assert!(session.get_parameter("limit").is_none());
5032 }
5033
5034 #[test]
5035 fn test_session_close_clears_state() {
5036 let db = GrafeoDB::new_in_memory();
5037 let session = db.session();
5038
5039 session.execute("CREATE GRAPH analytics").unwrap();
5040 session.execute("SESSION SET GRAPH analytics").unwrap();
5041 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5042
5043 session.execute("SESSION CLOSE").unwrap();
5044
5045 assert_eq!(session.current_graph(), None);
5046 assert_eq!(session.time_zone(), None);
5047 }
5048
5049 #[test]
5050 fn test_create_graph() {
5051 let db = GrafeoDB::new_in_memory();
5052 let session = db.session();
5053
5054 session.execute("CREATE GRAPH mydb").unwrap();
5055
5056 session.execute("USE GRAPH mydb").unwrap();
5058 assert_eq!(session.current_graph(), Some("mydb".to_string()));
5059 }
5060
5061 #[test]
5062 fn test_create_graph_duplicate_errors() {
5063 let db = GrafeoDB::new_in_memory();
5064 let session = db.session();
5065
5066 session.execute("CREATE GRAPH mydb").unwrap();
5067 let result = session.execute("CREATE GRAPH mydb");
5068
5069 assert!(result.is_err());
5070 let err = result.unwrap_err().to_string();
5071 assert!(
5072 err.contains("already exists"),
5073 "Expected 'already exists' error, got: {err}"
5074 );
5075 }
5076
5077 #[test]
5078 fn test_create_graph_if_not_exists() {
5079 let db = GrafeoDB::new_in_memory();
5080 let session = db.session();
5081
5082 session.execute("CREATE GRAPH mydb").unwrap();
5083 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5085 }
5086
5087 #[test]
5088 fn test_drop_graph() {
5089 let db = GrafeoDB::new_in_memory();
5090 let session = db.session();
5091
5092 session.execute("CREATE GRAPH mydb").unwrap();
5093 session.execute("DROP GRAPH mydb").unwrap();
5094
5095 let result = session.execute("USE GRAPH mydb");
5097 assert!(result.is_err());
5098 }
5099
5100 #[test]
5101 fn test_drop_graph_nonexistent_errors() {
5102 let db = GrafeoDB::new_in_memory();
5103 let session = db.session();
5104
5105 let result = session.execute("DROP GRAPH nosuchgraph");
5106 assert!(result.is_err());
5107 let err = result.unwrap_err().to_string();
5108 assert!(
5109 err.contains("does not exist"),
5110 "Expected 'does not exist' error, got: {err}"
5111 );
5112 }
5113
5114 #[test]
5115 fn test_drop_graph_if_exists() {
5116 let db = GrafeoDB::new_in_memory();
5117 let session = db.session();
5118
5119 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5121 }
5122
5123 #[test]
5124 fn test_start_transaction_via_gql() {
5125 let db = GrafeoDB::new_in_memory();
5126 let session = db.session();
5127
5128 session.execute("START TRANSACTION").unwrap();
5129 assert!(session.in_transaction());
5130 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5131 session.execute("COMMIT").unwrap();
5132 assert!(!session.in_transaction());
5133
5134 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5135 assert_eq!(result.rows.len(), 1);
5136 }
5137
5138 #[test]
5139 fn test_start_transaction_read_only_blocks_insert() {
5140 let db = GrafeoDB::new_in_memory();
5141 let session = db.session();
5142
5143 session.execute("START TRANSACTION READ ONLY").unwrap();
5144 let result = session.execute("INSERT (:Person {name: 'Alix'})");
5145 assert!(result.is_err());
5146 let err = result.unwrap_err().to_string();
5147 assert!(
5148 err.contains("read-only"),
5149 "Expected read-only error, got: {err}"
5150 );
5151 session.execute("ROLLBACK").unwrap();
5152 }
5153
5154 #[test]
5155 fn test_start_transaction_read_only_allows_reads() {
5156 let db = GrafeoDB::new_in_memory();
5157 let mut session = db.session();
5158 session.begin_transaction().unwrap();
5159 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5160 session.commit().unwrap();
5161
5162 session.execute("START TRANSACTION READ ONLY").unwrap();
5163 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5164 assert_eq!(result.rows.len(), 1);
5165 session.execute("COMMIT").unwrap();
5166 }
5167
5168 #[test]
5169 fn test_rollback_via_gql() {
5170 let db = GrafeoDB::new_in_memory();
5171 let session = db.session();
5172
5173 session.execute("START TRANSACTION").unwrap();
5174 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5175 session.execute("ROLLBACK").unwrap();
5176
5177 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5178 assert!(result.rows.is_empty());
5179 }
5180
5181 #[test]
5182 fn test_start_transaction_with_isolation_level() {
5183 let db = GrafeoDB::new_in_memory();
5184 let session = db.session();
5185
5186 session
5187 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5188 .unwrap();
5189 assert!(session.in_transaction());
5190 session.execute("ROLLBACK").unwrap();
5191 }
5192
5193 #[test]
5194 fn test_session_commands_return_empty_result() {
5195 let db = GrafeoDB::new_in_memory();
5196 let session = db.session();
5197
5198 session.execute("CREATE GRAPH test").unwrap();
5199 let result = session.execute("SESSION SET GRAPH test").unwrap();
5200 assert_eq!(result.row_count(), 0);
5201 assert_eq!(result.column_count(), 0);
5202 }
5203
5204 #[test]
5205 fn test_current_graph_default_is_none() {
5206 let db = GrafeoDB::new_in_memory();
5207 let session = db.session();
5208
5209 assert_eq!(session.current_graph(), None);
5210 }
5211
5212 #[test]
5213 fn test_time_zone_default_is_none() {
5214 let db = GrafeoDB::new_in_memory();
5215 let session = db.session();
5216
5217 assert_eq!(session.time_zone(), None);
5218 }
5219
5220 #[test]
5221 fn test_session_state_independent_across_sessions() {
5222 let db = GrafeoDB::new_in_memory();
5223 let session1 = db.session();
5224 let session2 = db.session();
5225
5226 session1.execute("CREATE GRAPH first").unwrap();
5227 session1.execute("CREATE GRAPH second").unwrap();
5228 session1.execute("SESSION SET GRAPH first").unwrap();
5229 session2.execute("SESSION SET GRAPH second").unwrap();
5230
5231 assert_eq!(session1.current_graph(), Some("first".to_string()));
5232 assert_eq!(session2.current_graph(), Some("second".to_string()));
5233 }
5234
5235 #[test]
5236 fn test_show_node_types() {
5237 let db = GrafeoDB::new_in_memory();
5238 let session = db.session();
5239
5240 session
5241 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5242 .unwrap();
5243
5244 let result = session.execute("SHOW NODE TYPES").unwrap();
5245 assert_eq!(
5246 result.columns,
5247 vec!["name", "properties", "constraints", "parents"]
5248 );
5249 assert_eq!(result.rows.len(), 1);
5250 assert_eq!(result.rows[0][0], Value::from("Person"));
5252 }
5253
5254 #[test]
5255 fn test_show_edge_types() {
5256 let db = GrafeoDB::new_in_memory();
5257 let session = db.session();
5258
5259 session
5260 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5261 .unwrap();
5262
5263 let result = session.execute("SHOW EDGE TYPES").unwrap();
5264 assert_eq!(
5265 result.columns,
5266 vec!["name", "properties", "source_types", "target_types"]
5267 );
5268 assert_eq!(result.rows.len(), 1);
5269 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5270 }
5271
5272 #[test]
5273 fn test_show_graph_types() {
5274 let db = GrafeoDB::new_in_memory();
5275 let session = db.session();
5276
5277 session
5278 .execute("CREATE NODE TYPE Person (name STRING)")
5279 .unwrap();
5280 session
5281 .execute(
5282 "CREATE GRAPH TYPE social (\
5283 NODE TYPE Person (name STRING)\
5284 )",
5285 )
5286 .unwrap();
5287
5288 let result = session.execute("SHOW GRAPH TYPES").unwrap();
5289 assert_eq!(
5290 result.columns,
5291 vec!["name", "open", "node_types", "edge_types"]
5292 );
5293 assert_eq!(result.rows.len(), 1);
5294 assert_eq!(result.rows[0][0], Value::from("social"));
5295 }
5296
5297 #[test]
5298 fn test_show_graph_type_named() {
5299 let db = GrafeoDB::new_in_memory();
5300 let session = db.session();
5301
5302 session
5303 .execute("CREATE NODE TYPE Person (name STRING)")
5304 .unwrap();
5305 session
5306 .execute(
5307 "CREATE GRAPH TYPE social (\
5308 NODE TYPE Person (name STRING)\
5309 )",
5310 )
5311 .unwrap();
5312
5313 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5314 assert_eq!(result.rows.len(), 1);
5315 assert_eq!(result.rows[0][0], Value::from("social"));
5316 }
5317
5318 #[test]
5319 fn test_show_graph_type_not_found() {
5320 let db = GrafeoDB::new_in_memory();
5321 let session = db.session();
5322
5323 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5324 assert!(result.is_err());
5325 }
5326
5327 #[test]
5328 fn test_show_indexes_via_gql() {
5329 let db = GrafeoDB::new_in_memory();
5330 let session = db.session();
5331
5332 let result = session.execute("SHOW INDEXES").unwrap();
5333 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5334 }
5335
5336 #[test]
5337 fn test_show_constraints_via_gql() {
5338 let db = GrafeoDB::new_in_memory();
5339 let session = db.session();
5340
5341 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5342 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5343 }
5344
5345 #[test]
5346 fn test_pattern_form_graph_type_roundtrip() {
5347 let db = GrafeoDB::new_in_memory();
5348 let session = db.session();
5349
5350 session
5352 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5353 .unwrap();
5354 session
5355 .execute("CREATE NODE TYPE City (name STRING)")
5356 .unwrap();
5357 session
5358 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5359 .unwrap();
5360 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5361
5362 session
5364 .execute(
5365 "CREATE GRAPH TYPE social (\
5366 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5367 (:Person)-[:LIVES_IN]->(:City)\
5368 )",
5369 )
5370 .unwrap();
5371
5372 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5374 assert_eq!(result.rows.len(), 1);
5375 assert_eq!(result.rows[0][0], Value::from("social"));
5376 }
5377 }
5378}