1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25fn parse_default_literal(text: &str) -> Value {
30 if text.eq_ignore_ascii_case("null") {
31 return Value::Null;
32 }
33 if text.eq_ignore_ascii_case("true") {
34 return Value::Bool(true);
35 }
36 if text.eq_ignore_ascii_case("false") {
37 return Value::Bool(false);
38 }
39 if (text.starts_with('\'') && text.ends_with('\''))
41 || (text.starts_with('"') && text.ends_with('"'))
42 {
43 return Value::String(text[1..text.len() - 1].into());
44 }
45 if let Ok(i) = text.parse::<i64>() {
47 return Value::Int64(i);
48 }
49 if let Ok(f) = text.parse::<f64>() {
50 return Value::Float64(f);
51 }
52 Value::String(text.into())
54}
55
56pub(crate) struct SessionConfig {
61 pub transaction_manager: Arc<TransactionManager>,
62 pub query_cache: Arc<QueryCache>,
63 pub catalog: Arc<Catalog>,
64 pub adaptive_config: AdaptiveConfig,
65 pub factorized_execution: bool,
66 pub graph_model: GraphModel,
67 pub query_timeout: Option<Duration>,
68 pub commit_counter: Arc<AtomicUsize>,
69 pub gc_interval: usize,
70}
71
72pub struct Session {
78 store: Arc<LpgStore>,
80 graph_store: Arc<dyn GraphStoreMut>,
82 catalog: Arc<Catalog>,
84 #[cfg(feature = "rdf")]
86 rdf_store: Arc<RdfStore>,
87 transaction_manager: Arc<TransactionManager>,
89 query_cache: Arc<QueryCache>,
91 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
95 read_only_tx: parking_lot::Mutex<bool>,
97 auto_commit: bool,
99 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
102 factorized_execution: bool,
104 graph_model: GraphModel,
106 query_timeout: Option<Duration>,
108 commit_counter: Arc<AtomicUsize>,
110 gc_interval: usize,
112 transaction_start_node_count: AtomicUsize,
114 transaction_start_edge_count: AtomicUsize,
116 #[cfg(feature = "wal")]
118 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
119 #[cfg(feature = "wal")]
121 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
122 #[cfg(feature = "cdc")]
124 cdc_log: Arc<crate::cdc::CdcLog>,
125 current_graph: parking_lot::Mutex<Option<String>>,
127 time_zone: parking_lot::Mutex<Option<String>>,
129 session_params:
131 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
132 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
134 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
136 transaction_nesting_depth: parking_lot::Mutex<u32>,
140 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
144}
145
146#[derive(Clone)]
148struct GraphSavepoint {
149 graph_name: Option<String>,
150 next_node_id: u64,
151 next_edge_id: u64,
152 undo_log_position: usize,
153}
154
155#[derive(Clone)]
157struct SavepointState {
158 name: String,
159 graph_snapshots: Vec<GraphSavepoint>,
160 #[allow(dead_code)]
163 active_graph: Option<String>,
164}
165
166impl Session {
167 #[allow(dead_code)]
169 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
170 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
171 Self {
172 store,
173 graph_store,
174 catalog: cfg.catalog,
175 #[cfg(feature = "rdf")]
176 rdf_store: Arc::new(RdfStore::new()),
177 transaction_manager: cfg.transaction_manager,
178 query_cache: cfg.query_cache,
179 current_transaction: parking_lot::Mutex::new(None),
180 read_only_tx: parking_lot::Mutex::new(false),
181 auto_commit: true,
182 adaptive_config: cfg.adaptive_config,
183 factorized_execution: cfg.factorized_execution,
184 graph_model: cfg.graph_model,
185 query_timeout: cfg.query_timeout,
186 commit_counter: cfg.commit_counter,
187 gc_interval: cfg.gc_interval,
188 transaction_start_node_count: AtomicUsize::new(0),
189 transaction_start_edge_count: AtomicUsize::new(0),
190 #[cfg(feature = "wal")]
191 wal: None,
192 #[cfg(feature = "wal")]
193 wal_graph_context: None,
194 #[cfg(feature = "cdc")]
195 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
196 current_graph: parking_lot::Mutex::new(None),
197 time_zone: parking_lot::Mutex::new(None),
198 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
199 viewing_epoch_override: parking_lot::Mutex::new(None),
200 savepoints: parking_lot::Mutex::new(Vec::new()),
201 transaction_nesting_depth: parking_lot::Mutex::new(0),
202 touched_graphs: parking_lot::Mutex::new(Vec::new()),
203 }
204 }
205
206 #[cfg(feature = "wal")]
211 pub(crate) fn set_wal(
212 &mut self,
213 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
214 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
215 ) {
216 self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
218 Arc::clone(&self.store),
219 Arc::clone(&wal),
220 Arc::clone(&wal_graph_context),
221 ));
222 self.wal = Some(wal);
223 self.wal_graph_context = Some(wal_graph_context);
224 }
225
226 #[cfg(feature = "cdc")]
228 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
229 self.cdc_log = cdc_log;
230 }
231
232 #[cfg(feature = "rdf")]
234 pub(crate) fn with_rdf_store_and_adaptive(
235 store: Arc<LpgStore>,
236 rdf_store: Arc<RdfStore>,
237 cfg: SessionConfig,
238 ) -> Self {
239 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
240 Self {
241 store,
242 graph_store,
243 catalog: cfg.catalog,
244 rdf_store,
245 transaction_manager: cfg.transaction_manager,
246 query_cache: cfg.query_cache,
247 current_transaction: parking_lot::Mutex::new(None),
248 read_only_tx: parking_lot::Mutex::new(false),
249 auto_commit: true,
250 adaptive_config: cfg.adaptive_config,
251 factorized_execution: cfg.factorized_execution,
252 graph_model: cfg.graph_model,
253 query_timeout: cfg.query_timeout,
254 commit_counter: cfg.commit_counter,
255 gc_interval: cfg.gc_interval,
256 transaction_start_node_count: AtomicUsize::new(0),
257 transaction_start_edge_count: AtomicUsize::new(0),
258 #[cfg(feature = "wal")]
259 wal: None,
260 #[cfg(feature = "wal")]
261 wal_graph_context: None,
262 #[cfg(feature = "cdc")]
263 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
264 current_graph: parking_lot::Mutex::new(None),
265 time_zone: parking_lot::Mutex::new(None),
266 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
267 viewing_epoch_override: parking_lot::Mutex::new(None),
268 savepoints: parking_lot::Mutex::new(Vec::new()),
269 transaction_nesting_depth: parking_lot::Mutex::new(0),
270 touched_graphs: parking_lot::Mutex::new(Vec::new()),
271 }
272 }
273
274 pub(crate) fn with_external_store(
283 store: Arc<dyn GraphStoreMut>,
284 cfg: SessionConfig,
285 ) -> Result<Self> {
286 Ok(Self {
287 store: Arc::new(LpgStore::new()?),
288 graph_store: store,
289 catalog: cfg.catalog,
290 #[cfg(feature = "rdf")]
291 rdf_store: Arc::new(RdfStore::new()),
292 transaction_manager: cfg.transaction_manager,
293 query_cache: cfg.query_cache,
294 current_transaction: parking_lot::Mutex::new(None),
295 read_only_tx: parking_lot::Mutex::new(false),
296 auto_commit: true,
297 adaptive_config: cfg.adaptive_config,
298 factorized_execution: cfg.factorized_execution,
299 graph_model: cfg.graph_model,
300 query_timeout: cfg.query_timeout,
301 commit_counter: cfg.commit_counter,
302 gc_interval: cfg.gc_interval,
303 transaction_start_node_count: AtomicUsize::new(0),
304 transaction_start_edge_count: AtomicUsize::new(0),
305 #[cfg(feature = "wal")]
306 wal: None,
307 #[cfg(feature = "wal")]
308 wal_graph_context: None,
309 #[cfg(feature = "cdc")]
310 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
311 current_graph: parking_lot::Mutex::new(None),
312 time_zone: parking_lot::Mutex::new(None),
313 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
314 viewing_epoch_override: parking_lot::Mutex::new(None),
315 savepoints: parking_lot::Mutex::new(Vec::new()),
316 transaction_nesting_depth: parking_lot::Mutex::new(0),
317 touched_graphs: parking_lot::Mutex::new(Vec::new()),
318 })
319 }
320
321 #[must_use]
323 pub fn graph_model(&self) -> GraphModel {
324 self.graph_model
325 }
326
327 pub fn use_graph(&self, name: &str) {
331 *self.current_graph.lock() = Some(name.to_string());
332 }
333
334 #[must_use]
336 pub fn current_graph(&self) -> Option<String> {
337 self.current_graph.lock().clone()
338 }
339
340 fn active_store(&self) -> Arc<dyn GraphStoreMut> {
348 let graph_name = self.current_graph.lock().clone();
349 match graph_name {
350 None => Arc::clone(&self.graph_store),
351 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.graph_store),
352 Some(ref name) => match self.store.graph(name) {
353 Some(named_store) => {
354 #[cfg(feature = "wal")]
355 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
356 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
357 named_store,
358 Arc::clone(wal),
359 name.clone(),
360 Arc::clone(ctx),
361 )) as Arc<dyn GraphStoreMut>;
362 }
363 named_store as Arc<dyn GraphStoreMut>
364 }
365 None => Arc::clone(&self.graph_store),
366 },
367 }
368 }
369
370 fn active_lpg_store(&self) -> Arc<LpgStore> {
375 let graph_name = self.current_graph.lock().clone();
376 match graph_name {
377 None => Arc::clone(&self.store),
378 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
379 Some(ref name) => self
380 .store
381 .graph(name)
382 .unwrap_or_else(|| Arc::clone(&self.store)),
383 }
384 }
385
386 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
389 match graph_name {
390 None => Arc::clone(&self.store),
391 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
392 Some(name) => self
393 .store
394 .graph(name)
395 .unwrap_or_else(|| Arc::clone(&self.store)),
396 }
397 }
398
399 fn track_graph_touch(&self) {
401 if self.current_transaction.lock().is_some() {
402 let graph = self.current_graph.lock().clone();
403 let normalized = match graph {
405 Some(ref name) if name.eq_ignore_ascii_case("default") => None,
406 other => other,
407 };
408 let mut touched = self.touched_graphs.lock();
409 if !touched.contains(&normalized) {
410 touched.push(normalized);
411 }
412 }
413 }
414
415 pub fn set_time_zone(&self, tz: &str) {
417 *self.time_zone.lock() = Some(tz.to_string());
418 }
419
420 #[must_use]
422 pub fn time_zone(&self) -> Option<String> {
423 self.time_zone.lock().clone()
424 }
425
426 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
428 self.session_params.lock().insert(key.to_string(), value);
429 }
430
431 #[must_use]
433 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
434 self.session_params.lock().get(key).cloned()
435 }
436
437 pub fn reset_session(&self) {
439 *self.current_graph.lock() = None;
440 *self.time_zone.lock() = None;
441 self.session_params.lock().clear();
442 *self.viewing_epoch_override.lock() = None;
443 }
444
445 pub fn set_viewing_epoch(&self, epoch: EpochId) {
453 *self.viewing_epoch_override.lock() = Some(epoch);
454 }
455
456 pub fn clear_viewing_epoch(&self) {
458 *self.viewing_epoch_override.lock() = None;
459 }
460
461 #[must_use]
463 pub fn viewing_epoch(&self) -> Option<EpochId> {
464 *self.viewing_epoch_override.lock()
465 }
466
467 #[must_use]
471 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
472 self.active_lpg_store().get_node_history(id)
473 }
474
475 #[must_use]
479 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
480 self.active_lpg_store().get_edge_history(id)
481 }
482
483 fn require_lpg(&self, language: &str) -> Result<()> {
485 if self.graph_model == GraphModel::Rdf {
486 return Err(grafeo_common::utils::error::Error::Internal(format!(
487 "This is an RDF database. {language} queries require an LPG database."
488 )));
489 }
490 Ok(())
491 }
492
493 #[cfg(feature = "gql")]
495 fn execute_session_command(
496 &self,
497 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
498 ) -> Result<QueryResult> {
499 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
500 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
501
502 match cmd {
503 SessionCommand::CreateGraph {
504 name,
505 if_not_exists,
506 typed,
507 like_graph,
508 copy_of,
509 open: _,
510 } => {
511 if let Some(ref src) = like_graph
513 && self.store.graph(src).is_none()
514 {
515 return Err(Error::Query(QueryError::new(
516 QueryErrorKind::Semantic,
517 format!("Source graph '{src}' does not exist"),
518 )));
519 }
520 if let Some(ref src) = copy_of
521 && self.store.graph(src).is_none()
522 {
523 return Err(Error::Query(QueryError::new(
524 QueryErrorKind::Semantic,
525 format!("Source graph '{src}' does not exist"),
526 )));
527 }
528
529 let created = self
530 .store
531 .create_graph(&name)
532 .map_err(|e| Error::Internal(e.to_string()))?;
533 if !created && !if_not_exists {
534 return Err(Error::Query(QueryError::new(
535 QueryErrorKind::Semantic,
536 format!("Graph '{name}' already exists"),
537 )));
538 }
539 if created {
540 #[cfg(feature = "wal")]
541 self.log_schema_wal(
542 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
543 name: name.clone(),
544 },
545 );
546 }
547
548 if let Some(ref src) = copy_of {
550 self.store
551 .copy_graph(Some(src), Some(&name))
552 .map_err(|e| Error::Internal(e.to_string()))?;
553 }
554
555 if let Some(type_name) = typed
557 && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
558 {
559 return Err(Error::Query(QueryError::new(
560 QueryErrorKind::Semantic,
561 e.to_string(),
562 )));
563 }
564
565 if let Some(ref src) = like_graph
567 && let Some(src_type) = self.catalog.get_graph_type_binding(src)
568 {
569 let _ = self.catalog.bind_graph_type(&name, src_type);
570 }
571
572 Ok(QueryResult::empty())
573 }
574 SessionCommand::DropGraph { name, if_exists } => {
575 let dropped = self.store.drop_graph(&name);
576 if !dropped && !if_exists {
577 return Err(Error::Query(QueryError::new(
578 QueryErrorKind::Semantic,
579 format!("Graph '{name}' does not exist"),
580 )));
581 }
582 if dropped {
583 #[cfg(feature = "wal")]
584 self.log_schema_wal(
585 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
586 name: name.clone(),
587 },
588 );
589 let mut current = self.current_graph.lock();
591 if current
592 .as_deref()
593 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
594 {
595 *current = None;
596 }
597 }
598 Ok(QueryResult::empty())
599 }
600 SessionCommand::UseGraph(name) => {
601 if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
603 return Err(Error::Query(QueryError::new(
604 QueryErrorKind::Semantic,
605 format!("Graph '{name}' does not exist"),
606 )));
607 }
608 self.use_graph(&name);
609 self.track_graph_touch();
611 Ok(QueryResult::empty())
612 }
613 SessionCommand::SessionSetGraph(name) => {
614 if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
616 return Err(Error::Query(QueryError::new(
617 QueryErrorKind::Semantic,
618 format!("Graph '{name}' does not exist"),
619 )));
620 }
621 self.use_graph(&name);
622 self.track_graph_touch();
624 Ok(QueryResult::empty())
625 }
626 SessionCommand::SessionSetTimeZone(tz) => {
627 self.set_time_zone(&tz);
628 Ok(QueryResult::empty())
629 }
630 SessionCommand::SessionSetParameter(key, expr) => {
631 if key.eq_ignore_ascii_case("viewing_epoch") {
632 match Self::eval_integer_literal(&expr) {
633 Some(n) if n >= 0 => {
634 self.set_viewing_epoch(EpochId::new(n as u64));
635 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
636 }
637 _ => Err(Error::Query(QueryError::new(
638 QueryErrorKind::Semantic,
639 "viewing_epoch must be a non-negative integer literal",
640 ))),
641 }
642 } else {
643 self.set_parameter(&key, Value::Null);
646 Ok(QueryResult::empty())
647 }
648 }
649 SessionCommand::SessionReset => {
650 self.reset_session();
651 Ok(QueryResult::empty())
652 }
653 SessionCommand::SessionClose => {
654 self.reset_session();
655 Ok(QueryResult::empty())
656 }
657 SessionCommand::StartTransaction {
658 read_only,
659 isolation_level,
660 } => {
661 let engine_level = isolation_level.map(|l| match l {
662 TransactionIsolationLevel::ReadCommitted => {
663 crate::transaction::IsolationLevel::ReadCommitted
664 }
665 TransactionIsolationLevel::SnapshotIsolation => {
666 crate::transaction::IsolationLevel::SnapshotIsolation
667 }
668 TransactionIsolationLevel::Serializable => {
669 crate::transaction::IsolationLevel::Serializable
670 }
671 });
672 self.begin_transaction_inner(read_only, engine_level)?;
673 Ok(QueryResult::status("Transaction started"))
674 }
675 SessionCommand::Commit => {
676 self.commit_inner()?;
677 Ok(QueryResult::status("Transaction committed"))
678 }
679 SessionCommand::Rollback => {
680 self.rollback_inner()?;
681 Ok(QueryResult::status("Transaction rolled back"))
682 }
683 SessionCommand::Savepoint(name) => {
684 self.savepoint(&name)?;
685 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
686 }
687 SessionCommand::RollbackToSavepoint(name) => {
688 self.rollback_to_savepoint(&name)?;
689 Ok(QueryResult::status(format!(
690 "Rolled back to savepoint '{name}'"
691 )))
692 }
693 SessionCommand::ReleaseSavepoint(name) => {
694 self.release_savepoint(&name)?;
695 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
696 }
697 }
698 }
699
700 #[cfg(feature = "wal")]
702 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
703 if let Some(ref wal) = self.wal
704 && let Err(e) = wal.log(record)
705 {
706 tracing::warn!("Failed to log schema change to WAL: {}", e);
707 }
708 }
709
710 #[cfg(feature = "gql")]
712 fn execute_schema_command(
713 &self,
714 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
715 ) -> Result<QueryResult> {
716 use crate::catalog::{
717 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
718 };
719 use grafeo_adapters::query::gql::ast::SchemaStatement;
720 #[cfg(feature = "wal")]
721 use grafeo_adapters::storage::wal::WalRecord;
722 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
723
724 macro_rules! wal_log {
726 ($self:expr, $record:expr) => {
727 #[cfg(feature = "wal")]
728 $self.log_schema_wal(&$record);
729 };
730 }
731
732 let result = match cmd {
733 SchemaStatement::CreateNodeType(stmt) => {
734 #[cfg(feature = "wal")]
735 let props_for_wal: Vec<(String, String, bool)> = stmt
736 .properties
737 .iter()
738 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
739 .collect();
740 let def = NodeTypeDefinition {
741 name: stmt.name.clone(),
742 properties: stmt
743 .properties
744 .iter()
745 .map(|p| TypedProperty {
746 name: p.name.clone(),
747 data_type: PropertyDataType::from_type_name(&p.data_type),
748 nullable: p.nullable,
749 default_value: p
750 .default_value
751 .as_ref()
752 .map(|s| parse_default_literal(s)),
753 })
754 .collect(),
755 constraints: Vec::new(),
756 parent_types: stmt.parent_types.clone(),
757 };
758 let result = if stmt.or_replace {
759 let _ = self.catalog.drop_node_type(&stmt.name);
760 self.catalog.register_node_type(def)
761 } else {
762 self.catalog.register_node_type(def)
763 };
764 match result {
765 Ok(()) => {
766 wal_log!(
767 self,
768 WalRecord::CreateNodeType {
769 name: stmt.name.clone(),
770 properties: props_for_wal,
771 constraints: Vec::new(),
772 }
773 );
774 Ok(QueryResult::status(format!(
775 "Created node type '{}'",
776 stmt.name
777 )))
778 }
779 Err(e) if stmt.if_not_exists => {
780 let _ = e;
781 Ok(QueryResult::status("No change"))
782 }
783 Err(e) => Err(Error::Query(QueryError::new(
784 QueryErrorKind::Semantic,
785 e.to_string(),
786 ))),
787 }
788 }
789 SchemaStatement::CreateEdgeType(stmt) => {
790 #[cfg(feature = "wal")]
791 let props_for_wal: Vec<(String, String, bool)> = stmt
792 .properties
793 .iter()
794 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
795 .collect();
796 let def = EdgeTypeDefinition {
797 name: stmt.name.clone(),
798 properties: stmt
799 .properties
800 .iter()
801 .map(|p| TypedProperty {
802 name: p.name.clone(),
803 data_type: PropertyDataType::from_type_name(&p.data_type),
804 nullable: p.nullable,
805 default_value: p
806 .default_value
807 .as_ref()
808 .map(|s| parse_default_literal(s)),
809 })
810 .collect(),
811 constraints: Vec::new(),
812 source_node_types: stmt.source_node_types.clone(),
813 target_node_types: stmt.target_node_types.clone(),
814 };
815 let result = if stmt.or_replace {
816 let _ = self.catalog.drop_edge_type_def(&stmt.name);
817 self.catalog.register_edge_type_def(def)
818 } else {
819 self.catalog.register_edge_type_def(def)
820 };
821 match result {
822 Ok(()) => {
823 wal_log!(
824 self,
825 WalRecord::CreateEdgeType {
826 name: stmt.name.clone(),
827 properties: props_for_wal,
828 constraints: Vec::new(),
829 }
830 );
831 Ok(QueryResult::status(format!(
832 "Created edge type '{}'",
833 stmt.name
834 )))
835 }
836 Err(e) if stmt.if_not_exists => {
837 let _ = e;
838 Ok(QueryResult::status("No change"))
839 }
840 Err(e) => Err(Error::Query(QueryError::new(
841 QueryErrorKind::Semantic,
842 e.to_string(),
843 ))),
844 }
845 }
846 SchemaStatement::CreateVectorIndex(stmt) => {
847 Self::create_vector_index_on_store(
848 &self.active_lpg_store(),
849 &stmt.node_label,
850 &stmt.property,
851 stmt.dimensions,
852 stmt.metric.as_deref(),
853 )?;
854 wal_log!(
855 self,
856 WalRecord::CreateIndex {
857 name: stmt.name.clone(),
858 label: stmt.node_label.clone(),
859 property: stmt.property.clone(),
860 index_type: "vector".to_string(),
861 }
862 );
863 Ok(QueryResult::status(format!(
864 "Created vector index '{}'",
865 stmt.name
866 )))
867 }
868 SchemaStatement::DropNodeType { name, if_exists } => {
869 match self.catalog.drop_node_type(&name) {
870 Ok(()) => {
871 wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
872 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
873 }
874 Err(e) if if_exists => {
875 let _ = e;
876 Ok(QueryResult::status("No change"))
877 }
878 Err(e) => Err(Error::Query(QueryError::new(
879 QueryErrorKind::Semantic,
880 e.to_string(),
881 ))),
882 }
883 }
884 SchemaStatement::DropEdgeType { name, if_exists } => {
885 match self.catalog.drop_edge_type_def(&name) {
886 Ok(()) => {
887 wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
888 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
889 }
890 Err(e) if if_exists => {
891 let _ = e;
892 Ok(QueryResult::status("No change"))
893 }
894 Err(e) => Err(Error::Query(QueryError::new(
895 QueryErrorKind::Semantic,
896 e.to_string(),
897 ))),
898 }
899 }
900 SchemaStatement::CreateIndex(stmt) => {
901 use grafeo_adapters::query::gql::ast::IndexKind;
902 let active = self.active_lpg_store();
903 let index_type_str = match stmt.index_kind {
904 IndexKind::Property => "property",
905 IndexKind::BTree => "btree",
906 IndexKind::Text => "text",
907 IndexKind::Vector => "vector",
908 };
909 match stmt.index_kind {
910 IndexKind::Property | IndexKind::BTree => {
911 for prop in &stmt.properties {
912 active.create_property_index(prop);
913 }
914 }
915 IndexKind::Text => {
916 for prop in &stmt.properties {
917 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
918 }
919 }
920 IndexKind::Vector => {
921 for prop in &stmt.properties {
922 Self::create_vector_index_on_store(
923 &active,
924 &stmt.label,
925 prop,
926 stmt.options.dimensions,
927 stmt.options.metric.as_deref(),
928 )?;
929 }
930 }
931 }
932 #[cfg(feature = "wal")]
933 for prop in &stmt.properties {
934 wal_log!(
935 self,
936 WalRecord::CreateIndex {
937 name: stmt.name.clone(),
938 label: stmt.label.clone(),
939 property: prop.clone(),
940 index_type: index_type_str.to_string(),
941 }
942 );
943 }
944 Ok(QueryResult::status(format!(
945 "Created {} index '{}'",
946 index_type_str, stmt.name
947 )))
948 }
949 SchemaStatement::DropIndex { name, if_exists } => {
950 let dropped = self.active_lpg_store().drop_property_index(&name);
952 if dropped || if_exists {
953 if dropped {
954 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
955 }
956 Ok(QueryResult::status(if dropped {
957 format!("Dropped index '{name}'")
958 } else {
959 "No change".to_string()
960 }))
961 } else {
962 Err(Error::Query(QueryError::new(
963 QueryErrorKind::Semantic,
964 format!("Index '{name}' does not exist"),
965 )))
966 }
967 }
968 SchemaStatement::CreateConstraint(stmt) => {
969 use crate::catalog::TypeConstraint;
970 use grafeo_adapters::query::gql::ast::ConstraintKind;
971 let kind_str = match stmt.constraint_kind {
972 ConstraintKind::Unique => "unique",
973 ConstraintKind::NodeKey => "node_key",
974 ConstraintKind::NotNull => "not_null",
975 ConstraintKind::Exists => "exists",
976 };
977 let constraint_name = stmt
978 .name
979 .clone()
980 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
981
982 match stmt.constraint_kind {
984 ConstraintKind::Unique => {
985 for prop in &stmt.properties {
986 let label_id = self.catalog.get_or_create_label(&stmt.label);
987 let prop_id = self.catalog.get_or_create_property_key(prop);
988 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
989 }
990 let _ = self.catalog.add_constraint_to_type(
991 &stmt.label,
992 TypeConstraint::Unique(stmt.properties.clone()),
993 );
994 }
995 ConstraintKind::NodeKey => {
996 for prop in &stmt.properties {
997 let label_id = self.catalog.get_or_create_label(&stmt.label);
998 let prop_id = self.catalog.get_or_create_property_key(prop);
999 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1000 let _ = self.catalog.add_required_property(label_id, prop_id);
1001 }
1002 let _ = self.catalog.add_constraint_to_type(
1003 &stmt.label,
1004 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1005 );
1006 }
1007 ConstraintKind::NotNull | ConstraintKind::Exists => {
1008 for prop in &stmt.properties {
1009 let label_id = self.catalog.get_or_create_label(&stmt.label);
1010 let prop_id = self.catalog.get_or_create_property_key(prop);
1011 let _ = self.catalog.add_required_property(label_id, prop_id);
1012 let _ = self.catalog.add_constraint_to_type(
1013 &stmt.label,
1014 TypeConstraint::NotNull(prop.clone()),
1015 );
1016 }
1017 }
1018 }
1019
1020 wal_log!(
1021 self,
1022 WalRecord::CreateConstraint {
1023 name: constraint_name.clone(),
1024 label: stmt.label.clone(),
1025 properties: stmt.properties.clone(),
1026 kind: kind_str.to_string(),
1027 }
1028 );
1029 Ok(QueryResult::status(format!(
1030 "Created {kind_str} constraint '{constraint_name}'"
1031 )))
1032 }
1033 SchemaStatement::DropConstraint { name, if_exists } => {
1034 let _ = if_exists;
1035 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1036 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1037 }
1038 SchemaStatement::CreateGraphType(stmt) => {
1039 use crate::catalog::GraphTypeDefinition;
1040 use grafeo_adapters::query::gql::ast::InlineElementType;
1041
1042 let (mut node_types, mut edge_types, open) =
1044 if let Some(ref like_graph) = stmt.like_graph {
1045 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1047 if let Some(existing) = self
1048 .catalog
1049 .schema()
1050 .and_then(|s| s.get_graph_type(&type_name))
1051 {
1052 (
1053 existing.allowed_node_types.clone(),
1054 existing.allowed_edge_types.clone(),
1055 existing.open,
1056 )
1057 } else {
1058 (Vec::new(), Vec::new(), true)
1059 }
1060 } else {
1061 let nt = self.catalog.all_node_type_names();
1063 let et = self.catalog.all_edge_type_names();
1064 if nt.is_empty() && et.is_empty() {
1065 (Vec::new(), Vec::new(), true)
1066 } else {
1067 (nt, et, false)
1068 }
1069 }
1070 } else {
1071 (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1072 };
1073
1074 for inline in &stmt.inline_types {
1076 match inline {
1077 InlineElementType::Node {
1078 name,
1079 properties,
1080 key_labels,
1081 ..
1082 } => {
1083 let def = NodeTypeDefinition {
1084 name: name.clone(),
1085 properties: properties
1086 .iter()
1087 .map(|p| TypedProperty {
1088 name: p.name.clone(),
1089 data_type: PropertyDataType::from_type_name(&p.data_type),
1090 nullable: p.nullable,
1091 default_value: None,
1092 })
1093 .collect(),
1094 constraints: Vec::new(),
1095 parent_types: key_labels.clone(),
1096 };
1097 self.catalog.register_or_replace_node_type(def);
1099 #[cfg(feature = "wal")]
1100 {
1101 let props_for_wal: Vec<(String, String, bool)> = properties
1102 .iter()
1103 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1104 .collect();
1105 self.log_schema_wal(&WalRecord::CreateNodeType {
1106 name: name.clone(),
1107 properties: props_for_wal,
1108 constraints: Vec::new(),
1109 });
1110 }
1111 if !node_types.contains(name) {
1112 node_types.push(name.clone());
1113 }
1114 }
1115 InlineElementType::Edge {
1116 name,
1117 properties,
1118 source_node_types,
1119 target_node_types,
1120 ..
1121 } => {
1122 let def = EdgeTypeDefinition {
1123 name: name.clone(),
1124 properties: properties
1125 .iter()
1126 .map(|p| TypedProperty {
1127 name: p.name.clone(),
1128 data_type: PropertyDataType::from_type_name(&p.data_type),
1129 nullable: p.nullable,
1130 default_value: None,
1131 })
1132 .collect(),
1133 constraints: Vec::new(),
1134 source_node_types: source_node_types.clone(),
1135 target_node_types: target_node_types.clone(),
1136 };
1137 self.catalog.register_or_replace_edge_type_def(def);
1138 #[cfg(feature = "wal")]
1139 {
1140 let props_for_wal: Vec<(String, String, bool)> = properties
1141 .iter()
1142 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1143 .collect();
1144 self.log_schema_wal(&WalRecord::CreateEdgeType {
1145 name: name.clone(),
1146 properties: props_for_wal,
1147 constraints: Vec::new(),
1148 });
1149 }
1150 if !edge_types.contains(name) {
1151 edge_types.push(name.clone());
1152 }
1153 }
1154 }
1155 }
1156
1157 let def = GraphTypeDefinition {
1158 name: stmt.name.clone(),
1159 allowed_node_types: node_types.clone(),
1160 allowed_edge_types: edge_types.clone(),
1161 open,
1162 };
1163 let result = if stmt.or_replace {
1164 let _ = self.catalog.drop_graph_type(&stmt.name);
1166 self.catalog.register_graph_type(def)
1167 } else {
1168 self.catalog.register_graph_type(def)
1169 };
1170 match result {
1171 Ok(()) => {
1172 wal_log!(
1173 self,
1174 WalRecord::CreateGraphType {
1175 name: stmt.name.clone(),
1176 node_types,
1177 edge_types,
1178 open,
1179 }
1180 );
1181 Ok(QueryResult::status(format!(
1182 "Created graph type '{}'",
1183 stmt.name
1184 )))
1185 }
1186 Err(e) if stmt.if_not_exists => {
1187 let _ = e;
1188 Ok(QueryResult::status("No change"))
1189 }
1190 Err(e) => Err(Error::Query(QueryError::new(
1191 QueryErrorKind::Semantic,
1192 e.to_string(),
1193 ))),
1194 }
1195 }
1196 SchemaStatement::DropGraphType { name, if_exists } => {
1197 match self.catalog.drop_graph_type(&name) {
1198 Ok(()) => {
1199 wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1200 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1201 }
1202 Err(e) if if_exists => {
1203 let _ = e;
1204 Ok(QueryResult::status("No change"))
1205 }
1206 Err(e) => Err(Error::Query(QueryError::new(
1207 QueryErrorKind::Semantic,
1208 e.to_string(),
1209 ))),
1210 }
1211 }
1212 SchemaStatement::CreateSchema {
1213 name,
1214 if_not_exists,
1215 } => match self.catalog.register_schema_namespace(name.clone()) {
1216 Ok(()) => {
1217 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1218 Ok(QueryResult::status(format!("Created schema '{name}'")))
1219 }
1220 Err(e) if if_not_exists => {
1221 let _ = e;
1222 Ok(QueryResult::status("No change"))
1223 }
1224 Err(e) => Err(Error::Query(QueryError::new(
1225 QueryErrorKind::Semantic,
1226 e.to_string(),
1227 ))),
1228 },
1229 SchemaStatement::DropSchema { name, if_exists } => {
1230 match self.catalog.drop_schema_namespace(&name) {
1231 Ok(()) => {
1232 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1233 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1234 }
1235 Err(e) if if_exists => {
1236 let _ = e;
1237 Ok(QueryResult::status("No change"))
1238 }
1239 Err(e) => Err(Error::Query(QueryError::new(
1240 QueryErrorKind::Semantic,
1241 e.to_string(),
1242 ))),
1243 }
1244 }
1245 SchemaStatement::AlterNodeType(stmt) => {
1246 use grafeo_adapters::query::gql::ast::TypeAlteration;
1247 let mut wal_alts = Vec::new();
1248 for alt in &stmt.alterations {
1249 match alt {
1250 TypeAlteration::AddProperty(prop) => {
1251 let typed = TypedProperty {
1252 name: prop.name.clone(),
1253 data_type: PropertyDataType::from_type_name(&prop.data_type),
1254 nullable: prop.nullable,
1255 default_value: prop
1256 .default_value
1257 .as_ref()
1258 .map(|s| parse_default_literal(s)),
1259 };
1260 self.catalog
1261 .alter_node_type_add_property(&stmt.name, typed)
1262 .map_err(|e| {
1263 Error::Query(QueryError::new(
1264 QueryErrorKind::Semantic,
1265 e.to_string(),
1266 ))
1267 })?;
1268 wal_alts.push((
1269 "add".to_string(),
1270 prop.name.clone(),
1271 prop.data_type.clone(),
1272 prop.nullable,
1273 ));
1274 }
1275 TypeAlteration::DropProperty(name) => {
1276 self.catalog
1277 .alter_node_type_drop_property(&stmt.name, name)
1278 .map_err(|e| {
1279 Error::Query(QueryError::new(
1280 QueryErrorKind::Semantic,
1281 e.to_string(),
1282 ))
1283 })?;
1284 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1285 }
1286 }
1287 }
1288 wal_log!(
1289 self,
1290 WalRecord::AlterNodeType {
1291 name: stmt.name.clone(),
1292 alterations: wal_alts,
1293 }
1294 );
1295 Ok(QueryResult::status(format!(
1296 "Altered node type '{}'",
1297 stmt.name
1298 )))
1299 }
1300 SchemaStatement::AlterEdgeType(stmt) => {
1301 use grafeo_adapters::query::gql::ast::TypeAlteration;
1302 let mut wal_alts = Vec::new();
1303 for alt in &stmt.alterations {
1304 match alt {
1305 TypeAlteration::AddProperty(prop) => {
1306 let typed = TypedProperty {
1307 name: prop.name.clone(),
1308 data_type: PropertyDataType::from_type_name(&prop.data_type),
1309 nullable: prop.nullable,
1310 default_value: prop
1311 .default_value
1312 .as_ref()
1313 .map(|s| parse_default_literal(s)),
1314 };
1315 self.catalog
1316 .alter_edge_type_add_property(&stmt.name, typed)
1317 .map_err(|e| {
1318 Error::Query(QueryError::new(
1319 QueryErrorKind::Semantic,
1320 e.to_string(),
1321 ))
1322 })?;
1323 wal_alts.push((
1324 "add".to_string(),
1325 prop.name.clone(),
1326 prop.data_type.clone(),
1327 prop.nullable,
1328 ));
1329 }
1330 TypeAlteration::DropProperty(name) => {
1331 self.catalog
1332 .alter_edge_type_drop_property(&stmt.name, name)
1333 .map_err(|e| {
1334 Error::Query(QueryError::new(
1335 QueryErrorKind::Semantic,
1336 e.to_string(),
1337 ))
1338 })?;
1339 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1340 }
1341 }
1342 }
1343 wal_log!(
1344 self,
1345 WalRecord::AlterEdgeType {
1346 name: stmt.name.clone(),
1347 alterations: wal_alts,
1348 }
1349 );
1350 Ok(QueryResult::status(format!(
1351 "Altered edge type '{}'",
1352 stmt.name
1353 )))
1354 }
1355 SchemaStatement::AlterGraphType(stmt) => {
1356 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1357 let mut wal_alts = Vec::new();
1358 for alt in &stmt.alterations {
1359 match alt {
1360 GraphTypeAlteration::AddNodeType(name) => {
1361 self.catalog
1362 .alter_graph_type_add_node_type(&stmt.name, name.clone())
1363 .map_err(|e| {
1364 Error::Query(QueryError::new(
1365 QueryErrorKind::Semantic,
1366 e.to_string(),
1367 ))
1368 })?;
1369 wal_alts.push(("add_node_type".to_string(), name.clone()));
1370 }
1371 GraphTypeAlteration::DropNodeType(name) => {
1372 self.catalog
1373 .alter_graph_type_drop_node_type(&stmt.name, name)
1374 .map_err(|e| {
1375 Error::Query(QueryError::new(
1376 QueryErrorKind::Semantic,
1377 e.to_string(),
1378 ))
1379 })?;
1380 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1381 }
1382 GraphTypeAlteration::AddEdgeType(name) => {
1383 self.catalog
1384 .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1385 .map_err(|e| {
1386 Error::Query(QueryError::new(
1387 QueryErrorKind::Semantic,
1388 e.to_string(),
1389 ))
1390 })?;
1391 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1392 }
1393 GraphTypeAlteration::DropEdgeType(name) => {
1394 self.catalog
1395 .alter_graph_type_drop_edge_type(&stmt.name, name)
1396 .map_err(|e| {
1397 Error::Query(QueryError::new(
1398 QueryErrorKind::Semantic,
1399 e.to_string(),
1400 ))
1401 })?;
1402 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1403 }
1404 }
1405 }
1406 wal_log!(
1407 self,
1408 WalRecord::AlterGraphType {
1409 name: stmt.name.clone(),
1410 alterations: wal_alts,
1411 }
1412 );
1413 Ok(QueryResult::status(format!(
1414 "Altered graph type '{}'",
1415 stmt.name
1416 )))
1417 }
1418 SchemaStatement::CreateProcedure(stmt) => {
1419 use crate::catalog::ProcedureDefinition;
1420
1421 let def = ProcedureDefinition {
1422 name: stmt.name.clone(),
1423 params: stmt
1424 .params
1425 .iter()
1426 .map(|p| (p.name.clone(), p.param_type.clone()))
1427 .collect(),
1428 returns: stmt
1429 .returns
1430 .iter()
1431 .map(|r| (r.name.clone(), r.return_type.clone()))
1432 .collect(),
1433 body: stmt.body.clone(),
1434 };
1435
1436 if stmt.or_replace {
1437 self.catalog.replace_procedure(def).map_err(|e| {
1438 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1439 })?;
1440 } else {
1441 match self.catalog.register_procedure(def) {
1442 Ok(()) => {}
1443 Err(_) if stmt.if_not_exists => {
1444 return Ok(QueryResult::empty());
1445 }
1446 Err(e) => {
1447 return Err(Error::Query(QueryError::new(
1448 QueryErrorKind::Semantic,
1449 e.to_string(),
1450 )));
1451 }
1452 }
1453 }
1454
1455 wal_log!(
1456 self,
1457 WalRecord::CreateProcedure {
1458 name: stmt.name.clone(),
1459 params: stmt
1460 .params
1461 .iter()
1462 .map(|p| (p.name.clone(), p.param_type.clone()))
1463 .collect(),
1464 returns: stmt
1465 .returns
1466 .iter()
1467 .map(|r| (r.name.clone(), r.return_type.clone()))
1468 .collect(),
1469 body: stmt.body,
1470 }
1471 );
1472 Ok(QueryResult::status(format!(
1473 "Created procedure '{}'",
1474 stmt.name
1475 )))
1476 }
1477 SchemaStatement::DropProcedure { name, if_exists } => {
1478 match self.catalog.drop_procedure(&name) {
1479 Ok(()) => {}
1480 Err(_) if if_exists => {
1481 return Ok(QueryResult::empty());
1482 }
1483 Err(e) => {
1484 return Err(Error::Query(QueryError::new(
1485 QueryErrorKind::Semantic,
1486 e.to_string(),
1487 )));
1488 }
1489 }
1490 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1491 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1492 }
1493 SchemaStatement::ShowIndexes => {
1494 return self.execute_show_indexes();
1495 }
1496 SchemaStatement::ShowConstraints => {
1497 return self.execute_show_constraints();
1498 }
1499 SchemaStatement::ShowNodeTypes => {
1500 return self.execute_show_node_types();
1501 }
1502 SchemaStatement::ShowEdgeTypes => {
1503 return self.execute_show_edge_types();
1504 }
1505 SchemaStatement::ShowGraphTypes => {
1506 return self.execute_show_graph_types();
1507 }
1508 SchemaStatement::ShowGraphType(name) => {
1509 return self.execute_show_graph_type(&name);
1510 }
1511 SchemaStatement::ShowCurrentGraphType => {
1512 return self.execute_show_current_graph_type();
1513 }
1514 SchemaStatement::ShowGraphs => {
1515 return self.execute_show_graphs();
1516 }
1517 };
1518
1519 if result.is_ok() {
1522 self.query_cache.clear();
1523 }
1524
1525 result
1526 }
1527
1528 #[cfg(all(feature = "gql", feature = "vector-index"))]
1530 fn create_vector_index_on_store(
1531 store: &LpgStore,
1532 label: &str,
1533 property: &str,
1534 dimensions: Option<usize>,
1535 metric: Option<&str>,
1536 ) -> Result<()> {
1537 use grafeo_common::types::{PropertyKey, Value};
1538 use grafeo_common::utils::error::Error;
1539 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1540
1541 let metric = match metric {
1542 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1543 Error::Internal(format!(
1544 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1545 ))
1546 })?,
1547 None => DistanceMetric::Cosine,
1548 };
1549
1550 let prop_key = PropertyKey::new(property);
1551 let mut found_dims: Option<usize> = dimensions;
1552 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1553
1554 for node in store.nodes_with_label(label) {
1555 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1556 if let Some(expected) = found_dims {
1557 if v.len() != expected {
1558 return Err(Error::Internal(format!(
1559 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1560 v.len(),
1561 node.id.0
1562 )));
1563 }
1564 } else {
1565 found_dims = Some(v.len());
1566 }
1567 vectors.push((node.id, v.to_vec()));
1568 }
1569 }
1570
1571 let Some(dims) = found_dims else {
1572 return Err(Error::Internal(format!(
1573 "No vector properties found on :{label}({property}) and no dimensions specified"
1574 )));
1575 };
1576
1577 let config = HnswConfig::new(dims, metric);
1578 let index = HnswIndex::with_capacity(config, vectors.len());
1579 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1580 for (node_id, vec) in &vectors {
1581 index.insert(*node_id, vec, &accessor);
1582 }
1583
1584 store.add_vector_index(label, property, Arc::new(index));
1585 Ok(())
1586 }
1587
1588 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1590 fn create_vector_index_on_store(
1591 _store: &LpgStore,
1592 _label: &str,
1593 _property: &str,
1594 _dimensions: Option<usize>,
1595 _metric: Option<&str>,
1596 ) -> Result<()> {
1597 Err(grafeo_common::utils::error::Error::Internal(
1598 "Vector index support requires the 'vector-index' feature".to_string(),
1599 ))
1600 }
1601
1602 #[cfg(all(feature = "gql", feature = "text-index"))]
1604 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1605 use grafeo_common::types::{PropertyKey, Value};
1606 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1607
1608 let mut index = InvertedIndex::new(BM25Config::default());
1609 let prop_key = PropertyKey::new(property);
1610
1611 let nodes = store.nodes_by_label(label);
1612 for node_id in nodes {
1613 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1614 index.insert(node_id, text.as_str());
1615 }
1616 }
1617
1618 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1619 Ok(())
1620 }
1621
1622 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1624 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1625 Err(grafeo_common::utils::error::Error::Internal(
1626 "Text index support requires the 'text-index' feature".to_string(),
1627 ))
1628 }
1629
1630 fn execute_show_indexes(&self) -> Result<QueryResult> {
1632 let indexes = self.catalog.all_indexes();
1633 let columns = vec![
1634 "name".to_string(),
1635 "type".to_string(),
1636 "label".to_string(),
1637 "property".to_string(),
1638 ];
1639 let rows: Vec<Vec<Value>> = indexes
1640 .into_iter()
1641 .map(|def| {
1642 let label_name = self
1643 .catalog
1644 .get_label_name(def.label)
1645 .unwrap_or_else(|| "?".into());
1646 let prop_name = self
1647 .catalog
1648 .get_property_key_name(def.property_key)
1649 .unwrap_or_else(|| "?".into());
1650 vec![
1651 Value::from(format!("idx_{}_{}", label_name, prop_name)),
1652 Value::from(format!("{:?}", def.index_type)),
1653 Value::from(&*label_name),
1654 Value::from(&*prop_name),
1655 ]
1656 })
1657 .collect();
1658 Ok(QueryResult {
1659 columns,
1660 column_types: Vec::new(),
1661 rows,
1662 ..QueryResult::empty()
1663 })
1664 }
1665
1666 fn execute_show_constraints(&self) -> Result<QueryResult> {
1668 Ok(QueryResult {
1671 columns: vec![
1672 "name".to_string(),
1673 "type".to_string(),
1674 "label".to_string(),
1675 "properties".to_string(),
1676 ],
1677 column_types: Vec::new(),
1678 rows: Vec::new(),
1679 ..QueryResult::empty()
1680 })
1681 }
1682
1683 fn execute_show_node_types(&self) -> Result<QueryResult> {
1685 let columns = vec![
1686 "name".to_string(),
1687 "properties".to_string(),
1688 "constraints".to_string(),
1689 "parents".to_string(),
1690 ];
1691 let type_names = self.catalog.all_node_type_names();
1692 let rows: Vec<Vec<Value>> = type_names
1693 .into_iter()
1694 .filter_map(|name| {
1695 let def = self.catalog.get_node_type(&name)?;
1696 let props: Vec<String> = def
1697 .properties
1698 .iter()
1699 .map(|p| {
1700 let nullable = if p.nullable { "" } else { " NOT NULL" };
1701 format!("{} {}{}", p.name, p.data_type, nullable)
1702 })
1703 .collect();
1704 let constraints: Vec<String> =
1705 def.constraints.iter().map(|c| format!("{c:?}")).collect();
1706 let parents = def.parent_types.join(", ");
1707 Some(vec![
1708 Value::from(name),
1709 Value::from(props.join(", ")),
1710 Value::from(constraints.join(", ")),
1711 Value::from(parents),
1712 ])
1713 })
1714 .collect();
1715 Ok(QueryResult {
1716 columns,
1717 column_types: Vec::new(),
1718 rows,
1719 ..QueryResult::empty()
1720 })
1721 }
1722
1723 fn execute_show_edge_types(&self) -> Result<QueryResult> {
1725 let columns = vec![
1726 "name".to_string(),
1727 "properties".to_string(),
1728 "source_types".to_string(),
1729 "target_types".to_string(),
1730 ];
1731 let type_names = self.catalog.all_edge_type_names();
1732 let rows: Vec<Vec<Value>> = type_names
1733 .into_iter()
1734 .filter_map(|name| {
1735 let def = self.catalog.get_edge_type_def(&name)?;
1736 let props: Vec<String> = def
1737 .properties
1738 .iter()
1739 .map(|p| {
1740 let nullable = if p.nullable { "" } else { " NOT NULL" };
1741 format!("{} {}{}", p.name, p.data_type, nullable)
1742 })
1743 .collect();
1744 let src = def.source_node_types.join(", ");
1745 let tgt = def.target_node_types.join(", ");
1746 Some(vec![
1747 Value::from(name),
1748 Value::from(props.join(", ")),
1749 Value::from(src),
1750 Value::from(tgt),
1751 ])
1752 })
1753 .collect();
1754 Ok(QueryResult {
1755 columns,
1756 column_types: Vec::new(),
1757 rows,
1758 ..QueryResult::empty()
1759 })
1760 }
1761
1762 fn execute_show_graph_types(&self) -> Result<QueryResult> {
1764 let columns = vec![
1765 "name".to_string(),
1766 "open".to_string(),
1767 "node_types".to_string(),
1768 "edge_types".to_string(),
1769 ];
1770 let type_names = self.catalog.all_graph_type_names();
1771 let rows: Vec<Vec<Value>> = type_names
1772 .into_iter()
1773 .filter_map(|name| {
1774 let def = self.catalog.get_graph_type_def(&name)?;
1775 Some(vec![
1776 Value::from(name),
1777 Value::from(def.open),
1778 Value::from(def.allowed_node_types.join(", ")),
1779 Value::from(def.allowed_edge_types.join(", ")),
1780 ])
1781 })
1782 .collect();
1783 Ok(QueryResult {
1784 columns,
1785 column_types: Vec::new(),
1786 rows,
1787 ..QueryResult::empty()
1788 })
1789 }
1790
1791 fn execute_show_graphs(&self) -> Result<QueryResult> {
1793 let mut names = self.store.graph_names();
1794 names.sort();
1795 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1796 Ok(QueryResult {
1797 columns: vec!["name".to_string()],
1798 column_types: Vec::new(),
1799 rows,
1800 ..QueryResult::empty()
1801 })
1802 }
1803
1804 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1806 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1807
1808 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1809 Error::Query(QueryError::new(
1810 QueryErrorKind::Semantic,
1811 format!("Graph type '{name}' not found"),
1812 ))
1813 })?;
1814
1815 let columns = vec![
1816 "name".to_string(),
1817 "open".to_string(),
1818 "node_types".to_string(),
1819 "edge_types".to_string(),
1820 ];
1821 let rows = vec![vec![
1822 Value::from(def.name),
1823 Value::from(def.open),
1824 Value::from(def.allowed_node_types.join(", ")),
1825 Value::from(def.allowed_edge_types.join(", ")),
1826 ]];
1827 Ok(QueryResult {
1828 columns,
1829 column_types: Vec::new(),
1830 rows,
1831 ..QueryResult::empty()
1832 })
1833 }
1834
1835 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1837 let graph_name = self
1838 .current_graph()
1839 .unwrap_or_else(|| "default".to_string());
1840 let columns = vec![
1841 "graph".to_string(),
1842 "graph_type".to_string(),
1843 "open".to_string(),
1844 "node_types".to_string(),
1845 "edge_types".to_string(),
1846 ];
1847
1848 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
1849 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
1850 {
1851 let rows = vec![vec![
1852 Value::from(graph_name),
1853 Value::from(type_name),
1854 Value::from(def.open),
1855 Value::from(def.allowed_node_types.join(", ")),
1856 Value::from(def.allowed_edge_types.join(", ")),
1857 ]];
1858 return Ok(QueryResult {
1859 columns,
1860 column_types: Vec::new(),
1861 rows,
1862 ..QueryResult::empty()
1863 });
1864 }
1865
1866 Ok(QueryResult {
1868 columns,
1869 column_types: Vec::new(),
1870 rows: vec![vec![
1871 Value::from(graph_name),
1872 Value::Null,
1873 Value::Null,
1874 Value::Null,
1875 Value::Null,
1876 ]],
1877 ..QueryResult::empty()
1878 })
1879 }
1880
1881 #[cfg(feature = "gql")]
1908 pub fn execute(&self, query: &str) -> Result<QueryResult> {
1909 self.require_lpg("GQL")?;
1910
1911 use crate::query::{
1912 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1913 processor::QueryLanguage, translators::gql,
1914 };
1915
1916 #[cfg(not(target_arch = "wasm32"))]
1917 let start_time = std::time::Instant::now();
1918
1919 let translation = gql::translate_full(query)?;
1921 let logical_plan = match translation {
1922 gql::GqlTranslationResult::SessionCommand(cmd) => {
1923 return self.execute_session_command(cmd);
1924 }
1925 gql::GqlTranslationResult::SchemaCommand(cmd) => {
1926 if *self.read_only_tx.lock() {
1928 return Err(grafeo_common::utils::error::Error::Transaction(
1929 grafeo_common::utils::error::TransactionError::ReadOnly,
1930 ));
1931 }
1932 return self.execute_schema_command(cmd);
1933 }
1934 gql::GqlTranslationResult::Plan(plan) => {
1935 if *self.read_only_tx.lock() && plan.root.has_mutations() {
1937 return Err(grafeo_common::utils::error::Error::Transaction(
1938 grafeo_common::utils::error::TransactionError::ReadOnly,
1939 ));
1940 }
1941 plan
1942 }
1943 };
1944
1945 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
1947
1948 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1950 cached_plan
1951 } else {
1952 let mut binder = Binder::new();
1954 let _binding_context = binder.bind(&logical_plan)?;
1955
1956 let active = self.active_store();
1958 let optimizer = Optimizer::from_graph_store(&*active);
1959 let plan = optimizer.optimize(logical_plan)?;
1960
1961 self.query_cache.put_optimized(cache_key, plan.clone());
1963
1964 plan
1965 };
1966
1967 let active = self.active_store();
1969
1970 if optimized_plan.explain {
1972 use crate::query::processor::{annotate_pushdown_hints, explain_result};
1973 let mut plan = optimized_plan;
1974 annotate_pushdown_hints(&mut plan.root, active.as_ref());
1975 return Ok(explain_result(&plan));
1976 }
1977
1978 if optimized_plan.profile {
1980 let has_mutations = optimized_plan.root.has_mutations();
1981 return self.with_auto_commit(has_mutations, || {
1982 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1983 let planner = self.create_planner_for_store(
1984 Arc::clone(&active),
1985 viewing_epoch,
1986 transaction_id,
1987 );
1988 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
1989
1990 let executor = Executor::with_columns(physical_plan.columns.clone())
1991 .with_deadline(self.query_deadline());
1992 let _result = executor.execute(physical_plan.operator.as_mut())?;
1993
1994 let total_time_ms;
1995 #[cfg(not(target_arch = "wasm32"))]
1996 {
1997 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1998 }
1999 #[cfg(target_arch = "wasm32")]
2000 {
2001 total_time_ms = 0.0;
2002 }
2003
2004 let profile_tree = crate::query::profile::build_profile_tree(
2005 &optimized_plan.root,
2006 &mut entries.into_iter(),
2007 );
2008 Ok(crate::query::profile::profile_result(
2009 &profile_tree,
2010 total_time_ms,
2011 ))
2012 });
2013 }
2014
2015 let has_mutations = optimized_plan.root.has_mutations();
2016
2017 self.with_auto_commit(has_mutations, || {
2018 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2020
2021 let planner =
2024 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2025 let mut physical_plan = planner.plan(&optimized_plan)?;
2026
2027 let executor = Executor::with_columns(physical_plan.columns.clone())
2029 .with_deadline(self.query_deadline());
2030 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2031
2032 let rows_scanned = result.rows.len() as u64;
2034 #[cfg(not(target_arch = "wasm32"))]
2035 {
2036 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2037 result.execution_time_ms = Some(elapsed_ms);
2038 }
2039 result.rows_scanned = Some(rows_scanned);
2040
2041 Ok(result)
2042 })
2043 }
2044
2045 #[cfg(feature = "gql")]
2054 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2055 let previous = self.viewing_epoch_override.lock().replace(epoch);
2056 let result = self.execute(query);
2057 *self.viewing_epoch_override.lock() = previous;
2058 result
2059 }
2060
2061 #[cfg(feature = "gql")]
2067 pub fn execute_with_params(
2068 &self,
2069 query: &str,
2070 params: std::collections::HashMap<String, Value>,
2071 ) -> Result<QueryResult> {
2072 self.require_lpg("GQL")?;
2073
2074 use crate::query::processor::{QueryLanguage, QueryProcessor};
2075
2076 let has_mutations = Self::query_looks_like_mutation(query);
2077 let active = self.active_store();
2078
2079 self.with_auto_commit(has_mutations, || {
2080 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2082
2083 let processor = QueryProcessor::for_graph_store_with_transaction(
2085 Arc::clone(&active),
2086 Arc::clone(&self.transaction_manager),
2087 )?;
2088
2089 let processor = if let Some(transaction_id) = transaction_id {
2091 processor.with_transaction_context(viewing_epoch, transaction_id)
2092 } else {
2093 processor
2094 };
2095
2096 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2097 })
2098 }
2099
2100 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2106 pub fn execute_with_params(
2107 &self,
2108 _query: &str,
2109 _params: std::collections::HashMap<String, Value>,
2110 ) -> Result<QueryResult> {
2111 Err(grafeo_common::utils::error::Error::Internal(
2112 "No query language enabled".to_string(),
2113 ))
2114 }
2115
2116 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2122 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2123 Err(grafeo_common::utils::error::Error::Internal(
2124 "No query language enabled".to_string(),
2125 ))
2126 }
2127
2128 #[cfg(feature = "cypher")]
2134 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2135 use crate::query::{
2136 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2137 processor::QueryLanguage, translators::cypher,
2138 };
2139 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2140
2141 let translation = cypher::translate_full(query)?;
2143 match translation {
2144 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2145 if *self.read_only_tx.lock() {
2146 return Err(GrafeoError::Query(QueryError::new(
2147 QueryErrorKind::Semantic,
2148 "Cannot execute schema DDL in a read-only transaction",
2149 )));
2150 }
2151 return self.execute_schema_command(cmd);
2152 }
2153 cypher::CypherTranslationResult::ShowIndexes => {
2154 return self.execute_show_indexes();
2155 }
2156 cypher::CypherTranslationResult::ShowConstraints => {
2157 return self.execute_show_constraints();
2158 }
2159 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2160 return self.execute_show_current_graph_type();
2161 }
2162 cypher::CypherTranslationResult::Plan(_) => {
2163 }
2165 }
2166
2167 #[cfg(not(target_arch = "wasm32"))]
2168 let start_time = std::time::Instant::now();
2169
2170 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2172
2173 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2175 cached_plan
2176 } else {
2177 let logical_plan = cypher::translate(query)?;
2179
2180 let mut binder = Binder::new();
2182 let _binding_context = binder.bind(&logical_plan)?;
2183
2184 let active = self.active_store();
2186 let optimizer = Optimizer::from_graph_store(&*active);
2187 let plan = optimizer.optimize(logical_plan)?;
2188
2189 self.query_cache.put_optimized(cache_key, plan.clone());
2191
2192 plan
2193 };
2194
2195 let active = self.active_store();
2197
2198 if optimized_plan.explain {
2200 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2201 let mut plan = optimized_plan;
2202 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2203 return Ok(explain_result(&plan));
2204 }
2205
2206 if optimized_plan.profile {
2208 let has_mutations = optimized_plan.root.has_mutations();
2209 return self.with_auto_commit(has_mutations, || {
2210 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2211 let planner = self.create_planner_for_store(
2212 Arc::clone(&active),
2213 viewing_epoch,
2214 transaction_id,
2215 );
2216 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2217
2218 let executor = Executor::with_columns(physical_plan.columns.clone())
2219 .with_deadline(self.query_deadline());
2220 let _result = executor.execute(physical_plan.operator.as_mut())?;
2221
2222 let total_time_ms;
2223 #[cfg(not(target_arch = "wasm32"))]
2224 {
2225 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2226 }
2227 #[cfg(target_arch = "wasm32")]
2228 {
2229 total_time_ms = 0.0;
2230 }
2231
2232 let profile_tree = crate::query::profile::build_profile_tree(
2233 &optimized_plan.root,
2234 &mut entries.into_iter(),
2235 );
2236 Ok(crate::query::profile::profile_result(
2237 &profile_tree,
2238 total_time_ms,
2239 ))
2240 });
2241 }
2242
2243 let has_mutations = optimized_plan.root.has_mutations();
2244
2245 self.with_auto_commit(has_mutations, || {
2246 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2248
2249 let planner =
2251 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2252 let mut physical_plan = planner.plan(&optimized_plan)?;
2253
2254 let executor = Executor::with_columns(physical_plan.columns.clone())
2256 .with_deadline(self.query_deadline());
2257 executor.execute(physical_plan.operator.as_mut())
2258 })
2259 }
2260
2261 #[cfg(feature = "gremlin")]
2285 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2286 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2287
2288 let logical_plan = gremlin::translate(query)?;
2290
2291 let mut binder = Binder::new();
2293 let _binding_context = binder.bind(&logical_plan)?;
2294
2295 let active = self.active_store();
2297 let optimizer = Optimizer::from_graph_store(&*active);
2298 let optimized_plan = optimizer.optimize(logical_plan)?;
2299
2300 let has_mutations = optimized_plan.root.has_mutations();
2301
2302 self.with_auto_commit(has_mutations, || {
2303 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2305
2306 let planner =
2308 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2309 let mut physical_plan = planner.plan(&optimized_plan)?;
2310
2311 let executor = Executor::with_columns(physical_plan.columns.clone())
2313 .with_deadline(self.query_deadline());
2314 executor.execute(physical_plan.operator.as_mut())
2315 })
2316 }
2317
2318 #[cfg(feature = "gremlin")]
2324 pub fn execute_gremlin_with_params(
2325 &self,
2326 query: &str,
2327 params: std::collections::HashMap<String, Value>,
2328 ) -> Result<QueryResult> {
2329 use crate::query::processor::{QueryLanguage, QueryProcessor};
2330
2331 let has_mutations = Self::query_looks_like_mutation(query);
2332 let active = self.active_store();
2333
2334 self.with_auto_commit(has_mutations, || {
2335 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2337
2338 let processor = QueryProcessor::for_graph_store_with_transaction(
2340 Arc::clone(&active),
2341 Arc::clone(&self.transaction_manager),
2342 )?;
2343
2344 let processor = if let Some(transaction_id) = transaction_id {
2346 processor.with_transaction_context(viewing_epoch, transaction_id)
2347 } else {
2348 processor
2349 };
2350
2351 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2352 })
2353 }
2354
2355 #[cfg(feature = "graphql")]
2379 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2380 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2381
2382 let logical_plan = graphql::translate(query)?;
2384
2385 let mut binder = Binder::new();
2387 let _binding_context = binder.bind(&logical_plan)?;
2388
2389 let active = self.active_store();
2391 let optimizer = Optimizer::from_graph_store(&*active);
2392 let optimized_plan = optimizer.optimize(logical_plan)?;
2393
2394 let has_mutations = optimized_plan.root.has_mutations();
2395
2396 self.with_auto_commit(has_mutations, || {
2397 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2399
2400 let planner =
2402 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2403 let mut physical_plan = planner.plan(&optimized_plan)?;
2404
2405 let executor = Executor::with_columns(physical_plan.columns.clone())
2407 .with_deadline(self.query_deadline());
2408 executor.execute(physical_plan.operator.as_mut())
2409 })
2410 }
2411
2412 #[cfg(feature = "graphql")]
2418 pub fn execute_graphql_with_params(
2419 &self,
2420 query: &str,
2421 params: std::collections::HashMap<String, Value>,
2422 ) -> Result<QueryResult> {
2423 use crate::query::processor::{QueryLanguage, QueryProcessor};
2424
2425 let has_mutations = Self::query_looks_like_mutation(query);
2426 let active = self.active_store();
2427
2428 self.with_auto_commit(has_mutations, || {
2429 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2431
2432 let processor = QueryProcessor::for_graph_store_with_transaction(
2434 Arc::clone(&active),
2435 Arc::clone(&self.transaction_manager),
2436 )?;
2437
2438 let processor = if let Some(transaction_id) = transaction_id {
2440 processor.with_transaction_context(viewing_epoch, transaction_id)
2441 } else {
2442 processor
2443 };
2444
2445 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2446 })
2447 }
2448
2449 #[cfg(all(feature = "graphql", feature = "rdf"))]
2455 pub fn execute_graphql_rdf(&self, query: &str) -> Result<QueryResult> {
2456 use crate::query::{
2457 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::graphql_rdf,
2458 };
2459
2460 let logical_plan = graphql_rdf::translate(query, "http://example.org/")?;
2461
2462 let active = self.active_store();
2463 let optimizer = Optimizer::from_graph_store(&*active);
2464 let optimized_plan = optimizer.optimize(logical_plan)?;
2465
2466 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2467 .with_transaction_id(*self.current_transaction.lock());
2468 #[cfg(feature = "wal")]
2469 let planner = planner.with_wal(self.wal.clone());
2470 let mut physical_plan = planner.plan(&optimized_plan)?;
2471
2472 let executor = Executor::with_columns(physical_plan.columns.clone())
2473 .with_deadline(self.query_deadline());
2474 executor.execute(physical_plan.operator.as_mut())
2475 }
2476
2477 #[cfg(all(feature = "graphql", feature = "rdf"))]
2483 pub fn execute_graphql_rdf_with_params(
2484 &self,
2485 query: &str,
2486 params: std::collections::HashMap<String, Value>,
2487 ) -> Result<QueryResult> {
2488 use crate::query::processor::{QueryLanguage, QueryProcessor};
2489
2490 let has_mutations = Self::query_looks_like_mutation(query);
2491 let active = self.active_store();
2492
2493 self.with_auto_commit(has_mutations, || {
2494 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2495
2496 let processor = QueryProcessor::for_graph_store_with_transaction(
2497 Arc::clone(&active),
2498 Arc::clone(&self.transaction_manager),
2499 )?;
2500
2501 let processor = if let Some(transaction_id) = transaction_id {
2502 processor.with_transaction_context(viewing_epoch, transaction_id)
2503 } else {
2504 processor
2505 };
2506
2507 processor.process(query, QueryLanguage::GraphQLRdf, Some(¶ms))
2508 })
2509 }
2510
2511 #[cfg(feature = "sql-pgq")]
2536 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2537 use crate::query::{
2538 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2539 processor::QueryLanguage, translators::sql_pgq,
2540 };
2541
2542 let logical_plan = sql_pgq::translate(query)?;
2544
2545 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2547 return Ok(QueryResult {
2548 columns: vec!["status".into()],
2549 column_types: vec![grafeo_common::types::LogicalType::String],
2550 rows: vec![vec![Value::from(format!(
2551 "Property graph '{}' created ({} node tables, {} edge tables)",
2552 cpg.name,
2553 cpg.node_tables.len(),
2554 cpg.edge_tables.len()
2555 ))]],
2556 execution_time_ms: None,
2557 rows_scanned: None,
2558 status_message: None,
2559 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2560 });
2561 }
2562
2563 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2565
2566 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2568 cached_plan
2569 } else {
2570 let mut binder = Binder::new();
2572 let _binding_context = binder.bind(&logical_plan)?;
2573
2574 let active = self.active_store();
2576 let optimizer = Optimizer::from_graph_store(&*active);
2577 let plan = optimizer.optimize(logical_plan)?;
2578
2579 self.query_cache.put_optimized(cache_key, plan.clone());
2581
2582 plan
2583 };
2584
2585 let active = self.active_store();
2586 let has_mutations = optimized_plan.root.has_mutations();
2587
2588 self.with_auto_commit(has_mutations, || {
2589 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2591
2592 let planner =
2594 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2595 let mut physical_plan = planner.plan(&optimized_plan)?;
2596
2597 let executor = Executor::with_columns(physical_plan.columns.clone())
2599 .with_deadline(self.query_deadline());
2600 executor.execute(physical_plan.operator.as_mut())
2601 })
2602 }
2603
2604 #[cfg(feature = "sql-pgq")]
2610 pub fn execute_sql_with_params(
2611 &self,
2612 query: &str,
2613 params: std::collections::HashMap<String, Value>,
2614 ) -> Result<QueryResult> {
2615 use crate::query::processor::{QueryLanguage, QueryProcessor};
2616
2617 let has_mutations = Self::query_looks_like_mutation(query);
2618 let active = self.active_store();
2619
2620 self.with_auto_commit(has_mutations, || {
2621 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2623
2624 let processor = QueryProcessor::for_graph_store_with_transaction(
2626 Arc::clone(&active),
2627 Arc::clone(&self.transaction_manager),
2628 )?;
2629
2630 let processor = if let Some(transaction_id) = transaction_id {
2632 processor.with_transaction_context(viewing_epoch, transaction_id)
2633 } else {
2634 processor
2635 };
2636
2637 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
2638 })
2639 }
2640
2641 #[cfg(all(feature = "sparql", feature = "rdf"))]
2647 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
2648 use crate::query::{
2649 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
2650 };
2651
2652 let logical_plan = sparql::translate(query)?;
2654
2655 let active = self.active_store();
2657 let optimizer = Optimizer::from_graph_store(&*active);
2658 let optimized_plan = optimizer.optimize(logical_plan)?;
2659
2660 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2662 .with_transaction_id(*self.current_transaction.lock());
2663 #[cfg(feature = "wal")]
2664 let planner = planner.with_wal(self.wal.clone());
2665 let mut physical_plan = planner.plan(&optimized_plan)?;
2666
2667 let executor = Executor::with_columns(physical_plan.columns.clone())
2669 .with_deadline(self.query_deadline());
2670 executor.execute(physical_plan.operator.as_mut())
2671 }
2672
2673 #[cfg(all(feature = "sparql", feature = "rdf"))]
2679 pub fn execute_sparql_with_params(
2680 &self,
2681 query: &str,
2682 params: std::collections::HashMap<String, Value>,
2683 ) -> Result<QueryResult> {
2684 use crate::query::{
2685 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, processor::substitute_params,
2686 translators::sparql,
2687 };
2688
2689 let mut logical_plan = sparql::translate(query)?;
2690
2691 substitute_params(&mut logical_plan, ¶ms)?;
2692
2693 let active = self.active_store();
2694 let optimizer = Optimizer::from_graph_store(&*active);
2695 let optimized_plan = optimizer.optimize(logical_plan)?;
2696
2697 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2698 .with_transaction_id(*self.current_transaction.lock());
2699 #[cfg(feature = "wal")]
2700 let planner = planner.with_wal(self.wal.clone());
2701 let mut physical_plan = planner.plan(&optimized_plan)?;
2702
2703 let executor = Executor::with_columns(physical_plan.columns.clone())
2704 .with_deadline(self.query_deadline());
2705 executor.execute(physical_plan.operator.as_mut())
2706 }
2707
2708 pub fn execute_language(
2717 &self,
2718 query: &str,
2719 language: &str,
2720 params: Option<std::collections::HashMap<String, Value>>,
2721 ) -> Result<QueryResult> {
2722 match language {
2723 "gql" => {
2724 if let Some(p) = params {
2725 self.execute_with_params(query, p)
2726 } else {
2727 self.execute(query)
2728 }
2729 }
2730 #[cfg(feature = "cypher")]
2731 "cypher" => {
2732 if let Some(p) = params {
2733 use crate::query::processor::{QueryLanguage, QueryProcessor};
2734 let has_mutations = Self::query_looks_like_mutation(query);
2735 let active = self.active_store();
2736 self.with_auto_commit(has_mutations, || {
2737 let processor = QueryProcessor::for_graph_store_with_transaction(
2738 Arc::clone(&active),
2739 Arc::clone(&self.transaction_manager),
2740 )?;
2741 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2742 let processor = if let Some(transaction_id) = transaction_id {
2743 processor.with_transaction_context(viewing_epoch, transaction_id)
2744 } else {
2745 processor
2746 };
2747 processor.process(query, QueryLanguage::Cypher, Some(&p))
2748 })
2749 } else {
2750 self.execute_cypher(query)
2751 }
2752 }
2753 #[cfg(feature = "gremlin")]
2754 "gremlin" => {
2755 if let Some(p) = params {
2756 self.execute_gremlin_with_params(query, p)
2757 } else {
2758 self.execute_gremlin(query)
2759 }
2760 }
2761 #[cfg(feature = "graphql")]
2762 "graphql" => {
2763 if let Some(p) = params {
2764 self.execute_graphql_with_params(query, p)
2765 } else {
2766 self.execute_graphql(query)
2767 }
2768 }
2769 #[cfg(all(feature = "graphql", feature = "rdf"))]
2770 "graphql-rdf" => {
2771 if let Some(p) = params {
2772 self.execute_graphql_rdf_with_params(query, p)
2773 } else {
2774 self.execute_graphql_rdf(query)
2775 }
2776 }
2777 #[cfg(feature = "sql-pgq")]
2778 "sql" | "sql-pgq" => {
2779 if let Some(p) = params {
2780 self.execute_sql_with_params(query, p)
2781 } else {
2782 self.execute_sql(query)
2783 }
2784 }
2785 #[cfg(all(feature = "sparql", feature = "rdf"))]
2786 "sparql" => {
2787 if let Some(p) = params {
2788 self.execute_sparql_with_params(query, p)
2789 } else {
2790 self.execute_sparql(query)
2791 }
2792 }
2793 other => Err(grafeo_common::utils::error::Error::Query(
2794 grafeo_common::utils::error::QueryError::new(
2795 grafeo_common::utils::error::QueryErrorKind::Semantic,
2796 format!("Unknown query language: '{other}'"),
2797 ),
2798 )),
2799 }
2800 }
2801
2802 pub fn clear_plan_cache(&self) {
2829 self.query_cache.clear();
2830 }
2831
2832 pub fn begin_transaction(&mut self) -> Result<()> {
2840 self.begin_transaction_inner(false, None)
2841 }
2842
2843 pub fn begin_transaction_with_isolation(
2851 &mut self,
2852 isolation_level: crate::transaction::IsolationLevel,
2853 ) -> Result<()> {
2854 self.begin_transaction_inner(false, Some(isolation_level))
2855 }
2856
2857 fn begin_transaction_inner(
2859 &self,
2860 read_only: bool,
2861 isolation_level: Option<crate::transaction::IsolationLevel>,
2862 ) -> Result<()> {
2863 let mut current = self.current_transaction.lock();
2864 if current.is_some() {
2865 drop(current);
2867 let mut depth = self.transaction_nesting_depth.lock();
2868 *depth += 1;
2869 let sp_name = format!("_nested_tx_{}", *depth);
2870 self.savepoint(&sp_name)?;
2871 return Ok(());
2872 }
2873
2874 let active = self.active_lpg_store();
2875 self.transaction_start_node_count
2876 .store(active.node_count(), Ordering::Relaxed);
2877 self.transaction_start_edge_count
2878 .store(active.edge_count(), Ordering::Relaxed);
2879 let transaction_id = if let Some(level) = isolation_level {
2880 self.transaction_manager.begin_with_isolation(level)
2881 } else {
2882 self.transaction_manager.begin()
2883 };
2884 *current = Some(transaction_id);
2885 *self.read_only_tx.lock() = read_only;
2886
2887 let graph = self.current_graph.lock().clone();
2889 let normalized = match graph {
2890 Some(ref name) if name.eq_ignore_ascii_case("default") => None,
2891 other => other,
2892 };
2893 let mut touched = self.touched_graphs.lock();
2894 touched.clear();
2895 touched.push(normalized);
2896
2897 Ok(())
2898 }
2899
2900 pub fn commit(&mut self) -> Result<()> {
2908 self.commit_inner()
2909 }
2910
2911 fn commit_inner(&self) -> Result<()> {
2913 {
2915 let mut depth = self.transaction_nesting_depth.lock();
2916 if *depth > 0 {
2917 let sp_name = format!("_nested_tx_{depth}");
2918 *depth -= 1;
2919 drop(depth);
2920 return self.release_savepoint(&sp_name);
2921 }
2922 }
2923
2924 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2925 grafeo_common::utils::error::Error::Transaction(
2926 grafeo_common::utils::error::TransactionError::InvalidState(
2927 "No active transaction".to_string(),
2928 ),
2929 )
2930 })?;
2931
2932 let touched = self.touched_graphs.lock().clone();
2935 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
2936 Ok(epoch) => epoch,
2937 Err(e) => {
2938 for graph_name in &touched {
2940 let store = self.resolve_store(graph_name);
2941 store.rollback_transaction_properties(transaction_id);
2942 }
2943 #[cfg(feature = "rdf")]
2944 self.rdf_store.rollback_transaction(transaction_id);
2945 *self.read_only_tx.lock() = false;
2946 self.savepoints.lock().clear();
2947 self.touched_graphs.lock().clear();
2948 return Err(e);
2949 }
2950 };
2951
2952 for graph_name in &touched {
2954 let store = self.resolve_store(graph_name);
2955 store.finalize_version_epochs(transaction_id, commit_epoch);
2956 }
2957
2958 #[cfg(feature = "rdf")]
2960 self.rdf_store.commit_transaction(transaction_id);
2961
2962 for graph_name in &touched {
2963 let store = self.resolve_store(graph_name);
2964 store.commit_transaction_properties(transaction_id);
2965 }
2966
2967 let current_epoch = self.transaction_manager.current_epoch();
2970 for graph_name in &touched {
2971 let store = self.resolve_store(graph_name);
2972 store.sync_epoch(current_epoch);
2973 }
2974
2975 *self.read_only_tx.lock() = false;
2977 self.savepoints.lock().clear();
2978 self.touched_graphs.lock().clear();
2979
2980 if self.gc_interval > 0 {
2982 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
2983 if count.is_multiple_of(self.gc_interval) {
2984 let min_epoch = self.transaction_manager.min_active_epoch();
2985 for graph_name in &touched {
2986 let store = self.resolve_store(graph_name);
2987 store.gc_versions(min_epoch);
2988 }
2989 self.transaction_manager.gc();
2990 }
2991 }
2992
2993 Ok(())
2994 }
2995
2996 pub fn rollback(&mut self) -> Result<()> {
3020 self.rollback_inner()
3021 }
3022
3023 fn rollback_inner(&self) -> Result<()> {
3025 {
3027 let mut depth = self.transaction_nesting_depth.lock();
3028 if *depth > 0 {
3029 let sp_name = format!("_nested_tx_{depth}");
3030 *depth -= 1;
3031 drop(depth);
3032 return self.rollback_to_savepoint(&sp_name);
3033 }
3034 }
3035
3036 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3037 grafeo_common::utils::error::Error::Transaction(
3038 grafeo_common::utils::error::TransactionError::InvalidState(
3039 "No active transaction".to_string(),
3040 ),
3041 )
3042 })?;
3043
3044 *self.read_only_tx.lock() = false;
3046
3047 let touched = self.touched_graphs.lock().clone();
3049 for graph_name in &touched {
3050 let store = self.resolve_store(graph_name);
3051 store.discard_uncommitted_versions(transaction_id);
3052 }
3053
3054 #[cfg(feature = "rdf")]
3056 self.rdf_store.rollback_transaction(transaction_id);
3057
3058 self.savepoints.lock().clear();
3060 self.touched_graphs.lock().clear();
3061
3062 self.transaction_manager.abort(transaction_id)
3064 }
3065
3066 pub fn savepoint(&self, name: &str) -> Result<()> {
3076 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3077 grafeo_common::utils::error::Error::Transaction(
3078 grafeo_common::utils::error::TransactionError::InvalidState(
3079 "No active transaction".to_string(),
3080 ),
3081 )
3082 })?;
3083
3084 let touched = self.touched_graphs.lock().clone();
3086 let graph_snapshots: Vec<GraphSavepoint> = touched
3087 .iter()
3088 .map(|graph_name| {
3089 let store = self.resolve_store(graph_name);
3090 GraphSavepoint {
3091 graph_name: graph_name.clone(),
3092 next_node_id: store.peek_next_node_id(),
3093 next_edge_id: store.peek_next_edge_id(),
3094 undo_log_position: store.property_undo_log_position(tx_id),
3095 }
3096 })
3097 .collect();
3098
3099 self.savepoints.lock().push(SavepointState {
3100 name: name.to_string(),
3101 graph_snapshots,
3102 active_graph: self.current_graph.lock().clone(),
3103 });
3104 Ok(())
3105 }
3106
3107 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3116 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3117 grafeo_common::utils::error::Error::Transaction(
3118 grafeo_common::utils::error::TransactionError::InvalidState(
3119 "No active transaction".to_string(),
3120 ),
3121 )
3122 })?;
3123
3124 let mut savepoints = self.savepoints.lock();
3125
3126 let pos = savepoints
3128 .iter()
3129 .rposition(|sp| sp.name == name)
3130 .ok_or_else(|| {
3131 grafeo_common::utils::error::Error::Transaction(
3132 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3133 "Savepoint '{name}' not found"
3134 )),
3135 )
3136 })?;
3137
3138 let sp_state = savepoints[pos].clone();
3139
3140 savepoints.truncate(pos);
3142 drop(savepoints);
3143
3144 for gs in &sp_state.graph_snapshots {
3146 let store = self.resolve_store(&gs.graph_name);
3147
3148 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3150
3151 let current_next_node = store.peek_next_node_id();
3153 let current_next_edge = store.peek_next_edge_id();
3154
3155 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3156 .map(NodeId::new)
3157 .collect();
3158 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3159 .map(EdgeId::new)
3160 .collect();
3161
3162 if !node_ids.is_empty() || !edge_ids.is_empty() {
3163 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3164 }
3165 }
3166
3167 let touched = self.touched_graphs.lock().clone();
3171 for graph_name in &touched {
3172 let already_captured = sp_state
3173 .graph_snapshots
3174 .iter()
3175 .any(|gs| gs.graph_name == *graph_name);
3176 if !already_captured {
3177 let store = self.resolve_store(graph_name);
3178 store.discard_uncommitted_versions(transaction_id);
3179 }
3180 }
3181
3182 let mut touched = self.touched_graphs.lock();
3184 touched.clear();
3185 for gs in &sp_state.graph_snapshots {
3186 if !touched.contains(&gs.graph_name) {
3187 touched.push(gs.graph_name.clone());
3188 }
3189 }
3190
3191 Ok(())
3192 }
3193
3194 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3200 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3201 grafeo_common::utils::error::Error::Transaction(
3202 grafeo_common::utils::error::TransactionError::InvalidState(
3203 "No active transaction".to_string(),
3204 ),
3205 )
3206 })?;
3207
3208 let mut savepoints = self.savepoints.lock();
3209 let pos = savepoints
3210 .iter()
3211 .rposition(|sp| sp.name == name)
3212 .ok_or_else(|| {
3213 grafeo_common::utils::error::Error::Transaction(
3214 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3215 "Savepoint '{name}' not found"
3216 )),
3217 )
3218 })?;
3219 savepoints.remove(pos);
3220 Ok(())
3221 }
3222
3223 #[must_use]
3225 pub fn in_transaction(&self) -> bool {
3226 self.current_transaction.lock().is_some()
3227 }
3228
3229 #[must_use]
3231 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3232 *self.current_transaction.lock()
3233 }
3234
3235 #[must_use]
3237 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3238 &self.transaction_manager
3239 }
3240
3241 #[must_use]
3243 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3244 (
3245 self.transaction_start_node_count.load(Ordering::Relaxed),
3246 self.active_lpg_store().node_count(),
3247 )
3248 }
3249
3250 #[must_use]
3252 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3253 (
3254 self.transaction_start_edge_count.load(Ordering::Relaxed),
3255 self.active_lpg_store().edge_count(),
3256 )
3257 }
3258
3259 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3293 crate::transaction::PreparedCommit::new(self)
3294 }
3295
3296 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3298 self.auto_commit = auto_commit;
3299 }
3300
3301 #[must_use]
3303 pub fn auto_commit(&self) -> bool {
3304 self.auto_commit
3305 }
3306
3307 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3312 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3313 }
3314
3315 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3318 where
3319 F: FnOnce() -> Result<QueryResult>,
3320 {
3321 if self.needs_auto_commit(has_mutations) {
3322 self.begin_transaction_inner(false, None)?;
3323 match body() {
3324 Ok(result) => {
3325 self.commit_inner()?;
3326 Ok(result)
3327 }
3328 Err(e) => {
3329 let _ = self.rollback_inner();
3330 Err(e)
3331 }
3332 }
3333 } else {
3334 body()
3335 }
3336 }
3337
3338 fn query_looks_like_mutation(query: &str) -> bool {
3344 let upper = query.to_ascii_uppercase();
3345 upper.contains("INSERT")
3346 || upper.contains("CREATE")
3347 || upper.contains("DELETE")
3348 || upper.contains("MERGE")
3349 || upper.contains("SET")
3350 || upper.contains("REMOVE")
3351 || upper.contains("DROP")
3352 || upper.contains("ALTER")
3353 }
3354
3355 #[must_use]
3357 fn query_deadline(&self) -> Option<Instant> {
3358 #[cfg(not(target_arch = "wasm32"))]
3359 {
3360 self.query_timeout.map(|d| Instant::now() + d)
3361 }
3362 #[cfg(target_arch = "wasm32")]
3363 {
3364 let _ = &self.query_timeout;
3365 None
3366 }
3367 }
3368
3369 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3371 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3372 match expr {
3373 Expression::Literal(Literal::Integer(n)) => Some(*n),
3374 _ => None,
3375 }
3376 }
3377
3378 #[must_use]
3384 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3385 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3387 return (epoch, None);
3388 }
3389
3390 if let Some(transaction_id) = *self.current_transaction.lock() {
3391 let epoch = self
3393 .transaction_manager
3394 .start_epoch(transaction_id)
3395 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3396 (epoch, Some(transaction_id))
3397 } else {
3398 (self.transaction_manager.current_epoch(), None)
3400 }
3401 }
3402
3403 fn create_planner_for_store(
3408 &self,
3409 store: Arc<dyn GraphStoreMut>,
3410 viewing_epoch: EpochId,
3411 transaction_id: Option<TransactionId>,
3412 ) -> crate::query::Planner {
3413 use crate::query::Planner;
3414
3415 let mut planner = Planner::with_context(
3416 Arc::clone(&store),
3417 Arc::clone(&self.transaction_manager),
3418 transaction_id,
3419 viewing_epoch,
3420 )
3421 .with_factorized_execution(self.factorized_execution)
3422 .with_catalog(Arc::clone(&self.catalog));
3423
3424 let validator =
3426 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3427 planner = planner.with_validator(Arc::new(validator));
3428
3429 planner
3430 }
3431
3432 pub fn create_node(&self, labels: &[&str]) -> NodeId {
3437 let (epoch, transaction_id) = self.get_transaction_context();
3438 self.active_lpg_store().create_node_versioned(
3439 labels,
3440 epoch,
3441 transaction_id.unwrap_or(TransactionId::SYSTEM),
3442 )
3443 }
3444
3445 pub fn create_node_with_props<'a>(
3449 &self,
3450 labels: &[&str],
3451 properties: impl IntoIterator<Item = (&'a str, Value)>,
3452 ) -> NodeId {
3453 let (epoch, transaction_id) = self.get_transaction_context();
3454 self.active_lpg_store().create_node_with_props_versioned(
3455 labels,
3456 properties,
3457 epoch,
3458 transaction_id.unwrap_or(TransactionId::SYSTEM),
3459 )
3460 }
3461
3462 pub fn create_edge(
3467 &self,
3468 src: NodeId,
3469 dst: NodeId,
3470 edge_type: &str,
3471 ) -> grafeo_common::types::EdgeId {
3472 let (epoch, transaction_id) = self.get_transaction_context();
3473 self.active_lpg_store().create_edge_versioned(
3474 src,
3475 dst,
3476 edge_type,
3477 epoch,
3478 transaction_id.unwrap_or(TransactionId::SYSTEM),
3479 )
3480 }
3481
3482 #[must_use]
3510 pub fn get_node(&self, id: NodeId) -> Option<Node> {
3511 let (epoch, transaction_id) = self.get_transaction_context();
3512 self.active_lpg_store().get_node_versioned(
3513 id,
3514 epoch,
3515 transaction_id.unwrap_or(TransactionId::SYSTEM),
3516 )
3517 }
3518
3519 #[must_use]
3543 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3544 self.get_node(id)
3545 .and_then(|node| node.get_property(key).cloned())
3546 }
3547
3548 #[must_use]
3555 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3556 let (epoch, transaction_id) = self.get_transaction_context();
3557 self.active_lpg_store().get_edge_versioned(
3558 id,
3559 epoch,
3560 transaction_id.unwrap_or(TransactionId::SYSTEM),
3561 )
3562 }
3563
3564 #[must_use]
3590 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3591 self.active_lpg_store()
3592 .edges_from(node, Direction::Outgoing)
3593 .collect()
3594 }
3595
3596 #[must_use]
3605 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3606 self.active_lpg_store()
3607 .edges_from(node, Direction::Incoming)
3608 .collect()
3609 }
3610
3611 #[must_use]
3623 pub fn get_neighbors_outgoing_by_type(
3624 &self,
3625 node: NodeId,
3626 edge_type: &str,
3627 ) -> Vec<(NodeId, EdgeId)> {
3628 self.active_lpg_store()
3629 .edges_from(node, Direction::Outgoing)
3630 .filter(|(_, edge_id)| {
3631 self.get_edge(*edge_id)
3632 .is_some_and(|e| e.edge_type.as_str() == edge_type)
3633 })
3634 .collect()
3635 }
3636
3637 #[must_use]
3644 pub fn node_exists(&self, id: NodeId) -> bool {
3645 self.get_node(id).is_some()
3646 }
3647
3648 #[must_use]
3650 pub fn edge_exists(&self, id: EdgeId) -> bool {
3651 self.get_edge(id).is_some()
3652 }
3653
3654 #[must_use]
3658 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3659 let active = self.active_lpg_store();
3660 let out = active.out_degree(node);
3661 let in_degree = active.in_degree(node);
3662 (out, in_degree)
3663 }
3664
3665 #[must_use]
3675 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3676 let (epoch, transaction_id) = self.get_transaction_context();
3677 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3678 let active = self.active_lpg_store();
3679 ids.iter()
3680 .map(|&id| active.get_node_versioned(id, epoch, tx))
3681 .collect()
3682 }
3683
3684 #[cfg(feature = "cdc")]
3688 pub fn history(
3689 &self,
3690 entity_id: impl Into<crate::cdc::EntityId>,
3691 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3692 Ok(self.cdc_log.history(entity_id.into()))
3693 }
3694
3695 #[cfg(feature = "cdc")]
3697 pub fn history_since(
3698 &self,
3699 entity_id: impl Into<crate::cdc::EntityId>,
3700 since_epoch: EpochId,
3701 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3702 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3703 }
3704
3705 #[cfg(feature = "cdc")]
3707 pub fn changes_between(
3708 &self,
3709 start_epoch: EpochId,
3710 end_epoch: EpochId,
3711 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3712 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3713 }
3714}
3715
3716impl Drop for Session {
3717 fn drop(&mut self) {
3718 if self.in_transaction() {
3721 let _ = self.rollback_inner();
3722 }
3723 }
3724}
3725
3726#[cfg(test)]
3727mod tests {
3728 use crate::database::GrafeoDB;
3729
3730 #[test]
3731 fn test_session_create_node() {
3732 let db = GrafeoDB::new_in_memory();
3733 let session = db.session();
3734
3735 let id = session.create_node(&["Person"]);
3736 assert!(id.is_valid());
3737 assert_eq!(db.node_count(), 1);
3738 }
3739
3740 #[test]
3741 fn test_session_transaction() {
3742 let db = GrafeoDB::new_in_memory();
3743 let mut session = db.session();
3744
3745 assert!(!session.in_transaction());
3746
3747 session.begin_transaction().unwrap();
3748 assert!(session.in_transaction());
3749
3750 session.commit().unwrap();
3751 assert!(!session.in_transaction());
3752 }
3753
3754 #[test]
3755 fn test_session_transaction_context() {
3756 let db = GrafeoDB::new_in_memory();
3757 let mut session = db.session();
3758
3759 let (_epoch1, transaction_id1) = session.get_transaction_context();
3761 assert!(transaction_id1.is_none());
3762
3763 session.begin_transaction().unwrap();
3765 let (epoch2, transaction_id2) = session.get_transaction_context();
3766 assert!(transaction_id2.is_some());
3767 let _ = epoch2; session.commit().unwrap();
3772 let (epoch3, tx_id3) = session.get_transaction_context();
3773 assert!(tx_id3.is_none());
3774 assert!(epoch3.as_u64() >= epoch2.as_u64());
3776 }
3777
3778 #[test]
3779 fn test_session_rollback() {
3780 let db = GrafeoDB::new_in_memory();
3781 let mut session = db.session();
3782
3783 session.begin_transaction().unwrap();
3784 session.rollback().unwrap();
3785 assert!(!session.in_transaction());
3786 }
3787
3788 #[test]
3789 fn test_session_rollback_discards_versions() {
3790 use grafeo_common::types::TransactionId;
3791
3792 let db = GrafeoDB::new_in_memory();
3793
3794 let node_before = db.store().create_node(&["Person"]);
3796 assert!(node_before.is_valid());
3797 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3798
3799 let mut session = db.session();
3801 session.begin_transaction().unwrap();
3802 let transaction_id = session.current_transaction.lock().unwrap();
3803
3804 let epoch = db.store().current_epoch();
3806 let node_in_tx = db
3807 .store()
3808 .create_node_versioned(&["Person"], epoch, transaction_id);
3809 assert!(node_in_tx.is_valid());
3810
3811 assert_eq!(
3815 db.node_count(),
3816 1,
3817 "PENDING nodes should be invisible to non-versioned node_count()"
3818 );
3819 assert!(
3820 db.store()
3821 .get_node_versioned(node_in_tx, epoch, transaction_id)
3822 .is_some(),
3823 "Transaction node should be visible to its own transaction"
3824 );
3825
3826 session.rollback().unwrap();
3828 assert!(!session.in_transaction());
3829
3830 let count_after = db.node_count();
3833 assert_eq!(
3834 count_after, 1,
3835 "Rollback should discard uncommitted node, but got {count_after}"
3836 );
3837
3838 let current_epoch = db.store().current_epoch();
3840 assert!(
3841 db.store()
3842 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
3843 .is_some(),
3844 "Original node should still exist"
3845 );
3846
3847 assert!(
3849 db.store()
3850 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
3851 .is_none(),
3852 "Transaction node should be gone"
3853 );
3854 }
3855
3856 #[test]
3857 fn test_session_create_node_in_transaction() {
3858 let db = GrafeoDB::new_in_memory();
3860
3861 let node_before = db.create_node(&["Person"]);
3863 assert!(node_before.is_valid());
3864 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3865
3866 let mut session = db.session();
3868 session.begin_transaction().unwrap();
3869 let transaction_id = session.current_transaction.lock().unwrap();
3870
3871 let node_in_tx = session.create_node(&["Person"]);
3873 assert!(node_in_tx.is_valid());
3874
3875 assert_eq!(
3878 db.node_count(),
3879 1,
3880 "PENDING nodes should be invisible to non-versioned node_count()"
3881 );
3882 let epoch = db.store().current_epoch();
3883 assert!(
3884 db.store()
3885 .get_node_versioned(node_in_tx, epoch, transaction_id)
3886 .is_some(),
3887 "Transaction node should be visible to its own transaction"
3888 );
3889
3890 session.rollback().unwrap();
3892
3893 let count_after = db.node_count();
3895 assert_eq!(
3896 count_after, 1,
3897 "Rollback should discard node created via session.create_node(), but got {count_after}"
3898 );
3899 }
3900
3901 #[test]
3902 fn test_session_create_node_with_props_in_transaction() {
3903 use grafeo_common::types::Value;
3904
3905 let db = GrafeoDB::new_in_memory();
3907
3908 db.create_node(&["Person"]);
3910 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3911
3912 let mut session = db.session();
3914 session.begin_transaction().unwrap();
3915 let transaction_id = session.current_transaction.lock().unwrap();
3916
3917 let node_in_tx =
3918 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3919 assert!(node_in_tx.is_valid());
3920
3921 assert_eq!(
3924 db.node_count(),
3925 1,
3926 "PENDING nodes should be invisible to non-versioned node_count()"
3927 );
3928 let epoch = db.store().current_epoch();
3929 assert!(
3930 db.store()
3931 .get_node_versioned(node_in_tx, epoch, transaction_id)
3932 .is_some(),
3933 "Transaction node should be visible to its own transaction"
3934 );
3935
3936 session.rollback().unwrap();
3938
3939 let count_after = db.node_count();
3941 assert_eq!(
3942 count_after, 1,
3943 "Rollback should discard node created via session.create_node_with_props()"
3944 );
3945 }
3946
3947 #[cfg(feature = "gql")]
3948 mod gql_tests {
3949 use super::*;
3950
3951 #[test]
3952 fn test_gql_query_execution() {
3953 let db = GrafeoDB::new_in_memory();
3954 let session = db.session();
3955
3956 session.create_node(&["Person"]);
3958 session.create_node(&["Person"]);
3959 session.create_node(&["Animal"]);
3960
3961 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3963
3964 assert_eq!(result.row_count(), 2);
3966 assert_eq!(result.column_count(), 1);
3967 assert_eq!(result.columns[0], "n");
3968 }
3969
3970 #[test]
3971 fn test_gql_empty_result() {
3972 let db = GrafeoDB::new_in_memory();
3973 let session = db.session();
3974
3975 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3977
3978 assert_eq!(result.row_count(), 0);
3979 }
3980
3981 #[test]
3982 fn test_gql_parse_error() {
3983 let db = GrafeoDB::new_in_memory();
3984 let session = db.session();
3985
3986 let result = session.execute("MATCH (n RETURN n");
3988
3989 assert!(result.is_err());
3990 }
3991
3992 #[test]
3993 fn test_gql_relationship_traversal() {
3994 let db = GrafeoDB::new_in_memory();
3995 let session = db.session();
3996
3997 let alix = session.create_node(&["Person"]);
3999 let gus = session.create_node(&["Person"]);
4000 let vincent = session.create_node(&["Person"]);
4001
4002 session.create_edge(alix, gus, "KNOWS");
4003 session.create_edge(alix, vincent, "KNOWS");
4004
4005 let result = session
4007 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4008 .unwrap();
4009
4010 assert_eq!(result.row_count(), 2);
4012 assert_eq!(result.column_count(), 2);
4013 assert_eq!(result.columns[0], "a");
4014 assert_eq!(result.columns[1], "b");
4015 }
4016
4017 #[test]
4018 fn test_gql_relationship_with_type_filter() {
4019 let db = GrafeoDB::new_in_memory();
4020 let session = db.session();
4021
4022 let alix = session.create_node(&["Person"]);
4024 let gus = session.create_node(&["Person"]);
4025 let vincent = session.create_node(&["Person"]);
4026
4027 session.create_edge(alix, gus, "KNOWS");
4028 session.create_edge(alix, vincent, "WORKS_WITH");
4029
4030 let result = session
4032 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4033 .unwrap();
4034
4035 assert_eq!(result.row_count(), 1);
4037 }
4038
4039 #[test]
4040 fn test_gql_semantic_error_undefined_variable() {
4041 let db = GrafeoDB::new_in_memory();
4042 let session = db.session();
4043
4044 let result = session.execute("MATCH (n:Person) RETURN x");
4046
4047 assert!(result.is_err());
4049 let Err(err) = result else {
4050 panic!("Expected error")
4051 };
4052 assert!(
4053 err.to_string().contains("Undefined variable"),
4054 "Expected undefined variable error, got: {}",
4055 err
4056 );
4057 }
4058
4059 #[test]
4060 fn test_gql_where_clause_property_filter() {
4061 use grafeo_common::types::Value;
4062
4063 let db = GrafeoDB::new_in_memory();
4064 let session = db.session();
4065
4066 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4068 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4069 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4070
4071 let result = session
4073 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4074 .unwrap();
4075
4076 assert_eq!(result.row_count(), 2);
4078 }
4079
4080 #[test]
4081 fn test_gql_where_clause_equality() {
4082 use grafeo_common::types::Value;
4083
4084 let db = GrafeoDB::new_in_memory();
4085 let session = db.session();
4086
4087 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4089 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4090 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4091
4092 let result = session
4094 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4095 .unwrap();
4096
4097 assert_eq!(result.row_count(), 2);
4099 }
4100
4101 #[test]
4102 fn test_gql_return_property_access() {
4103 use grafeo_common::types::Value;
4104
4105 let db = GrafeoDB::new_in_memory();
4106 let session = db.session();
4107
4108 session.create_node_with_props(
4110 &["Person"],
4111 [
4112 ("name", Value::String("Alix".into())),
4113 ("age", Value::Int64(30)),
4114 ],
4115 );
4116 session.create_node_with_props(
4117 &["Person"],
4118 [
4119 ("name", Value::String("Gus".into())),
4120 ("age", Value::Int64(25)),
4121 ],
4122 );
4123
4124 let result = session
4126 .execute("MATCH (n:Person) RETURN n.name, n.age")
4127 .unwrap();
4128
4129 assert_eq!(result.row_count(), 2);
4131 assert_eq!(result.column_count(), 2);
4132 assert_eq!(result.columns[0], "n.name");
4133 assert_eq!(result.columns[1], "n.age");
4134
4135 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4137 assert!(names.contains(&&Value::String("Alix".into())));
4138 assert!(names.contains(&&Value::String("Gus".into())));
4139 }
4140
4141 #[test]
4142 fn test_gql_return_mixed_expressions() {
4143 use grafeo_common::types::Value;
4144
4145 let db = GrafeoDB::new_in_memory();
4146 let session = db.session();
4147
4148 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4150
4151 let result = session
4153 .execute("MATCH (n:Person) RETURN n, n.name")
4154 .unwrap();
4155
4156 assert_eq!(result.row_count(), 1);
4157 assert_eq!(result.column_count(), 2);
4158 assert_eq!(result.columns[0], "n");
4159 assert_eq!(result.columns[1], "n.name");
4160
4161 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4163 }
4164 }
4165
4166 #[cfg(feature = "cypher")]
4167 mod cypher_tests {
4168 use super::*;
4169
4170 #[test]
4171 fn test_cypher_query_execution() {
4172 let db = GrafeoDB::new_in_memory();
4173 let session = db.session();
4174
4175 session.create_node(&["Person"]);
4177 session.create_node(&["Person"]);
4178 session.create_node(&["Animal"]);
4179
4180 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4182
4183 assert_eq!(result.row_count(), 2);
4185 assert_eq!(result.column_count(), 1);
4186 assert_eq!(result.columns[0], "n");
4187 }
4188
4189 #[test]
4190 fn test_cypher_empty_result() {
4191 let db = GrafeoDB::new_in_memory();
4192 let session = db.session();
4193
4194 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4196
4197 assert_eq!(result.row_count(), 0);
4198 }
4199
4200 #[test]
4201 fn test_cypher_parse_error() {
4202 let db = GrafeoDB::new_in_memory();
4203 let session = db.session();
4204
4205 let result = session.execute_cypher("MATCH (n RETURN n");
4207
4208 assert!(result.is_err());
4209 }
4210 }
4211
4212 mod direct_lookup_tests {
4215 use super::*;
4216 use grafeo_common::types::Value;
4217
4218 #[test]
4219 fn test_get_node() {
4220 let db = GrafeoDB::new_in_memory();
4221 let session = db.session();
4222
4223 let id = session.create_node(&["Person"]);
4224 let node = session.get_node(id);
4225
4226 assert!(node.is_some());
4227 let node = node.unwrap();
4228 assert_eq!(node.id, id);
4229 }
4230
4231 #[test]
4232 fn test_get_node_not_found() {
4233 use grafeo_common::types::NodeId;
4234
4235 let db = GrafeoDB::new_in_memory();
4236 let session = db.session();
4237
4238 let node = session.get_node(NodeId::new(9999));
4240 assert!(node.is_none());
4241 }
4242
4243 #[test]
4244 fn test_get_node_property() {
4245 let db = GrafeoDB::new_in_memory();
4246 let session = db.session();
4247
4248 let id = session
4249 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4250
4251 let name = session.get_node_property(id, "name");
4252 assert_eq!(name, Some(Value::String("Alix".into())));
4253
4254 let missing = session.get_node_property(id, "missing");
4256 assert!(missing.is_none());
4257 }
4258
4259 #[test]
4260 fn test_get_edge() {
4261 let db = GrafeoDB::new_in_memory();
4262 let session = db.session();
4263
4264 let alix = session.create_node(&["Person"]);
4265 let gus = session.create_node(&["Person"]);
4266 let edge_id = session.create_edge(alix, gus, "KNOWS");
4267
4268 let edge = session.get_edge(edge_id);
4269 assert!(edge.is_some());
4270 let edge = edge.unwrap();
4271 assert_eq!(edge.id, edge_id);
4272 assert_eq!(edge.src, alix);
4273 assert_eq!(edge.dst, gus);
4274 }
4275
4276 #[test]
4277 fn test_get_edge_not_found() {
4278 use grafeo_common::types::EdgeId;
4279
4280 let db = GrafeoDB::new_in_memory();
4281 let session = db.session();
4282
4283 let edge = session.get_edge(EdgeId::new(9999));
4284 assert!(edge.is_none());
4285 }
4286
4287 #[test]
4288 fn test_get_neighbors_outgoing() {
4289 let db = GrafeoDB::new_in_memory();
4290 let session = db.session();
4291
4292 let alix = session.create_node(&["Person"]);
4293 let gus = session.create_node(&["Person"]);
4294 let harm = session.create_node(&["Person"]);
4295
4296 session.create_edge(alix, gus, "KNOWS");
4297 session.create_edge(alix, harm, "KNOWS");
4298
4299 let neighbors = session.get_neighbors_outgoing(alix);
4300 assert_eq!(neighbors.len(), 2);
4301
4302 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4303 assert!(neighbor_ids.contains(&gus));
4304 assert!(neighbor_ids.contains(&harm));
4305 }
4306
4307 #[test]
4308 fn test_get_neighbors_incoming() {
4309 let db = GrafeoDB::new_in_memory();
4310 let session = db.session();
4311
4312 let alix = session.create_node(&["Person"]);
4313 let gus = session.create_node(&["Person"]);
4314 let harm = session.create_node(&["Person"]);
4315
4316 session.create_edge(gus, alix, "KNOWS");
4317 session.create_edge(harm, alix, "KNOWS");
4318
4319 let neighbors = session.get_neighbors_incoming(alix);
4320 assert_eq!(neighbors.len(), 2);
4321
4322 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4323 assert!(neighbor_ids.contains(&gus));
4324 assert!(neighbor_ids.contains(&harm));
4325 }
4326
4327 #[test]
4328 fn test_get_neighbors_outgoing_by_type() {
4329 let db = GrafeoDB::new_in_memory();
4330 let session = db.session();
4331
4332 let alix = session.create_node(&["Person"]);
4333 let gus = session.create_node(&["Person"]);
4334 let company = session.create_node(&["Company"]);
4335
4336 session.create_edge(alix, gus, "KNOWS");
4337 session.create_edge(alix, company, "WORKS_AT");
4338
4339 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4340 assert_eq!(knows_neighbors.len(), 1);
4341 assert_eq!(knows_neighbors[0].0, gus);
4342
4343 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4344 assert_eq!(works_neighbors.len(), 1);
4345 assert_eq!(works_neighbors[0].0, company);
4346
4347 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4349 assert!(no_neighbors.is_empty());
4350 }
4351
4352 #[test]
4353 fn test_node_exists() {
4354 use grafeo_common::types::NodeId;
4355
4356 let db = GrafeoDB::new_in_memory();
4357 let session = db.session();
4358
4359 let id = session.create_node(&["Person"]);
4360
4361 assert!(session.node_exists(id));
4362 assert!(!session.node_exists(NodeId::new(9999)));
4363 }
4364
4365 #[test]
4366 fn test_edge_exists() {
4367 use grafeo_common::types::EdgeId;
4368
4369 let db = GrafeoDB::new_in_memory();
4370 let session = db.session();
4371
4372 let alix = session.create_node(&["Person"]);
4373 let gus = session.create_node(&["Person"]);
4374 let edge_id = session.create_edge(alix, gus, "KNOWS");
4375
4376 assert!(session.edge_exists(edge_id));
4377 assert!(!session.edge_exists(EdgeId::new(9999)));
4378 }
4379
4380 #[test]
4381 fn test_get_degree() {
4382 let db = GrafeoDB::new_in_memory();
4383 let session = db.session();
4384
4385 let alix = session.create_node(&["Person"]);
4386 let gus = session.create_node(&["Person"]);
4387 let harm = session.create_node(&["Person"]);
4388
4389 session.create_edge(alix, gus, "KNOWS");
4391 session.create_edge(alix, harm, "KNOWS");
4392 session.create_edge(gus, alix, "KNOWS");
4394
4395 let (out_degree, in_degree) = session.get_degree(alix);
4396 assert_eq!(out_degree, 2);
4397 assert_eq!(in_degree, 1);
4398
4399 let lonely = session.create_node(&["Person"]);
4401 let (out, in_deg) = session.get_degree(lonely);
4402 assert_eq!(out, 0);
4403 assert_eq!(in_deg, 0);
4404 }
4405
4406 #[test]
4407 fn test_get_nodes_batch() {
4408 let db = GrafeoDB::new_in_memory();
4409 let session = db.session();
4410
4411 let alix = session.create_node(&["Person"]);
4412 let gus = session.create_node(&["Person"]);
4413 let harm = session.create_node(&["Person"]);
4414
4415 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4416 assert_eq!(nodes.len(), 3);
4417 assert!(nodes[0].is_some());
4418 assert!(nodes[1].is_some());
4419 assert!(nodes[2].is_some());
4420
4421 use grafeo_common::types::NodeId;
4423 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4424 assert_eq!(nodes_with_missing.len(), 3);
4425 assert!(nodes_with_missing[0].is_some());
4426 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
4428 }
4429
4430 #[test]
4431 fn test_auto_commit_setting() {
4432 let db = GrafeoDB::new_in_memory();
4433 let mut session = db.session();
4434
4435 assert!(session.auto_commit());
4437
4438 session.set_auto_commit(false);
4439 assert!(!session.auto_commit());
4440
4441 session.set_auto_commit(true);
4442 assert!(session.auto_commit());
4443 }
4444
4445 #[test]
4446 fn test_transaction_double_begin_nests() {
4447 let db = GrafeoDB::new_in_memory();
4448 let mut session = db.session();
4449
4450 session.begin_transaction().unwrap();
4451 let result = session.begin_transaction();
4453 assert!(result.is_ok());
4454 session.commit().unwrap();
4456 session.commit().unwrap();
4458 }
4459
4460 #[test]
4461 fn test_commit_without_transaction_error() {
4462 let db = GrafeoDB::new_in_memory();
4463 let mut session = db.session();
4464
4465 let result = session.commit();
4466 assert!(result.is_err());
4467 }
4468
4469 #[test]
4470 fn test_rollback_without_transaction_error() {
4471 let db = GrafeoDB::new_in_memory();
4472 let mut session = db.session();
4473
4474 let result = session.rollback();
4475 assert!(result.is_err());
4476 }
4477
4478 #[test]
4479 fn test_create_edge_in_transaction() {
4480 let db = GrafeoDB::new_in_memory();
4481 let mut session = db.session();
4482
4483 let alix = session.create_node(&["Person"]);
4485 let gus = session.create_node(&["Person"]);
4486
4487 session.begin_transaction().unwrap();
4489 let edge_id = session.create_edge(alix, gus, "KNOWS");
4490
4491 assert!(session.edge_exists(edge_id));
4493
4494 session.commit().unwrap();
4496
4497 assert!(session.edge_exists(edge_id));
4499 }
4500
4501 #[test]
4502 fn test_neighbors_empty_node() {
4503 let db = GrafeoDB::new_in_memory();
4504 let session = db.session();
4505
4506 let lonely = session.create_node(&["Person"]);
4507
4508 assert!(session.get_neighbors_outgoing(lonely).is_empty());
4509 assert!(session.get_neighbors_incoming(lonely).is_empty());
4510 assert!(
4511 session
4512 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4513 .is_empty()
4514 );
4515 }
4516 }
4517
4518 #[test]
4519 fn test_auto_gc_triggers_on_commit_interval() {
4520 use crate::config::Config;
4521
4522 let config = Config::in_memory().with_gc_interval(2);
4523 let db = GrafeoDB::with_config(config).unwrap();
4524 let mut session = db.session();
4525
4526 session.begin_transaction().unwrap();
4528 session.create_node(&["A"]);
4529 session.commit().unwrap();
4530
4531 session.begin_transaction().unwrap();
4533 session.create_node(&["B"]);
4534 session.commit().unwrap();
4535
4536 assert_eq!(db.node_count(), 2);
4538 }
4539
4540 #[test]
4541 fn test_query_timeout_config_propagates_to_session() {
4542 use crate::config::Config;
4543 use std::time::Duration;
4544
4545 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4546 let db = GrafeoDB::with_config(config).unwrap();
4547 let session = db.session();
4548
4549 assert!(session.query_deadline().is_some());
4551 }
4552
4553 #[test]
4554 fn test_no_query_timeout_returns_no_deadline() {
4555 let db = GrafeoDB::new_in_memory();
4556 let session = db.session();
4557
4558 assert!(session.query_deadline().is_none());
4560 }
4561
4562 #[test]
4563 fn test_graph_model_accessor() {
4564 use crate::config::GraphModel;
4565
4566 let db = GrafeoDB::new_in_memory();
4567 let session = db.session();
4568
4569 assert_eq!(session.graph_model(), GraphModel::Lpg);
4570 }
4571
4572 #[cfg(feature = "gql")]
4573 #[test]
4574 fn test_external_store_session() {
4575 use grafeo_core::graph::GraphStoreMut;
4576 use std::sync::Arc;
4577
4578 let config = crate::config::Config::in_memory();
4579 let store =
4580 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4581 let db = GrafeoDB::with_store(store, config).unwrap();
4582
4583 let mut session = db.session();
4584
4585 session.begin_transaction().unwrap();
4589 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4590
4591 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4593 assert_eq!(result.row_count(), 1);
4594
4595 session.commit().unwrap();
4596 }
4597
4598 #[cfg(feature = "gql")]
4601 mod session_command_tests {
4602 use super::*;
4603 use grafeo_common::types::Value;
4604
4605 #[test]
4606 fn test_use_graph_sets_current_graph() {
4607 let db = GrafeoDB::new_in_memory();
4608 let session = db.session();
4609
4610 session.execute("CREATE GRAPH mydb").unwrap();
4612 session.execute("USE GRAPH mydb").unwrap();
4613
4614 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4615 }
4616
4617 #[test]
4618 fn test_use_graph_nonexistent_errors() {
4619 let db = GrafeoDB::new_in_memory();
4620 let session = db.session();
4621
4622 let result = session.execute("USE GRAPH doesnotexist");
4623 assert!(result.is_err());
4624 let err = result.unwrap_err().to_string();
4625 assert!(
4626 err.contains("does not exist"),
4627 "Expected 'does not exist' error, got: {err}"
4628 );
4629 }
4630
4631 #[test]
4632 fn test_use_graph_default_always_valid() {
4633 let db = GrafeoDB::new_in_memory();
4634 let session = db.session();
4635
4636 session.execute("USE GRAPH default").unwrap();
4638 assert_eq!(session.current_graph(), Some("default".to_string()));
4639 }
4640
4641 #[test]
4642 fn test_session_set_graph() {
4643 let db = GrafeoDB::new_in_memory();
4644 let session = db.session();
4645
4646 session.execute("CREATE GRAPH analytics").unwrap();
4647 session.execute("SESSION SET GRAPH analytics").unwrap();
4648 assert_eq!(session.current_graph(), Some("analytics".to_string()));
4649 }
4650
4651 #[test]
4652 fn test_session_set_graph_nonexistent_errors() {
4653 let db = GrafeoDB::new_in_memory();
4654 let session = db.session();
4655
4656 let result = session.execute("SESSION SET GRAPH nosuchgraph");
4657 assert!(result.is_err());
4658 }
4659
4660 #[test]
4661 fn test_session_set_time_zone() {
4662 let db = GrafeoDB::new_in_memory();
4663 let session = db.session();
4664
4665 assert_eq!(session.time_zone(), None);
4666
4667 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4668 assert_eq!(session.time_zone(), Some("UTC".to_string()));
4669
4670 session
4671 .execute("SESSION SET TIME ZONE 'America/New_York'")
4672 .unwrap();
4673 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
4674 }
4675
4676 #[test]
4677 fn test_session_set_parameter() {
4678 let db = GrafeoDB::new_in_memory();
4679 let session = db.session();
4680
4681 session
4682 .execute("SESSION SET PARAMETER $timeout = 30")
4683 .unwrap();
4684
4685 assert!(session.get_parameter("timeout").is_some());
4688 }
4689
4690 #[test]
4691 fn test_session_reset_clears_all_state() {
4692 let db = GrafeoDB::new_in_memory();
4693 let session = db.session();
4694
4695 session.execute("CREATE GRAPH analytics").unwrap();
4697 session.execute("SESSION SET GRAPH analytics").unwrap();
4698 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4699 session
4700 .execute("SESSION SET PARAMETER $limit = 100")
4701 .unwrap();
4702
4703 assert!(session.current_graph().is_some());
4705 assert!(session.time_zone().is_some());
4706 assert!(session.get_parameter("limit").is_some());
4707
4708 session.execute("SESSION RESET").unwrap();
4710
4711 assert_eq!(session.current_graph(), None);
4712 assert_eq!(session.time_zone(), None);
4713 assert!(session.get_parameter("limit").is_none());
4714 }
4715
4716 #[test]
4717 fn test_session_close_clears_state() {
4718 let db = GrafeoDB::new_in_memory();
4719 let session = db.session();
4720
4721 session.execute("CREATE GRAPH analytics").unwrap();
4722 session.execute("SESSION SET GRAPH analytics").unwrap();
4723 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4724
4725 session.execute("SESSION CLOSE").unwrap();
4726
4727 assert_eq!(session.current_graph(), None);
4728 assert_eq!(session.time_zone(), None);
4729 }
4730
4731 #[test]
4732 fn test_create_graph() {
4733 let db = GrafeoDB::new_in_memory();
4734 let session = db.session();
4735
4736 session.execute("CREATE GRAPH mydb").unwrap();
4737
4738 session.execute("USE GRAPH mydb").unwrap();
4740 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4741 }
4742
4743 #[test]
4744 fn test_create_graph_duplicate_errors() {
4745 let db = GrafeoDB::new_in_memory();
4746 let session = db.session();
4747
4748 session.execute("CREATE GRAPH mydb").unwrap();
4749 let result = session.execute("CREATE GRAPH mydb");
4750
4751 assert!(result.is_err());
4752 let err = result.unwrap_err().to_string();
4753 assert!(
4754 err.contains("already exists"),
4755 "Expected 'already exists' error, got: {err}"
4756 );
4757 }
4758
4759 #[test]
4760 fn test_create_graph_if_not_exists() {
4761 let db = GrafeoDB::new_in_memory();
4762 let session = db.session();
4763
4764 session.execute("CREATE GRAPH mydb").unwrap();
4765 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
4767 }
4768
4769 #[test]
4770 fn test_drop_graph() {
4771 let db = GrafeoDB::new_in_memory();
4772 let session = db.session();
4773
4774 session.execute("CREATE GRAPH mydb").unwrap();
4775 session.execute("DROP GRAPH mydb").unwrap();
4776
4777 let result = session.execute("USE GRAPH mydb");
4779 assert!(result.is_err());
4780 }
4781
4782 #[test]
4783 fn test_drop_graph_nonexistent_errors() {
4784 let db = GrafeoDB::new_in_memory();
4785 let session = db.session();
4786
4787 let result = session.execute("DROP GRAPH nosuchgraph");
4788 assert!(result.is_err());
4789 let err = result.unwrap_err().to_string();
4790 assert!(
4791 err.contains("does not exist"),
4792 "Expected 'does not exist' error, got: {err}"
4793 );
4794 }
4795
4796 #[test]
4797 fn test_drop_graph_if_exists() {
4798 let db = GrafeoDB::new_in_memory();
4799 let session = db.session();
4800
4801 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
4803 }
4804
4805 #[test]
4806 fn test_start_transaction_via_gql() {
4807 let db = GrafeoDB::new_in_memory();
4808 let session = db.session();
4809
4810 session.execute("START TRANSACTION").unwrap();
4811 assert!(session.in_transaction());
4812 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4813 session.execute("COMMIT").unwrap();
4814 assert!(!session.in_transaction());
4815
4816 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4817 assert_eq!(result.rows.len(), 1);
4818 }
4819
4820 #[test]
4821 fn test_start_transaction_read_only_blocks_insert() {
4822 let db = GrafeoDB::new_in_memory();
4823 let session = db.session();
4824
4825 session.execute("START TRANSACTION READ ONLY").unwrap();
4826 let result = session.execute("INSERT (:Person {name: 'Alix'})");
4827 assert!(result.is_err());
4828 let err = result.unwrap_err().to_string();
4829 assert!(
4830 err.contains("read-only"),
4831 "Expected read-only error, got: {err}"
4832 );
4833 session.execute("ROLLBACK").unwrap();
4834 }
4835
4836 #[test]
4837 fn test_start_transaction_read_only_allows_reads() {
4838 let db = GrafeoDB::new_in_memory();
4839 let mut session = db.session();
4840 session.begin_transaction().unwrap();
4841 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4842 session.commit().unwrap();
4843
4844 session.execute("START TRANSACTION READ ONLY").unwrap();
4845 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4846 assert_eq!(result.rows.len(), 1);
4847 session.execute("COMMIT").unwrap();
4848 }
4849
4850 #[test]
4851 fn test_rollback_via_gql() {
4852 let db = GrafeoDB::new_in_memory();
4853 let session = db.session();
4854
4855 session.execute("START TRANSACTION").unwrap();
4856 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4857 session.execute("ROLLBACK").unwrap();
4858
4859 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4860 assert!(result.rows.is_empty());
4861 }
4862
4863 #[test]
4864 fn test_start_transaction_with_isolation_level() {
4865 let db = GrafeoDB::new_in_memory();
4866 let session = db.session();
4867
4868 session
4869 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
4870 .unwrap();
4871 assert!(session.in_transaction());
4872 session.execute("ROLLBACK").unwrap();
4873 }
4874
4875 #[test]
4876 fn test_session_commands_return_empty_result() {
4877 let db = GrafeoDB::new_in_memory();
4878 let session = db.session();
4879
4880 session.execute("CREATE GRAPH test").unwrap();
4881 let result = session.execute("SESSION SET GRAPH test").unwrap();
4882 assert_eq!(result.row_count(), 0);
4883 assert_eq!(result.column_count(), 0);
4884 }
4885
4886 #[test]
4887 fn test_current_graph_default_is_none() {
4888 let db = GrafeoDB::new_in_memory();
4889 let session = db.session();
4890
4891 assert_eq!(session.current_graph(), None);
4892 }
4893
4894 #[test]
4895 fn test_time_zone_default_is_none() {
4896 let db = GrafeoDB::new_in_memory();
4897 let session = db.session();
4898
4899 assert_eq!(session.time_zone(), None);
4900 }
4901
4902 #[test]
4903 fn test_session_state_independent_across_sessions() {
4904 let db = GrafeoDB::new_in_memory();
4905 let session1 = db.session();
4906 let session2 = db.session();
4907
4908 session1.execute("CREATE GRAPH first").unwrap();
4909 session1.execute("CREATE GRAPH second").unwrap();
4910 session1.execute("SESSION SET GRAPH first").unwrap();
4911 session2.execute("SESSION SET GRAPH second").unwrap();
4912
4913 assert_eq!(session1.current_graph(), Some("first".to_string()));
4914 assert_eq!(session2.current_graph(), Some("second".to_string()));
4915 }
4916
4917 #[test]
4918 fn test_show_node_types() {
4919 let db = GrafeoDB::new_in_memory();
4920 let session = db.session();
4921
4922 session
4923 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
4924 .unwrap();
4925
4926 let result = session.execute("SHOW NODE TYPES").unwrap();
4927 assert_eq!(
4928 result.columns,
4929 vec!["name", "properties", "constraints", "parents"]
4930 );
4931 assert_eq!(result.rows.len(), 1);
4932 assert_eq!(result.rows[0][0], Value::from("Person"));
4934 }
4935
4936 #[test]
4937 fn test_show_edge_types() {
4938 let db = GrafeoDB::new_in_memory();
4939 let session = db.session();
4940
4941 session
4942 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
4943 .unwrap();
4944
4945 let result = session.execute("SHOW EDGE TYPES").unwrap();
4946 assert_eq!(
4947 result.columns,
4948 vec!["name", "properties", "source_types", "target_types"]
4949 );
4950 assert_eq!(result.rows.len(), 1);
4951 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
4952 }
4953
4954 #[test]
4955 fn test_show_graph_types() {
4956 let db = GrafeoDB::new_in_memory();
4957 let session = db.session();
4958
4959 session
4960 .execute("CREATE NODE TYPE Person (name STRING)")
4961 .unwrap();
4962 session
4963 .execute(
4964 "CREATE GRAPH TYPE social (\
4965 NODE TYPE Person (name STRING)\
4966 )",
4967 )
4968 .unwrap();
4969
4970 let result = session.execute("SHOW GRAPH TYPES").unwrap();
4971 assert_eq!(
4972 result.columns,
4973 vec!["name", "open", "node_types", "edge_types"]
4974 );
4975 assert_eq!(result.rows.len(), 1);
4976 assert_eq!(result.rows[0][0], Value::from("social"));
4977 }
4978
4979 #[test]
4980 fn test_show_graph_type_named() {
4981 let db = GrafeoDB::new_in_memory();
4982 let session = db.session();
4983
4984 session
4985 .execute("CREATE NODE TYPE Person (name STRING)")
4986 .unwrap();
4987 session
4988 .execute(
4989 "CREATE GRAPH TYPE social (\
4990 NODE TYPE Person (name STRING)\
4991 )",
4992 )
4993 .unwrap();
4994
4995 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
4996 assert_eq!(result.rows.len(), 1);
4997 assert_eq!(result.rows[0][0], Value::from("social"));
4998 }
4999
5000 #[test]
5001 fn test_show_graph_type_not_found() {
5002 let db = GrafeoDB::new_in_memory();
5003 let session = db.session();
5004
5005 let result = session.execute("SHOW GRAPH TYPE nonexistent");
5006 assert!(result.is_err());
5007 }
5008
5009 #[test]
5010 fn test_show_indexes_via_gql() {
5011 let db = GrafeoDB::new_in_memory();
5012 let session = db.session();
5013
5014 let result = session.execute("SHOW INDEXES").unwrap();
5015 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5016 }
5017
5018 #[test]
5019 fn test_show_constraints_via_gql() {
5020 let db = GrafeoDB::new_in_memory();
5021 let session = db.session();
5022
5023 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5024 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5025 }
5026
5027 #[test]
5028 fn test_pattern_form_graph_type_roundtrip() {
5029 let db = GrafeoDB::new_in_memory();
5030 let session = db.session();
5031
5032 session
5034 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5035 .unwrap();
5036 session
5037 .execute("CREATE NODE TYPE City (name STRING)")
5038 .unwrap();
5039 session
5040 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5041 .unwrap();
5042 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5043
5044 session
5046 .execute(
5047 "CREATE GRAPH TYPE social (\
5048 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5049 (:Person)-[:LIVES_IN]->(:City)\
5050 )",
5051 )
5052 .unwrap();
5053
5054 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5056 assert_eq!(result.rows.len(), 1);
5057 assert_eq!(result.rows[0][0], Value::from("social"));
5058 }
5059 }
5060}