1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25pub struct Session {
31 store: Arc<LpgStore>,
33 graph_store: Arc<dyn GraphStoreMut>,
35 catalog: Arc<Catalog>,
37 #[cfg(feature = "rdf")]
39 rdf_store: Arc<RdfStore>,
40 tx_manager: Arc<TransactionManager>,
42 query_cache: Arc<QueryCache>,
44 current_tx: parking_lot::Mutex<Option<TxId>>,
48 read_only_tx: parking_lot::Mutex<bool>,
50 auto_commit: bool,
52 #[allow(dead_code)]
54 adaptive_config: AdaptiveConfig,
55 factorized_execution: bool,
57 graph_model: GraphModel,
59 query_timeout: Option<Duration>,
61 commit_counter: Arc<AtomicUsize>,
63 gc_interval: usize,
65 tx_start_node_count: AtomicUsize,
67 tx_start_edge_count: AtomicUsize,
69 #[cfg(feature = "wal")]
71 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
72 #[cfg(feature = "cdc")]
74 cdc_log: Arc<crate::cdc::CdcLog>,
75 current_graph: parking_lot::Mutex<Option<String>>,
77 time_zone: parking_lot::Mutex<Option<String>>,
79 session_params:
81 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
82 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
84}
85
86impl Session {
87 #[allow(dead_code, clippy::too_many_arguments)]
89 pub(crate) fn with_adaptive(
90 store: Arc<LpgStore>,
91 tx_manager: Arc<TransactionManager>,
92 query_cache: Arc<QueryCache>,
93 catalog: Arc<Catalog>,
94 adaptive_config: AdaptiveConfig,
95 factorized_execution: bool,
96 graph_model: GraphModel,
97 query_timeout: Option<Duration>,
98 commit_counter: Arc<AtomicUsize>,
99 gc_interval: usize,
100 ) -> Self {
101 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
102 Self {
103 store,
104 graph_store,
105 catalog,
106 #[cfg(feature = "rdf")]
107 rdf_store: Arc::new(RdfStore::new()),
108 tx_manager,
109 query_cache,
110 current_tx: parking_lot::Mutex::new(None),
111 read_only_tx: parking_lot::Mutex::new(false),
112 auto_commit: true,
113 adaptive_config,
114 factorized_execution,
115 graph_model,
116 query_timeout,
117 commit_counter,
118 gc_interval,
119 tx_start_node_count: AtomicUsize::new(0),
120 tx_start_edge_count: AtomicUsize::new(0),
121 #[cfg(feature = "wal")]
122 wal: None,
123 #[cfg(feature = "cdc")]
124 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
125 current_graph: parking_lot::Mutex::new(None),
126 time_zone: parking_lot::Mutex::new(None),
127 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
128 viewing_epoch_override: parking_lot::Mutex::new(None),
129 }
130 }
131
132 #[cfg(feature = "wal")]
134 pub(crate) fn set_wal(&mut self, wal: Arc<grafeo_adapters::storage::wal::LpgWal>) {
135 self.wal = Some(wal);
136 }
137
138 #[cfg(feature = "cdc")]
140 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
141 self.cdc_log = cdc_log;
142 }
143
144 #[cfg(feature = "rdf")]
146 #[allow(clippy::too_many_arguments)]
147 pub(crate) fn with_rdf_store_and_adaptive(
148 store: Arc<LpgStore>,
149 rdf_store: Arc<RdfStore>,
150 tx_manager: Arc<TransactionManager>,
151 query_cache: Arc<QueryCache>,
152 catalog: Arc<Catalog>,
153 adaptive_config: AdaptiveConfig,
154 factorized_execution: bool,
155 graph_model: GraphModel,
156 query_timeout: Option<Duration>,
157 commit_counter: Arc<AtomicUsize>,
158 gc_interval: usize,
159 ) -> Self {
160 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
161 Self {
162 store,
163 graph_store,
164 catalog,
165 rdf_store,
166 tx_manager,
167 query_cache,
168 current_tx: parking_lot::Mutex::new(None),
169 read_only_tx: parking_lot::Mutex::new(false),
170 auto_commit: true,
171 adaptive_config,
172 factorized_execution,
173 graph_model,
174 query_timeout,
175 commit_counter,
176 gc_interval,
177 tx_start_node_count: AtomicUsize::new(0),
178 tx_start_edge_count: AtomicUsize::new(0),
179 #[cfg(feature = "wal")]
180 wal: None,
181 #[cfg(feature = "cdc")]
182 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
183 current_graph: parking_lot::Mutex::new(None),
184 time_zone: parking_lot::Mutex::new(None),
185 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
186 viewing_epoch_override: parking_lot::Mutex::new(None),
187 }
188 }
189
190 #[allow(clippy::too_many_arguments)]
195 pub(crate) fn with_external_store(
196 store: Arc<dyn GraphStoreMut>,
197 tx_manager: Arc<TransactionManager>,
198 query_cache: Arc<QueryCache>,
199 catalog: Arc<Catalog>,
200 adaptive_config: AdaptiveConfig,
201 factorized_execution: bool,
202 graph_model: GraphModel,
203 query_timeout: Option<Duration>,
204 commit_counter: Arc<AtomicUsize>,
205 gc_interval: usize,
206 ) -> Self {
207 Self {
208 store: Arc::new(LpgStore::new()), graph_store: store,
210 catalog,
211 #[cfg(feature = "rdf")]
212 rdf_store: Arc::new(RdfStore::new()),
213 tx_manager,
214 query_cache,
215 current_tx: parking_lot::Mutex::new(None),
216 read_only_tx: parking_lot::Mutex::new(false),
217 auto_commit: true,
218 adaptive_config,
219 factorized_execution,
220 graph_model,
221 query_timeout,
222 commit_counter,
223 gc_interval,
224 tx_start_node_count: AtomicUsize::new(0),
225 tx_start_edge_count: AtomicUsize::new(0),
226 #[cfg(feature = "wal")]
227 wal: None,
228 #[cfg(feature = "cdc")]
229 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
230 current_graph: parking_lot::Mutex::new(None),
231 time_zone: parking_lot::Mutex::new(None),
232 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
233 viewing_epoch_override: parking_lot::Mutex::new(None),
234 }
235 }
236
237 #[must_use]
239 pub fn graph_model(&self) -> GraphModel {
240 self.graph_model
241 }
242
243 pub fn use_graph(&self, name: &str) {
247 *self.current_graph.lock() = Some(name.to_string());
248 }
249
250 #[must_use]
252 pub fn current_graph(&self) -> Option<String> {
253 self.current_graph.lock().clone()
254 }
255
256 pub fn set_time_zone(&self, tz: &str) {
258 *self.time_zone.lock() = Some(tz.to_string());
259 }
260
261 #[must_use]
263 pub fn time_zone(&self) -> Option<String> {
264 self.time_zone.lock().clone()
265 }
266
267 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
269 self.session_params.lock().insert(key.to_string(), value);
270 }
271
272 #[must_use]
274 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
275 self.session_params.lock().get(key).cloned()
276 }
277
278 pub fn reset_session(&self) {
280 *self.current_graph.lock() = None;
281 *self.time_zone.lock() = None;
282 self.session_params.lock().clear();
283 *self.viewing_epoch_override.lock() = None;
284 }
285
286 pub fn set_viewing_epoch(&self, epoch: EpochId) {
294 *self.viewing_epoch_override.lock() = Some(epoch);
295 }
296
297 pub fn clear_viewing_epoch(&self) {
299 *self.viewing_epoch_override.lock() = None;
300 }
301
302 #[must_use]
304 pub fn viewing_epoch(&self) -> Option<EpochId> {
305 *self.viewing_epoch_override.lock()
306 }
307
308 #[must_use]
312 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
313 self.store.get_node_history(id)
314 }
315
316 #[must_use]
320 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
321 self.store.get_edge_history(id)
322 }
323
324 fn require_lpg(&self, language: &str) -> Result<()> {
326 if self.graph_model == GraphModel::Rdf {
327 return Err(grafeo_common::utils::error::Error::Internal(format!(
328 "This is an RDF database. {language} queries require an LPG database."
329 )));
330 }
331 Ok(())
332 }
333
334 #[cfg(feature = "gql")]
336 fn execute_session_command(
337 &self,
338 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
339 ) -> Result<QueryResult> {
340 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
341 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
342
343 match cmd {
344 SessionCommand::CreateGraph {
345 name,
346 if_not_exists,
347 typed,
348 } => {
349 let created = self.store.create_graph(&name);
350 if !created && !if_not_exists {
351 return Err(Error::Query(QueryError::new(
352 QueryErrorKind::Semantic,
353 format!("Graph '{name}' already exists"),
354 )));
355 }
356 if let Some(type_name) = typed
358 && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
359 {
360 return Err(Error::Query(QueryError::new(
361 QueryErrorKind::Semantic,
362 e.to_string(),
363 )));
364 }
365 Ok(QueryResult::empty())
366 }
367 SessionCommand::DropGraph { name, if_exists } => {
368 let dropped = self.store.drop_graph(&name);
369 if !dropped && !if_exists {
370 return Err(Error::Query(QueryError::new(
371 QueryErrorKind::Semantic,
372 format!("Graph '{name}' does not exist"),
373 )));
374 }
375 Ok(QueryResult::empty())
376 }
377 SessionCommand::UseGraph(name) => {
378 if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
380 return Err(Error::Query(QueryError::new(
381 QueryErrorKind::Semantic,
382 format!("Graph '{name}' does not exist"),
383 )));
384 }
385 self.use_graph(&name);
386 Ok(QueryResult::empty())
387 }
388 SessionCommand::SessionSetGraph(name) => {
389 self.use_graph(&name);
390 Ok(QueryResult::empty())
391 }
392 SessionCommand::SessionSetTimeZone(tz) => {
393 self.set_time_zone(&tz);
394 Ok(QueryResult::empty())
395 }
396 SessionCommand::SessionSetParameter(key, expr) => {
397 if key.eq_ignore_ascii_case("viewing_epoch") {
398 match Self::eval_integer_literal(&expr) {
399 Some(n) if n >= 0 => {
400 self.set_viewing_epoch(EpochId::new(n as u64));
401 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
402 }
403 _ => Err(Error::Query(QueryError::new(
404 QueryErrorKind::Semantic,
405 "viewing_epoch must be a non-negative integer literal",
406 ))),
407 }
408 } else {
409 self.set_parameter(&key, Value::Null);
412 Ok(QueryResult::empty())
413 }
414 }
415 SessionCommand::SessionReset => {
416 self.reset_session();
417 Ok(QueryResult::empty())
418 }
419 SessionCommand::SessionClose => {
420 self.reset_session();
421 Ok(QueryResult::empty())
422 }
423 SessionCommand::StartTransaction {
424 read_only,
425 isolation_level,
426 } => {
427 let engine_level = isolation_level.map(|l| match l {
428 TransactionIsolationLevel::ReadCommitted => {
429 crate::transaction::IsolationLevel::ReadCommitted
430 }
431 TransactionIsolationLevel::SnapshotIsolation => {
432 crate::transaction::IsolationLevel::SnapshotIsolation
433 }
434 TransactionIsolationLevel::Serializable => {
435 crate::transaction::IsolationLevel::Serializable
436 }
437 });
438 self.begin_tx_inner(read_only, engine_level)?;
439 Ok(QueryResult::status("Transaction started"))
440 }
441 SessionCommand::Commit => {
442 self.commit_inner()?;
443 Ok(QueryResult::status("Transaction committed"))
444 }
445 SessionCommand::Rollback => {
446 self.rollback_inner()?;
447 Ok(QueryResult::status("Transaction rolled back"))
448 }
449 }
450 }
451
452 #[cfg(feature = "wal")]
454 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
455 if let Some(ref wal) = self.wal
456 && let Err(e) = wal.log(record)
457 {
458 tracing::warn!("Failed to log schema change to WAL: {}", e);
459 }
460 }
461
462 #[cfg(feature = "gql")]
464 fn execute_schema_command(
465 &self,
466 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
467 ) -> Result<QueryResult> {
468 use crate::catalog::{
469 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
470 };
471 use grafeo_adapters::query::gql::ast::SchemaStatement;
472 #[cfg(feature = "wal")]
473 use grafeo_adapters::storage::wal::WalRecord;
474 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
475
476 macro_rules! wal_log {
478 ($self:expr, $record:expr) => {
479 #[cfg(feature = "wal")]
480 $self.log_schema_wal(&$record);
481 };
482 }
483
484 match cmd {
485 SchemaStatement::CreateNodeType(stmt) => {
486 #[cfg(feature = "wal")]
487 let props_for_wal: Vec<(String, String, bool)> = stmt
488 .properties
489 .iter()
490 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
491 .collect();
492 let def = NodeTypeDefinition {
493 name: stmt.name.clone(),
494 properties: stmt
495 .properties
496 .iter()
497 .map(|p| TypedProperty {
498 name: p.name.clone(),
499 data_type: PropertyDataType::from_type_name(&p.data_type),
500 nullable: p.nullable,
501 default_value: None,
502 })
503 .collect(),
504 constraints: Vec::new(),
505 };
506 let result = if stmt.or_replace {
507 let _ = self.catalog.drop_node_type(&stmt.name);
508 self.catalog.register_node_type(def)
509 } else {
510 self.catalog.register_node_type(def)
511 };
512 match result {
513 Ok(()) => {
514 wal_log!(
515 self,
516 WalRecord::CreateNodeType {
517 name: stmt.name.clone(),
518 properties: props_for_wal,
519 constraints: Vec::new(),
520 }
521 );
522 Ok(QueryResult::status(format!(
523 "Created node type '{}'",
524 stmt.name
525 )))
526 }
527 Err(e) if stmt.if_not_exists => {
528 let _ = e;
529 Ok(QueryResult::status("No change"))
530 }
531 Err(e) => Err(Error::Query(QueryError::new(
532 QueryErrorKind::Semantic,
533 e.to_string(),
534 ))),
535 }
536 }
537 SchemaStatement::CreateEdgeType(stmt) => {
538 #[cfg(feature = "wal")]
539 let props_for_wal: Vec<(String, String, bool)> = stmt
540 .properties
541 .iter()
542 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
543 .collect();
544 let def = EdgeTypeDefinition {
545 name: stmt.name.clone(),
546 properties: stmt
547 .properties
548 .iter()
549 .map(|p| TypedProperty {
550 name: p.name.clone(),
551 data_type: PropertyDataType::from_type_name(&p.data_type),
552 nullable: p.nullable,
553 default_value: None,
554 })
555 .collect(),
556 constraints: Vec::new(),
557 };
558 let result = if stmt.or_replace {
559 let _ = self.catalog.drop_edge_type_def(&stmt.name);
560 self.catalog.register_edge_type_def(def)
561 } else {
562 self.catalog.register_edge_type_def(def)
563 };
564 match result {
565 Ok(()) => {
566 wal_log!(
567 self,
568 WalRecord::CreateEdgeType {
569 name: stmt.name.clone(),
570 properties: props_for_wal,
571 constraints: Vec::new(),
572 }
573 );
574 Ok(QueryResult::status(format!(
575 "Created edge type '{}'",
576 stmt.name
577 )))
578 }
579 Err(e) if stmt.if_not_exists => {
580 let _ = e;
581 Ok(QueryResult::status("No change"))
582 }
583 Err(e) => Err(Error::Query(QueryError::new(
584 QueryErrorKind::Semantic,
585 e.to_string(),
586 ))),
587 }
588 }
589 SchemaStatement::CreateVectorIndex(stmt) => {
590 Self::create_vector_index_on_store(
591 &self.store,
592 &stmt.node_label,
593 &stmt.property,
594 stmt.dimensions,
595 stmt.metric.as_deref(),
596 )?;
597 wal_log!(
598 self,
599 WalRecord::CreateIndex {
600 name: stmt.name.clone(),
601 label: stmt.node_label.clone(),
602 property: stmt.property.clone(),
603 index_type: "vector".to_string(),
604 }
605 );
606 Ok(QueryResult::status(format!(
607 "Created vector index '{}'",
608 stmt.name
609 )))
610 }
611 SchemaStatement::DropNodeType { name, if_exists } => {
612 match self.catalog.drop_node_type(&name) {
613 Ok(()) => {
614 wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
615 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
616 }
617 Err(e) if if_exists => {
618 let _ = e;
619 Ok(QueryResult::status("No change"))
620 }
621 Err(e) => Err(Error::Query(QueryError::new(
622 QueryErrorKind::Semantic,
623 e.to_string(),
624 ))),
625 }
626 }
627 SchemaStatement::DropEdgeType { name, if_exists } => {
628 match self.catalog.drop_edge_type_def(&name) {
629 Ok(()) => {
630 wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
631 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
632 }
633 Err(e) if if_exists => {
634 let _ = e;
635 Ok(QueryResult::status("No change"))
636 }
637 Err(e) => Err(Error::Query(QueryError::new(
638 QueryErrorKind::Semantic,
639 e.to_string(),
640 ))),
641 }
642 }
643 SchemaStatement::CreateIndex(stmt) => {
644 use grafeo_adapters::query::gql::ast::IndexKind;
645 let index_type_str = match stmt.index_kind {
646 IndexKind::Property => "property",
647 IndexKind::BTree => "btree",
648 IndexKind::Text => "text",
649 IndexKind::Vector => "vector",
650 };
651 match stmt.index_kind {
652 IndexKind::Property | IndexKind::BTree => {
653 for prop in &stmt.properties {
654 self.store.create_property_index(prop);
655 }
656 }
657 IndexKind::Text => {
658 for prop in &stmt.properties {
659 Self::create_text_index_on_store(&self.store, &stmt.label, prop)?;
660 }
661 }
662 IndexKind::Vector => {
663 for prop in &stmt.properties {
664 Self::create_vector_index_on_store(
665 &self.store,
666 &stmt.label,
667 prop,
668 stmt.options.dimensions,
669 stmt.options.metric.as_deref(),
670 )?;
671 }
672 }
673 }
674 #[cfg(feature = "wal")]
675 for prop in &stmt.properties {
676 wal_log!(
677 self,
678 WalRecord::CreateIndex {
679 name: stmt.name.clone(),
680 label: stmt.label.clone(),
681 property: prop.clone(),
682 index_type: index_type_str.to_string(),
683 }
684 );
685 }
686 Ok(QueryResult::status(format!(
687 "Created {} index '{}'",
688 index_type_str, stmt.name
689 )))
690 }
691 SchemaStatement::DropIndex { name, if_exists } => {
692 let dropped = self.store.drop_property_index(&name);
694 if dropped || if_exists {
695 if dropped {
696 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
697 }
698 Ok(QueryResult::status(if dropped {
699 format!("Dropped index '{name}'")
700 } else {
701 "No change".to_string()
702 }))
703 } else {
704 Err(Error::Query(QueryError::new(
705 QueryErrorKind::Semantic,
706 format!("Index '{name}' does not exist"),
707 )))
708 }
709 }
710 SchemaStatement::CreateConstraint(stmt) => {
711 use grafeo_adapters::query::gql::ast::ConstraintKind;
712 let kind_str = match stmt.constraint_kind {
713 ConstraintKind::Unique => "unique",
714 ConstraintKind::NodeKey => "node_key",
715 ConstraintKind::NotNull => "not_null",
716 ConstraintKind::Exists => "exists",
717 };
718 let constraint_name = stmt
719 .name
720 .clone()
721 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
722 wal_log!(
723 self,
724 WalRecord::CreateConstraint {
725 name: constraint_name.clone(),
726 label: stmt.label.clone(),
727 properties: stmt.properties.clone(),
728 kind: kind_str.to_string(),
729 }
730 );
731 Ok(QueryResult::status(format!(
732 "Created {kind_str} constraint '{constraint_name}'"
733 )))
734 }
735 SchemaStatement::DropConstraint { name, if_exists } => {
736 let _ = if_exists;
737 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
738 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
739 }
740 SchemaStatement::CreateGraphType(stmt) => {
741 use crate::catalog::GraphTypeDefinition;
742 let def = GraphTypeDefinition {
743 name: stmt.name.clone(),
744 allowed_node_types: stmt.node_types.clone(),
745 allowed_edge_types: stmt.edge_types.clone(),
746 open: stmt.open,
747 };
748 let result = if stmt.or_replace {
749 let _ = self.catalog.drop_graph_type(&stmt.name);
751 self.catalog.register_graph_type(def)
752 } else {
753 self.catalog.register_graph_type(def)
754 };
755 match result {
756 Ok(()) => {
757 wal_log!(
758 self,
759 WalRecord::CreateGraphType {
760 name: stmt.name.clone(),
761 node_types: stmt.node_types,
762 edge_types: stmt.edge_types,
763 open: stmt.open,
764 }
765 );
766 Ok(QueryResult::status(format!(
767 "Created graph type '{}'",
768 stmt.name
769 )))
770 }
771 Err(e) if stmt.if_not_exists => {
772 let _ = e;
773 Ok(QueryResult::status("No change"))
774 }
775 Err(e) => Err(Error::Query(QueryError::new(
776 QueryErrorKind::Semantic,
777 e.to_string(),
778 ))),
779 }
780 }
781 SchemaStatement::DropGraphType { name, if_exists } => {
782 match self.catalog.drop_graph_type(&name) {
783 Ok(()) => {
784 wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
785 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
786 }
787 Err(e) if if_exists => {
788 let _ = e;
789 Ok(QueryResult::status("No change"))
790 }
791 Err(e) => Err(Error::Query(QueryError::new(
792 QueryErrorKind::Semantic,
793 e.to_string(),
794 ))),
795 }
796 }
797 SchemaStatement::CreateSchema {
798 name,
799 if_not_exists,
800 } => match self.catalog.register_schema_namespace(name.clone()) {
801 Ok(()) => {
802 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
803 Ok(QueryResult::status(format!("Created schema '{name}'")))
804 }
805 Err(e) if if_not_exists => {
806 let _ = e;
807 Ok(QueryResult::status("No change"))
808 }
809 Err(e) => Err(Error::Query(QueryError::new(
810 QueryErrorKind::Semantic,
811 e.to_string(),
812 ))),
813 },
814 SchemaStatement::DropSchema { name, if_exists } => {
815 match self.catalog.drop_schema_namespace(&name) {
816 Ok(()) => {
817 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
818 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
819 }
820 Err(e) if if_exists => {
821 let _ = e;
822 Ok(QueryResult::status("No change"))
823 }
824 Err(e) => Err(Error::Query(QueryError::new(
825 QueryErrorKind::Semantic,
826 e.to_string(),
827 ))),
828 }
829 }
830 SchemaStatement::AlterNodeType(stmt) => {
831 use grafeo_adapters::query::gql::ast::TypeAlteration;
832 let mut wal_alts = Vec::new();
833 for alt in &stmt.alterations {
834 match alt {
835 TypeAlteration::AddProperty(prop) => {
836 let typed = TypedProperty {
837 name: prop.name.clone(),
838 data_type: PropertyDataType::from_type_name(&prop.data_type),
839 nullable: prop.nullable,
840 default_value: None,
841 };
842 self.catalog
843 .alter_node_type_add_property(&stmt.name, typed)
844 .map_err(|e| {
845 Error::Query(QueryError::new(
846 QueryErrorKind::Semantic,
847 e.to_string(),
848 ))
849 })?;
850 wal_alts.push((
851 "add".to_string(),
852 prop.name.clone(),
853 prop.data_type.clone(),
854 prop.nullable,
855 ));
856 }
857 TypeAlteration::DropProperty(name) => {
858 self.catalog
859 .alter_node_type_drop_property(&stmt.name, name)
860 .map_err(|e| {
861 Error::Query(QueryError::new(
862 QueryErrorKind::Semantic,
863 e.to_string(),
864 ))
865 })?;
866 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
867 }
868 }
869 }
870 wal_log!(
871 self,
872 WalRecord::AlterNodeType {
873 name: stmt.name.clone(),
874 alterations: wal_alts,
875 }
876 );
877 Ok(QueryResult::status(format!(
878 "Altered node type '{}'",
879 stmt.name
880 )))
881 }
882 SchemaStatement::AlterEdgeType(stmt) => {
883 use grafeo_adapters::query::gql::ast::TypeAlteration;
884 let mut wal_alts = Vec::new();
885 for alt in &stmt.alterations {
886 match alt {
887 TypeAlteration::AddProperty(prop) => {
888 let typed = TypedProperty {
889 name: prop.name.clone(),
890 data_type: PropertyDataType::from_type_name(&prop.data_type),
891 nullable: prop.nullable,
892 default_value: None,
893 };
894 self.catalog
895 .alter_edge_type_add_property(&stmt.name, typed)
896 .map_err(|e| {
897 Error::Query(QueryError::new(
898 QueryErrorKind::Semantic,
899 e.to_string(),
900 ))
901 })?;
902 wal_alts.push((
903 "add".to_string(),
904 prop.name.clone(),
905 prop.data_type.clone(),
906 prop.nullable,
907 ));
908 }
909 TypeAlteration::DropProperty(name) => {
910 self.catalog
911 .alter_edge_type_drop_property(&stmt.name, name)
912 .map_err(|e| {
913 Error::Query(QueryError::new(
914 QueryErrorKind::Semantic,
915 e.to_string(),
916 ))
917 })?;
918 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
919 }
920 }
921 }
922 wal_log!(
923 self,
924 WalRecord::AlterEdgeType {
925 name: stmt.name.clone(),
926 alterations: wal_alts,
927 }
928 );
929 Ok(QueryResult::status(format!(
930 "Altered edge type '{}'",
931 stmt.name
932 )))
933 }
934 SchemaStatement::AlterGraphType(stmt) => {
935 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
936 let mut wal_alts = Vec::new();
937 for alt in &stmt.alterations {
938 match alt {
939 GraphTypeAlteration::AddNodeType(name) => {
940 self.catalog
941 .alter_graph_type_add_node_type(&stmt.name, name.clone())
942 .map_err(|e| {
943 Error::Query(QueryError::new(
944 QueryErrorKind::Semantic,
945 e.to_string(),
946 ))
947 })?;
948 wal_alts.push(("add_node_type".to_string(), name.clone()));
949 }
950 GraphTypeAlteration::DropNodeType(name) => {
951 self.catalog
952 .alter_graph_type_drop_node_type(&stmt.name, name)
953 .map_err(|e| {
954 Error::Query(QueryError::new(
955 QueryErrorKind::Semantic,
956 e.to_string(),
957 ))
958 })?;
959 wal_alts.push(("drop_node_type".to_string(), name.clone()));
960 }
961 GraphTypeAlteration::AddEdgeType(name) => {
962 self.catalog
963 .alter_graph_type_add_edge_type(&stmt.name, name.clone())
964 .map_err(|e| {
965 Error::Query(QueryError::new(
966 QueryErrorKind::Semantic,
967 e.to_string(),
968 ))
969 })?;
970 wal_alts.push(("add_edge_type".to_string(), name.clone()));
971 }
972 GraphTypeAlteration::DropEdgeType(name) => {
973 self.catalog
974 .alter_graph_type_drop_edge_type(&stmt.name, name)
975 .map_err(|e| {
976 Error::Query(QueryError::new(
977 QueryErrorKind::Semantic,
978 e.to_string(),
979 ))
980 })?;
981 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
982 }
983 }
984 }
985 wal_log!(
986 self,
987 WalRecord::AlterGraphType {
988 name: stmt.name.clone(),
989 alterations: wal_alts,
990 }
991 );
992 Ok(QueryResult::status(format!(
993 "Altered graph type '{}'",
994 stmt.name
995 )))
996 }
997 SchemaStatement::CreateProcedure(stmt) => {
998 use crate::catalog::ProcedureDefinition;
999
1000 let def = ProcedureDefinition {
1001 name: stmt.name.clone(),
1002 params: stmt
1003 .params
1004 .iter()
1005 .map(|p| (p.name.clone(), p.param_type.clone()))
1006 .collect(),
1007 returns: stmt
1008 .returns
1009 .iter()
1010 .map(|r| (r.name.clone(), r.return_type.clone()))
1011 .collect(),
1012 body: stmt.body.clone(),
1013 };
1014
1015 if stmt.or_replace {
1016 self.catalog.replace_procedure(def).map_err(|e| {
1017 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1018 })?;
1019 } else {
1020 match self.catalog.register_procedure(def) {
1021 Ok(()) => {}
1022 Err(_) if stmt.if_not_exists => {
1023 return Ok(QueryResult::empty());
1024 }
1025 Err(e) => {
1026 return Err(Error::Query(QueryError::new(
1027 QueryErrorKind::Semantic,
1028 e.to_string(),
1029 )));
1030 }
1031 }
1032 }
1033
1034 wal_log!(
1035 self,
1036 WalRecord::CreateProcedure {
1037 name: stmt.name.clone(),
1038 params: stmt
1039 .params
1040 .iter()
1041 .map(|p| (p.name.clone(), p.param_type.clone()))
1042 .collect(),
1043 returns: stmt
1044 .returns
1045 .iter()
1046 .map(|r| (r.name.clone(), r.return_type.clone()))
1047 .collect(),
1048 body: stmt.body,
1049 }
1050 );
1051 Ok(QueryResult::status(format!(
1052 "Created procedure '{}'",
1053 stmt.name
1054 )))
1055 }
1056 SchemaStatement::DropProcedure { name, if_exists } => {
1057 match self.catalog.drop_procedure(&name) {
1058 Ok(()) => {}
1059 Err(_) if if_exists => {
1060 return Ok(QueryResult::empty());
1061 }
1062 Err(e) => {
1063 return Err(Error::Query(QueryError::new(
1064 QueryErrorKind::Semantic,
1065 e.to_string(),
1066 )));
1067 }
1068 }
1069 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1070 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1071 }
1072 }
1073 }
1074
1075 #[cfg(all(feature = "gql", feature = "vector-index"))]
1077 fn create_vector_index_on_store(
1078 store: &LpgStore,
1079 label: &str,
1080 property: &str,
1081 dimensions: Option<usize>,
1082 metric: Option<&str>,
1083 ) -> Result<()> {
1084 use grafeo_common::types::{PropertyKey, Value};
1085 use grafeo_common::utils::error::Error;
1086 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1087
1088 let metric = match metric {
1089 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1090 Error::Internal(format!(
1091 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1092 ))
1093 })?,
1094 None => DistanceMetric::Cosine,
1095 };
1096
1097 let prop_key = PropertyKey::new(property);
1098 let mut found_dims: Option<usize> = dimensions;
1099 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1100
1101 for node in store.nodes_with_label(label) {
1102 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1103 if let Some(expected) = found_dims {
1104 if v.len() != expected {
1105 return Err(Error::Internal(format!(
1106 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1107 v.len(),
1108 node.id.0
1109 )));
1110 }
1111 } else {
1112 found_dims = Some(v.len());
1113 }
1114 vectors.push((node.id, v.to_vec()));
1115 }
1116 }
1117
1118 let Some(dims) = found_dims else {
1119 return Err(Error::Internal(format!(
1120 "No vector properties found on :{label}({property}) and no dimensions specified"
1121 )));
1122 };
1123
1124 let config = HnswConfig::new(dims, metric);
1125 let index = HnswIndex::with_capacity(config, vectors.len());
1126 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1127 for (node_id, vec) in &vectors {
1128 index.insert(*node_id, vec, &accessor);
1129 }
1130
1131 store.add_vector_index(label, property, Arc::new(index));
1132 Ok(())
1133 }
1134
1135 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1137 fn create_vector_index_on_store(
1138 _store: &LpgStore,
1139 _label: &str,
1140 _property: &str,
1141 _dimensions: Option<usize>,
1142 _metric: Option<&str>,
1143 ) -> Result<()> {
1144 Err(grafeo_common::utils::error::Error::Internal(
1145 "Vector index support requires the 'vector-index' feature".to_string(),
1146 ))
1147 }
1148
1149 #[cfg(all(feature = "gql", feature = "text-index"))]
1151 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1152 use grafeo_common::types::{PropertyKey, Value};
1153 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1154
1155 let mut index = InvertedIndex::new(BM25Config::default());
1156 let prop_key = PropertyKey::new(property);
1157
1158 let nodes = store.nodes_by_label(label);
1159 for node_id in nodes {
1160 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1161 index.insert(node_id, text.as_str());
1162 }
1163 }
1164
1165 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1166 Ok(())
1167 }
1168
1169 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1171 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1172 Err(grafeo_common::utils::error::Error::Internal(
1173 "Text index support requires the 'text-index' feature".to_string(),
1174 ))
1175 }
1176
1177 #[cfg(feature = "gql")]
1204 pub fn execute(&self, query: &str) -> Result<QueryResult> {
1205 self.require_lpg("GQL")?;
1206
1207 use crate::query::{
1208 Executor, binder::Binder, cache::CacheKey, gql_translator, optimizer::Optimizer,
1209 processor::QueryLanguage,
1210 };
1211
1212 let start_time = std::time::Instant::now();
1213
1214 let translation = gql_translator::translate_full(query)?;
1216 let logical_plan = match translation {
1217 gql_translator::GqlTranslationResult::SessionCommand(cmd) => {
1218 return self.execute_session_command(cmd);
1219 }
1220 gql_translator::GqlTranslationResult::SchemaCommand(cmd) => {
1221 if *self.read_only_tx.lock() {
1223 return Err(grafeo_common::utils::error::Error::Transaction(
1224 grafeo_common::utils::error::TransactionError::ReadOnly,
1225 ));
1226 }
1227 return self.execute_schema_command(cmd);
1228 }
1229 gql_translator::GqlTranslationResult::Plan(plan) => {
1230 if *self.read_only_tx.lock() && plan.root.has_mutations() {
1232 return Err(grafeo_common::utils::error::Error::Transaction(
1233 grafeo_common::utils::error::TransactionError::ReadOnly,
1234 ));
1235 }
1236 plan
1237 }
1238 };
1239
1240 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
1242
1243 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1245 cached_plan
1246 } else {
1247 let mut binder = Binder::new();
1249 let _binding_context = binder.bind(&logical_plan)?;
1250
1251 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1253 let plan = optimizer.optimize(logical_plan)?;
1254
1255 self.query_cache.put_optimized(cache_key, plan.clone());
1257
1258 plan
1259 };
1260
1261 let (viewing_epoch, tx_id) = self.get_transaction_context();
1263
1264 let planner = self.create_planner(viewing_epoch, tx_id);
1267 let mut physical_plan = planner.plan(&optimized_plan)?;
1268
1269 let executor = Executor::with_columns(physical_plan.columns.clone())
1271 .with_deadline(self.query_deadline());
1272 let mut result = executor.execute(physical_plan.operator.as_mut())?;
1273
1274 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1276 let rows_scanned = result.rows.len() as u64;
1277 result.execution_time_ms = Some(elapsed_ms);
1278 result.rows_scanned = Some(rows_scanned);
1279
1280 Ok(result)
1281 }
1282
1283 #[cfg(feature = "gql")]
1292 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
1293 let previous = self.viewing_epoch_override.lock().replace(epoch);
1294 let result = self.execute(query);
1295 *self.viewing_epoch_override.lock() = previous;
1296 result
1297 }
1298
1299 #[cfg(feature = "gql")]
1305 pub fn execute_with_params(
1306 &self,
1307 query: &str,
1308 params: std::collections::HashMap<String, Value>,
1309 ) -> Result<QueryResult> {
1310 self.require_lpg("GQL")?;
1311
1312 use crate::query::processor::{QueryLanguage, QueryProcessor};
1313
1314 let (viewing_epoch, tx_id) = self.get_transaction_context();
1316
1317 let processor = QueryProcessor::for_graph_store_with_tx(
1319 Arc::clone(&self.graph_store),
1320 Arc::clone(&self.tx_manager),
1321 );
1322
1323 let processor = if let Some(tx_id) = tx_id {
1325 processor.with_tx_context(viewing_epoch, tx_id)
1326 } else {
1327 processor
1328 };
1329
1330 processor.process(query, QueryLanguage::Gql, Some(¶ms))
1331 }
1332
1333 #[cfg(not(any(feature = "gql", feature = "cypher")))]
1339 pub fn execute_with_params(
1340 &self,
1341 _query: &str,
1342 _params: std::collections::HashMap<String, Value>,
1343 ) -> Result<QueryResult> {
1344 Err(grafeo_common::utils::error::Error::Internal(
1345 "No query language enabled".to_string(),
1346 ))
1347 }
1348
1349 #[cfg(not(any(feature = "gql", feature = "cypher")))]
1355 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
1356 Err(grafeo_common::utils::error::Error::Internal(
1357 "No query language enabled".to_string(),
1358 ))
1359 }
1360
1361 #[cfg(feature = "cypher")]
1367 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
1368 use crate::query::{
1369 Executor, binder::Binder, cache::CacheKey, cypher_translator, optimizer::Optimizer,
1370 processor::QueryLanguage,
1371 };
1372
1373 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
1375
1376 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1378 cached_plan
1379 } else {
1380 let logical_plan = cypher_translator::translate(query)?;
1382
1383 let mut binder = Binder::new();
1385 let _binding_context = binder.bind(&logical_plan)?;
1386
1387 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1389 let plan = optimizer.optimize(logical_plan)?;
1390
1391 self.query_cache.put_optimized(cache_key, plan.clone());
1393
1394 plan
1395 };
1396
1397 let (viewing_epoch, tx_id) = self.get_transaction_context();
1399
1400 let planner = self.create_planner(viewing_epoch, tx_id);
1402 let mut physical_plan = planner.plan(&optimized_plan)?;
1403
1404 let executor = Executor::with_columns(physical_plan.columns.clone())
1406 .with_deadline(self.query_deadline());
1407 let result = executor.execute(physical_plan.operator.as_mut())?;
1408 Ok(result)
1409 }
1410
1411 #[cfg(feature = "gremlin")]
1435 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
1436 use crate::query::{Executor, binder::Binder, gremlin_translator, optimizer::Optimizer};
1437
1438 let logical_plan = gremlin_translator::translate(query)?;
1440
1441 let mut binder = Binder::new();
1443 let _binding_context = binder.bind(&logical_plan)?;
1444
1445 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1447 let optimized_plan = optimizer.optimize(logical_plan)?;
1448
1449 let (viewing_epoch, tx_id) = self.get_transaction_context();
1451
1452 let planner = self.create_planner(viewing_epoch, tx_id);
1454 let mut physical_plan = planner.plan(&optimized_plan)?;
1455
1456 let executor = Executor::with_columns(physical_plan.columns.clone())
1458 .with_deadline(self.query_deadline());
1459 let result = executor.execute(physical_plan.operator.as_mut())?;
1460 Ok(result)
1461 }
1462
1463 #[cfg(feature = "gremlin")]
1469 pub fn execute_gremlin_with_params(
1470 &self,
1471 query: &str,
1472 params: std::collections::HashMap<String, Value>,
1473 ) -> Result<QueryResult> {
1474 use crate::query::processor::{QueryLanguage, QueryProcessor};
1475
1476 let (viewing_epoch, tx_id) = self.get_transaction_context();
1478
1479 let processor = QueryProcessor::for_graph_store_with_tx(
1481 Arc::clone(&self.graph_store),
1482 Arc::clone(&self.tx_manager),
1483 );
1484
1485 let processor = if let Some(tx_id) = tx_id {
1487 processor.with_tx_context(viewing_epoch, tx_id)
1488 } else {
1489 processor
1490 };
1491
1492 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
1493 }
1494
1495 #[cfg(feature = "graphql")]
1519 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
1520 use crate::query::{Executor, binder::Binder, graphql_translator, optimizer::Optimizer};
1521
1522 let logical_plan = graphql_translator::translate(query)?;
1524
1525 let mut binder = Binder::new();
1527 let _binding_context = binder.bind(&logical_plan)?;
1528
1529 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1531 let optimized_plan = optimizer.optimize(logical_plan)?;
1532
1533 let (viewing_epoch, tx_id) = self.get_transaction_context();
1535
1536 let planner = self.create_planner(viewing_epoch, tx_id);
1538 let mut physical_plan = planner.plan(&optimized_plan)?;
1539
1540 let executor = Executor::with_columns(physical_plan.columns.clone())
1542 .with_deadline(self.query_deadline());
1543 let result = executor.execute(physical_plan.operator.as_mut())?;
1544 Ok(result)
1545 }
1546
1547 #[cfg(feature = "graphql")]
1553 pub fn execute_graphql_with_params(
1554 &self,
1555 query: &str,
1556 params: std::collections::HashMap<String, Value>,
1557 ) -> Result<QueryResult> {
1558 use crate::query::processor::{QueryLanguage, QueryProcessor};
1559
1560 let (viewing_epoch, tx_id) = self.get_transaction_context();
1562
1563 let processor = QueryProcessor::for_graph_store_with_tx(
1565 Arc::clone(&self.graph_store),
1566 Arc::clone(&self.tx_manager),
1567 );
1568
1569 let processor = if let Some(tx_id) = tx_id {
1571 processor.with_tx_context(viewing_epoch, tx_id)
1572 } else {
1573 processor
1574 };
1575
1576 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
1577 }
1578
1579 #[cfg(feature = "sql-pgq")]
1604 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
1605 use crate::query::{
1606 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
1607 processor::QueryLanguage, sql_pgq_translator,
1608 };
1609
1610 let logical_plan = sql_pgq_translator::translate(query)?;
1612
1613 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
1615 return Ok(QueryResult {
1616 columns: vec!["status".into()],
1617 column_types: vec![grafeo_common::types::LogicalType::String],
1618 rows: vec![vec![Value::from(format!(
1619 "Property graph '{}' created ({} node tables, {} edge tables)",
1620 cpg.name,
1621 cpg.node_tables.len(),
1622 cpg.edge_tables.len()
1623 ))]],
1624 execution_time_ms: None,
1625 rows_scanned: None,
1626 status_message: None,
1627 });
1628 }
1629
1630 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
1632
1633 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1635 cached_plan
1636 } else {
1637 let mut binder = Binder::new();
1639 let _binding_context = binder.bind(&logical_plan)?;
1640
1641 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1643 let plan = optimizer.optimize(logical_plan)?;
1644
1645 self.query_cache.put_optimized(cache_key, plan.clone());
1647
1648 plan
1649 };
1650
1651 let (viewing_epoch, tx_id) = self.get_transaction_context();
1653
1654 let planner = self.create_planner(viewing_epoch, tx_id);
1656 let mut physical_plan = planner.plan(&optimized_plan)?;
1657
1658 let executor = Executor::with_columns(physical_plan.columns.clone())
1660 .with_deadline(self.query_deadline());
1661 let result = executor.execute(physical_plan.operator.as_mut())?;
1662 Ok(result)
1663 }
1664
1665 #[cfg(feature = "sql-pgq")]
1671 pub fn execute_sql_with_params(
1672 &self,
1673 query: &str,
1674 params: std::collections::HashMap<String, Value>,
1675 ) -> Result<QueryResult> {
1676 use crate::query::processor::{QueryLanguage, QueryProcessor};
1677
1678 let (viewing_epoch, tx_id) = self.get_transaction_context();
1680
1681 let processor = QueryProcessor::for_graph_store_with_tx(
1683 Arc::clone(&self.graph_store),
1684 Arc::clone(&self.tx_manager),
1685 );
1686
1687 let processor = if let Some(tx_id) = tx_id {
1689 processor.with_tx_context(viewing_epoch, tx_id)
1690 } else {
1691 processor
1692 };
1693
1694 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
1695 }
1696
1697 #[cfg(all(feature = "sparql", feature = "rdf"))]
1703 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
1704 use crate::query::{
1705 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
1706 };
1707
1708 let logical_plan = sparql_translator::translate(query)?;
1710
1711 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1713 let optimized_plan = optimizer.optimize(logical_plan)?;
1714
1715 let planner =
1717 RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(*self.current_tx.lock());
1718 let mut physical_plan = planner.plan(&optimized_plan)?;
1719
1720 let executor = Executor::with_columns(physical_plan.columns.clone())
1722 .with_deadline(self.query_deadline());
1723 executor.execute(physical_plan.operator.as_mut())
1724 }
1725
1726 #[cfg(all(feature = "sparql", feature = "rdf"))]
1732 pub fn execute_sparql_with_params(
1733 &self,
1734 query: &str,
1735 _params: std::collections::HashMap<String, Value>,
1736 ) -> Result<QueryResult> {
1737 self.execute_sparql(query)
1740 }
1741
1742 pub fn execute_language(
1751 &self,
1752 query: &str,
1753 language: &str,
1754 params: Option<std::collections::HashMap<String, Value>>,
1755 ) -> Result<QueryResult> {
1756 match language {
1757 "gql" => {
1758 if let Some(p) = params {
1759 self.execute_with_params(query, p)
1760 } else {
1761 self.execute(query)
1762 }
1763 }
1764 #[cfg(feature = "cypher")]
1765 "cypher" => {
1766 if let Some(p) = params {
1767 use crate::query::processor::{QueryLanguage, QueryProcessor};
1768 let processor = QueryProcessor::for_graph_store_with_tx(
1769 Arc::clone(&self.graph_store),
1770 Arc::clone(&self.tx_manager),
1771 );
1772 let (viewing_epoch, tx_id) = self.get_transaction_context();
1773 let processor = if let Some(tx_id) = tx_id {
1774 processor.with_tx_context(viewing_epoch, tx_id)
1775 } else {
1776 processor
1777 };
1778 processor.process(query, QueryLanguage::Cypher, Some(&p))
1779 } else {
1780 self.execute_cypher(query)
1781 }
1782 }
1783 #[cfg(feature = "gremlin")]
1784 "gremlin" => {
1785 if let Some(p) = params {
1786 self.execute_gremlin_with_params(query, p)
1787 } else {
1788 self.execute_gremlin(query)
1789 }
1790 }
1791 #[cfg(feature = "graphql")]
1792 "graphql" => {
1793 if let Some(p) = params {
1794 self.execute_graphql_with_params(query, p)
1795 } else {
1796 self.execute_graphql(query)
1797 }
1798 }
1799 #[cfg(feature = "sql-pgq")]
1800 "sql" | "sql-pgq" => {
1801 if let Some(p) = params {
1802 self.execute_sql_with_params(query, p)
1803 } else {
1804 self.execute_sql(query)
1805 }
1806 }
1807 #[cfg(all(feature = "sparql", feature = "rdf"))]
1808 "sparql" => {
1809 if let Some(p) = params {
1810 self.execute_sparql_with_params(query, p)
1811 } else {
1812 self.execute_sparql(query)
1813 }
1814 }
1815 other => Err(grafeo_common::utils::error::Error::Query(
1816 grafeo_common::utils::error::QueryError::new(
1817 grafeo_common::utils::error::QueryErrorKind::Semantic,
1818 format!("Unknown query language: '{other}'"),
1819 ),
1820 )),
1821 }
1822 }
1823
1824 pub fn begin_tx(&mut self) -> Result<()> {
1847 self.begin_tx_inner(false, None)
1848 }
1849
1850 pub fn begin_tx_with_isolation(
1858 &mut self,
1859 isolation_level: crate::transaction::IsolationLevel,
1860 ) -> Result<()> {
1861 self.begin_tx_inner(false, Some(isolation_level))
1862 }
1863
1864 fn begin_tx_inner(
1866 &self,
1867 read_only: bool,
1868 isolation_level: Option<crate::transaction::IsolationLevel>,
1869 ) -> Result<()> {
1870 let mut current = self.current_tx.lock();
1871 if current.is_some() {
1872 return Err(grafeo_common::utils::error::Error::Transaction(
1873 grafeo_common::utils::error::TransactionError::InvalidState(
1874 "Transaction already active".to_string(),
1875 ),
1876 ));
1877 }
1878
1879 self.tx_start_node_count
1880 .store(self.store.node_count(), Ordering::Relaxed);
1881 self.tx_start_edge_count
1882 .store(self.store.edge_count(), Ordering::Relaxed);
1883 let tx_id = if let Some(level) = isolation_level {
1884 self.tx_manager.begin_with_isolation(level)
1885 } else {
1886 self.tx_manager.begin()
1887 };
1888 *current = Some(tx_id);
1889 *self.read_only_tx.lock() = read_only;
1890 Ok(())
1891 }
1892
1893 pub fn commit(&mut self) -> Result<()> {
1901 self.commit_inner()
1902 }
1903
1904 fn commit_inner(&self) -> Result<()> {
1906 let tx_id = self.current_tx.lock().take().ok_or_else(|| {
1907 grafeo_common::utils::error::Error::Transaction(
1908 grafeo_common::utils::error::TransactionError::InvalidState(
1909 "No active transaction".to_string(),
1910 ),
1911 )
1912 })?;
1913
1914 #[cfg(feature = "rdf")]
1916 self.rdf_store.commit_tx(tx_id);
1917
1918 self.tx_manager.commit(tx_id)?;
1919
1920 self.store.sync_epoch(self.tx_manager.current_epoch());
1924
1925 *self.read_only_tx.lock() = false;
1927
1928 if self.gc_interval > 0 {
1930 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
1931 if count.is_multiple_of(self.gc_interval) {
1932 let min_epoch = self.tx_manager.min_active_epoch();
1933 self.store.gc_versions(min_epoch);
1934 self.tx_manager.gc();
1935 }
1936 }
1937
1938 Ok(())
1939 }
1940
1941 pub fn rollback(&mut self) -> Result<()> {
1965 self.rollback_inner()
1966 }
1967
1968 fn rollback_inner(&self) -> Result<()> {
1970 let tx_id = self.current_tx.lock().take().ok_or_else(|| {
1971 grafeo_common::utils::error::Error::Transaction(
1972 grafeo_common::utils::error::TransactionError::InvalidState(
1973 "No active transaction".to_string(),
1974 ),
1975 )
1976 })?;
1977
1978 *self.read_only_tx.lock() = false;
1980
1981 self.store.discard_uncommitted_versions(tx_id);
1983
1984 #[cfg(feature = "rdf")]
1986 self.rdf_store.rollback_tx(tx_id);
1987
1988 self.tx_manager.abort(tx_id)
1990 }
1991
1992 #[must_use]
1994 pub fn in_transaction(&self) -> bool {
1995 self.current_tx.lock().is_some()
1996 }
1997
1998 #[must_use]
2000 pub(crate) fn current_tx_id(&self) -> Option<TxId> {
2001 *self.current_tx.lock()
2002 }
2003
2004 #[must_use]
2006 pub(crate) fn tx_manager(&self) -> &TransactionManager {
2007 &self.tx_manager
2008 }
2009
2010 #[must_use]
2012 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
2013 (
2014 self.tx_start_node_count.load(Ordering::Relaxed),
2015 self.store.node_count(),
2016 )
2017 }
2018
2019 #[must_use]
2021 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
2022 (
2023 self.tx_start_edge_count.load(Ordering::Relaxed),
2024 self.store.edge_count(),
2025 )
2026 }
2027
2028 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
2062 crate::transaction::PreparedCommit::new(self)
2063 }
2064
2065 pub fn set_auto_commit(&mut self, auto_commit: bool) {
2067 self.auto_commit = auto_commit;
2068 }
2069
2070 #[must_use]
2072 pub fn auto_commit(&self) -> bool {
2073 self.auto_commit
2074 }
2075
2076 #[must_use]
2078 fn query_deadline(&self) -> Option<Instant> {
2079 self.query_timeout.map(|d| Instant::now() + d)
2080 }
2081
2082 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
2084 use grafeo_adapters::query::gql::ast::{Expression, Literal};
2085 match expr {
2086 Expression::Literal(Literal::Integer(n)) => Some(*n),
2087 _ => None,
2088 }
2089 }
2090
2091 #[must_use]
2097 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
2098 if let Some(epoch) = *self.viewing_epoch_override.lock() {
2100 return (epoch, None);
2101 }
2102
2103 if let Some(tx_id) = *self.current_tx.lock() {
2104 let epoch = self
2106 .tx_manager
2107 .start_epoch(tx_id)
2108 .unwrap_or_else(|| self.tx_manager.current_epoch());
2109 (epoch, Some(tx_id))
2110 } else {
2111 (self.tx_manager.current_epoch(), None)
2113 }
2114 }
2115
2116 fn create_planner(&self, viewing_epoch: EpochId, tx_id: Option<TxId>) -> crate::query::Planner {
2118 use crate::query::Planner;
2119
2120 let mut planner = Planner::with_context(
2121 Arc::clone(&self.graph_store),
2122 Arc::clone(&self.tx_manager),
2123 tx_id,
2124 viewing_epoch,
2125 )
2126 .with_factorized_execution(self.factorized_execution)
2127 .with_catalog(Arc::clone(&self.catalog));
2128
2129 let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog));
2131 planner = planner.with_validator(Arc::new(validator));
2132
2133 planner
2134 }
2135
2136 pub fn create_node(&self, labels: &[&str]) -> NodeId {
2141 let (epoch, tx_id) = self.get_transaction_context();
2142 self.store
2143 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2144 }
2145
2146 pub fn create_node_with_props<'a>(
2150 &self,
2151 labels: &[&str],
2152 properties: impl IntoIterator<Item = (&'a str, Value)>,
2153 ) -> NodeId {
2154 let (epoch, tx_id) = self.get_transaction_context();
2155 self.store.create_node_with_props_versioned(
2156 labels,
2157 properties.into_iter().map(|(k, v)| (k, v)),
2158 epoch,
2159 tx_id.unwrap_or(TxId::SYSTEM),
2160 )
2161 }
2162
2163 pub fn create_edge(
2168 &self,
2169 src: NodeId,
2170 dst: NodeId,
2171 edge_type: &str,
2172 ) -> grafeo_common::types::EdgeId {
2173 let (epoch, tx_id) = self.get_transaction_context();
2174 self.store
2175 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2176 }
2177
2178 #[must_use]
2206 pub fn get_node(&self, id: NodeId) -> Option<Node> {
2207 let (epoch, tx_id) = self.get_transaction_context();
2208 self.store
2209 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2210 }
2211
2212 #[must_use]
2236 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
2237 self.get_node(id)
2238 .and_then(|node| node.get_property(key).cloned())
2239 }
2240
2241 #[must_use]
2248 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
2249 let (epoch, tx_id) = self.get_transaction_context();
2250 self.store
2251 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2252 }
2253
2254 #[must_use]
2280 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2281 self.store.edges_from(node, Direction::Outgoing).collect()
2282 }
2283
2284 #[must_use]
2293 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2294 self.store.edges_from(node, Direction::Incoming).collect()
2295 }
2296
2297 #[must_use]
2309 pub fn get_neighbors_outgoing_by_type(
2310 &self,
2311 node: NodeId,
2312 edge_type: &str,
2313 ) -> Vec<(NodeId, EdgeId)> {
2314 self.store
2315 .edges_from(node, Direction::Outgoing)
2316 .filter(|(_, edge_id)| {
2317 self.get_edge(*edge_id)
2318 .is_some_and(|e| e.edge_type.as_str() == edge_type)
2319 })
2320 .collect()
2321 }
2322
2323 #[must_use]
2330 pub fn node_exists(&self, id: NodeId) -> bool {
2331 self.get_node(id).is_some()
2332 }
2333
2334 #[must_use]
2336 pub fn edge_exists(&self, id: EdgeId) -> bool {
2337 self.get_edge(id).is_some()
2338 }
2339
2340 #[must_use]
2344 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
2345 let out = self.store.out_degree(node);
2346 let in_degree = self.store.in_degree(node);
2347 (out, in_degree)
2348 }
2349
2350 #[must_use]
2360 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
2361 let (epoch, tx_id) = self.get_transaction_context();
2362 let tx = tx_id.unwrap_or(TxId::SYSTEM);
2363 ids.iter()
2364 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
2365 .collect()
2366 }
2367
2368 #[cfg(feature = "cdc")]
2372 pub fn history(
2373 &self,
2374 entity_id: impl Into<crate::cdc::EntityId>,
2375 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2376 Ok(self.cdc_log.history(entity_id.into()))
2377 }
2378
2379 #[cfg(feature = "cdc")]
2381 pub fn history_since(
2382 &self,
2383 entity_id: impl Into<crate::cdc::EntityId>,
2384 since_epoch: EpochId,
2385 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2386 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
2387 }
2388
2389 #[cfg(feature = "cdc")]
2391 pub fn changes_between(
2392 &self,
2393 start_epoch: EpochId,
2394 end_epoch: EpochId,
2395 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2396 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
2397 }
2398}
2399
2400#[cfg(test)]
2401mod tests {
2402 use crate::database::GrafeoDB;
2403
2404 #[test]
2405 fn test_session_create_node() {
2406 let db = GrafeoDB::new_in_memory();
2407 let session = db.session();
2408
2409 let id = session.create_node(&["Person"]);
2410 assert!(id.is_valid());
2411 assert_eq!(db.node_count(), 1);
2412 }
2413
2414 #[test]
2415 fn test_session_transaction() {
2416 let db = GrafeoDB::new_in_memory();
2417 let mut session = db.session();
2418
2419 assert!(!session.in_transaction());
2420
2421 session.begin_tx().unwrap();
2422 assert!(session.in_transaction());
2423
2424 session.commit().unwrap();
2425 assert!(!session.in_transaction());
2426 }
2427
2428 #[test]
2429 fn test_session_transaction_context() {
2430 let db = GrafeoDB::new_in_memory();
2431 let mut session = db.session();
2432
2433 let (_epoch1, tx_id1) = session.get_transaction_context();
2435 assert!(tx_id1.is_none());
2436
2437 session.begin_tx().unwrap();
2439 let (epoch2, tx_id2) = session.get_transaction_context();
2440 assert!(tx_id2.is_some());
2441 let _ = epoch2; session.commit().unwrap();
2446 let (epoch3, tx_id3) = session.get_transaction_context();
2447 assert!(tx_id3.is_none());
2448 assert!(epoch3.as_u64() >= epoch2.as_u64());
2450 }
2451
2452 #[test]
2453 fn test_session_rollback() {
2454 let db = GrafeoDB::new_in_memory();
2455 let mut session = db.session();
2456
2457 session.begin_tx().unwrap();
2458 session.rollback().unwrap();
2459 assert!(!session.in_transaction());
2460 }
2461
2462 #[test]
2463 fn test_session_rollback_discards_versions() {
2464 use grafeo_common::types::TxId;
2465
2466 let db = GrafeoDB::new_in_memory();
2467
2468 let node_before = db.store().create_node(&["Person"]);
2470 assert!(node_before.is_valid());
2471 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2472
2473 let mut session = db.session();
2475 session.begin_tx().unwrap();
2476 let tx_id = session.current_tx.lock().unwrap();
2477
2478 let epoch = db.store().current_epoch();
2480 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
2481 assert!(node_in_tx.is_valid());
2482
2483 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2485
2486 session.rollback().unwrap();
2488 assert!(!session.in_transaction());
2489
2490 let count_after = db.node_count();
2493 assert_eq!(
2494 count_after, 1,
2495 "Rollback should discard uncommitted node, but got {count_after}"
2496 );
2497
2498 let current_epoch = db.store().current_epoch();
2500 assert!(
2501 db.store()
2502 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
2503 .is_some(),
2504 "Original node should still exist"
2505 );
2506
2507 assert!(
2509 db.store()
2510 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
2511 .is_none(),
2512 "Transaction node should be gone"
2513 );
2514 }
2515
2516 #[test]
2517 fn test_session_create_node_in_transaction() {
2518 let db = GrafeoDB::new_in_memory();
2520
2521 let node_before = db.create_node(&["Person"]);
2523 assert!(node_before.is_valid());
2524 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2525
2526 let mut session = db.session();
2528 session.begin_tx().unwrap();
2529
2530 let node_in_tx = session.create_node(&["Person"]);
2532 assert!(node_in_tx.is_valid());
2533
2534 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2536
2537 session.rollback().unwrap();
2539
2540 let count_after = db.node_count();
2542 assert_eq!(
2543 count_after, 1,
2544 "Rollback should discard node created via session.create_node(), but got {count_after}"
2545 );
2546 }
2547
2548 #[test]
2549 fn test_session_create_node_with_props_in_transaction() {
2550 use grafeo_common::types::Value;
2551
2552 let db = GrafeoDB::new_in_memory();
2554
2555 db.create_node(&["Person"]);
2557 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2558
2559 let mut session = db.session();
2561 session.begin_tx().unwrap();
2562
2563 let node_in_tx =
2564 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2565 assert!(node_in_tx.is_valid());
2566
2567 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2569
2570 session.rollback().unwrap();
2572
2573 let count_after = db.node_count();
2575 assert_eq!(
2576 count_after, 1,
2577 "Rollback should discard node created via session.create_node_with_props()"
2578 );
2579 }
2580
2581 #[cfg(feature = "gql")]
2582 mod gql_tests {
2583 use super::*;
2584
2585 #[test]
2586 fn test_gql_query_execution() {
2587 let db = GrafeoDB::new_in_memory();
2588 let session = db.session();
2589
2590 session.create_node(&["Person"]);
2592 session.create_node(&["Person"]);
2593 session.create_node(&["Animal"]);
2594
2595 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
2597
2598 assert_eq!(result.row_count(), 2);
2600 assert_eq!(result.column_count(), 1);
2601 assert_eq!(result.columns[0], "n");
2602 }
2603
2604 #[test]
2605 fn test_gql_empty_result() {
2606 let db = GrafeoDB::new_in_memory();
2607 let session = db.session();
2608
2609 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
2611
2612 assert_eq!(result.row_count(), 0);
2613 }
2614
2615 #[test]
2616 fn test_gql_parse_error() {
2617 let db = GrafeoDB::new_in_memory();
2618 let session = db.session();
2619
2620 let result = session.execute("MATCH (n RETURN n");
2622
2623 assert!(result.is_err());
2624 }
2625
2626 #[test]
2627 fn test_gql_relationship_traversal() {
2628 let db = GrafeoDB::new_in_memory();
2629 let session = db.session();
2630
2631 let alice = session.create_node(&["Person"]);
2633 let bob = session.create_node(&["Person"]);
2634 let charlie = session.create_node(&["Person"]);
2635
2636 session.create_edge(alice, bob, "KNOWS");
2637 session.create_edge(alice, charlie, "KNOWS");
2638
2639 let result = session
2641 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
2642 .unwrap();
2643
2644 assert_eq!(result.row_count(), 2);
2646 assert_eq!(result.column_count(), 2);
2647 assert_eq!(result.columns[0], "a");
2648 assert_eq!(result.columns[1], "b");
2649 }
2650
2651 #[test]
2652 fn test_gql_relationship_with_type_filter() {
2653 let db = GrafeoDB::new_in_memory();
2654 let session = db.session();
2655
2656 let alice = session.create_node(&["Person"]);
2658 let bob = session.create_node(&["Person"]);
2659 let charlie = session.create_node(&["Person"]);
2660
2661 session.create_edge(alice, bob, "KNOWS");
2662 session.create_edge(alice, charlie, "WORKS_WITH");
2663
2664 let result = session
2666 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
2667 .unwrap();
2668
2669 assert_eq!(result.row_count(), 1);
2671 }
2672
2673 #[test]
2674 fn test_gql_semantic_error_undefined_variable() {
2675 let db = GrafeoDB::new_in_memory();
2676 let session = db.session();
2677
2678 let result = session.execute("MATCH (n:Person) RETURN x");
2680
2681 assert!(result.is_err());
2683 let Err(err) = result else {
2684 panic!("Expected error")
2685 };
2686 assert!(
2687 err.to_string().contains("Undefined variable"),
2688 "Expected undefined variable error, got: {}",
2689 err
2690 );
2691 }
2692
2693 #[test]
2694 fn test_gql_where_clause_property_filter() {
2695 use grafeo_common::types::Value;
2696
2697 let db = GrafeoDB::new_in_memory();
2698 let session = db.session();
2699
2700 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
2702 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
2703 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
2704
2705 let result = session
2707 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
2708 .unwrap();
2709
2710 assert_eq!(result.row_count(), 2);
2712 }
2713
2714 #[test]
2715 fn test_gql_where_clause_equality() {
2716 use grafeo_common::types::Value;
2717
2718 let db = GrafeoDB::new_in_memory();
2719 let session = db.session();
2720
2721 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2723 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
2724 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2725
2726 let result = session
2728 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
2729 .unwrap();
2730
2731 assert_eq!(result.row_count(), 2);
2733 }
2734
2735 #[test]
2736 fn test_gql_return_property_access() {
2737 use grafeo_common::types::Value;
2738
2739 let db = GrafeoDB::new_in_memory();
2740 let session = db.session();
2741
2742 session.create_node_with_props(
2744 &["Person"],
2745 [
2746 ("name", Value::String("Alice".into())),
2747 ("age", Value::Int64(30)),
2748 ],
2749 );
2750 session.create_node_with_props(
2751 &["Person"],
2752 [
2753 ("name", Value::String("Bob".into())),
2754 ("age", Value::Int64(25)),
2755 ],
2756 );
2757
2758 let result = session
2760 .execute("MATCH (n:Person) RETURN n.name, n.age")
2761 .unwrap();
2762
2763 assert_eq!(result.row_count(), 2);
2765 assert_eq!(result.column_count(), 2);
2766 assert_eq!(result.columns[0], "n.name");
2767 assert_eq!(result.columns[1], "n.age");
2768
2769 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
2771 assert!(names.contains(&&Value::String("Alice".into())));
2772 assert!(names.contains(&&Value::String("Bob".into())));
2773 }
2774
2775 #[test]
2776 fn test_gql_return_mixed_expressions() {
2777 use grafeo_common::types::Value;
2778
2779 let db = GrafeoDB::new_in_memory();
2780 let session = db.session();
2781
2782 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2784
2785 let result = session
2787 .execute("MATCH (n:Person) RETURN n, n.name")
2788 .unwrap();
2789
2790 assert_eq!(result.row_count(), 1);
2791 assert_eq!(result.column_count(), 2);
2792 assert_eq!(result.columns[0], "n");
2793 assert_eq!(result.columns[1], "n.name");
2794
2795 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
2797 }
2798 }
2799
2800 #[cfg(feature = "cypher")]
2801 mod cypher_tests {
2802 use super::*;
2803
2804 #[test]
2805 fn test_cypher_query_execution() {
2806 let db = GrafeoDB::new_in_memory();
2807 let session = db.session();
2808
2809 session.create_node(&["Person"]);
2811 session.create_node(&["Person"]);
2812 session.create_node(&["Animal"]);
2813
2814 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
2816
2817 assert_eq!(result.row_count(), 2);
2819 assert_eq!(result.column_count(), 1);
2820 assert_eq!(result.columns[0], "n");
2821 }
2822
2823 #[test]
2824 fn test_cypher_empty_result() {
2825 let db = GrafeoDB::new_in_memory();
2826 let session = db.session();
2827
2828 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
2830
2831 assert_eq!(result.row_count(), 0);
2832 }
2833
2834 #[test]
2835 fn test_cypher_parse_error() {
2836 let db = GrafeoDB::new_in_memory();
2837 let session = db.session();
2838
2839 let result = session.execute_cypher("MATCH (n RETURN n");
2841
2842 assert!(result.is_err());
2843 }
2844 }
2845
2846 mod direct_lookup_tests {
2849 use super::*;
2850 use grafeo_common::types::Value;
2851
2852 #[test]
2853 fn test_get_node() {
2854 let db = GrafeoDB::new_in_memory();
2855 let session = db.session();
2856
2857 let id = session.create_node(&["Person"]);
2858 let node = session.get_node(id);
2859
2860 assert!(node.is_some());
2861 let node = node.unwrap();
2862 assert_eq!(node.id, id);
2863 }
2864
2865 #[test]
2866 fn test_get_node_not_found() {
2867 use grafeo_common::types::NodeId;
2868
2869 let db = GrafeoDB::new_in_memory();
2870 let session = db.session();
2871
2872 let node = session.get_node(NodeId::new(9999));
2874 assert!(node.is_none());
2875 }
2876
2877 #[test]
2878 fn test_get_node_property() {
2879 let db = GrafeoDB::new_in_memory();
2880 let session = db.session();
2881
2882 let id = session
2883 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2884
2885 let name = session.get_node_property(id, "name");
2886 assert_eq!(name, Some(Value::String("Alice".into())));
2887
2888 let missing = session.get_node_property(id, "missing");
2890 assert!(missing.is_none());
2891 }
2892
2893 #[test]
2894 fn test_get_edge() {
2895 let db = GrafeoDB::new_in_memory();
2896 let session = db.session();
2897
2898 let alice = session.create_node(&["Person"]);
2899 let bob = session.create_node(&["Person"]);
2900 let edge_id = session.create_edge(alice, bob, "KNOWS");
2901
2902 let edge = session.get_edge(edge_id);
2903 assert!(edge.is_some());
2904 let edge = edge.unwrap();
2905 assert_eq!(edge.id, edge_id);
2906 assert_eq!(edge.src, alice);
2907 assert_eq!(edge.dst, bob);
2908 }
2909
2910 #[test]
2911 fn test_get_edge_not_found() {
2912 use grafeo_common::types::EdgeId;
2913
2914 let db = GrafeoDB::new_in_memory();
2915 let session = db.session();
2916
2917 let edge = session.get_edge(EdgeId::new(9999));
2918 assert!(edge.is_none());
2919 }
2920
2921 #[test]
2922 fn test_get_neighbors_outgoing() {
2923 let db = GrafeoDB::new_in_memory();
2924 let session = db.session();
2925
2926 let alice = session.create_node(&["Person"]);
2927 let bob = session.create_node(&["Person"]);
2928 let carol = session.create_node(&["Person"]);
2929
2930 session.create_edge(alice, bob, "KNOWS");
2931 session.create_edge(alice, carol, "KNOWS");
2932
2933 let neighbors = session.get_neighbors_outgoing(alice);
2934 assert_eq!(neighbors.len(), 2);
2935
2936 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
2937 assert!(neighbor_ids.contains(&bob));
2938 assert!(neighbor_ids.contains(&carol));
2939 }
2940
2941 #[test]
2942 fn test_get_neighbors_incoming() {
2943 let db = GrafeoDB::new_in_memory();
2944 let session = db.session();
2945
2946 let alice = session.create_node(&["Person"]);
2947 let bob = session.create_node(&["Person"]);
2948 let carol = session.create_node(&["Person"]);
2949
2950 session.create_edge(bob, alice, "KNOWS");
2951 session.create_edge(carol, alice, "KNOWS");
2952
2953 let neighbors = session.get_neighbors_incoming(alice);
2954 assert_eq!(neighbors.len(), 2);
2955
2956 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
2957 assert!(neighbor_ids.contains(&bob));
2958 assert!(neighbor_ids.contains(&carol));
2959 }
2960
2961 #[test]
2962 fn test_get_neighbors_outgoing_by_type() {
2963 let db = GrafeoDB::new_in_memory();
2964 let session = db.session();
2965
2966 let alice = session.create_node(&["Person"]);
2967 let bob = session.create_node(&["Person"]);
2968 let company = session.create_node(&["Company"]);
2969
2970 session.create_edge(alice, bob, "KNOWS");
2971 session.create_edge(alice, company, "WORKS_AT");
2972
2973 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
2974 assert_eq!(knows_neighbors.len(), 1);
2975 assert_eq!(knows_neighbors[0].0, bob);
2976
2977 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
2978 assert_eq!(works_neighbors.len(), 1);
2979 assert_eq!(works_neighbors[0].0, company);
2980
2981 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
2983 assert!(no_neighbors.is_empty());
2984 }
2985
2986 #[test]
2987 fn test_node_exists() {
2988 use grafeo_common::types::NodeId;
2989
2990 let db = GrafeoDB::new_in_memory();
2991 let session = db.session();
2992
2993 let id = session.create_node(&["Person"]);
2994
2995 assert!(session.node_exists(id));
2996 assert!(!session.node_exists(NodeId::new(9999)));
2997 }
2998
2999 #[test]
3000 fn test_edge_exists() {
3001 use grafeo_common::types::EdgeId;
3002
3003 let db = GrafeoDB::new_in_memory();
3004 let session = db.session();
3005
3006 let alice = session.create_node(&["Person"]);
3007 let bob = session.create_node(&["Person"]);
3008 let edge_id = session.create_edge(alice, bob, "KNOWS");
3009
3010 assert!(session.edge_exists(edge_id));
3011 assert!(!session.edge_exists(EdgeId::new(9999)));
3012 }
3013
3014 #[test]
3015 fn test_get_degree() {
3016 let db = GrafeoDB::new_in_memory();
3017 let session = db.session();
3018
3019 let alice = session.create_node(&["Person"]);
3020 let bob = session.create_node(&["Person"]);
3021 let carol = session.create_node(&["Person"]);
3022
3023 session.create_edge(alice, bob, "KNOWS");
3025 session.create_edge(alice, carol, "KNOWS");
3026 session.create_edge(bob, alice, "KNOWS");
3028
3029 let (out_degree, in_degree) = session.get_degree(alice);
3030 assert_eq!(out_degree, 2);
3031 assert_eq!(in_degree, 1);
3032
3033 let lonely = session.create_node(&["Person"]);
3035 let (out, in_deg) = session.get_degree(lonely);
3036 assert_eq!(out, 0);
3037 assert_eq!(in_deg, 0);
3038 }
3039
3040 #[test]
3041 fn test_get_nodes_batch() {
3042 let db = GrafeoDB::new_in_memory();
3043 let session = db.session();
3044
3045 let alice = session.create_node(&["Person"]);
3046 let bob = session.create_node(&["Person"]);
3047 let carol = session.create_node(&["Person"]);
3048
3049 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
3050 assert_eq!(nodes.len(), 3);
3051 assert!(nodes[0].is_some());
3052 assert!(nodes[1].is_some());
3053 assert!(nodes[2].is_some());
3054
3055 use grafeo_common::types::NodeId;
3057 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
3058 assert_eq!(nodes_with_missing.len(), 3);
3059 assert!(nodes_with_missing[0].is_some());
3060 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
3062 }
3063
3064 #[test]
3065 fn test_auto_commit_setting() {
3066 let db = GrafeoDB::new_in_memory();
3067 let mut session = db.session();
3068
3069 assert!(session.auto_commit());
3071
3072 session.set_auto_commit(false);
3073 assert!(!session.auto_commit());
3074
3075 session.set_auto_commit(true);
3076 assert!(session.auto_commit());
3077 }
3078
3079 #[test]
3080 fn test_transaction_double_begin_error() {
3081 let db = GrafeoDB::new_in_memory();
3082 let mut session = db.session();
3083
3084 session.begin_tx().unwrap();
3085 let result = session.begin_tx();
3086
3087 assert!(result.is_err());
3088 session.rollback().unwrap();
3090 }
3091
3092 #[test]
3093 fn test_commit_without_transaction_error() {
3094 let db = GrafeoDB::new_in_memory();
3095 let mut session = db.session();
3096
3097 let result = session.commit();
3098 assert!(result.is_err());
3099 }
3100
3101 #[test]
3102 fn test_rollback_without_transaction_error() {
3103 let db = GrafeoDB::new_in_memory();
3104 let mut session = db.session();
3105
3106 let result = session.rollback();
3107 assert!(result.is_err());
3108 }
3109
3110 #[test]
3111 fn test_create_edge_in_transaction() {
3112 let db = GrafeoDB::new_in_memory();
3113 let mut session = db.session();
3114
3115 let alice = session.create_node(&["Person"]);
3117 let bob = session.create_node(&["Person"]);
3118
3119 session.begin_tx().unwrap();
3121 let edge_id = session.create_edge(alice, bob, "KNOWS");
3122
3123 assert!(session.edge_exists(edge_id));
3125
3126 session.commit().unwrap();
3128
3129 assert!(session.edge_exists(edge_id));
3131 }
3132
3133 #[test]
3134 fn test_neighbors_empty_node() {
3135 let db = GrafeoDB::new_in_memory();
3136 let session = db.session();
3137
3138 let lonely = session.create_node(&["Person"]);
3139
3140 assert!(session.get_neighbors_outgoing(lonely).is_empty());
3141 assert!(session.get_neighbors_incoming(lonely).is_empty());
3142 assert!(
3143 session
3144 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
3145 .is_empty()
3146 );
3147 }
3148 }
3149
3150 #[test]
3151 fn test_auto_gc_triggers_on_commit_interval() {
3152 use crate::config::Config;
3153
3154 let config = Config::in_memory().with_gc_interval(2);
3155 let db = GrafeoDB::with_config(config).unwrap();
3156 let mut session = db.session();
3157
3158 session.begin_tx().unwrap();
3160 session.create_node(&["A"]);
3161 session.commit().unwrap();
3162
3163 session.begin_tx().unwrap();
3165 session.create_node(&["B"]);
3166 session.commit().unwrap();
3167
3168 assert_eq!(db.node_count(), 2);
3170 }
3171
3172 #[test]
3173 fn test_query_timeout_config_propagates_to_session() {
3174 use crate::config::Config;
3175 use std::time::Duration;
3176
3177 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
3178 let db = GrafeoDB::with_config(config).unwrap();
3179 let session = db.session();
3180
3181 assert!(session.query_deadline().is_some());
3183 }
3184
3185 #[test]
3186 fn test_no_query_timeout_returns_no_deadline() {
3187 let db = GrafeoDB::new_in_memory();
3188 let session = db.session();
3189
3190 assert!(session.query_deadline().is_none());
3192 }
3193
3194 #[test]
3195 fn test_graph_model_accessor() {
3196 use crate::config::GraphModel;
3197
3198 let db = GrafeoDB::new_in_memory();
3199 let session = db.session();
3200
3201 assert_eq!(session.graph_model(), GraphModel::Lpg);
3202 }
3203
3204 #[cfg(feature = "gql")]
3205 #[test]
3206 fn test_external_store_session() {
3207 use grafeo_core::graph::GraphStoreMut;
3208 use std::sync::Arc;
3209
3210 let config = crate::config::Config::in_memory();
3211 let store = Arc::new(grafeo_core::graph::lpg::LpgStore::new()) as Arc<dyn GraphStoreMut>;
3212 let db = GrafeoDB::with_store(store, config).unwrap();
3213
3214 let session = db.session();
3215
3216 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
3218
3219 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
3221 assert_eq!(result.row_count(), 1);
3222 }
3223
3224 #[cfg(feature = "gql")]
3227 mod session_command_tests {
3228 use super::*;
3229
3230 #[test]
3231 fn test_use_graph_sets_current_graph() {
3232 let db = GrafeoDB::new_in_memory();
3233 let session = db.session();
3234
3235 session.execute("CREATE GRAPH mydb").unwrap();
3237 session.execute("USE GRAPH mydb").unwrap();
3238
3239 assert_eq!(session.current_graph(), Some("mydb".to_string()));
3240 }
3241
3242 #[test]
3243 fn test_use_graph_nonexistent_errors() {
3244 let db = GrafeoDB::new_in_memory();
3245 let session = db.session();
3246
3247 let result = session.execute("USE GRAPH doesnotexist");
3248 assert!(result.is_err());
3249 let err = result.unwrap_err().to_string();
3250 assert!(
3251 err.contains("does not exist"),
3252 "Expected 'does not exist' error, got: {err}"
3253 );
3254 }
3255
3256 #[test]
3257 fn test_use_graph_default_always_valid() {
3258 let db = GrafeoDB::new_in_memory();
3259 let session = db.session();
3260
3261 session.execute("USE GRAPH default").unwrap();
3263 assert_eq!(session.current_graph(), Some("default".to_string()));
3264 }
3265
3266 #[test]
3267 fn test_session_set_graph() {
3268 let db = GrafeoDB::new_in_memory();
3269 let session = db.session();
3270
3271 session.execute("SESSION SET GRAPH analytics").unwrap();
3273 assert_eq!(session.current_graph(), Some("analytics".to_string()));
3274 }
3275
3276 #[test]
3277 fn test_session_set_time_zone() {
3278 let db = GrafeoDB::new_in_memory();
3279 let session = db.session();
3280
3281 assert_eq!(session.time_zone(), None);
3282
3283 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3284 assert_eq!(session.time_zone(), Some("UTC".to_string()));
3285
3286 session
3287 .execute("SESSION SET TIME ZONE 'America/New_York'")
3288 .unwrap();
3289 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
3290 }
3291
3292 #[test]
3293 fn test_session_set_parameter() {
3294 let db = GrafeoDB::new_in_memory();
3295 let session = db.session();
3296
3297 session
3298 .execute("SESSION SET PARAMETER $timeout = 30")
3299 .unwrap();
3300
3301 assert!(session.get_parameter("timeout").is_some());
3304 }
3305
3306 #[test]
3307 fn test_session_reset_clears_all_state() {
3308 let db = GrafeoDB::new_in_memory();
3309 let session = db.session();
3310
3311 session.execute("SESSION SET GRAPH analytics").unwrap();
3313 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3314 session
3315 .execute("SESSION SET PARAMETER $limit = 100")
3316 .unwrap();
3317
3318 assert!(session.current_graph().is_some());
3320 assert!(session.time_zone().is_some());
3321 assert!(session.get_parameter("limit").is_some());
3322
3323 session.execute("SESSION RESET").unwrap();
3325
3326 assert_eq!(session.current_graph(), None);
3327 assert_eq!(session.time_zone(), None);
3328 assert!(session.get_parameter("limit").is_none());
3329 }
3330
3331 #[test]
3332 fn test_session_close_clears_state() {
3333 let db = GrafeoDB::new_in_memory();
3334 let session = db.session();
3335
3336 session.execute("SESSION SET GRAPH analytics").unwrap();
3337 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3338
3339 session.execute("SESSION CLOSE").unwrap();
3340
3341 assert_eq!(session.current_graph(), None);
3342 assert_eq!(session.time_zone(), None);
3343 }
3344
3345 #[test]
3346 fn test_create_graph() {
3347 let db = GrafeoDB::new_in_memory();
3348 let session = db.session();
3349
3350 session.execute("CREATE GRAPH mydb").unwrap();
3351
3352 session.execute("USE GRAPH mydb").unwrap();
3354 assert_eq!(session.current_graph(), Some("mydb".to_string()));
3355 }
3356
3357 #[test]
3358 fn test_create_graph_duplicate_errors() {
3359 let db = GrafeoDB::new_in_memory();
3360 let session = db.session();
3361
3362 session.execute("CREATE GRAPH mydb").unwrap();
3363 let result = session.execute("CREATE GRAPH mydb");
3364
3365 assert!(result.is_err());
3366 let err = result.unwrap_err().to_string();
3367 assert!(
3368 err.contains("already exists"),
3369 "Expected 'already exists' error, got: {err}"
3370 );
3371 }
3372
3373 #[test]
3374 fn test_create_graph_if_not_exists() {
3375 let db = GrafeoDB::new_in_memory();
3376 let session = db.session();
3377
3378 session.execute("CREATE GRAPH mydb").unwrap();
3379 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
3381 }
3382
3383 #[test]
3384 fn test_drop_graph() {
3385 let db = GrafeoDB::new_in_memory();
3386 let session = db.session();
3387
3388 session.execute("CREATE GRAPH mydb").unwrap();
3389 session.execute("DROP GRAPH mydb").unwrap();
3390
3391 let result = session.execute("USE GRAPH mydb");
3393 assert!(result.is_err());
3394 }
3395
3396 #[test]
3397 fn test_drop_graph_nonexistent_errors() {
3398 let db = GrafeoDB::new_in_memory();
3399 let session = db.session();
3400
3401 let result = session.execute("DROP GRAPH nosuchgraph");
3402 assert!(result.is_err());
3403 let err = result.unwrap_err().to_string();
3404 assert!(
3405 err.contains("does not exist"),
3406 "Expected 'does not exist' error, got: {err}"
3407 );
3408 }
3409
3410 #[test]
3411 fn test_drop_graph_if_exists() {
3412 let db = GrafeoDB::new_in_memory();
3413 let session = db.session();
3414
3415 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
3417 }
3418
3419 #[test]
3420 fn test_start_transaction_via_gql() {
3421 let db = GrafeoDB::new_in_memory();
3422 let session = db.session();
3423
3424 session.execute("START TRANSACTION").unwrap();
3425 assert!(session.in_transaction());
3426 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
3427 session.execute("COMMIT").unwrap();
3428 assert!(!session.in_transaction());
3429
3430 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3431 assert_eq!(result.rows.len(), 1);
3432 }
3433
3434 #[test]
3435 fn test_start_transaction_read_only_blocks_insert() {
3436 let db = GrafeoDB::new_in_memory();
3437 let session = db.session();
3438
3439 session.execute("START TRANSACTION READ ONLY").unwrap();
3440 let result = session.execute("INSERT (:Person {name: 'Alice'})");
3441 assert!(result.is_err());
3442 let err = result.unwrap_err().to_string();
3443 assert!(
3444 err.contains("read-only"),
3445 "Expected read-only error, got: {err}"
3446 );
3447 session.execute("ROLLBACK").unwrap();
3448 }
3449
3450 #[test]
3451 fn test_start_transaction_read_only_allows_reads() {
3452 let db = GrafeoDB::new_in_memory();
3453 let mut session = db.session();
3454 session.begin_tx().unwrap();
3455 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
3456 session.commit().unwrap();
3457
3458 session.execute("START TRANSACTION READ ONLY").unwrap();
3459 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3460 assert_eq!(result.rows.len(), 1);
3461 session.execute("COMMIT").unwrap();
3462 }
3463
3464 #[test]
3465 fn test_rollback_via_gql() {
3466 let db = GrafeoDB::new_in_memory();
3467 let session = db.session();
3468
3469 session.execute("START TRANSACTION").unwrap();
3470 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
3471 session.execute("ROLLBACK").unwrap();
3472
3473 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3474 assert!(result.rows.is_empty());
3475 }
3476
3477 #[test]
3478 fn test_start_transaction_with_isolation_level() {
3479 let db = GrafeoDB::new_in_memory();
3480 let session = db.session();
3481
3482 session
3483 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
3484 .unwrap();
3485 assert!(session.in_transaction());
3486 session.execute("ROLLBACK").unwrap();
3487 }
3488
3489 #[test]
3490 fn test_session_commands_return_empty_result() {
3491 let db = GrafeoDB::new_in_memory();
3492 let session = db.session();
3493
3494 let result = session.execute("SESSION SET GRAPH test").unwrap();
3495 assert_eq!(result.row_count(), 0);
3496 assert_eq!(result.column_count(), 0);
3497 }
3498
3499 #[test]
3500 fn test_current_graph_default_is_none() {
3501 let db = GrafeoDB::new_in_memory();
3502 let session = db.session();
3503
3504 assert_eq!(session.current_graph(), None);
3505 }
3506
3507 #[test]
3508 fn test_time_zone_default_is_none() {
3509 let db = GrafeoDB::new_in_memory();
3510 let session = db.session();
3511
3512 assert_eq!(session.time_zone(), None);
3513 }
3514
3515 #[test]
3516 fn test_session_state_independent_across_sessions() {
3517 let db = GrafeoDB::new_in_memory();
3518 let session1 = db.session();
3519 let session2 = db.session();
3520
3521 session1.execute("SESSION SET GRAPH first").unwrap();
3522 session2.execute("SESSION SET GRAPH second").unwrap();
3523
3524 assert_eq!(session1.current_graph(), Some("first".to_string()));
3525 assert_eq!(session2.current_graph(), Some("second".to_string()));
3526 }
3527 }
3528}