1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25pub struct Session {
31 store: Arc<LpgStore>,
33 graph_store: Arc<dyn GraphStoreMut>,
35 catalog: Arc<Catalog>,
37 #[cfg(feature = "rdf")]
39 rdf_store: Arc<RdfStore>,
40 tx_manager: Arc<TransactionManager>,
42 query_cache: Arc<QueryCache>,
44 current_tx: parking_lot::Mutex<Option<TxId>>,
48 read_only_tx: parking_lot::Mutex<bool>,
50 auto_commit: bool,
52 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
55 factorized_execution: bool,
57 graph_model: GraphModel,
59 query_timeout: Option<Duration>,
61 commit_counter: Arc<AtomicUsize>,
63 gc_interval: usize,
65 tx_start_node_count: AtomicUsize,
67 tx_start_edge_count: AtomicUsize,
69 #[cfg(feature = "wal")]
71 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
72 #[cfg(feature = "cdc")]
74 cdc_log: Arc<crate::cdc::CdcLog>,
75 current_graph: parking_lot::Mutex<Option<String>>,
77 time_zone: parking_lot::Mutex<Option<String>>,
79 session_params:
81 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
82 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
84}
85
86impl Session {
87 #[allow(dead_code, clippy::too_many_arguments)]
89 pub(crate) fn with_adaptive(
90 store: Arc<LpgStore>,
91 tx_manager: Arc<TransactionManager>,
92 query_cache: Arc<QueryCache>,
93 catalog: Arc<Catalog>,
94 adaptive_config: AdaptiveConfig,
95 factorized_execution: bool,
96 graph_model: GraphModel,
97 query_timeout: Option<Duration>,
98 commit_counter: Arc<AtomicUsize>,
99 gc_interval: usize,
100 ) -> Self {
101 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
102 Self {
103 store,
104 graph_store,
105 catalog,
106 #[cfg(feature = "rdf")]
107 rdf_store: Arc::new(RdfStore::new()),
108 tx_manager,
109 query_cache,
110 current_tx: parking_lot::Mutex::new(None),
111 read_only_tx: parking_lot::Mutex::new(false),
112 auto_commit: true,
113 adaptive_config,
114 factorized_execution,
115 graph_model,
116 query_timeout,
117 commit_counter,
118 gc_interval,
119 tx_start_node_count: AtomicUsize::new(0),
120 tx_start_edge_count: AtomicUsize::new(0),
121 #[cfg(feature = "wal")]
122 wal: None,
123 #[cfg(feature = "cdc")]
124 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
125 current_graph: parking_lot::Mutex::new(None),
126 time_zone: parking_lot::Mutex::new(None),
127 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
128 viewing_epoch_override: parking_lot::Mutex::new(None),
129 }
130 }
131
132 #[cfg(feature = "wal")]
137 pub(crate) fn set_wal(&mut self, wal: Arc<grafeo_adapters::storage::wal::LpgWal>) {
138 self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
140 Arc::clone(&self.store),
141 Arc::clone(&wal),
142 ));
143 self.wal = Some(wal);
144 }
145
146 #[cfg(feature = "cdc")]
148 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
149 self.cdc_log = cdc_log;
150 }
151
152 #[cfg(feature = "rdf")]
154 #[allow(clippy::too_many_arguments)]
155 pub(crate) fn with_rdf_store_and_adaptive(
156 store: Arc<LpgStore>,
157 rdf_store: Arc<RdfStore>,
158 tx_manager: Arc<TransactionManager>,
159 query_cache: Arc<QueryCache>,
160 catalog: Arc<Catalog>,
161 adaptive_config: AdaptiveConfig,
162 factorized_execution: bool,
163 graph_model: GraphModel,
164 query_timeout: Option<Duration>,
165 commit_counter: Arc<AtomicUsize>,
166 gc_interval: usize,
167 ) -> Self {
168 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
169 Self {
170 store,
171 graph_store,
172 catalog,
173 rdf_store,
174 tx_manager,
175 query_cache,
176 current_tx: parking_lot::Mutex::new(None),
177 read_only_tx: parking_lot::Mutex::new(false),
178 auto_commit: true,
179 adaptive_config,
180 factorized_execution,
181 graph_model,
182 query_timeout,
183 commit_counter,
184 gc_interval,
185 tx_start_node_count: AtomicUsize::new(0),
186 tx_start_edge_count: AtomicUsize::new(0),
187 #[cfg(feature = "wal")]
188 wal: None,
189 #[cfg(feature = "cdc")]
190 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
191 current_graph: parking_lot::Mutex::new(None),
192 time_zone: parking_lot::Mutex::new(None),
193 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
194 viewing_epoch_override: parking_lot::Mutex::new(None),
195 }
196 }
197
198 #[allow(clippy::too_many_arguments)]
203 pub(crate) fn with_external_store(
204 store: Arc<dyn GraphStoreMut>,
205 tx_manager: Arc<TransactionManager>,
206 query_cache: Arc<QueryCache>,
207 catalog: Arc<Catalog>,
208 adaptive_config: AdaptiveConfig,
209 factorized_execution: bool,
210 graph_model: GraphModel,
211 query_timeout: Option<Duration>,
212 commit_counter: Arc<AtomicUsize>,
213 gc_interval: usize,
214 ) -> Self {
215 Self {
216 store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), graph_store: store,
218 catalog,
219 #[cfg(feature = "rdf")]
220 rdf_store: Arc::new(RdfStore::new()),
221 tx_manager,
222 query_cache,
223 current_tx: parking_lot::Mutex::new(None),
224 read_only_tx: parking_lot::Mutex::new(false),
225 auto_commit: true,
226 adaptive_config,
227 factorized_execution,
228 graph_model,
229 query_timeout,
230 commit_counter,
231 gc_interval,
232 tx_start_node_count: AtomicUsize::new(0),
233 tx_start_edge_count: AtomicUsize::new(0),
234 #[cfg(feature = "wal")]
235 wal: None,
236 #[cfg(feature = "cdc")]
237 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
238 current_graph: parking_lot::Mutex::new(None),
239 time_zone: parking_lot::Mutex::new(None),
240 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
241 viewing_epoch_override: parking_lot::Mutex::new(None),
242 }
243 }
244
245 #[must_use]
247 pub fn graph_model(&self) -> GraphModel {
248 self.graph_model
249 }
250
251 pub fn use_graph(&self, name: &str) {
255 *self.current_graph.lock() = Some(name.to_string());
256 }
257
258 #[must_use]
260 pub fn current_graph(&self) -> Option<String> {
261 self.current_graph.lock().clone()
262 }
263
264 pub fn set_time_zone(&self, tz: &str) {
266 *self.time_zone.lock() = Some(tz.to_string());
267 }
268
269 #[must_use]
271 pub fn time_zone(&self) -> Option<String> {
272 self.time_zone.lock().clone()
273 }
274
275 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
277 self.session_params.lock().insert(key.to_string(), value);
278 }
279
280 #[must_use]
282 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
283 self.session_params.lock().get(key).cloned()
284 }
285
286 pub fn reset_session(&self) {
288 *self.current_graph.lock() = None;
289 *self.time_zone.lock() = None;
290 self.session_params.lock().clear();
291 *self.viewing_epoch_override.lock() = None;
292 }
293
294 pub fn set_viewing_epoch(&self, epoch: EpochId) {
302 *self.viewing_epoch_override.lock() = Some(epoch);
303 }
304
305 pub fn clear_viewing_epoch(&self) {
307 *self.viewing_epoch_override.lock() = None;
308 }
309
310 #[must_use]
312 pub fn viewing_epoch(&self) -> Option<EpochId> {
313 *self.viewing_epoch_override.lock()
314 }
315
316 #[must_use]
320 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
321 self.store.get_node_history(id)
322 }
323
324 #[must_use]
328 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
329 self.store.get_edge_history(id)
330 }
331
332 fn require_lpg(&self, language: &str) -> Result<()> {
334 if self.graph_model == GraphModel::Rdf {
335 return Err(grafeo_common::utils::error::Error::Internal(format!(
336 "This is an RDF database. {language} queries require an LPG database."
337 )));
338 }
339 Ok(())
340 }
341
342 #[cfg(feature = "gql")]
344 fn execute_session_command(
345 &self,
346 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
347 ) -> Result<QueryResult> {
348 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
349 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
350
351 match cmd {
352 SessionCommand::CreateGraph {
353 name,
354 if_not_exists,
355 typed,
356 } => {
357 let created = self
358 .store
359 .create_graph(&name)
360 .map_err(|e| Error::Internal(e.to_string()))?;
361 if !created && !if_not_exists {
362 return Err(Error::Query(QueryError::new(
363 QueryErrorKind::Semantic,
364 format!("Graph '{name}' already exists"),
365 )));
366 }
367 if let Some(type_name) = typed
369 && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
370 {
371 return Err(Error::Query(QueryError::new(
372 QueryErrorKind::Semantic,
373 e.to_string(),
374 )));
375 }
376 Ok(QueryResult::empty())
377 }
378 SessionCommand::DropGraph { name, if_exists } => {
379 let dropped = self.store.drop_graph(&name);
380 if !dropped && !if_exists {
381 return Err(Error::Query(QueryError::new(
382 QueryErrorKind::Semantic,
383 format!("Graph '{name}' does not exist"),
384 )));
385 }
386 Ok(QueryResult::empty())
387 }
388 SessionCommand::UseGraph(name) => {
389 if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
391 return Err(Error::Query(QueryError::new(
392 QueryErrorKind::Semantic,
393 format!("Graph '{name}' does not exist"),
394 )));
395 }
396 self.use_graph(&name);
397 Ok(QueryResult::empty())
398 }
399 SessionCommand::SessionSetGraph(name) => {
400 self.use_graph(&name);
401 Ok(QueryResult::empty())
402 }
403 SessionCommand::SessionSetTimeZone(tz) => {
404 self.set_time_zone(&tz);
405 Ok(QueryResult::empty())
406 }
407 SessionCommand::SessionSetParameter(key, expr) => {
408 if key.eq_ignore_ascii_case("viewing_epoch") {
409 match Self::eval_integer_literal(&expr) {
410 Some(n) if n >= 0 => {
411 self.set_viewing_epoch(EpochId::new(n as u64));
412 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
413 }
414 _ => Err(Error::Query(QueryError::new(
415 QueryErrorKind::Semantic,
416 "viewing_epoch must be a non-negative integer literal",
417 ))),
418 }
419 } else {
420 self.set_parameter(&key, Value::Null);
423 Ok(QueryResult::empty())
424 }
425 }
426 SessionCommand::SessionReset => {
427 self.reset_session();
428 Ok(QueryResult::empty())
429 }
430 SessionCommand::SessionClose => {
431 self.reset_session();
432 Ok(QueryResult::empty())
433 }
434 SessionCommand::StartTransaction {
435 read_only,
436 isolation_level,
437 } => {
438 let engine_level = isolation_level.map(|l| match l {
439 TransactionIsolationLevel::ReadCommitted => {
440 crate::transaction::IsolationLevel::ReadCommitted
441 }
442 TransactionIsolationLevel::SnapshotIsolation => {
443 crate::transaction::IsolationLevel::SnapshotIsolation
444 }
445 TransactionIsolationLevel::Serializable => {
446 crate::transaction::IsolationLevel::Serializable
447 }
448 });
449 self.begin_tx_inner(read_only, engine_level)?;
450 Ok(QueryResult::status("Transaction started"))
451 }
452 SessionCommand::Commit => {
453 self.commit_inner()?;
454 Ok(QueryResult::status("Transaction committed"))
455 }
456 SessionCommand::Rollback => {
457 self.rollback_inner()?;
458 Ok(QueryResult::status("Transaction rolled back"))
459 }
460 }
461 }
462
463 #[cfg(feature = "wal")]
465 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
466 if let Some(ref wal) = self.wal
467 && let Err(e) = wal.log(record)
468 {
469 tracing::warn!("Failed to log schema change to WAL: {}", e);
470 }
471 }
472
473 #[cfg(feature = "gql")]
475 fn execute_schema_command(
476 &self,
477 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
478 ) -> Result<QueryResult> {
479 use crate::catalog::{
480 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
481 };
482 use grafeo_adapters::query::gql::ast::SchemaStatement;
483 #[cfg(feature = "wal")]
484 use grafeo_adapters::storage::wal::WalRecord;
485 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
486
487 macro_rules! wal_log {
489 ($self:expr, $record:expr) => {
490 #[cfg(feature = "wal")]
491 $self.log_schema_wal(&$record);
492 };
493 }
494
495 match cmd {
496 SchemaStatement::CreateNodeType(stmt) => {
497 #[cfg(feature = "wal")]
498 let props_for_wal: Vec<(String, String, bool)> = stmt
499 .properties
500 .iter()
501 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
502 .collect();
503 let def = NodeTypeDefinition {
504 name: stmt.name.clone(),
505 properties: stmt
506 .properties
507 .iter()
508 .map(|p| TypedProperty {
509 name: p.name.clone(),
510 data_type: PropertyDataType::from_type_name(&p.data_type),
511 nullable: p.nullable,
512 default_value: None,
513 })
514 .collect(),
515 constraints: Vec::new(),
516 };
517 let result = if stmt.or_replace {
518 let _ = self.catalog.drop_node_type(&stmt.name);
519 self.catalog.register_node_type(def)
520 } else {
521 self.catalog.register_node_type(def)
522 };
523 match result {
524 Ok(()) => {
525 wal_log!(
526 self,
527 WalRecord::CreateNodeType {
528 name: stmt.name.clone(),
529 properties: props_for_wal,
530 constraints: Vec::new(),
531 }
532 );
533 Ok(QueryResult::status(format!(
534 "Created node type '{}'",
535 stmt.name
536 )))
537 }
538 Err(e) if stmt.if_not_exists => {
539 let _ = e;
540 Ok(QueryResult::status("No change"))
541 }
542 Err(e) => Err(Error::Query(QueryError::new(
543 QueryErrorKind::Semantic,
544 e.to_string(),
545 ))),
546 }
547 }
548 SchemaStatement::CreateEdgeType(stmt) => {
549 #[cfg(feature = "wal")]
550 let props_for_wal: Vec<(String, String, bool)> = stmt
551 .properties
552 .iter()
553 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
554 .collect();
555 let def = EdgeTypeDefinition {
556 name: stmt.name.clone(),
557 properties: stmt
558 .properties
559 .iter()
560 .map(|p| TypedProperty {
561 name: p.name.clone(),
562 data_type: PropertyDataType::from_type_name(&p.data_type),
563 nullable: p.nullable,
564 default_value: None,
565 })
566 .collect(),
567 constraints: Vec::new(),
568 };
569 let result = if stmt.or_replace {
570 let _ = self.catalog.drop_edge_type_def(&stmt.name);
571 self.catalog.register_edge_type_def(def)
572 } else {
573 self.catalog.register_edge_type_def(def)
574 };
575 match result {
576 Ok(()) => {
577 wal_log!(
578 self,
579 WalRecord::CreateEdgeType {
580 name: stmt.name.clone(),
581 properties: props_for_wal,
582 constraints: Vec::new(),
583 }
584 );
585 Ok(QueryResult::status(format!(
586 "Created edge type '{}'",
587 stmt.name
588 )))
589 }
590 Err(e) if stmt.if_not_exists => {
591 let _ = e;
592 Ok(QueryResult::status("No change"))
593 }
594 Err(e) => Err(Error::Query(QueryError::new(
595 QueryErrorKind::Semantic,
596 e.to_string(),
597 ))),
598 }
599 }
600 SchemaStatement::CreateVectorIndex(stmt) => {
601 Self::create_vector_index_on_store(
602 &self.store,
603 &stmt.node_label,
604 &stmt.property,
605 stmt.dimensions,
606 stmt.metric.as_deref(),
607 )?;
608 wal_log!(
609 self,
610 WalRecord::CreateIndex {
611 name: stmt.name.clone(),
612 label: stmt.node_label.clone(),
613 property: stmt.property.clone(),
614 index_type: "vector".to_string(),
615 }
616 );
617 Ok(QueryResult::status(format!(
618 "Created vector index '{}'",
619 stmt.name
620 )))
621 }
622 SchemaStatement::DropNodeType { name, if_exists } => {
623 match self.catalog.drop_node_type(&name) {
624 Ok(()) => {
625 wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
626 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
627 }
628 Err(e) if if_exists => {
629 let _ = e;
630 Ok(QueryResult::status("No change"))
631 }
632 Err(e) => Err(Error::Query(QueryError::new(
633 QueryErrorKind::Semantic,
634 e.to_string(),
635 ))),
636 }
637 }
638 SchemaStatement::DropEdgeType { name, if_exists } => {
639 match self.catalog.drop_edge_type_def(&name) {
640 Ok(()) => {
641 wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
642 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
643 }
644 Err(e) if if_exists => {
645 let _ = e;
646 Ok(QueryResult::status("No change"))
647 }
648 Err(e) => Err(Error::Query(QueryError::new(
649 QueryErrorKind::Semantic,
650 e.to_string(),
651 ))),
652 }
653 }
654 SchemaStatement::CreateIndex(stmt) => {
655 use grafeo_adapters::query::gql::ast::IndexKind;
656 let index_type_str = match stmt.index_kind {
657 IndexKind::Property => "property",
658 IndexKind::BTree => "btree",
659 IndexKind::Text => "text",
660 IndexKind::Vector => "vector",
661 };
662 match stmt.index_kind {
663 IndexKind::Property | IndexKind::BTree => {
664 for prop in &stmt.properties {
665 self.store.create_property_index(prop);
666 }
667 }
668 IndexKind::Text => {
669 for prop in &stmt.properties {
670 Self::create_text_index_on_store(&self.store, &stmt.label, prop)?;
671 }
672 }
673 IndexKind::Vector => {
674 for prop in &stmt.properties {
675 Self::create_vector_index_on_store(
676 &self.store,
677 &stmt.label,
678 prop,
679 stmt.options.dimensions,
680 stmt.options.metric.as_deref(),
681 )?;
682 }
683 }
684 }
685 #[cfg(feature = "wal")]
686 for prop in &stmt.properties {
687 wal_log!(
688 self,
689 WalRecord::CreateIndex {
690 name: stmt.name.clone(),
691 label: stmt.label.clone(),
692 property: prop.clone(),
693 index_type: index_type_str.to_string(),
694 }
695 );
696 }
697 Ok(QueryResult::status(format!(
698 "Created {} index '{}'",
699 index_type_str, stmt.name
700 )))
701 }
702 SchemaStatement::DropIndex { name, if_exists } => {
703 let dropped = self.store.drop_property_index(&name);
705 if dropped || if_exists {
706 if dropped {
707 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
708 }
709 Ok(QueryResult::status(if dropped {
710 format!("Dropped index '{name}'")
711 } else {
712 "No change".to_string()
713 }))
714 } else {
715 Err(Error::Query(QueryError::new(
716 QueryErrorKind::Semantic,
717 format!("Index '{name}' does not exist"),
718 )))
719 }
720 }
721 SchemaStatement::CreateConstraint(stmt) => {
722 use grafeo_adapters::query::gql::ast::ConstraintKind;
723 let kind_str = match stmt.constraint_kind {
724 ConstraintKind::Unique => "unique",
725 ConstraintKind::NodeKey => "node_key",
726 ConstraintKind::NotNull => "not_null",
727 ConstraintKind::Exists => "exists",
728 };
729 let constraint_name = stmt
730 .name
731 .clone()
732 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
733 wal_log!(
734 self,
735 WalRecord::CreateConstraint {
736 name: constraint_name.clone(),
737 label: stmt.label.clone(),
738 properties: stmt.properties.clone(),
739 kind: kind_str.to_string(),
740 }
741 );
742 Ok(QueryResult::status(format!(
743 "Created {kind_str} constraint '{constraint_name}'"
744 )))
745 }
746 SchemaStatement::DropConstraint { name, if_exists } => {
747 let _ = if_exists;
748 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
749 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
750 }
751 SchemaStatement::CreateGraphType(stmt) => {
752 use crate::catalog::GraphTypeDefinition;
753 use grafeo_adapters::query::gql::ast::InlineElementType;
754
755 let (mut node_types, mut edge_types, open) =
757 if let Some(ref like_graph) = stmt.like_graph {
758 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
760 if let Some(existing) = self
761 .catalog
762 .schema()
763 .and_then(|s| s.get_graph_type(&type_name))
764 {
765 (
766 existing.allowed_node_types.clone(),
767 existing.allowed_edge_types.clone(),
768 existing.open,
769 )
770 } else {
771 (Vec::new(), Vec::new(), true)
772 }
773 } else {
774 let nt = self.catalog.all_node_type_names();
776 let et = self.catalog.all_edge_type_names();
777 if nt.is_empty() && et.is_empty() {
778 (Vec::new(), Vec::new(), true)
779 } else {
780 (nt, et, false)
781 }
782 }
783 } else {
784 (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
785 };
786
787 for inline in &stmt.inline_types {
789 match inline {
790 InlineElementType::Node {
791 name, properties, ..
792 } => {
793 let def = NodeTypeDefinition {
794 name: name.clone(),
795 properties: properties
796 .iter()
797 .map(|p| TypedProperty {
798 name: p.name.clone(),
799 data_type: PropertyDataType::from_type_name(&p.data_type),
800 nullable: p.nullable,
801 default_value: None,
802 })
803 .collect(),
804 constraints: Vec::new(),
805 };
806 self.catalog.register_or_replace_node_type(def);
808 #[cfg(feature = "wal")]
809 {
810 let props_for_wal: Vec<(String, String, bool)> = properties
811 .iter()
812 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
813 .collect();
814 self.log_schema_wal(&WalRecord::CreateNodeType {
815 name: name.clone(),
816 properties: props_for_wal,
817 constraints: Vec::new(),
818 });
819 }
820 if !node_types.contains(name) {
821 node_types.push(name.clone());
822 }
823 }
824 InlineElementType::Edge {
825 name, properties, ..
826 } => {
827 let def = EdgeTypeDefinition {
828 name: name.clone(),
829 properties: properties
830 .iter()
831 .map(|p| TypedProperty {
832 name: p.name.clone(),
833 data_type: PropertyDataType::from_type_name(&p.data_type),
834 nullable: p.nullable,
835 default_value: None,
836 })
837 .collect(),
838 constraints: Vec::new(),
839 };
840 self.catalog.register_or_replace_edge_type_def(def);
841 #[cfg(feature = "wal")]
842 {
843 let props_for_wal: Vec<(String, String, bool)> = properties
844 .iter()
845 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
846 .collect();
847 self.log_schema_wal(&WalRecord::CreateEdgeType {
848 name: name.clone(),
849 properties: props_for_wal,
850 constraints: Vec::new(),
851 });
852 }
853 if !edge_types.contains(name) {
854 edge_types.push(name.clone());
855 }
856 }
857 }
858 }
859
860 let def = GraphTypeDefinition {
861 name: stmt.name.clone(),
862 allowed_node_types: node_types.clone(),
863 allowed_edge_types: edge_types.clone(),
864 open,
865 };
866 let result = if stmt.or_replace {
867 let _ = self.catalog.drop_graph_type(&stmt.name);
869 self.catalog.register_graph_type(def)
870 } else {
871 self.catalog.register_graph_type(def)
872 };
873 match result {
874 Ok(()) => {
875 wal_log!(
876 self,
877 WalRecord::CreateGraphType {
878 name: stmt.name.clone(),
879 node_types,
880 edge_types,
881 open,
882 }
883 );
884 Ok(QueryResult::status(format!(
885 "Created graph type '{}'",
886 stmt.name
887 )))
888 }
889 Err(e) if stmt.if_not_exists => {
890 let _ = e;
891 Ok(QueryResult::status("No change"))
892 }
893 Err(e) => Err(Error::Query(QueryError::new(
894 QueryErrorKind::Semantic,
895 e.to_string(),
896 ))),
897 }
898 }
899 SchemaStatement::DropGraphType { name, if_exists } => {
900 match self.catalog.drop_graph_type(&name) {
901 Ok(()) => {
902 wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
903 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
904 }
905 Err(e) if if_exists => {
906 let _ = e;
907 Ok(QueryResult::status("No change"))
908 }
909 Err(e) => Err(Error::Query(QueryError::new(
910 QueryErrorKind::Semantic,
911 e.to_string(),
912 ))),
913 }
914 }
915 SchemaStatement::CreateSchema {
916 name,
917 if_not_exists,
918 } => match self.catalog.register_schema_namespace(name.clone()) {
919 Ok(()) => {
920 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
921 Ok(QueryResult::status(format!("Created schema '{name}'")))
922 }
923 Err(e) if if_not_exists => {
924 let _ = e;
925 Ok(QueryResult::status("No change"))
926 }
927 Err(e) => Err(Error::Query(QueryError::new(
928 QueryErrorKind::Semantic,
929 e.to_string(),
930 ))),
931 },
932 SchemaStatement::DropSchema { name, if_exists } => {
933 match self.catalog.drop_schema_namespace(&name) {
934 Ok(()) => {
935 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
936 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
937 }
938 Err(e) if if_exists => {
939 let _ = e;
940 Ok(QueryResult::status("No change"))
941 }
942 Err(e) => Err(Error::Query(QueryError::new(
943 QueryErrorKind::Semantic,
944 e.to_string(),
945 ))),
946 }
947 }
948 SchemaStatement::AlterNodeType(stmt) => {
949 use grafeo_adapters::query::gql::ast::TypeAlteration;
950 let mut wal_alts = Vec::new();
951 for alt in &stmt.alterations {
952 match alt {
953 TypeAlteration::AddProperty(prop) => {
954 let typed = TypedProperty {
955 name: prop.name.clone(),
956 data_type: PropertyDataType::from_type_name(&prop.data_type),
957 nullable: prop.nullable,
958 default_value: None,
959 };
960 self.catalog
961 .alter_node_type_add_property(&stmt.name, typed)
962 .map_err(|e| {
963 Error::Query(QueryError::new(
964 QueryErrorKind::Semantic,
965 e.to_string(),
966 ))
967 })?;
968 wal_alts.push((
969 "add".to_string(),
970 prop.name.clone(),
971 prop.data_type.clone(),
972 prop.nullable,
973 ));
974 }
975 TypeAlteration::DropProperty(name) => {
976 self.catalog
977 .alter_node_type_drop_property(&stmt.name, name)
978 .map_err(|e| {
979 Error::Query(QueryError::new(
980 QueryErrorKind::Semantic,
981 e.to_string(),
982 ))
983 })?;
984 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
985 }
986 }
987 }
988 wal_log!(
989 self,
990 WalRecord::AlterNodeType {
991 name: stmt.name.clone(),
992 alterations: wal_alts,
993 }
994 );
995 Ok(QueryResult::status(format!(
996 "Altered node type '{}'",
997 stmt.name
998 )))
999 }
1000 SchemaStatement::AlterEdgeType(stmt) => {
1001 use grafeo_adapters::query::gql::ast::TypeAlteration;
1002 let mut wal_alts = Vec::new();
1003 for alt in &stmt.alterations {
1004 match alt {
1005 TypeAlteration::AddProperty(prop) => {
1006 let typed = TypedProperty {
1007 name: prop.name.clone(),
1008 data_type: PropertyDataType::from_type_name(&prop.data_type),
1009 nullable: prop.nullable,
1010 default_value: None,
1011 };
1012 self.catalog
1013 .alter_edge_type_add_property(&stmt.name, typed)
1014 .map_err(|e| {
1015 Error::Query(QueryError::new(
1016 QueryErrorKind::Semantic,
1017 e.to_string(),
1018 ))
1019 })?;
1020 wal_alts.push((
1021 "add".to_string(),
1022 prop.name.clone(),
1023 prop.data_type.clone(),
1024 prop.nullable,
1025 ));
1026 }
1027 TypeAlteration::DropProperty(name) => {
1028 self.catalog
1029 .alter_edge_type_drop_property(&stmt.name, name)
1030 .map_err(|e| {
1031 Error::Query(QueryError::new(
1032 QueryErrorKind::Semantic,
1033 e.to_string(),
1034 ))
1035 })?;
1036 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1037 }
1038 }
1039 }
1040 wal_log!(
1041 self,
1042 WalRecord::AlterEdgeType {
1043 name: stmt.name.clone(),
1044 alterations: wal_alts,
1045 }
1046 );
1047 Ok(QueryResult::status(format!(
1048 "Altered edge type '{}'",
1049 stmt.name
1050 )))
1051 }
1052 SchemaStatement::AlterGraphType(stmt) => {
1053 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1054 let mut wal_alts = Vec::new();
1055 for alt in &stmt.alterations {
1056 match alt {
1057 GraphTypeAlteration::AddNodeType(name) => {
1058 self.catalog
1059 .alter_graph_type_add_node_type(&stmt.name, name.clone())
1060 .map_err(|e| {
1061 Error::Query(QueryError::new(
1062 QueryErrorKind::Semantic,
1063 e.to_string(),
1064 ))
1065 })?;
1066 wal_alts.push(("add_node_type".to_string(), name.clone()));
1067 }
1068 GraphTypeAlteration::DropNodeType(name) => {
1069 self.catalog
1070 .alter_graph_type_drop_node_type(&stmt.name, name)
1071 .map_err(|e| {
1072 Error::Query(QueryError::new(
1073 QueryErrorKind::Semantic,
1074 e.to_string(),
1075 ))
1076 })?;
1077 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1078 }
1079 GraphTypeAlteration::AddEdgeType(name) => {
1080 self.catalog
1081 .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1082 .map_err(|e| {
1083 Error::Query(QueryError::new(
1084 QueryErrorKind::Semantic,
1085 e.to_string(),
1086 ))
1087 })?;
1088 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1089 }
1090 GraphTypeAlteration::DropEdgeType(name) => {
1091 self.catalog
1092 .alter_graph_type_drop_edge_type(&stmt.name, name)
1093 .map_err(|e| {
1094 Error::Query(QueryError::new(
1095 QueryErrorKind::Semantic,
1096 e.to_string(),
1097 ))
1098 })?;
1099 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1100 }
1101 }
1102 }
1103 wal_log!(
1104 self,
1105 WalRecord::AlterGraphType {
1106 name: stmt.name.clone(),
1107 alterations: wal_alts,
1108 }
1109 );
1110 Ok(QueryResult::status(format!(
1111 "Altered graph type '{}'",
1112 stmt.name
1113 )))
1114 }
1115 SchemaStatement::CreateProcedure(stmt) => {
1116 use crate::catalog::ProcedureDefinition;
1117
1118 let def = ProcedureDefinition {
1119 name: stmt.name.clone(),
1120 params: stmt
1121 .params
1122 .iter()
1123 .map(|p| (p.name.clone(), p.param_type.clone()))
1124 .collect(),
1125 returns: stmt
1126 .returns
1127 .iter()
1128 .map(|r| (r.name.clone(), r.return_type.clone()))
1129 .collect(),
1130 body: stmt.body.clone(),
1131 };
1132
1133 if stmt.or_replace {
1134 self.catalog.replace_procedure(def).map_err(|e| {
1135 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1136 })?;
1137 } else {
1138 match self.catalog.register_procedure(def) {
1139 Ok(()) => {}
1140 Err(_) if stmt.if_not_exists => {
1141 return Ok(QueryResult::empty());
1142 }
1143 Err(e) => {
1144 return Err(Error::Query(QueryError::new(
1145 QueryErrorKind::Semantic,
1146 e.to_string(),
1147 )));
1148 }
1149 }
1150 }
1151
1152 wal_log!(
1153 self,
1154 WalRecord::CreateProcedure {
1155 name: stmt.name.clone(),
1156 params: stmt
1157 .params
1158 .iter()
1159 .map(|p| (p.name.clone(), p.param_type.clone()))
1160 .collect(),
1161 returns: stmt
1162 .returns
1163 .iter()
1164 .map(|r| (r.name.clone(), r.return_type.clone()))
1165 .collect(),
1166 body: stmt.body,
1167 }
1168 );
1169 Ok(QueryResult::status(format!(
1170 "Created procedure '{}'",
1171 stmt.name
1172 )))
1173 }
1174 SchemaStatement::DropProcedure { name, if_exists } => {
1175 match self.catalog.drop_procedure(&name) {
1176 Ok(()) => {}
1177 Err(_) if if_exists => {
1178 return Ok(QueryResult::empty());
1179 }
1180 Err(e) => {
1181 return Err(Error::Query(QueryError::new(
1182 QueryErrorKind::Semantic,
1183 e.to_string(),
1184 )));
1185 }
1186 }
1187 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1188 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1189 }
1190 }
1191 }
1192
1193 #[cfg(all(feature = "gql", feature = "vector-index"))]
1195 fn create_vector_index_on_store(
1196 store: &LpgStore,
1197 label: &str,
1198 property: &str,
1199 dimensions: Option<usize>,
1200 metric: Option<&str>,
1201 ) -> Result<()> {
1202 use grafeo_common::types::{PropertyKey, Value};
1203 use grafeo_common::utils::error::Error;
1204 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1205
1206 let metric = match metric {
1207 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1208 Error::Internal(format!(
1209 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1210 ))
1211 })?,
1212 None => DistanceMetric::Cosine,
1213 };
1214
1215 let prop_key = PropertyKey::new(property);
1216 let mut found_dims: Option<usize> = dimensions;
1217 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1218
1219 for node in store.nodes_with_label(label) {
1220 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1221 if let Some(expected) = found_dims {
1222 if v.len() != expected {
1223 return Err(Error::Internal(format!(
1224 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1225 v.len(),
1226 node.id.0
1227 )));
1228 }
1229 } else {
1230 found_dims = Some(v.len());
1231 }
1232 vectors.push((node.id, v.to_vec()));
1233 }
1234 }
1235
1236 let Some(dims) = found_dims else {
1237 return Err(Error::Internal(format!(
1238 "No vector properties found on :{label}({property}) and no dimensions specified"
1239 )));
1240 };
1241
1242 let config = HnswConfig::new(dims, metric);
1243 let index = HnswIndex::with_capacity(config, vectors.len());
1244 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1245 for (node_id, vec) in &vectors {
1246 index.insert(*node_id, vec, &accessor);
1247 }
1248
1249 store.add_vector_index(label, property, Arc::new(index));
1250 Ok(())
1251 }
1252
1253 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1255 fn create_vector_index_on_store(
1256 _store: &LpgStore,
1257 _label: &str,
1258 _property: &str,
1259 _dimensions: Option<usize>,
1260 _metric: Option<&str>,
1261 ) -> Result<()> {
1262 Err(grafeo_common::utils::error::Error::Internal(
1263 "Vector index support requires the 'vector-index' feature".to_string(),
1264 ))
1265 }
1266
1267 #[cfg(all(feature = "gql", feature = "text-index"))]
1269 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1270 use grafeo_common::types::{PropertyKey, Value};
1271 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1272
1273 let mut index = InvertedIndex::new(BM25Config::default());
1274 let prop_key = PropertyKey::new(property);
1275
1276 let nodes = store.nodes_by_label(label);
1277 for node_id in nodes {
1278 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1279 index.insert(node_id, text.as_str());
1280 }
1281 }
1282
1283 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1284 Ok(())
1285 }
1286
1287 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1289 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1290 Err(grafeo_common::utils::error::Error::Internal(
1291 "Text index support requires the 'text-index' feature".to_string(),
1292 ))
1293 }
1294
1295 #[cfg(feature = "gql")]
1322 pub fn execute(&self, query: &str) -> Result<QueryResult> {
1323 self.require_lpg("GQL")?;
1324
1325 use crate::query::{
1326 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1327 processor::QueryLanguage, translators::gql,
1328 };
1329
1330 #[cfg(not(target_arch = "wasm32"))]
1331 let start_time = std::time::Instant::now();
1332
1333 let translation = gql::translate_full(query)?;
1335 let logical_plan = match translation {
1336 gql::GqlTranslationResult::SessionCommand(cmd) => {
1337 return self.execute_session_command(cmd);
1338 }
1339 gql::GqlTranslationResult::SchemaCommand(cmd) => {
1340 if *self.read_only_tx.lock() {
1342 return Err(grafeo_common::utils::error::Error::Transaction(
1343 grafeo_common::utils::error::TransactionError::ReadOnly,
1344 ));
1345 }
1346 return self.execute_schema_command(cmd);
1347 }
1348 gql::GqlTranslationResult::Plan(plan) => {
1349 if *self.read_only_tx.lock() && plan.root.has_mutations() {
1351 return Err(grafeo_common::utils::error::Error::Transaction(
1352 grafeo_common::utils::error::TransactionError::ReadOnly,
1353 ));
1354 }
1355 plan
1356 }
1357 };
1358
1359 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
1361
1362 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1364 cached_plan
1365 } else {
1366 let mut binder = Binder::new();
1368 let _binding_context = binder.bind(&logical_plan)?;
1369
1370 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1372 let plan = optimizer.optimize(logical_plan)?;
1373
1374 self.query_cache.put_optimized(cache_key, plan.clone());
1376
1377 plan
1378 };
1379
1380 if optimized_plan.explain {
1382 use crate::query::processor::{annotate_pushdown_hints, explain_result};
1383 let mut plan = optimized_plan;
1384 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
1385 return Ok(explain_result(&plan));
1386 }
1387
1388 let has_mutations = optimized_plan.root.has_mutations();
1389
1390 self.with_auto_commit(has_mutations, || {
1391 let (viewing_epoch, tx_id) = self.get_transaction_context();
1393
1394 let planner = self.create_planner(viewing_epoch, tx_id);
1397 let mut physical_plan = planner.plan(&optimized_plan)?;
1398
1399 let executor = Executor::with_columns(physical_plan.columns.clone())
1401 .with_deadline(self.query_deadline());
1402 let mut result = executor.execute(physical_plan.operator.as_mut())?;
1403
1404 let rows_scanned = result.rows.len() as u64;
1406 #[cfg(not(target_arch = "wasm32"))]
1407 {
1408 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1409 result.execution_time_ms = Some(elapsed_ms);
1410 }
1411 result.rows_scanned = Some(rows_scanned);
1412
1413 Ok(result)
1414 })
1415 }
1416
1417 #[cfg(feature = "gql")]
1426 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
1427 let previous = self.viewing_epoch_override.lock().replace(epoch);
1428 let result = self.execute(query);
1429 *self.viewing_epoch_override.lock() = previous;
1430 result
1431 }
1432
1433 #[cfg(feature = "gql")]
1439 pub fn execute_with_params(
1440 &self,
1441 query: &str,
1442 params: std::collections::HashMap<String, Value>,
1443 ) -> Result<QueryResult> {
1444 self.require_lpg("GQL")?;
1445
1446 use crate::query::processor::{QueryLanguage, QueryProcessor};
1447
1448 let has_mutations = Self::query_looks_like_mutation(query);
1449
1450 self.with_auto_commit(has_mutations, || {
1451 let (viewing_epoch, tx_id) = self.get_transaction_context();
1453
1454 let processor = QueryProcessor::for_graph_store_with_tx(
1456 Arc::clone(&self.graph_store),
1457 Arc::clone(&self.tx_manager),
1458 );
1459
1460 let processor = if let Some(tx_id) = tx_id {
1462 processor.with_tx_context(viewing_epoch, tx_id)
1463 } else {
1464 processor
1465 };
1466
1467 processor.process(query, QueryLanguage::Gql, Some(¶ms))
1468 })
1469 }
1470
1471 #[cfg(not(any(feature = "gql", feature = "cypher")))]
1477 pub fn execute_with_params(
1478 &self,
1479 _query: &str,
1480 _params: std::collections::HashMap<String, Value>,
1481 ) -> Result<QueryResult> {
1482 Err(grafeo_common::utils::error::Error::Internal(
1483 "No query language enabled".to_string(),
1484 ))
1485 }
1486
1487 #[cfg(not(any(feature = "gql", feature = "cypher")))]
1493 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
1494 Err(grafeo_common::utils::error::Error::Internal(
1495 "No query language enabled".to_string(),
1496 ))
1497 }
1498
1499 #[cfg(feature = "cypher")]
1505 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
1506 use crate::query::{
1507 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1508 processor::QueryLanguage, translators::cypher,
1509 };
1510
1511 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
1513
1514 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1516 cached_plan
1517 } else {
1518 let logical_plan = cypher::translate(query)?;
1520
1521 let mut binder = Binder::new();
1523 let _binding_context = binder.bind(&logical_plan)?;
1524
1525 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1527 let plan = optimizer.optimize(logical_plan)?;
1528
1529 self.query_cache.put_optimized(cache_key, plan.clone());
1531
1532 plan
1533 };
1534
1535 let has_mutations = optimized_plan.root.has_mutations();
1536
1537 self.with_auto_commit(has_mutations, || {
1538 let (viewing_epoch, tx_id) = self.get_transaction_context();
1540
1541 let planner = self.create_planner(viewing_epoch, tx_id);
1543 let mut physical_plan = planner.plan(&optimized_plan)?;
1544
1545 let executor = Executor::with_columns(physical_plan.columns.clone())
1547 .with_deadline(self.query_deadline());
1548 executor.execute(physical_plan.operator.as_mut())
1549 })
1550 }
1551
1552 #[cfg(feature = "gremlin")]
1576 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
1577 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
1578
1579 let logical_plan = gremlin::translate(query)?;
1581
1582 let mut binder = Binder::new();
1584 let _binding_context = binder.bind(&logical_plan)?;
1585
1586 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1588 let optimized_plan = optimizer.optimize(logical_plan)?;
1589
1590 let has_mutations = optimized_plan.root.has_mutations();
1591
1592 self.with_auto_commit(has_mutations, || {
1593 let (viewing_epoch, tx_id) = self.get_transaction_context();
1595
1596 let planner = self.create_planner(viewing_epoch, tx_id);
1598 let mut physical_plan = planner.plan(&optimized_plan)?;
1599
1600 let executor = Executor::with_columns(physical_plan.columns.clone())
1602 .with_deadline(self.query_deadline());
1603 executor.execute(physical_plan.operator.as_mut())
1604 })
1605 }
1606
1607 #[cfg(feature = "gremlin")]
1613 pub fn execute_gremlin_with_params(
1614 &self,
1615 query: &str,
1616 params: std::collections::HashMap<String, Value>,
1617 ) -> Result<QueryResult> {
1618 use crate::query::processor::{QueryLanguage, QueryProcessor};
1619
1620 let has_mutations = Self::query_looks_like_mutation(query);
1621
1622 self.with_auto_commit(has_mutations, || {
1623 let (viewing_epoch, tx_id) = self.get_transaction_context();
1625
1626 let processor = QueryProcessor::for_graph_store_with_tx(
1628 Arc::clone(&self.graph_store),
1629 Arc::clone(&self.tx_manager),
1630 );
1631
1632 let processor = if let Some(tx_id) = tx_id {
1634 processor.with_tx_context(viewing_epoch, tx_id)
1635 } else {
1636 processor
1637 };
1638
1639 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
1640 })
1641 }
1642
1643 #[cfg(feature = "graphql")]
1667 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
1668 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
1669
1670 let logical_plan = graphql::translate(query)?;
1672
1673 let mut binder = Binder::new();
1675 let _binding_context = binder.bind(&logical_plan)?;
1676
1677 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1679 let optimized_plan = optimizer.optimize(logical_plan)?;
1680
1681 let has_mutations = optimized_plan.root.has_mutations();
1682
1683 self.with_auto_commit(has_mutations, || {
1684 let (viewing_epoch, tx_id) = self.get_transaction_context();
1686
1687 let planner = self.create_planner(viewing_epoch, tx_id);
1689 let mut physical_plan = planner.plan(&optimized_plan)?;
1690
1691 let executor = Executor::with_columns(physical_plan.columns.clone())
1693 .with_deadline(self.query_deadline());
1694 executor.execute(physical_plan.operator.as_mut())
1695 })
1696 }
1697
1698 #[cfg(feature = "graphql")]
1704 pub fn execute_graphql_with_params(
1705 &self,
1706 query: &str,
1707 params: std::collections::HashMap<String, Value>,
1708 ) -> Result<QueryResult> {
1709 use crate::query::processor::{QueryLanguage, QueryProcessor};
1710
1711 let has_mutations = Self::query_looks_like_mutation(query);
1712
1713 self.with_auto_commit(has_mutations, || {
1714 let (viewing_epoch, tx_id) = self.get_transaction_context();
1716
1717 let processor = QueryProcessor::for_graph_store_with_tx(
1719 Arc::clone(&self.graph_store),
1720 Arc::clone(&self.tx_manager),
1721 );
1722
1723 let processor = if let Some(tx_id) = tx_id {
1725 processor.with_tx_context(viewing_epoch, tx_id)
1726 } else {
1727 processor
1728 };
1729
1730 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
1731 })
1732 }
1733
1734 #[cfg(feature = "sql-pgq")]
1759 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
1760 use crate::query::{
1761 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
1762 processor::QueryLanguage, translators::sql_pgq,
1763 };
1764
1765 let logical_plan = sql_pgq::translate(query)?;
1767
1768 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
1770 return Ok(QueryResult {
1771 columns: vec!["status".into()],
1772 column_types: vec![grafeo_common::types::LogicalType::String],
1773 rows: vec![vec![Value::from(format!(
1774 "Property graph '{}' created ({} node tables, {} edge tables)",
1775 cpg.name,
1776 cpg.node_tables.len(),
1777 cpg.edge_tables.len()
1778 ))]],
1779 execution_time_ms: None,
1780 rows_scanned: None,
1781 status_message: None,
1782 });
1783 }
1784
1785 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
1787
1788 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1790 cached_plan
1791 } else {
1792 let mut binder = Binder::new();
1794 let _binding_context = binder.bind(&logical_plan)?;
1795
1796 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1798 let plan = optimizer.optimize(logical_plan)?;
1799
1800 self.query_cache.put_optimized(cache_key, plan.clone());
1802
1803 plan
1804 };
1805
1806 let has_mutations = optimized_plan.root.has_mutations();
1807
1808 self.with_auto_commit(has_mutations, || {
1809 let (viewing_epoch, tx_id) = self.get_transaction_context();
1811
1812 let planner = self.create_planner(viewing_epoch, tx_id);
1814 let mut physical_plan = planner.plan(&optimized_plan)?;
1815
1816 let executor = Executor::with_columns(physical_plan.columns.clone())
1818 .with_deadline(self.query_deadline());
1819 executor.execute(physical_plan.operator.as_mut())
1820 })
1821 }
1822
1823 #[cfg(feature = "sql-pgq")]
1829 pub fn execute_sql_with_params(
1830 &self,
1831 query: &str,
1832 params: std::collections::HashMap<String, Value>,
1833 ) -> Result<QueryResult> {
1834 use crate::query::processor::{QueryLanguage, QueryProcessor};
1835
1836 let has_mutations = Self::query_looks_like_mutation(query);
1837
1838 self.with_auto_commit(has_mutations, || {
1839 let (viewing_epoch, tx_id) = self.get_transaction_context();
1841
1842 let processor = QueryProcessor::for_graph_store_with_tx(
1844 Arc::clone(&self.graph_store),
1845 Arc::clone(&self.tx_manager),
1846 );
1847
1848 let processor = if let Some(tx_id) = tx_id {
1850 processor.with_tx_context(viewing_epoch, tx_id)
1851 } else {
1852 processor
1853 };
1854
1855 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
1856 })
1857 }
1858
1859 #[cfg(all(feature = "sparql", feature = "rdf"))]
1865 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
1866 use crate::query::{
1867 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
1868 };
1869
1870 let logical_plan = sparql::translate(query)?;
1872
1873 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1875 let optimized_plan = optimizer.optimize(logical_plan)?;
1876
1877 let planner =
1879 RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(*self.current_tx.lock());
1880 let mut physical_plan = planner.plan(&optimized_plan)?;
1881
1882 let executor = Executor::with_columns(physical_plan.columns.clone())
1884 .with_deadline(self.query_deadline());
1885 executor.execute(physical_plan.operator.as_mut())
1886 }
1887
1888 #[cfg(all(feature = "sparql", feature = "rdf"))]
1894 pub fn execute_sparql_with_params(
1895 &self,
1896 query: &str,
1897 _params: std::collections::HashMap<String, Value>,
1898 ) -> Result<QueryResult> {
1899 self.execute_sparql(query)
1902 }
1903
1904 pub fn execute_language(
1913 &self,
1914 query: &str,
1915 language: &str,
1916 params: Option<std::collections::HashMap<String, Value>>,
1917 ) -> Result<QueryResult> {
1918 match language {
1919 "gql" => {
1920 if let Some(p) = params {
1921 self.execute_with_params(query, p)
1922 } else {
1923 self.execute(query)
1924 }
1925 }
1926 #[cfg(feature = "cypher")]
1927 "cypher" => {
1928 if let Some(p) = params {
1929 use crate::query::processor::{QueryLanguage, QueryProcessor};
1930 let has_mutations = Self::query_looks_like_mutation(query);
1931 self.with_auto_commit(has_mutations, || {
1932 let processor = QueryProcessor::for_graph_store_with_tx(
1933 Arc::clone(&self.graph_store),
1934 Arc::clone(&self.tx_manager),
1935 );
1936 let (viewing_epoch, tx_id) = self.get_transaction_context();
1937 let processor = if let Some(tx_id) = tx_id {
1938 processor.with_tx_context(viewing_epoch, tx_id)
1939 } else {
1940 processor
1941 };
1942 processor.process(query, QueryLanguage::Cypher, Some(&p))
1943 })
1944 } else {
1945 self.execute_cypher(query)
1946 }
1947 }
1948 #[cfg(feature = "gremlin")]
1949 "gremlin" => {
1950 if let Some(p) = params {
1951 self.execute_gremlin_with_params(query, p)
1952 } else {
1953 self.execute_gremlin(query)
1954 }
1955 }
1956 #[cfg(feature = "graphql")]
1957 "graphql" => {
1958 if let Some(p) = params {
1959 self.execute_graphql_with_params(query, p)
1960 } else {
1961 self.execute_graphql(query)
1962 }
1963 }
1964 #[cfg(feature = "sql-pgq")]
1965 "sql" | "sql-pgq" => {
1966 if let Some(p) = params {
1967 self.execute_sql_with_params(query, p)
1968 } else {
1969 self.execute_sql(query)
1970 }
1971 }
1972 #[cfg(all(feature = "sparql", feature = "rdf"))]
1973 "sparql" => {
1974 if let Some(p) = params {
1975 self.execute_sparql_with_params(query, p)
1976 } else {
1977 self.execute_sparql(query)
1978 }
1979 }
1980 other => Err(grafeo_common::utils::error::Error::Query(
1981 grafeo_common::utils::error::QueryError::new(
1982 grafeo_common::utils::error::QueryErrorKind::Semantic,
1983 format!("Unknown query language: '{other}'"),
1984 ),
1985 )),
1986 }
1987 }
1988
1989 pub fn begin_tx(&mut self) -> Result<()> {
2012 self.begin_tx_inner(false, None)
2013 }
2014
2015 pub fn begin_tx_with_isolation(
2023 &mut self,
2024 isolation_level: crate::transaction::IsolationLevel,
2025 ) -> Result<()> {
2026 self.begin_tx_inner(false, Some(isolation_level))
2027 }
2028
2029 fn begin_tx_inner(
2031 &self,
2032 read_only: bool,
2033 isolation_level: Option<crate::transaction::IsolationLevel>,
2034 ) -> Result<()> {
2035 let mut current = self.current_tx.lock();
2036 if current.is_some() {
2037 return Err(grafeo_common::utils::error::Error::Transaction(
2038 grafeo_common::utils::error::TransactionError::InvalidState(
2039 "Transaction already active".to_string(),
2040 ),
2041 ));
2042 }
2043
2044 self.tx_start_node_count
2045 .store(self.store.node_count(), Ordering::Relaxed);
2046 self.tx_start_edge_count
2047 .store(self.store.edge_count(), Ordering::Relaxed);
2048 let tx_id = if let Some(level) = isolation_level {
2049 self.tx_manager.begin_with_isolation(level)
2050 } else {
2051 self.tx_manager.begin()
2052 };
2053 *current = Some(tx_id);
2054 *self.read_only_tx.lock() = read_only;
2055 Ok(())
2056 }
2057
2058 pub fn commit(&mut self) -> Result<()> {
2066 self.commit_inner()
2067 }
2068
2069 fn commit_inner(&self) -> Result<()> {
2071 let tx_id = self.current_tx.lock().take().ok_or_else(|| {
2072 grafeo_common::utils::error::Error::Transaction(
2073 grafeo_common::utils::error::TransactionError::InvalidState(
2074 "No active transaction".to_string(),
2075 ),
2076 )
2077 })?;
2078
2079 #[cfg(feature = "rdf")]
2081 self.rdf_store.commit_tx(tx_id);
2082
2083 self.tx_manager.commit(tx_id)?;
2084
2085 self.store.sync_epoch(self.tx_manager.current_epoch());
2089
2090 *self.read_only_tx.lock() = false;
2092
2093 if self.gc_interval > 0 {
2095 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
2096 if count.is_multiple_of(self.gc_interval) {
2097 let min_epoch = self.tx_manager.min_active_epoch();
2098 self.store.gc_versions(min_epoch);
2099 self.tx_manager.gc();
2100 }
2101 }
2102
2103 Ok(())
2104 }
2105
2106 pub fn rollback(&mut self) -> Result<()> {
2130 self.rollback_inner()
2131 }
2132
2133 fn rollback_inner(&self) -> Result<()> {
2135 let tx_id = self.current_tx.lock().take().ok_or_else(|| {
2136 grafeo_common::utils::error::Error::Transaction(
2137 grafeo_common::utils::error::TransactionError::InvalidState(
2138 "No active transaction".to_string(),
2139 ),
2140 )
2141 })?;
2142
2143 *self.read_only_tx.lock() = false;
2145
2146 self.store.discard_uncommitted_versions(tx_id);
2148
2149 #[cfg(feature = "rdf")]
2151 self.rdf_store.rollback_tx(tx_id);
2152
2153 self.tx_manager.abort(tx_id)
2155 }
2156
2157 #[must_use]
2159 pub fn in_transaction(&self) -> bool {
2160 self.current_tx.lock().is_some()
2161 }
2162
2163 #[must_use]
2165 pub(crate) fn current_tx_id(&self) -> Option<TxId> {
2166 *self.current_tx.lock()
2167 }
2168
2169 #[must_use]
2171 pub(crate) fn tx_manager(&self) -> &TransactionManager {
2172 &self.tx_manager
2173 }
2174
2175 #[must_use]
2177 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
2178 (
2179 self.tx_start_node_count.load(Ordering::Relaxed),
2180 self.store.node_count(),
2181 )
2182 }
2183
2184 #[must_use]
2186 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
2187 (
2188 self.tx_start_edge_count.load(Ordering::Relaxed),
2189 self.store.edge_count(),
2190 )
2191 }
2192
2193 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
2227 crate::transaction::PreparedCommit::new(self)
2228 }
2229
2230 pub fn set_auto_commit(&mut self, auto_commit: bool) {
2232 self.auto_commit = auto_commit;
2233 }
2234
2235 #[must_use]
2237 pub fn auto_commit(&self) -> bool {
2238 self.auto_commit
2239 }
2240
2241 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
2246 self.auto_commit && has_mutations && self.current_tx.lock().is_none()
2247 }
2248
2249 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
2252 where
2253 F: FnOnce() -> Result<QueryResult>,
2254 {
2255 if self.needs_auto_commit(has_mutations) {
2256 self.begin_tx_inner(false, None)?;
2257 match body() {
2258 Ok(result) => {
2259 self.commit_inner()?;
2260 Ok(result)
2261 }
2262 Err(e) => {
2263 let _ = self.rollback_inner();
2264 Err(e)
2265 }
2266 }
2267 } else {
2268 body()
2269 }
2270 }
2271
2272 fn query_looks_like_mutation(query: &str) -> bool {
2278 let upper = query.to_ascii_uppercase();
2279 upper.contains("INSERT")
2280 || upper.contains("CREATE")
2281 || upper.contains("DELETE")
2282 || upper.contains("MERGE")
2283 || upper.contains("SET")
2284 || upper.contains("REMOVE")
2285 || upper.contains("DROP")
2286 || upper.contains("ALTER")
2287 }
2288
2289 #[must_use]
2291 fn query_deadline(&self) -> Option<Instant> {
2292 #[cfg(not(target_arch = "wasm32"))]
2293 {
2294 self.query_timeout.map(|d| Instant::now() + d)
2295 }
2296 #[cfg(target_arch = "wasm32")]
2297 {
2298 let _ = &self.query_timeout;
2299 None
2300 }
2301 }
2302
2303 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
2305 use grafeo_adapters::query::gql::ast::{Expression, Literal};
2306 match expr {
2307 Expression::Literal(Literal::Integer(n)) => Some(*n),
2308 _ => None,
2309 }
2310 }
2311
2312 #[must_use]
2318 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
2319 if let Some(epoch) = *self.viewing_epoch_override.lock() {
2321 return (epoch, None);
2322 }
2323
2324 if let Some(tx_id) = *self.current_tx.lock() {
2325 let epoch = self
2327 .tx_manager
2328 .start_epoch(tx_id)
2329 .unwrap_or_else(|| self.tx_manager.current_epoch());
2330 (epoch, Some(tx_id))
2331 } else {
2332 (self.tx_manager.current_epoch(), None)
2334 }
2335 }
2336
2337 fn create_planner(&self, viewing_epoch: EpochId, tx_id: Option<TxId>) -> crate::query::Planner {
2339 use crate::query::Planner;
2340
2341 let mut planner = Planner::with_context(
2342 Arc::clone(&self.graph_store),
2343 Arc::clone(&self.tx_manager),
2344 tx_id,
2345 viewing_epoch,
2346 )
2347 .with_factorized_execution(self.factorized_execution)
2348 .with_catalog(Arc::clone(&self.catalog));
2349
2350 let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog));
2352 planner = planner.with_validator(Arc::new(validator));
2353
2354 planner
2355 }
2356
2357 pub fn create_node(&self, labels: &[&str]) -> NodeId {
2362 let (epoch, tx_id) = self.get_transaction_context();
2363 self.store
2364 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2365 }
2366
2367 pub fn create_node_with_props<'a>(
2371 &self,
2372 labels: &[&str],
2373 properties: impl IntoIterator<Item = (&'a str, Value)>,
2374 ) -> NodeId {
2375 let (epoch, tx_id) = self.get_transaction_context();
2376 self.store.create_node_with_props_versioned(
2377 labels,
2378 properties,
2379 epoch,
2380 tx_id.unwrap_or(TxId::SYSTEM),
2381 )
2382 }
2383
2384 pub fn create_edge(
2389 &self,
2390 src: NodeId,
2391 dst: NodeId,
2392 edge_type: &str,
2393 ) -> grafeo_common::types::EdgeId {
2394 let (epoch, tx_id) = self.get_transaction_context();
2395 self.store
2396 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2397 }
2398
2399 #[must_use]
2427 pub fn get_node(&self, id: NodeId) -> Option<Node> {
2428 let (epoch, tx_id) = self.get_transaction_context();
2429 self.store
2430 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2431 }
2432
2433 #[must_use]
2457 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
2458 self.get_node(id)
2459 .and_then(|node| node.get_property(key).cloned())
2460 }
2461
2462 #[must_use]
2469 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
2470 let (epoch, tx_id) = self.get_transaction_context();
2471 self.store
2472 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2473 }
2474
2475 #[must_use]
2501 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2502 self.store.edges_from(node, Direction::Outgoing).collect()
2503 }
2504
2505 #[must_use]
2514 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2515 self.store.edges_from(node, Direction::Incoming).collect()
2516 }
2517
2518 #[must_use]
2530 pub fn get_neighbors_outgoing_by_type(
2531 &self,
2532 node: NodeId,
2533 edge_type: &str,
2534 ) -> Vec<(NodeId, EdgeId)> {
2535 self.store
2536 .edges_from(node, Direction::Outgoing)
2537 .filter(|(_, edge_id)| {
2538 self.get_edge(*edge_id)
2539 .is_some_and(|e| e.edge_type.as_str() == edge_type)
2540 })
2541 .collect()
2542 }
2543
2544 #[must_use]
2551 pub fn node_exists(&self, id: NodeId) -> bool {
2552 self.get_node(id).is_some()
2553 }
2554
2555 #[must_use]
2557 pub fn edge_exists(&self, id: EdgeId) -> bool {
2558 self.get_edge(id).is_some()
2559 }
2560
2561 #[must_use]
2565 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
2566 let out = self.store.out_degree(node);
2567 let in_degree = self.store.in_degree(node);
2568 (out, in_degree)
2569 }
2570
2571 #[must_use]
2581 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
2582 let (epoch, tx_id) = self.get_transaction_context();
2583 let tx = tx_id.unwrap_or(TxId::SYSTEM);
2584 ids.iter()
2585 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
2586 .collect()
2587 }
2588
2589 #[cfg(feature = "cdc")]
2593 pub fn history(
2594 &self,
2595 entity_id: impl Into<crate::cdc::EntityId>,
2596 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2597 Ok(self.cdc_log.history(entity_id.into()))
2598 }
2599
2600 #[cfg(feature = "cdc")]
2602 pub fn history_since(
2603 &self,
2604 entity_id: impl Into<crate::cdc::EntityId>,
2605 since_epoch: EpochId,
2606 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2607 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
2608 }
2609
2610 #[cfg(feature = "cdc")]
2612 pub fn changes_between(
2613 &self,
2614 start_epoch: EpochId,
2615 end_epoch: EpochId,
2616 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2617 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
2618 }
2619}
2620
2621#[cfg(test)]
2622mod tests {
2623 use crate::database::GrafeoDB;
2624
2625 #[test]
2626 fn test_session_create_node() {
2627 let db = GrafeoDB::new_in_memory();
2628 let session = db.session();
2629
2630 let id = session.create_node(&["Person"]);
2631 assert!(id.is_valid());
2632 assert_eq!(db.node_count(), 1);
2633 }
2634
2635 #[test]
2636 fn test_session_transaction() {
2637 let db = GrafeoDB::new_in_memory();
2638 let mut session = db.session();
2639
2640 assert!(!session.in_transaction());
2641
2642 session.begin_tx().unwrap();
2643 assert!(session.in_transaction());
2644
2645 session.commit().unwrap();
2646 assert!(!session.in_transaction());
2647 }
2648
2649 #[test]
2650 fn test_session_transaction_context() {
2651 let db = GrafeoDB::new_in_memory();
2652 let mut session = db.session();
2653
2654 let (_epoch1, tx_id1) = session.get_transaction_context();
2656 assert!(tx_id1.is_none());
2657
2658 session.begin_tx().unwrap();
2660 let (epoch2, tx_id2) = session.get_transaction_context();
2661 assert!(tx_id2.is_some());
2662 let _ = epoch2; session.commit().unwrap();
2667 let (epoch3, tx_id3) = session.get_transaction_context();
2668 assert!(tx_id3.is_none());
2669 assert!(epoch3.as_u64() >= epoch2.as_u64());
2671 }
2672
2673 #[test]
2674 fn test_session_rollback() {
2675 let db = GrafeoDB::new_in_memory();
2676 let mut session = db.session();
2677
2678 session.begin_tx().unwrap();
2679 session.rollback().unwrap();
2680 assert!(!session.in_transaction());
2681 }
2682
2683 #[test]
2684 fn test_session_rollback_discards_versions() {
2685 use grafeo_common::types::TxId;
2686
2687 let db = GrafeoDB::new_in_memory();
2688
2689 let node_before = db.store().create_node(&["Person"]);
2691 assert!(node_before.is_valid());
2692 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2693
2694 let mut session = db.session();
2696 session.begin_tx().unwrap();
2697 let tx_id = session.current_tx.lock().unwrap();
2698
2699 let epoch = db.store().current_epoch();
2701 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
2702 assert!(node_in_tx.is_valid());
2703
2704 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2706
2707 session.rollback().unwrap();
2709 assert!(!session.in_transaction());
2710
2711 let count_after = db.node_count();
2714 assert_eq!(
2715 count_after, 1,
2716 "Rollback should discard uncommitted node, but got {count_after}"
2717 );
2718
2719 let current_epoch = db.store().current_epoch();
2721 assert!(
2722 db.store()
2723 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
2724 .is_some(),
2725 "Original node should still exist"
2726 );
2727
2728 assert!(
2730 db.store()
2731 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
2732 .is_none(),
2733 "Transaction node should be gone"
2734 );
2735 }
2736
2737 #[test]
2738 fn test_session_create_node_in_transaction() {
2739 let db = GrafeoDB::new_in_memory();
2741
2742 let node_before = db.create_node(&["Person"]);
2744 assert!(node_before.is_valid());
2745 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2746
2747 let mut session = db.session();
2749 session.begin_tx().unwrap();
2750
2751 let node_in_tx = session.create_node(&["Person"]);
2753 assert!(node_in_tx.is_valid());
2754
2755 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2757
2758 session.rollback().unwrap();
2760
2761 let count_after = db.node_count();
2763 assert_eq!(
2764 count_after, 1,
2765 "Rollback should discard node created via session.create_node(), but got {count_after}"
2766 );
2767 }
2768
2769 #[test]
2770 fn test_session_create_node_with_props_in_transaction() {
2771 use grafeo_common::types::Value;
2772
2773 let db = GrafeoDB::new_in_memory();
2775
2776 db.create_node(&["Person"]);
2778 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2779
2780 let mut session = db.session();
2782 session.begin_tx().unwrap();
2783
2784 let node_in_tx =
2785 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
2786 assert!(node_in_tx.is_valid());
2787
2788 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2790
2791 session.rollback().unwrap();
2793
2794 let count_after = db.node_count();
2796 assert_eq!(
2797 count_after, 1,
2798 "Rollback should discard node created via session.create_node_with_props()"
2799 );
2800 }
2801
2802 #[cfg(feature = "gql")]
2803 mod gql_tests {
2804 use super::*;
2805
2806 #[test]
2807 fn test_gql_query_execution() {
2808 let db = GrafeoDB::new_in_memory();
2809 let session = db.session();
2810
2811 session.create_node(&["Person"]);
2813 session.create_node(&["Person"]);
2814 session.create_node(&["Animal"]);
2815
2816 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
2818
2819 assert_eq!(result.row_count(), 2);
2821 assert_eq!(result.column_count(), 1);
2822 assert_eq!(result.columns[0], "n");
2823 }
2824
2825 #[test]
2826 fn test_gql_empty_result() {
2827 let db = GrafeoDB::new_in_memory();
2828 let session = db.session();
2829
2830 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
2832
2833 assert_eq!(result.row_count(), 0);
2834 }
2835
2836 #[test]
2837 fn test_gql_parse_error() {
2838 let db = GrafeoDB::new_in_memory();
2839 let session = db.session();
2840
2841 let result = session.execute("MATCH (n RETURN n");
2843
2844 assert!(result.is_err());
2845 }
2846
2847 #[test]
2848 fn test_gql_relationship_traversal() {
2849 let db = GrafeoDB::new_in_memory();
2850 let session = db.session();
2851
2852 let alix = session.create_node(&["Person"]);
2854 let gus = session.create_node(&["Person"]);
2855 let vincent = session.create_node(&["Person"]);
2856
2857 session.create_edge(alix, gus, "KNOWS");
2858 session.create_edge(alix, vincent, "KNOWS");
2859
2860 let result = session
2862 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
2863 .unwrap();
2864
2865 assert_eq!(result.row_count(), 2);
2867 assert_eq!(result.column_count(), 2);
2868 assert_eq!(result.columns[0], "a");
2869 assert_eq!(result.columns[1], "b");
2870 }
2871
2872 #[test]
2873 fn test_gql_relationship_with_type_filter() {
2874 let db = GrafeoDB::new_in_memory();
2875 let session = db.session();
2876
2877 let alix = session.create_node(&["Person"]);
2879 let gus = session.create_node(&["Person"]);
2880 let vincent = session.create_node(&["Person"]);
2881
2882 session.create_edge(alix, gus, "KNOWS");
2883 session.create_edge(alix, vincent, "WORKS_WITH");
2884
2885 let result = session
2887 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
2888 .unwrap();
2889
2890 assert_eq!(result.row_count(), 1);
2892 }
2893
2894 #[test]
2895 fn test_gql_semantic_error_undefined_variable() {
2896 let db = GrafeoDB::new_in_memory();
2897 let session = db.session();
2898
2899 let result = session.execute("MATCH (n:Person) RETURN x");
2901
2902 assert!(result.is_err());
2904 let Err(err) = result else {
2905 panic!("Expected error")
2906 };
2907 assert!(
2908 err.to_string().contains("Undefined variable"),
2909 "Expected undefined variable error, got: {}",
2910 err
2911 );
2912 }
2913
2914 #[test]
2915 fn test_gql_where_clause_property_filter() {
2916 use grafeo_common::types::Value;
2917
2918 let db = GrafeoDB::new_in_memory();
2919 let session = db.session();
2920
2921 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
2923 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
2924 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
2925
2926 let result = session
2928 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
2929 .unwrap();
2930
2931 assert_eq!(result.row_count(), 2);
2933 }
2934
2935 #[test]
2936 fn test_gql_where_clause_equality() {
2937 use grafeo_common::types::Value;
2938
2939 let db = GrafeoDB::new_in_memory();
2940 let session = db.session();
2941
2942 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
2944 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
2945 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
2946
2947 let result = session
2949 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
2950 .unwrap();
2951
2952 assert_eq!(result.row_count(), 2);
2954 }
2955
2956 #[test]
2957 fn test_gql_return_property_access() {
2958 use grafeo_common::types::Value;
2959
2960 let db = GrafeoDB::new_in_memory();
2961 let session = db.session();
2962
2963 session.create_node_with_props(
2965 &["Person"],
2966 [
2967 ("name", Value::String("Alix".into())),
2968 ("age", Value::Int64(30)),
2969 ],
2970 );
2971 session.create_node_with_props(
2972 &["Person"],
2973 [
2974 ("name", Value::String("Gus".into())),
2975 ("age", Value::Int64(25)),
2976 ],
2977 );
2978
2979 let result = session
2981 .execute("MATCH (n:Person) RETURN n.name, n.age")
2982 .unwrap();
2983
2984 assert_eq!(result.row_count(), 2);
2986 assert_eq!(result.column_count(), 2);
2987 assert_eq!(result.columns[0], "n.name");
2988 assert_eq!(result.columns[1], "n.age");
2989
2990 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
2992 assert!(names.contains(&&Value::String("Alix".into())));
2993 assert!(names.contains(&&Value::String("Gus".into())));
2994 }
2995
2996 #[test]
2997 fn test_gql_return_mixed_expressions() {
2998 use grafeo_common::types::Value;
2999
3000 let db = GrafeoDB::new_in_memory();
3001 let session = db.session();
3002
3003 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3005
3006 let result = session
3008 .execute("MATCH (n:Person) RETURN n, n.name")
3009 .unwrap();
3010
3011 assert_eq!(result.row_count(), 1);
3012 assert_eq!(result.column_count(), 2);
3013 assert_eq!(result.columns[0], "n");
3014 assert_eq!(result.columns[1], "n.name");
3015
3016 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
3018 }
3019 }
3020
3021 #[cfg(feature = "cypher")]
3022 mod cypher_tests {
3023 use super::*;
3024
3025 #[test]
3026 fn test_cypher_query_execution() {
3027 let db = GrafeoDB::new_in_memory();
3028 let session = db.session();
3029
3030 session.create_node(&["Person"]);
3032 session.create_node(&["Person"]);
3033 session.create_node(&["Animal"]);
3034
3035 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3037
3038 assert_eq!(result.row_count(), 2);
3040 assert_eq!(result.column_count(), 1);
3041 assert_eq!(result.columns[0], "n");
3042 }
3043
3044 #[test]
3045 fn test_cypher_empty_result() {
3046 let db = GrafeoDB::new_in_memory();
3047 let session = db.session();
3048
3049 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3051
3052 assert_eq!(result.row_count(), 0);
3053 }
3054
3055 #[test]
3056 fn test_cypher_parse_error() {
3057 let db = GrafeoDB::new_in_memory();
3058 let session = db.session();
3059
3060 let result = session.execute_cypher("MATCH (n RETURN n");
3062
3063 assert!(result.is_err());
3064 }
3065 }
3066
3067 mod direct_lookup_tests {
3070 use super::*;
3071 use grafeo_common::types::Value;
3072
3073 #[test]
3074 fn test_get_node() {
3075 let db = GrafeoDB::new_in_memory();
3076 let session = db.session();
3077
3078 let id = session.create_node(&["Person"]);
3079 let node = session.get_node(id);
3080
3081 assert!(node.is_some());
3082 let node = node.unwrap();
3083 assert_eq!(node.id, id);
3084 }
3085
3086 #[test]
3087 fn test_get_node_not_found() {
3088 use grafeo_common::types::NodeId;
3089
3090 let db = GrafeoDB::new_in_memory();
3091 let session = db.session();
3092
3093 let node = session.get_node(NodeId::new(9999));
3095 assert!(node.is_none());
3096 }
3097
3098 #[test]
3099 fn test_get_node_property() {
3100 let db = GrafeoDB::new_in_memory();
3101 let session = db.session();
3102
3103 let id = session
3104 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3105
3106 let name = session.get_node_property(id, "name");
3107 assert_eq!(name, Some(Value::String("Alix".into())));
3108
3109 let missing = session.get_node_property(id, "missing");
3111 assert!(missing.is_none());
3112 }
3113
3114 #[test]
3115 fn test_get_edge() {
3116 let db = GrafeoDB::new_in_memory();
3117 let session = db.session();
3118
3119 let alix = session.create_node(&["Person"]);
3120 let gus = session.create_node(&["Person"]);
3121 let edge_id = session.create_edge(alix, gus, "KNOWS");
3122
3123 let edge = session.get_edge(edge_id);
3124 assert!(edge.is_some());
3125 let edge = edge.unwrap();
3126 assert_eq!(edge.id, edge_id);
3127 assert_eq!(edge.src, alix);
3128 assert_eq!(edge.dst, gus);
3129 }
3130
3131 #[test]
3132 fn test_get_edge_not_found() {
3133 use grafeo_common::types::EdgeId;
3134
3135 let db = GrafeoDB::new_in_memory();
3136 let session = db.session();
3137
3138 let edge = session.get_edge(EdgeId::new(9999));
3139 assert!(edge.is_none());
3140 }
3141
3142 #[test]
3143 fn test_get_neighbors_outgoing() {
3144 let db = GrafeoDB::new_in_memory();
3145 let session = db.session();
3146
3147 let alix = session.create_node(&["Person"]);
3148 let gus = session.create_node(&["Person"]);
3149 let harm = session.create_node(&["Person"]);
3150
3151 session.create_edge(alix, gus, "KNOWS");
3152 session.create_edge(alix, harm, "KNOWS");
3153
3154 let neighbors = session.get_neighbors_outgoing(alix);
3155 assert_eq!(neighbors.len(), 2);
3156
3157 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3158 assert!(neighbor_ids.contains(&gus));
3159 assert!(neighbor_ids.contains(&harm));
3160 }
3161
3162 #[test]
3163 fn test_get_neighbors_incoming() {
3164 let db = GrafeoDB::new_in_memory();
3165 let session = db.session();
3166
3167 let alix = session.create_node(&["Person"]);
3168 let gus = session.create_node(&["Person"]);
3169 let harm = session.create_node(&["Person"]);
3170
3171 session.create_edge(gus, alix, "KNOWS");
3172 session.create_edge(harm, alix, "KNOWS");
3173
3174 let neighbors = session.get_neighbors_incoming(alix);
3175 assert_eq!(neighbors.len(), 2);
3176
3177 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3178 assert!(neighbor_ids.contains(&gus));
3179 assert!(neighbor_ids.contains(&harm));
3180 }
3181
3182 #[test]
3183 fn test_get_neighbors_outgoing_by_type() {
3184 let db = GrafeoDB::new_in_memory();
3185 let session = db.session();
3186
3187 let alix = session.create_node(&["Person"]);
3188 let gus = session.create_node(&["Person"]);
3189 let company = session.create_node(&["Company"]);
3190
3191 session.create_edge(alix, gus, "KNOWS");
3192 session.create_edge(alix, company, "WORKS_AT");
3193
3194 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3195 assert_eq!(knows_neighbors.len(), 1);
3196 assert_eq!(knows_neighbors[0].0, gus);
3197
3198 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
3199 assert_eq!(works_neighbors.len(), 1);
3200 assert_eq!(works_neighbors[0].0, company);
3201
3202 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
3204 assert!(no_neighbors.is_empty());
3205 }
3206
3207 #[test]
3208 fn test_node_exists() {
3209 use grafeo_common::types::NodeId;
3210
3211 let db = GrafeoDB::new_in_memory();
3212 let session = db.session();
3213
3214 let id = session.create_node(&["Person"]);
3215
3216 assert!(session.node_exists(id));
3217 assert!(!session.node_exists(NodeId::new(9999)));
3218 }
3219
3220 #[test]
3221 fn test_edge_exists() {
3222 use grafeo_common::types::EdgeId;
3223
3224 let db = GrafeoDB::new_in_memory();
3225 let session = db.session();
3226
3227 let alix = session.create_node(&["Person"]);
3228 let gus = session.create_node(&["Person"]);
3229 let edge_id = session.create_edge(alix, gus, "KNOWS");
3230
3231 assert!(session.edge_exists(edge_id));
3232 assert!(!session.edge_exists(EdgeId::new(9999)));
3233 }
3234
3235 #[test]
3236 fn test_get_degree() {
3237 let db = GrafeoDB::new_in_memory();
3238 let session = db.session();
3239
3240 let alix = session.create_node(&["Person"]);
3241 let gus = session.create_node(&["Person"]);
3242 let harm = session.create_node(&["Person"]);
3243
3244 session.create_edge(alix, gus, "KNOWS");
3246 session.create_edge(alix, harm, "KNOWS");
3247 session.create_edge(gus, alix, "KNOWS");
3249
3250 let (out_degree, in_degree) = session.get_degree(alix);
3251 assert_eq!(out_degree, 2);
3252 assert_eq!(in_degree, 1);
3253
3254 let lonely = session.create_node(&["Person"]);
3256 let (out, in_deg) = session.get_degree(lonely);
3257 assert_eq!(out, 0);
3258 assert_eq!(in_deg, 0);
3259 }
3260
3261 #[test]
3262 fn test_get_nodes_batch() {
3263 let db = GrafeoDB::new_in_memory();
3264 let session = db.session();
3265
3266 let alix = session.create_node(&["Person"]);
3267 let gus = session.create_node(&["Person"]);
3268 let harm = session.create_node(&["Person"]);
3269
3270 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
3271 assert_eq!(nodes.len(), 3);
3272 assert!(nodes[0].is_some());
3273 assert!(nodes[1].is_some());
3274 assert!(nodes[2].is_some());
3275
3276 use grafeo_common::types::NodeId;
3278 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
3279 assert_eq!(nodes_with_missing.len(), 3);
3280 assert!(nodes_with_missing[0].is_some());
3281 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
3283 }
3284
3285 #[test]
3286 fn test_auto_commit_setting() {
3287 let db = GrafeoDB::new_in_memory();
3288 let mut session = db.session();
3289
3290 assert!(session.auto_commit());
3292
3293 session.set_auto_commit(false);
3294 assert!(!session.auto_commit());
3295
3296 session.set_auto_commit(true);
3297 assert!(session.auto_commit());
3298 }
3299
3300 #[test]
3301 fn test_transaction_double_begin_error() {
3302 let db = GrafeoDB::new_in_memory();
3303 let mut session = db.session();
3304
3305 session.begin_tx().unwrap();
3306 let result = session.begin_tx();
3307
3308 assert!(result.is_err());
3309 session.rollback().unwrap();
3311 }
3312
3313 #[test]
3314 fn test_commit_without_transaction_error() {
3315 let db = GrafeoDB::new_in_memory();
3316 let mut session = db.session();
3317
3318 let result = session.commit();
3319 assert!(result.is_err());
3320 }
3321
3322 #[test]
3323 fn test_rollback_without_transaction_error() {
3324 let db = GrafeoDB::new_in_memory();
3325 let mut session = db.session();
3326
3327 let result = session.rollback();
3328 assert!(result.is_err());
3329 }
3330
3331 #[test]
3332 fn test_create_edge_in_transaction() {
3333 let db = GrafeoDB::new_in_memory();
3334 let mut session = db.session();
3335
3336 let alix = session.create_node(&["Person"]);
3338 let gus = session.create_node(&["Person"]);
3339
3340 session.begin_tx().unwrap();
3342 let edge_id = session.create_edge(alix, gus, "KNOWS");
3343
3344 assert!(session.edge_exists(edge_id));
3346
3347 session.commit().unwrap();
3349
3350 assert!(session.edge_exists(edge_id));
3352 }
3353
3354 #[test]
3355 fn test_neighbors_empty_node() {
3356 let db = GrafeoDB::new_in_memory();
3357 let session = db.session();
3358
3359 let lonely = session.create_node(&["Person"]);
3360
3361 assert!(session.get_neighbors_outgoing(lonely).is_empty());
3362 assert!(session.get_neighbors_incoming(lonely).is_empty());
3363 assert!(
3364 session
3365 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
3366 .is_empty()
3367 );
3368 }
3369 }
3370
3371 #[test]
3372 fn test_auto_gc_triggers_on_commit_interval() {
3373 use crate::config::Config;
3374
3375 let config = Config::in_memory().with_gc_interval(2);
3376 let db = GrafeoDB::with_config(config).unwrap();
3377 let mut session = db.session();
3378
3379 session.begin_tx().unwrap();
3381 session.create_node(&["A"]);
3382 session.commit().unwrap();
3383
3384 session.begin_tx().unwrap();
3386 session.create_node(&["B"]);
3387 session.commit().unwrap();
3388
3389 assert_eq!(db.node_count(), 2);
3391 }
3392
3393 #[test]
3394 fn test_query_timeout_config_propagates_to_session() {
3395 use crate::config::Config;
3396 use std::time::Duration;
3397
3398 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
3399 let db = GrafeoDB::with_config(config).unwrap();
3400 let session = db.session();
3401
3402 assert!(session.query_deadline().is_some());
3404 }
3405
3406 #[test]
3407 fn test_no_query_timeout_returns_no_deadline() {
3408 let db = GrafeoDB::new_in_memory();
3409 let session = db.session();
3410
3411 assert!(session.query_deadline().is_none());
3413 }
3414
3415 #[test]
3416 fn test_graph_model_accessor() {
3417 use crate::config::GraphModel;
3418
3419 let db = GrafeoDB::new_in_memory();
3420 let session = db.session();
3421
3422 assert_eq!(session.graph_model(), GraphModel::Lpg);
3423 }
3424
3425 #[cfg(feature = "gql")]
3426 #[test]
3427 fn test_external_store_session() {
3428 use grafeo_core::graph::GraphStoreMut;
3429 use std::sync::Arc;
3430
3431 let config = crate::config::Config::in_memory();
3432 let store =
3433 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
3434 let db = GrafeoDB::with_store(store, config).unwrap();
3435
3436 let session = db.session();
3437
3438 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
3440
3441 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
3443 assert_eq!(result.row_count(), 1);
3444 }
3445
3446 #[cfg(feature = "gql")]
3449 mod session_command_tests {
3450 use super::*;
3451
3452 #[test]
3453 fn test_use_graph_sets_current_graph() {
3454 let db = GrafeoDB::new_in_memory();
3455 let session = db.session();
3456
3457 session.execute("CREATE GRAPH mydb").unwrap();
3459 session.execute("USE GRAPH mydb").unwrap();
3460
3461 assert_eq!(session.current_graph(), Some("mydb".to_string()));
3462 }
3463
3464 #[test]
3465 fn test_use_graph_nonexistent_errors() {
3466 let db = GrafeoDB::new_in_memory();
3467 let session = db.session();
3468
3469 let result = session.execute("USE GRAPH doesnotexist");
3470 assert!(result.is_err());
3471 let err = result.unwrap_err().to_string();
3472 assert!(
3473 err.contains("does not exist"),
3474 "Expected 'does not exist' error, got: {err}"
3475 );
3476 }
3477
3478 #[test]
3479 fn test_use_graph_default_always_valid() {
3480 let db = GrafeoDB::new_in_memory();
3481 let session = db.session();
3482
3483 session.execute("USE GRAPH default").unwrap();
3485 assert_eq!(session.current_graph(), Some("default".to_string()));
3486 }
3487
3488 #[test]
3489 fn test_session_set_graph() {
3490 let db = GrafeoDB::new_in_memory();
3491 let session = db.session();
3492
3493 session.execute("SESSION SET GRAPH analytics").unwrap();
3495 assert_eq!(session.current_graph(), Some("analytics".to_string()));
3496 }
3497
3498 #[test]
3499 fn test_session_set_time_zone() {
3500 let db = GrafeoDB::new_in_memory();
3501 let session = db.session();
3502
3503 assert_eq!(session.time_zone(), None);
3504
3505 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3506 assert_eq!(session.time_zone(), Some("UTC".to_string()));
3507
3508 session
3509 .execute("SESSION SET TIME ZONE 'America/New_York'")
3510 .unwrap();
3511 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
3512 }
3513
3514 #[test]
3515 fn test_session_set_parameter() {
3516 let db = GrafeoDB::new_in_memory();
3517 let session = db.session();
3518
3519 session
3520 .execute("SESSION SET PARAMETER $timeout = 30")
3521 .unwrap();
3522
3523 assert!(session.get_parameter("timeout").is_some());
3526 }
3527
3528 #[test]
3529 fn test_session_reset_clears_all_state() {
3530 let db = GrafeoDB::new_in_memory();
3531 let session = db.session();
3532
3533 session.execute("SESSION SET GRAPH analytics").unwrap();
3535 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3536 session
3537 .execute("SESSION SET PARAMETER $limit = 100")
3538 .unwrap();
3539
3540 assert!(session.current_graph().is_some());
3542 assert!(session.time_zone().is_some());
3543 assert!(session.get_parameter("limit").is_some());
3544
3545 session.execute("SESSION RESET").unwrap();
3547
3548 assert_eq!(session.current_graph(), None);
3549 assert_eq!(session.time_zone(), None);
3550 assert!(session.get_parameter("limit").is_none());
3551 }
3552
3553 #[test]
3554 fn test_session_close_clears_state() {
3555 let db = GrafeoDB::new_in_memory();
3556 let session = db.session();
3557
3558 session.execute("SESSION SET GRAPH analytics").unwrap();
3559 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3560
3561 session.execute("SESSION CLOSE").unwrap();
3562
3563 assert_eq!(session.current_graph(), None);
3564 assert_eq!(session.time_zone(), None);
3565 }
3566
3567 #[test]
3568 fn test_create_graph() {
3569 let db = GrafeoDB::new_in_memory();
3570 let session = db.session();
3571
3572 session.execute("CREATE GRAPH mydb").unwrap();
3573
3574 session.execute("USE GRAPH mydb").unwrap();
3576 assert_eq!(session.current_graph(), Some("mydb".to_string()));
3577 }
3578
3579 #[test]
3580 fn test_create_graph_duplicate_errors() {
3581 let db = GrafeoDB::new_in_memory();
3582 let session = db.session();
3583
3584 session.execute("CREATE GRAPH mydb").unwrap();
3585 let result = session.execute("CREATE GRAPH mydb");
3586
3587 assert!(result.is_err());
3588 let err = result.unwrap_err().to_string();
3589 assert!(
3590 err.contains("already exists"),
3591 "Expected 'already exists' error, got: {err}"
3592 );
3593 }
3594
3595 #[test]
3596 fn test_create_graph_if_not_exists() {
3597 let db = GrafeoDB::new_in_memory();
3598 let session = db.session();
3599
3600 session.execute("CREATE GRAPH mydb").unwrap();
3601 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
3603 }
3604
3605 #[test]
3606 fn test_drop_graph() {
3607 let db = GrafeoDB::new_in_memory();
3608 let session = db.session();
3609
3610 session.execute("CREATE GRAPH mydb").unwrap();
3611 session.execute("DROP GRAPH mydb").unwrap();
3612
3613 let result = session.execute("USE GRAPH mydb");
3615 assert!(result.is_err());
3616 }
3617
3618 #[test]
3619 fn test_drop_graph_nonexistent_errors() {
3620 let db = GrafeoDB::new_in_memory();
3621 let session = db.session();
3622
3623 let result = session.execute("DROP GRAPH nosuchgraph");
3624 assert!(result.is_err());
3625 let err = result.unwrap_err().to_string();
3626 assert!(
3627 err.contains("does not exist"),
3628 "Expected 'does not exist' error, got: {err}"
3629 );
3630 }
3631
3632 #[test]
3633 fn test_drop_graph_if_exists() {
3634 let db = GrafeoDB::new_in_memory();
3635 let session = db.session();
3636
3637 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
3639 }
3640
3641 #[test]
3642 fn test_start_transaction_via_gql() {
3643 let db = GrafeoDB::new_in_memory();
3644 let session = db.session();
3645
3646 session.execute("START TRANSACTION").unwrap();
3647 assert!(session.in_transaction());
3648 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3649 session.execute("COMMIT").unwrap();
3650 assert!(!session.in_transaction());
3651
3652 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3653 assert_eq!(result.rows.len(), 1);
3654 }
3655
3656 #[test]
3657 fn test_start_transaction_read_only_blocks_insert() {
3658 let db = GrafeoDB::new_in_memory();
3659 let session = db.session();
3660
3661 session.execute("START TRANSACTION READ ONLY").unwrap();
3662 let result = session.execute("INSERT (:Person {name: 'Alix'})");
3663 assert!(result.is_err());
3664 let err = result.unwrap_err().to_string();
3665 assert!(
3666 err.contains("read-only"),
3667 "Expected read-only error, got: {err}"
3668 );
3669 session.execute("ROLLBACK").unwrap();
3670 }
3671
3672 #[test]
3673 fn test_start_transaction_read_only_allows_reads() {
3674 let db = GrafeoDB::new_in_memory();
3675 let mut session = db.session();
3676 session.begin_tx().unwrap();
3677 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3678 session.commit().unwrap();
3679
3680 session.execute("START TRANSACTION READ ONLY").unwrap();
3681 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3682 assert_eq!(result.rows.len(), 1);
3683 session.execute("COMMIT").unwrap();
3684 }
3685
3686 #[test]
3687 fn test_rollback_via_gql() {
3688 let db = GrafeoDB::new_in_memory();
3689 let session = db.session();
3690
3691 session.execute("START TRANSACTION").unwrap();
3692 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3693 session.execute("ROLLBACK").unwrap();
3694
3695 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3696 assert!(result.rows.is_empty());
3697 }
3698
3699 #[test]
3700 fn test_start_transaction_with_isolation_level() {
3701 let db = GrafeoDB::new_in_memory();
3702 let session = db.session();
3703
3704 session
3705 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
3706 .unwrap();
3707 assert!(session.in_transaction());
3708 session.execute("ROLLBACK").unwrap();
3709 }
3710
3711 #[test]
3712 fn test_session_commands_return_empty_result() {
3713 let db = GrafeoDB::new_in_memory();
3714 let session = db.session();
3715
3716 let result = session.execute("SESSION SET GRAPH test").unwrap();
3717 assert_eq!(result.row_count(), 0);
3718 assert_eq!(result.column_count(), 0);
3719 }
3720
3721 #[test]
3722 fn test_current_graph_default_is_none() {
3723 let db = GrafeoDB::new_in_memory();
3724 let session = db.session();
3725
3726 assert_eq!(session.current_graph(), None);
3727 }
3728
3729 #[test]
3730 fn test_time_zone_default_is_none() {
3731 let db = GrafeoDB::new_in_memory();
3732 let session = db.session();
3733
3734 assert_eq!(session.time_zone(), None);
3735 }
3736
3737 #[test]
3738 fn test_session_state_independent_across_sessions() {
3739 let db = GrafeoDB::new_in_memory();
3740 let session1 = db.session();
3741 let session2 = db.session();
3742
3743 session1.execute("SESSION SET GRAPH first").unwrap();
3744 session2.execute("SESSION SET GRAPH second").unwrap();
3745
3746 assert_eq!(session1.current_graph(), Some("first".to_string()));
3747 assert_eq!(session2.current_graph(), Some("second".to_string()));
3748 }
3749 }
3750}