1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25fn parse_default_literal(text: &str) -> Value {
30 if text.eq_ignore_ascii_case("null") {
31 return Value::Null;
32 }
33 if text.eq_ignore_ascii_case("true") {
34 return Value::Bool(true);
35 }
36 if text.eq_ignore_ascii_case("false") {
37 return Value::Bool(false);
38 }
39 if (text.starts_with('\'') && text.ends_with('\''))
41 || (text.starts_with('"') && text.ends_with('"'))
42 {
43 return Value::String(text[1..text.len() - 1].into());
44 }
45 if let Ok(i) = text.parse::<i64>() {
47 return Value::Int64(i);
48 }
49 if let Ok(f) = text.parse::<f64>() {
50 return Value::Float64(f);
51 }
52 Value::String(text.into())
54}
55
56pub(crate) struct SessionConfig {
61 pub transaction_manager: Arc<TransactionManager>,
62 pub query_cache: Arc<QueryCache>,
63 pub catalog: Arc<Catalog>,
64 pub adaptive_config: AdaptiveConfig,
65 pub factorized_execution: bool,
66 pub graph_model: GraphModel,
67 pub query_timeout: Option<Duration>,
68 pub commit_counter: Arc<AtomicUsize>,
69 pub gc_interval: usize,
70}
71
72pub struct Session {
78 store: Arc<LpgStore>,
80 graph_store: Arc<dyn GraphStoreMut>,
82 catalog: Arc<Catalog>,
84 #[cfg(feature = "rdf")]
86 rdf_store: Arc<RdfStore>,
87 transaction_manager: Arc<TransactionManager>,
89 query_cache: Arc<QueryCache>,
91 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
95 read_only_tx: parking_lot::Mutex<bool>,
97 auto_commit: bool,
99 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
102 factorized_execution: bool,
104 graph_model: GraphModel,
106 query_timeout: Option<Duration>,
108 commit_counter: Arc<AtomicUsize>,
110 gc_interval: usize,
112 transaction_start_node_count: AtomicUsize,
114 transaction_start_edge_count: AtomicUsize,
116 #[cfg(feature = "wal")]
118 wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
119 #[cfg(feature = "wal")]
121 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
122 #[cfg(feature = "cdc")]
124 cdc_log: Arc<crate::cdc::CdcLog>,
125 current_graph: parking_lot::Mutex<Option<String>>,
127 time_zone: parking_lot::Mutex<Option<String>>,
129 session_params:
131 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
132 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
134 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
136 transaction_nesting_depth: parking_lot::Mutex<u32>,
140 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
144}
145
146#[derive(Clone)]
148struct GraphSavepoint {
149 graph_name: Option<String>,
150 next_node_id: u64,
151 next_edge_id: u64,
152 undo_log_position: usize,
153}
154
155#[derive(Clone)]
157struct SavepointState {
158 name: String,
159 graph_snapshots: Vec<GraphSavepoint>,
160 #[allow(dead_code)]
163 active_graph: Option<String>,
164}
165
166impl Session {
167 #[allow(dead_code)]
169 pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
170 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
171 Self {
172 store,
173 graph_store,
174 catalog: cfg.catalog,
175 #[cfg(feature = "rdf")]
176 rdf_store: Arc::new(RdfStore::new()),
177 transaction_manager: cfg.transaction_manager,
178 query_cache: cfg.query_cache,
179 current_transaction: parking_lot::Mutex::new(None),
180 read_only_tx: parking_lot::Mutex::new(false),
181 auto_commit: true,
182 adaptive_config: cfg.adaptive_config,
183 factorized_execution: cfg.factorized_execution,
184 graph_model: cfg.graph_model,
185 query_timeout: cfg.query_timeout,
186 commit_counter: cfg.commit_counter,
187 gc_interval: cfg.gc_interval,
188 transaction_start_node_count: AtomicUsize::new(0),
189 transaction_start_edge_count: AtomicUsize::new(0),
190 #[cfg(feature = "wal")]
191 wal: None,
192 #[cfg(feature = "wal")]
193 wal_graph_context: None,
194 #[cfg(feature = "cdc")]
195 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
196 current_graph: parking_lot::Mutex::new(None),
197 time_zone: parking_lot::Mutex::new(None),
198 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
199 viewing_epoch_override: parking_lot::Mutex::new(None),
200 savepoints: parking_lot::Mutex::new(Vec::new()),
201 transaction_nesting_depth: parking_lot::Mutex::new(0),
202 touched_graphs: parking_lot::Mutex::new(Vec::new()),
203 }
204 }
205
206 #[cfg(feature = "wal")]
211 pub(crate) fn set_wal(
212 &mut self,
213 wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
214 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
215 ) {
216 self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
218 Arc::clone(&self.store),
219 Arc::clone(&wal),
220 Arc::clone(&wal_graph_context),
221 ));
222 self.wal = Some(wal);
223 self.wal_graph_context = Some(wal_graph_context);
224 }
225
226 #[cfg(feature = "cdc")]
228 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
229 self.cdc_log = cdc_log;
230 }
231
232 #[cfg(feature = "rdf")]
234 pub(crate) fn with_rdf_store_and_adaptive(
235 store: Arc<LpgStore>,
236 rdf_store: Arc<RdfStore>,
237 cfg: SessionConfig,
238 ) -> Self {
239 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
240 Self {
241 store,
242 graph_store,
243 catalog: cfg.catalog,
244 rdf_store,
245 transaction_manager: cfg.transaction_manager,
246 query_cache: cfg.query_cache,
247 current_transaction: parking_lot::Mutex::new(None),
248 read_only_tx: parking_lot::Mutex::new(false),
249 auto_commit: true,
250 adaptive_config: cfg.adaptive_config,
251 factorized_execution: cfg.factorized_execution,
252 graph_model: cfg.graph_model,
253 query_timeout: cfg.query_timeout,
254 commit_counter: cfg.commit_counter,
255 gc_interval: cfg.gc_interval,
256 transaction_start_node_count: AtomicUsize::new(0),
257 transaction_start_edge_count: AtomicUsize::new(0),
258 #[cfg(feature = "wal")]
259 wal: None,
260 #[cfg(feature = "wal")]
261 wal_graph_context: None,
262 #[cfg(feature = "cdc")]
263 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
264 current_graph: parking_lot::Mutex::new(None),
265 time_zone: parking_lot::Mutex::new(None),
266 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
267 viewing_epoch_override: parking_lot::Mutex::new(None),
268 savepoints: parking_lot::Mutex::new(Vec::new()),
269 transaction_nesting_depth: parking_lot::Mutex::new(0),
270 touched_graphs: parking_lot::Mutex::new(Vec::new()),
271 }
272 }
273
274 pub(crate) fn with_external_store(
283 store: Arc<dyn GraphStoreMut>,
284 cfg: SessionConfig,
285 ) -> Result<Self> {
286 Ok(Self {
287 store: Arc::new(LpgStore::new()?),
288 graph_store: store,
289 catalog: cfg.catalog,
290 #[cfg(feature = "rdf")]
291 rdf_store: Arc::new(RdfStore::new()),
292 transaction_manager: cfg.transaction_manager,
293 query_cache: cfg.query_cache,
294 current_transaction: parking_lot::Mutex::new(None),
295 read_only_tx: parking_lot::Mutex::new(false),
296 auto_commit: true,
297 adaptive_config: cfg.adaptive_config,
298 factorized_execution: cfg.factorized_execution,
299 graph_model: cfg.graph_model,
300 query_timeout: cfg.query_timeout,
301 commit_counter: cfg.commit_counter,
302 gc_interval: cfg.gc_interval,
303 transaction_start_node_count: AtomicUsize::new(0),
304 transaction_start_edge_count: AtomicUsize::new(0),
305 #[cfg(feature = "wal")]
306 wal: None,
307 #[cfg(feature = "wal")]
308 wal_graph_context: None,
309 #[cfg(feature = "cdc")]
310 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
311 current_graph: parking_lot::Mutex::new(None),
312 time_zone: parking_lot::Mutex::new(None),
313 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
314 viewing_epoch_override: parking_lot::Mutex::new(None),
315 savepoints: parking_lot::Mutex::new(Vec::new()),
316 transaction_nesting_depth: parking_lot::Mutex::new(0),
317 touched_graphs: parking_lot::Mutex::new(Vec::new()),
318 })
319 }
320
321 #[must_use]
323 pub fn graph_model(&self) -> GraphModel {
324 self.graph_model
325 }
326
327 pub fn use_graph(&self, name: &str) {
331 *self.current_graph.lock() = Some(name.to_string());
332 }
333
334 #[must_use]
336 pub fn current_graph(&self) -> Option<String> {
337 self.current_graph.lock().clone()
338 }
339
340 fn active_store(&self) -> Arc<dyn GraphStoreMut> {
348 let graph_name = self.current_graph.lock().clone();
349 match graph_name {
350 None => Arc::clone(&self.graph_store),
351 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.graph_store),
352 Some(ref name) => match self.store.graph(name) {
353 Some(named_store) => {
354 #[cfg(feature = "wal")]
355 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
356 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
357 named_store,
358 Arc::clone(wal),
359 name.clone(),
360 Arc::clone(ctx),
361 )) as Arc<dyn GraphStoreMut>;
362 }
363 named_store as Arc<dyn GraphStoreMut>
364 }
365 None => Arc::clone(&self.graph_store),
366 },
367 }
368 }
369
370 fn active_lpg_store(&self) -> Arc<LpgStore> {
375 let graph_name = self.current_graph.lock().clone();
376 match graph_name {
377 None => Arc::clone(&self.store),
378 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
379 Some(ref name) => self
380 .store
381 .graph(name)
382 .unwrap_or_else(|| Arc::clone(&self.store)),
383 }
384 }
385
386 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
389 match graph_name {
390 None => Arc::clone(&self.store),
391 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
392 Some(name) => self
393 .store
394 .graph(name)
395 .unwrap_or_else(|| Arc::clone(&self.store)),
396 }
397 }
398
399 fn track_graph_touch(&self) {
401 if self.current_transaction.lock().is_some() {
402 let graph = self.current_graph.lock().clone();
403 let normalized = match graph {
405 Some(ref name) if name.eq_ignore_ascii_case("default") => None,
406 other => other,
407 };
408 let mut touched = self.touched_graphs.lock();
409 if !touched.contains(&normalized) {
410 touched.push(normalized);
411 }
412 }
413 }
414
415 pub fn set_time_zone(&self, tz: &str) {
417 *self.time_zone.lock() = Some(tz.to_string());
418 }
419
420 #[must_use]
422 pub fn time_zone(&self) -> Option<String> {
423 self.time_zone.lock().clone()
424 }
425
426 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
428 self.session_params.lock().insert(key.to_string(), value);
429 }
430
431 #[must_use]
433 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
434 self.session_params.lock().get(key).cloned()
435 }
436
437 pub fn reset_session(&self) {
439 *self.current_graph.lock() = None;
440 *self.time_zone.lock() = None;
441 self.session_params.lock().clear();
442 *self.viewing_epoch_override.lock() = None;
443 }
444
445 pub fn set_viewing_epoch(&self, epoch: EpochId) {
453 *self.viewing_epoch_override.lock() = Some(epoch);
454 }
455
456 pub fn clear_viewing_epoch(&self) {
458 *self.viewing_epoch_override.lock() = None;
459 }
460
461 #[must_use]
463 pub fn viewing_epoch(&self) -> Option<EpochId> {
464 *self.viewing_epoch_override.lock()
465 }
466
467 #[must_use]
471 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
472 self.active_lpg_store().get_node_history(id)
473 }
474
475 #[must_use]
479 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
480 self.active_lpg_store().get_edge_history(id)
481 }
482
483 fn require_lpg(&self, language: &str) -> Result<()> {
485 if self.graph_model == GraphModel::Rdf {
486 return Err(grafeo_common::utils::error::Error::Internal(format!(
487 "This is an RDF database. {language} queries require an LPG database."
488 )));
489 }
490 Ok(())
491 }
492
493 #[cfg(feature = "gql")]
495 fn execute_session_command(
496 &self,
497 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
498 ) -> Result<QueryResult> {
499 use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
500 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
501
502 match cmd {
503 SessionCommand::CreateGraph {
504 name,
505 if_not_exists,
506 typed,
507 like_graph,
508 copy_of,
509 open: _,
510 } => {
511 if let Some(ref src) = like_graph
513 && self.store.graph(src).is_none()
514 {
515 return Err(Error::Query(QueryError::new(
516 QueryErrorKind::Semantic,
517 format!("Source graph '{src}' does not exist"),
518 )));
519 }
520 if let Some(ref src) = copy_of
521 && self.store.graph(src).is_none()
522 {
523 return Err(Error::Query(QueryError::new(
524 QueryErrorKind::Semantic,
525 format!("Source graph '{src}' does not exist"),
526 )));
527 }
528
529 let created = self
530 .store
531 .create_graph(&name)
532 .map_err(|e| Error::Internal(e.to_string()))?;
533 if !created && !if_not_exists {
534 return Err(Error::Query(QueryError::new(
535 QueryErrorKind::Semantic,
536 format!("Graph '{name}' already exists"),
537 )));
538 }
539 if created {
540 #[cfg(feature = "wal")]
541 self.log_schema_wal(
542 &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
543 name: name.clone(),
544 },
545 );
546 }
547
548 if let Some(ref src) = copy_of {
550 self.store
551 .copy_graph(Some(src), Some(&name))
552 .map_err(|e| Error::Internal(e.to_string()))?;
553 }
554
555 if let Some(type_name) = typed
557 && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
558 {
559 return Err(Error::Query(QueryError::new(
560 QueryErrorKind::Semantic,
561 e.to_string(),
562 )));
563 }
564
565 if let Some(ref src) = like_graph
567 && let Some(src_type) = self.catalog.get_graph_type_binding(src)
568 {
569 let _ = self.catalog.bind_graph_type(&name, src_type);
570 }
571
572 Ok(QueryResult::empty())
573 }
574 SessionCommand::DropGraph { name, if_exists } => {
575 let dropped = self.store.drop_graph(&name);
576 if !dropped && !if_exists {
577 return Err(Error::Query(QueryError::new(
578 QueryErrorKind::Semantic,
579 format!("Graph '{name}' does not exist"),
580 )));
581 }
582 if dropped {
583 #[cfg(feature = "wal")]
584 self.log_schema_wal(
585 &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
586 name: name.clone(),
587 },
588 );
589 let mut current = self.current_graph.lock();
591 if current
592 .as_deref()
593 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
594 {
595 *current = None;
596 }
597 }
598 Ok(QueryResult::empty())
599 }
600 SessionCommand::UseGraph(name) => {
601 if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
603 return Err(Error::Query(QueryError::new(
604 QueryErrorKind::Semantic,
605 format!("Graph '{name}' does not exist"),
606 )));
607 }
608 self.use_graph(&name);
609 self.track_graph_touch();
611 Ok(QueryResult::empty())
612 }
613 SessionCommand::SessionSetGraph(name) => {
614 self.use_graph(&name);
615 self.track_graph_touch();
617 Ok(QueryResult::empty())
618 }
619 SessionCommand::SessionSetTimeZone(tz) => {
620 self.set_time_zone(&tz);
621 Ok(QueryResult::empty())
622 }
623 SessionCommand::SessionSetParameter(key, expr) => {
624 if key.eq_ignore_ascii_case("viewing_epoch") {
625 match Self::eval_integer_literal(&expr) {
626 Some(n) if n >= 0 => {
627 self.set_viewing_epoch(EpochId::new(n as u64));
628 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
629 }
630 _ => Err(Error::Query(QueryError::new(
631 QueryErrorKind::Semantic,
632 "viewing_epoch must be a non-negative integer literal",
633 ))),
634 }
635 } else {
636 self.set_parameter(&key, Value::Null);
639 Ok(QueryResult::empty())
640 }
641 }
642 SessionCommand::SessionReset => {
643 self.reset_session();
644 Ok(QueryResult::empty())
645 }
646 SessionCommand::SessionClose => {
647 self.reset_session();
648 Ok(QueryResult::empty())
649 }
650 SessionCommand::StartTransaction {
651 read_only,
652 isolation_level,
653 } => {
654 let engine_level = isolation_level.map(|l| match l {
655 TransactionIsolationLevel::ReadCommitted => {
656 crate::transaction::IsolationLevel::ReadCommitted
657 }
658 TransactionIsolationLevel::SnapshotIsolation => {
659 crate::transaction::IsolationLevel::SnapshotIsolation
660 }
661 TransactionIsolationLevel::Serializable => {
662 crate::transaction::IsolationLevel::Serializable
663 }
664 });
665 self.begin_transaction_inner(read_only, engine_level)?;
666 Ok(QueryResult::status("Transaction started"))
667 }
668 SessionCommand::Commit => {
669 self.commit_inner()?;
670 Ok(QueryResult::status("Transaction committed"))
671 }
672 SessionCommand::Rollback => {
673 self.rollback_inner()?;
674 Ok(QueryResult::status("Transaction rolled back"))
675 }
676 SessionCommand::Savepoint(name) => {
677 self.savepoint(&name)?;
678 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
679 }
680 SessionCommand::RollbackToSavepoint(name) => {
681 self.rollback_to_savepoint(&name)?;
682 Ok(QueryResult::status(format!(
683 "Rolled back to savepoint '{name}'"
684 )))
685 }
686 SessionCommand::ReleaseSavepoint(name) => {
687 self.release_savepoint(&name)?;
688 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
689 }
690 }
691 }
692
693 #[cfg(feature = "wal")]
695 fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
696 if let Some(ref wal) = self.wal
697 && let Err(e) = wal.log(record)
698 {
699 tracing::warn!("Failed to log schema change to WAL: {}", e);
700 }
701 }
702
703 #[cfg(feature = "gql")]
705 fn execute_schema_command(
706 &self,
707 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
708 ) -> Result<QueryResult> {
709 use crate::catalog::{
710 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
711 };
712 use grafeo_adapters::query::gql::ast::SchemaStatement;
713 #[cfg(feature = "wal")]
714 use grafeo_adapters::storage::wal::WalRecord;
715 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
716
717 macro_rules! wal_log {
719 ($self:expr, $record:expr) => {
720 #[cfg(feature = "wal")]
721 $self.log_schema_wal(&$record);
722 };
723 }
724
725 let result = match cmd {
726 SchemaStatement::CreateNodeType(stmt) => {
727 #[cfg(feature = "wal")]
728 let props_for_wal: Vec<(String, String, bool)> = stmt
729 .properties
730 .iter()
731 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
732 .collect();
733 let def = NodeTypeDefinition {
734 name: stmt.name.clone(),
735 properties: stmt
736 .properties
737 .iter()
738 .map(|p| TypedProperty {
739 name: p.name.clone(),
740 data_type: PropertyDataType::from_type_name(&p.data_type),
741 nullable: p.nullable,
742 default_value: p
743 .default_value
744 .as_ref()
745 .map(|s| parse_default_literal(s)),
746 })
747 .collect(),
748 constraints: Vec::new(),
749 parent_types: stmt.parent_types.clone(),
750 };
751 let result = if stmt.or_replace {
752 let _ = self.catalog.drop_node_type(&stmt.name);
753 self.catalog.register_node_type(def)
754 } else {
755 self.catalog.register_node_type(def)
756 };
757 match result {
758 Ok(()) => {
759 wal_log!(
760 self,
761 WalRecord::CreateNodeType {
762 name: stmt.name.clone(),
763 properties: props_for_wal,
764 constraints: Vec::new(),
765 }
766 );
767 Ok(QueryResult::status(format!(
768 "Created node type '{}'",
769 stmt.name
770 )))
771 }
772 Err(e) if stmt.if_not_exists => {
773 let _ = e;
774 Ok(QueryResult::status("No change"))
775 }
776 Err(e) => Err(Error::Query(QueryError::new(
777 QueryErrorKind::Semantic,
778 e.to_string(),
779 ))),
780 }
781 }
782 SchemaStatement::CreateEdgeType(stmt) => {
783 #[cfg(feature = "wal")]
784 let props_for_wal: Vec<(String, String, bool)> = stmt
785 .properties
786 .iter()
787 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
788 .collect();
789 let def = EdgeTypeDefinition {
790 name: stmt.name.clone(),
791 properties: stmt
792 .properties
793 .iter()
794 .map(|p| TypedProperty {
795 name: p.name.clone(),
796 data_type: PropertyDataType::from_type_name(&p.data_type),
797 nullable: p.nullable,
798 default_value: p
799 .default_value
800 .as_ref()
801 .map(|s| parse_default_literal(s)),
802 })
803 .collect(),
804 constraints: Vec::new(),
805 source_node_types: stmt.source_node_types.clone(),
806 target_node_types: stmt.target_node_types.clone(),
807 };
808 let result = if stmt.or_replace {
809 let _ = self.catalog.drop_edge_type_def(&stmt.name);
810 self.catalog.register_edge_type_def(def)
811 } else {
812 self.catalog.register_edge_type_def(def)
813 };
814 match result {
815 Ok(()) => {
816 wal_log!(
817 self,
818 WalRecord::CreateEdgeType {
819 name: stmt.name.clone(),
820 properties: props_for_wal,
821 constraints: Vec::new(),
822 }
823 );
824 Ok(QueryResult::status(format!(
825 "Created edge type '{}'",
826 stmt.name
827 )))
828 }
829 Err(e) if stmt.if_not_exists => {
830 let _ = e;
831 Ok(QueryResult::status("No change"))
832 }
833 Err(e) => Err(Error::Query(QueryError::new(
834 QueryErrorKind::Semantic,
835 e.to_string(),
836 ))),
837 }
838 }
839 SchemaStatement::CreateVectorIndex(stmt) => {
840 Self::create_vector_index_on_store(
841 &self.active_lpg_store(),
842 &stmt.node_label,
843 &stmt.property,
844 stmt.dimensions,
845 stmt.metric.as_deref(),
846 )?;
847 wal_log!(
848 self,
849 WalRecord::CreateIndex {
850 name: stmt.name.clone(),
851 label: stmt.node_label.clone(),
852 property: stmt.property.clone(),
853 index_type: "vector".to_string(),
854 }
855 );
856 Ok(QueryResult::status(format!(
857 "Created vector index '{}'",
858 stmt.name
859 )))
860 }
861 SchemaStatement::DropNodeType { name, if_exists } => {
862 match self.catalog.drop_node_type(&name) {
863 Ok(()) => {
864 wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
865 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
866 }
867 Err(e) if if_exists => {
868 let _ = e;
869 Ok(QueryResult::status("No change"))
870 }
871 Err(e) => Err(Error::Query(QueryError::new(
872 QueryErrorKind::Semantic,
873 e.to_string(),
874 ))),
875 }
876 }
877 SchemaStatement::DropEdgeType { name, if_exists } => {
878 match self.catalog.drop_edge_type_def(&name) {
879 Ok(()) => {
880 wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
881 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
882 }
883 Err(e) if if_exists => {
884 let _ = e;
885 Ok(QueryResult::status("No change"))
886 }
887 Err(e) => Err(Error::Query(QueryError::new(
888 QueryErrorKind::Semantic,
889 e.to_string(),
890 ))),
891 }
892 }
893 SchemaStatement::CreateIndex(stmt) => {
894 use grafeo_adapters::query::gql::ast::IndexKind;
895 let active = self.active_lpg_store();
896 let index_type_str = match stmt.index_kind {
897 IndexKind::Property => "property",
898 IndexKind::BTree => "btree",
899 IndexKind::Text => "text",
900 IndexKind::Vector => "vector",
901 };
902 match stmt.index_kind {
903 IndexKind::Property | IndexKind::BTree => {
904 for prop in &stmt.properties {
905 active.create_property_index(prop);
906 }
907 }
908 IndexKind::Text => {
909 for prop in &stmt.properties {
910 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
911 }
912 }
913 IndexKind::Vector => {
914 for prop in &stmt.properties {
915 Self::create_vector_index_on_store(
916 &active,
917 &stmt.label,
918 prop,
919 stmt.options.dimensions,
920 stmt.options.metric.as_deref(),
921 )?;
922 }
923 }
924 }
925 #[cfg(feature = "wal")]
926 for prop in &stmt.properties {
927 wal_log!(
928 self,
929 WalRecord::CreateIndex {
930 name: stmt.name.clone(),
931 label: stmt.label.clone(),
932 property: prop.clone(),
933 index_type: index_type_str.to_string(),
934 }
935 );
936 }
937 Ok(QueryResult::status(format!(
938 "Created {} index '{}'",
939 index_type_str, stmt.name
940 )))
941 }
942 SchemaStatement::DropIndex { name, if_exists } => {
943 let dropped = self.active_lpg_store().drop_property_index(&name);
945 if dropped || if_exists {
946 if dropped {
947 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
948 }
949 Ok(QueryResult::status(if dropped {
950 format!("Dropped index '{name}'")
951 } else {
952 "No change".to_string()
953 }))
954 } else {
955 Err(Error::Query(QueryError::new(
956 QueryErrorKind::Semantic,
957 format!("Index '{name}' does not exist"),
958 )))
959 }
960 }
961 SchemaStatement::CreateConstraint(stmt) => {
962 use crate::catalog::TypeConstraint;
963 use grafeo_adapters::query::gql::ast::ConstraintKind;
964 let kind_str = match stmt.constraint_kind {
965 ConstraintKind::Unique => "unique",
966 ConstraintKind::NodeKey => "node_key",
967 ConstraintKind::NotNull => "not_null",
968 ConstraintKind::Exists => "exists",
969 };
970 let constraint_name = stmt
971 .name
972 .clone()
973 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
974
975 match stmt.constraint_kind {
977 ConstraintKind::Unique => {
978 for prop in &stmt.properties {
979 let label_id = self.catalog.get_or_create_label(&stmt.label);
980 let prop_id = self.catalog.get_or_create_property_key(prop);
981 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
982 }
983 let _ = self.catalog.add_constraint_to_type(
984 &stmt.label,
985 TypeConstraint::Unique(stmt.properties.clone()),
986 );
987 }
988 ConstraintKind::NodeKey => {
989 for prop in &stmt.properties {
990 let label_id = self.catalog.get_or_create_label(&stmt.label);
991 let prop_id = self.catalog.get_or_create_property_key(prop);
992 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
993 let _ = self.catalog.add_required_property(label_id, prop_id);
994 }
995 let _ = self.catalog.add_constraint_to_type(
996 &stmt.label,
997 TypeConstraint::PrimaryKey(stmt.properties.clone()),
998 );
999 }
1000 ConstraintKind::NotNull | ConstraintKind::Exists => {
1001 for prop in &stmt.properties {
1002 let label_id = self.catalog.get_or_create_label(&stmt.label);
1003 let prop_id = self.catalog.get_or_create_property_key(prop);
1004 let _ = self.catalog.add_required_property(label_id, prop_id);
1005 let _ = self.catalog.add_constraint_to_type(
1006 &stmt.label,
1007 TypeConstraint::NotNull(prop.clone()),
1008 );
1009 }
1010 }
1011 }
1012
1013 wal_log!(
1014 self,
1015 WalRecord::CreateConstraint {
1016 name: constraint_name.clone(),
1017 label: stmt.label.clone(),
1018 properties: stmt.properties.clone(),
1019 kind: kind_str.to_string(),
1020 }
1021 );
1022 Ok(QueryResult::status(format!(
1023 "Created {kind_str} constraint '{constraint_name}'"
1024 )))
1025 }
1026 SchemaStatement::DropConstraint { name, if_exists } => {
1027 let _ = if_exists;
1028 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1029 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1030 }
1031 SchemaStatement::CreateGraphType(stmt) => {
1032 use crate::catalog::GraphTypeDefinition;
1033 use grafeo_adapters::query::gql::ast::InlineElementType;
1034
1035 let (mut node_types, mut edge_types, open) =
1037 if let Some(ref like_graph) = stmt.like_graph {
1038 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1040 if let Some(existing) = self
1041 .catalog
1042 .schema()
1043 .and_then(|s| s.get_graph_type(&type_name))
1044 {
1045 (
1046 existing.allowed_node_types.clone(),
1047 existing.allowed_edge_types.clone(),
1048 existing.open,
1049 )
1050 } else {
1051 (Vec::new(), Vec::new(), true)
1052 }
1053 } else {
1054 let nt = self.catalog.all_node_type_names();
1056 let et = self.catalog.all_edge_type_names();
1057 if nt.is_empty() && et.is_empty() {
1058 (Vec::new(), Vec::new(), true)
1059 } else {
1060 (nt, et, false)
1061 }
1062 }
1063 } else {
1064 (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1065 };
1066
1067 for inline in &stmt.inline_types {
1069 match inline {
1070 InlineElementType::Node {
1071 name,
1072 properties,
1073 key_labels,
1074 ..
1075 } => {
1076 let def = NodeTypeDefinition {
1077 name: name.clone(),
1078 properties: properties
1079 .iter()
1080 .map(|p| TypedProperty {
1081 name: p.name.clone(),
1082 data_type: PropertyDataType::from_type_name(&p.data_type),
1083 nullable: p.nullable,
1084 default_value: None,
1085 })
1086 .collect(),
1087 constraints: Vec::new(),
1088 parent_types: key_labels.clone(),
1089 };
1090 self.catalog.register_or_replace_node_type(def);
1092 #[cfg(feature = "wal")]
1093 {
1094 let props_for_wal: Vec<(String, String, bool)> = properties
1095 .iter()
1096 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1097 .collect();
1098 self.log_schema_wal(&WalRecord::CreateNodeType {
1099 name: name.clone(),
1100 properties: props_for_wal,
1101 constraints: Vec::new(),
1102 });
1103 }
1104 if !node_types.contains(name) {
1105 node_types.push(name.clone());
1106 }
1107 }
1108 InlineElementType::Edge {
1109 name,
1110 properties,
1111 source_node_types,
1112 target_node_types,
1113 ..
1114 } => {
1115 let def = EdgeTypeDefinition {
1116 name: name.clone(),
1117 properties: properties
1118 .iter()
1119 .map(|p| TypedProperty {
1120 name: p.name.clone(),
1121 data_type: PropertyDataType::from_type_name(&p.data_type),
1122 nullable: p.nullable,
1123 default_value: None,
1124 })
1125 .collect(),
1126 constraints: Vec::new(),
1127 source_node_types: source_node_types.clone(),
1128 target_node_types: target_node_types.clone(),
1129 };
1130 self.catalog.register_or_replace_edge_type_def(def);
1131 #[cfg(feature = "wal")]
1132 {
1133 let props_for_wal: Vec<(String, String, bool)> = properties
1134 .iter()
1135 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1136 .collect();
1137 self.log_schema_wal(&WalRecord::CreateEdgeType {
1138 name: name.clone(),
1139 properties: props_for_wal,
1140 constraints: Vec::new(),
1141 });
1142 }
1143 if !edge_types.contains(name) {
1144 edge_types.push(name.clone());
1145 }
1146 }
1147 }
1148 }
1149
1150 let def = GraphTypeDefinition {
1151 name: stmt.name.clone(),
1152 allowed_node_types: node_types.clone(),
1153 allowed_edge_types: edge_types.clone(),
1154 open,
1155 };
1156 let result = if stmt.or_replace {
1157 let _ = self.catalog.drop_graph_type(&stmt.name);
1159 self.catalog.register_graph_type(def)
1160 } else {
1161 self.catalog.register_graph_type(def)
1162 };
1163 match result {
1164 Ok(()) => {
1165 wal_log!(
1166 self,
1167 WalRecord::CreateGraphType {
1168 name: stmt.name.clone(),
1169 node_types,
1170 edge_types,
1171 open,
1172 }
1173 );
1174 Ok(QueryResult::status(format!(
1175 "Created graph type '{}'",
1176 stmt.name
1177 )))
1178 }
1179 Err(e) if stmt.if_not_exists => {
1180 let _ = e;
1181 Ok(QueryResult::status("No change"))
1182 }
1183 Err(e) => Err(Error::Query(QueryError::new(
1184 QueryErrorKind::Semantic,
1185 e.to_string(),
1186 ))),
1187 }
1188 }
1189 SchemaStatement::DropGraphType { name, if_exists } => {
1190 match self.catalog.drop_graph_type(&name) {
1191 Ok(()) => {
1192 wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1193 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1194 }
1195 Err(e) if if_exists => {
1196 let _ = e;
1197 Ok(QueryResult::status("No change"))
1198 }
1199 Err(e) => Err(Error::Query(QueryError::new(
1200 QueryErrorKind::Semantic,
1201 e.to_string(),
1202 ))),
1203 }
1204 }
1205 SchemaStatement::CreateSchema {
1206 name,
1207 if_not_exists,
1208 } => match self.catalog.register_schema_namespace(name.clone()) {
1209 Ok(()) => {
1210 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1211 Ok(QueryResult::status(format!("Created schema '{name}'")))
1212 }
1213 Err(e) if if_not_exists => {
1214 let _ = e;
1215 Ok(QueryResult::status("No change"))
1216 }
1217 Err(e) => Err(Error::Query(QueryError::new(
1218 QueryErrorKind::Semantic,
1219 e.to_string(),
1220 ))),
1221 },
1222 SchemaStatement::DropSchema { name, if_exists } => {
1223 match self.catalog.drop_schema_namespace(&name) {
1224 Ok(()) => {
1225 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1226 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1227 }
1228 Err(e) if if_exists => {
1229 let _ = e;
1230 Ok(QueryResult::status("No change"))
1231 }
1232 Err(e) => Err(Error::Query(QueryError::new(
1233 QueryErrorKind::Semantic,
1234 e.to_string(),
1235 ))),
1236 }
1237 }
1238 SchemaStatement::AlterNodeType(stmt) => {
1239 use grafeo_adapters::query::gql::ast::TypeAlteration;
1240 let mut wal_alts = Vec::new();
1241 for alt in &stmt.alterations {
1242 match alt {
1243 TypeAlteration::AddProperty(prop) => {
1244 let typed = TypedProperty {
1245 name: prop.name.clone(),
1246 data_type: PropertyDataType::from_type_name(&prop.data_type),
1247 nullable: prop.nullable,
1248 default_value: prop
1249 .default_value
1250 .as_ref()
1251 .map(|s| parse_default_literal(s)),
1252 };
1253 self.catalog
1254 .alter_node_type_add_property(&stmt.name, typed)
1255 .map_err(|e| {
1256 Error::Query(QueryError::new(
1257 QueryErrorKind::Semantic,
1258 e.to_string(),
1259 ))
1260 })?;
1261 wal_alts.push((
1262 "add".to_string(),
1263 prop.name.clone(),
1264 prop.data_type.clone(),
1265 prop.nullable,
1266 ));
1267 }
1268 TypeAlteration::DropProperty(name) => {
1269 self.catalog
1270 .alter_node_type_drop_property(&stmt.name, name)
1271 .map_err(|e| {
1272 Error::Query(QueryError::new(
1273 QueryErrorKind::Semantic,
1274 e.to_string(),
1275 ))
1276 })?;
1277 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1278 }
1279 }
1280 }
1281 wal_log!(
1282 self,
1283 WalRecord::AlterNodeType {
1284 name: stmt.name.clone(),
1285 alterations: wal_alts,
1286 }
1287 );
1288 Ok(QueryResult::status(format!(
1289 "Altered node type '{}'",
1290 stmt.name
1291 )))
1292 }
1293 SchemaStatement::AlterEdgeType(stmt) => {
1294 use grafeo_adapters::query::gql::ast::TypeAlteration;
1295 let mut wal_alts = Vec::new();
1296 for alt in &stmt.alterations {
1297 match alt {
1298 TypeAlteration::AddProperty(prop) => {
1299 let typed = TypedProperty {
1300 name: prop.name.clone(),
1301 data_type: PropertyDataType::from_type_name(&prop.data_type),
1302 nullable: prop.nullable,
1303 default_value: prop
1304 .default_value
1305 .as_ref()
1306 .map(|s| parse_default_literal(s)),
1307 };
1308 self.catalog
1309 .alter_edge_type_add_property(&stmt.name, typed)
1310 .map_err(|e| {
1311 Error::Query(QueryError::new(
1312 QueryErrorKind::Semantic,
1313 e.to_string(),
1314 ))
1315 })?;
1316 wal_alts.push((
1317 "add".to_string(),
1318 prop.name.clone(),
1319 prop.data_type.clone(),
1320 prop.nullable,
1321 ));
1322 }
1323 TypeAlteration::DropProperty(name) => {
1324 self.catalog
1325 .alter_edge_type_drop_property(&stmt.name, name)
1326 .map_err(|e| {
1327 Error::Query(QueryError::new(
1328 QueryErrorKind::Semantic,
1329 e.to_string(),
1330 ))
1331 })?;
1332 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1333 }
1334 }
1335 }
1336 wal_log!(
1337 self,
1338 WalRecord::AlterEdgeType {
1339 name: stmt.name.clone(),
1340 alterations: wal_alts,
1341 }
1342 );
1343 Ok(QueryResult::status(format!(
1344 "Altered edge type '{}'",
1345 stmt.name
1346 )))
1347 }
1348 SchemaStatement::AlterGraphType(stmt) => {
1349 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1350 let mut wal_alts = Vec::new();
1351 for alt in &stmt.alterations {
1352 match alt {
1353 GraphTypeAlteration::AddNodeType(name) => {
1354 self.catalog
1355 .alter_graph_type_add_node_type(&stmt.name, name.clone())
1356 .map_err(|e| {
1357 Error::Query(QueryError::new(
1358 QueryErrorKind::Semantic,
1359 e.to_string(),
1360 ))
1361 })?;
1362 wal_alts.push(("add_node_type".to_string(), name.clone()));
1363 }
1364 GraphTypeAlteration::DropNodeType(name) => {
1365 self.catalog
1366 .alter_graph_type_drop_node_type(&stmt.name, name)
1367 .map_err(|e| {
1368 Error::Query(QueryError::new(
1369 QueryErrorKind::Semantic,
1370 e.to_string(),
1371 ))
1372 })?;
1373 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1374 }
1375 GraphTypeAlteration::AddEdgeType(name) => {
1376 self.catalog
1377 .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1378 .map_err(|e| {
1379 Error::Query(QueryError::new(
1380 QueryErrorKind::Semantic,
1381 e.to_string(),
1382 ))
1383 })?;
1384 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1385 }
1386 GraphTypeAlteration::DropEdgeType(name) => {
1387 self.catalog
1388 .alter_graph_type_drop_edge_type(&stmt.name, name)
1389 .map_err(|e| {
1390 Error::Query(QueryError::new(
1391 QueryErrorKind::Semantic,
1392 e.to_string(),
1393 ))
1394 })?;
1395 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1396 }
1397 }
1398 }
1399 wal_log!(
1400 self,
1401 WalRecord::AlterGraphType {
1402 name: stmt.name.clone(),
1403 alterations: wal_alts,
1404 }
1405 );
1406 Ok(QueryResult::status(format!(
1407 "Altered graph type '{}'",
1408 stmt.name
1409 )))
1410 }
1411 SchemaStatement::CreateProcedure(stmt) => {
1412 use crate::catalog::ProcedureDefinition;
1413
1414 let def = ProcedureDefinition {
1415 name: stmt.name.clone(),
1416 params: stmt
1417 .params
1418 .iter()
1419 .map(|p| (p.name.clone(), p.param_type.clone()))
1420 .collect(),
1421 returns: stmt
1422 .returns
1423 .iter()
1424 .map(|r| (r.name.clone(), r.return_type.clone()))
1425 .collect(),
1426 body: stmt.body.clone(),
1427 };
1428
1429 if stmt.or_replace {
1430 self.catalog.replace_procedure(def).map_err(|e| {
1431 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1432 })?;
1433 } else {
1434 match self.catalog.register_procedure(def) {
1435 Ok(()) => {}
1436 Err(_) if stmt.if_not_exists => {
1437 return Ok(QueryResult::empty());
1438 }
1439 Err(e) => {
1440 return Err(Error::Query(QueryError::new(
1441 QueryErrorKind::Semantic,
1442 e.to_string(),
1443 )));
1444 }
1445 }
1446 }
1447
1448 wal_log!(
1449 self,
1450 WalRecord::CreateProcedure {
1451 name: stmt.name.clone(),
1452 params: stmt
1453 .params
1454 .iter()
1455 .map(|p| (p.name.clone(), p.param_type.clone()))
1456 .collect(),
1457 returns: stmt
1458 .returns
1459 .iter()
1460 .map(|r| (r.name.clone(), r.return_type.clone()))
1461 .collect(),
1462 body: stmt.body,
1463 }
1464 );
1465 Ok(QueryResult::status(format!(
1466 "Created procedure '{}'",
1467 stmt.name
1468 )))
1469 }
1470 SchemaStatement::DropProcedure { name, if_exists } => {
1471 match self.catalog.drop_procedure(&name) {
1472 Ok(()) => {}
1473 Err(_) if if_exists => {
1474 return Ok(QueryResult::empty());
1475 }
1476 Err(e) => {
1477 return Err(Error::Query(QueryError::new(
1478 QueryErrorKind::Semantic,
1479 e.to_string(),
1480 )));
1481 }
1482 }
1483 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1484 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1485 }
1486 SchemaStatement::ShowIndexes => {
1487 return self.execute_show_indexes();
1488 }
1489 SchemaStatement::ShowConstraints => {
1490 return self.execute_show_constraints();
1491 }
1492 SchemaStatement::ShowNodeTypes => {
1493 return self.execute_show_node_types();
1494 }
1495 SchemaStatement::ShowEdgeTypes => {
1496 return self.execute_show_edge_types();
1497 }
1498 SchemaStatement::ShowGraphTypes => {
1499 return self.execute_show_graph_types();
1500 }
1501 SchemaStatement::ShowGraphType(name) => {
1502 return self.execute_show_graph_type(&name);
1503 }
1504 SchemaStatement::ShowCurrentGraphType => {
1505 return self.execute_show_current_graph_type();
1506 }
1507 SchemaStatement::ShowGraphs => {
1508 return self.execute_show_graphs();
1509 }
1510 };
1511
1512 if result.is_ok() {
1515 self.query_cache.clear();
1516 }
1517
1518 result
1519 }
1520
1521 #[cfg(all(feature = "gql", feature = "vector-index"))]
1523 fn create_vector_index_on_store(
1524 store: &LpgStore,
1525 label: &str,
1526 property: &str,
1527 dimensions: Option<usize>,
1528 metric: Option<&str>,
1529 ) -> Result<()> {
1530 use grafeo_common::types::{PropertyKey, Value};
1531 use grafeo_common::utils::error::Error;
1532 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1533
1534 let metric = match metric {
1535 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1536 Error::Internal(format!(
1537 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1538 ))
1539 })?,
1540 None => DistanceMetric::Cosine,
1541 };
1542
1543 let prop_key = PropertyKey::new(property);
1544 let mut found_dims: Option<usize> = dimensions;
1545 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1546
1547 for node in store.nodes_with_label(label) {
1548 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1549 if let Some(expected) = found_dims {
1550 if v.len() != expected {
1551 return Err(Error::Internal(format!(
1552 "Vector dimension mismatch: expected {expected}, found {} on node {}",
1553 v.len(),
1554 node.id.0
1555 )));
1556 }
1557 } else {
1558 found_dims = Some(v.len());
1559 }
1560 vectors.push((node.id, v.to_vec()));
1561 }
1562 }
1563
1564 let Some(dims) = found_dims else {
1565 return Err(Error::Internal(format!(
1566 "No vector properties found on :{label}({property}) and no dimensions specified"
1567 )));
1568 };
1569
1570 let config = HnswConfig::new(dims, metric);
1571 let index = HnswIndex::with_capacity(config, vectors.len());
1572 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1573 for (node_id, vec) in &vectors {
1574 index.insert(*node_id, vec, &accessor);
1575 }
1576
1577 store.add_vector_index(label, property, Arc::new(index));
1578 Ok(())
1579 }
1580
1581 #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1583 fn create_vector_index_on_store(
1584 _store: &LpgStore,
1585 _label: &str,
1586 _property: &str,
1587 _dimensions: Option<usize>,
1588 _metric: Option<&str>,
1589 ) -> Result<()> {
1590 Err(grafeo_common::utils::error::Error::Internal(
1591 "Vector index support requires the 'vector-index' feature".to_string(),
1592 ))
1593 }
1594
1595 #[cfg(all(feature = "gql", feature = "text-index"))]
1597 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1598 use grafeo_common::types::{PropertyKey, Value};
1599 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1600
1601 let mut index = InvertedIndex::new(BM25Config::default());
1602 let prop_key = PropertyKey::new(property);
1603
1604 let nodes = store.nodes_by_label(label);
1605 for node_id in nodes {
1606 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1607 index.insert(node_id, text.as_str());
1608 }
1609 }
1610
1611 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1612 Ok(())
1613 }
1614
1615 #[cfg(all(feature = "gql", not(feature = "text-index")))]
1617 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1618 Err(grafeo_common::utils::error::Error::Internal(
1619 "Text index support requires the 'text-index' feature".to_string(),
1620 ))
1621 }
1622
1623 fn execute_show_indexes(&self) -> Result<QueryResult> {
1625 let indexes = self.catalog.all_indexes();
1626 let columns = vec![
1627 "name".to_string(),
1628 "type".to_string(),
1629 "label".to_string(),
1630 "property".to_string(),
1631 ];
1632 let rows: Vec<Vec<Value>> = indexes
1633 .into_iter()
1634 .map(|def| {
1635 let label_name = self
1636 .catalog
1637 .get_label_name(def.label)
1638 .unwrap_or_else(|| "?".into());
1639 let prop_name = self
1640 .catalog
1641 .get_property_key_name(def.property_key)
1642 .unwrap_or_else(|| "?".into());
1643 vec![
1644 Value::from(format!("idx_{}_{}", label_name, prop_name)),
1645 Value::from(format!("{:?}", def.index_type)),
1646 Value::from(&*label_name),
1647 Value::from(&*prop_name),
1648 ]
1649 })
1650 .collect();
1651 Ok(QueryResult {
1652 columns,
1653 column_types: Vec::new(),
1654 rows,
1655 ..QueryResult::empty()
1656 })
1657 }
1658
1659 fn execute_show_constraints(&self) -> Result<QueryResult> {
1661 Ok(QueryResult {
1664 columns: vec![
1665 "name".to_string(),
1666 "type".to_string(),
1667 "label".to_string(),
1668 "properties".to_string(),
1669 ],
1670 column_types: Vec::new(),
1671 rows: Vec::new(),
1672 ..QueryResult::empty()
1673 })
1674 }
1675
1676 fn execute_show_node_types(&self) -> Result<QueryResult> {
1678 let columns = vec![
1679 "name".to_string(),
1680 "properties".to_string(),
1681 "constraints".to_string(),
1682 "parents".to_string(),
1683 ];
1684 let type_names = self.catalog.all_node_type_names();
1685 let rows: Vec<Vec<Value>> = type_names
1686 .into_iter()
1687 .filter_map(|name| {
1688 let def = self.catalog.get_node_type(&name)?;
1689 let props: Vec<String> = def
1690 .properties
1691 .iter()
1692 .map(|p| {
1693 let nullable = if p.nullable { "" } else { " NOT NULL" };
1694 format!("{} {}{}", p.name, p.data_type, nullable)
1695 })
1696 .collect();
1697 let constraints: Vec<String> =
1698 def.constraints.iter().map(|c| format!("{c:?}")).collect();
1699 let parents = def.parent_types.join(", ");
1700 Some(vec![
1701 Value::from(name),
1702 Value::from(props.join(", ")),
1703 Value::from(constraints.join(", ")),
1704 Value::from(parents),
1705 ])
1706 })
1707 .collect();
1708 Ok(QueryResult {
1709 columns,
1710 column_types: Vec::new(),
1711 rows,
1712 ..QueryResult::empty()
1713 })
1714 }
1715
1716 fn execute_show_edge_types(&self) -> Result<QueryResult> {
1718 let columns = vec![
1719 "name".to_string(),
1720 "properties".to_string(),
1721 "source_types".to_string(),
1722 "target_types".to_string(),
1723 ];
1724 let type_names = self.catalog.all_edge_type_names();
1725 let rows: Vec<Vec<Value>> = type_names
1726 .into_iter()
1727 .filter_map(|name| {
1728 let def = self.catalog.get_edge_type_def(&name)?;
1729 let props: Vec<String> = def
1730 .properties
1731 .iter()
1732 .map(|p| {
1733 let nullable = if p.nullable { "" } else { " NOT NULL" };
1734 format!("{} {}{}", p.name, p.data_type, nullable)
1735 })
1736 .collect();
1737 let src = def.source_node_types.join(", ");
1738 let tgt = def.target_node_types.join(", ");
1739 Some(vec![
1740 Value::from(name),
1741 Value::from(props.join(", ")),
1742 Value::from(src),
1743 Value::from(tgt),
1744 ])
1745 })
1746 .collect();
1747 Ok(QueryResult {
1748 columns,
1749 column_types: Vec::new(),
1750 rows,
1751 ..QueryResult::empty()
1752 })
1753 }
1754
1755 fn execute_show_graph_types(&self) -> Result<QueryResult> {
1757 let columns = vec![
1758 "name".to_string(),
1759 "open".to_string(),
1760 "node_types".to_string(),
1761 "edge_types".to_string(),
1762 ];
1763 let type_names = self.catalog.all_graph_type_names();
1764 let rows: Vec<Vec<Value>> = type_names
1765 .into_iter()
1766 .filter_map(|name| {
1767 let def = self.catalog.get_graph_type_def(&name)?;
1768 Some(vec![
1769 Value::from(name),
1770 Value::from(def.open),
1771 Value::from(def.allowed_node_types.join(", ")),
1772 Value::from(def.allowed_edge_types.join(", ")),
1773 ])
1774 })
1775 .collect();
1776 Ok(QueryResult {
1777 columns,
1778 column_types: Vec::new(),
1779 rows,
1780 ..QueryResult::empty()
1781 })
1782 }
1783
1784 fn execute_show_graphs(&self) -> Result<QueryResult> {
1786 let mut names = self.store.graph_names();
1787 names.sort();
1788 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1789 Ok(QueryResult {
1790 columns: vec!["name".to_string()],
1791 column_types: Vec::new(),
1792 rows,
1793 ..QueryResult::empty()
1794 })
1795 }
1796
1797 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1799 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1800
1801 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1802 Error::Query(QueryError::new(
1803 QueryErrorKind::Semantic,
1804 format!("Graph type '{name}' not found"),
1805 ))
1806 })?;
1807
1808 let columns = vec![
1809 "name".to_string(),
1810 "open".to_string(),
1811 "node_types".to_string(),
1812 "edge_types".to_string(),
1813 ];
1814 let rows = vec![vec![
1815 Value::from(def.name),
1816 Value::from(def.open),
1817 Value::from(def.allowed_node_types.join(", ")),
1818 Value::from(def.allowed_edge_types.join(", ")),
1819 ]];
1820 Ok(QueryResult {
1821 columns,
1822 column_types: Vec::new(),
1823 rows,
1824 ..QueryResult::empty()
1825 })
1826 }
1827
1828 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1830 let graph_name = self
1831 .current_graph()
1832 .unwrap_or_else(|| "default".to_string());
1833 let columns = vec![
1834 "graph".to_string(),
1835 "graph_type".to_string(),
1836 "open".to_string(),
1837 "node_types".to_string(),
1838 "edge_types".to_string(),
1839 ];
1840
1841 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
1842 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
1843 {
1844 let rows = vec![vec![
1845 Value::from(graph_name),
1846 Value::from(type_name),
1847 Value::from(def.open),
1848 Value::from(def.allowed_node_types.join(", ")),
1849 Value::from(def.allowed_edge_types.join(", ")),
1850 ]];
1851 return Ok(QueryResult {
1852 columns,
1853 column_types: Vec::new(),
1854 rows,
1855 ..QueryResult::empty()
1856 });
1857 }
1858
1859 Ok(QueryResult {
1861 columns,
1862 column_types: Vec::new(),
1863 rows: vec![vec![
1864 Value::from(graph_name),
1865 Value::Null,
1866 Value::Null,
1867 Value::Null,
1868 Value::Null,
1869 ]],
1870 ..QueryResult::empty()
1871 })
1872 }
1873
1874 #[cfg(feature = "gql")]
1901 pub fn execute(&self, query: &str) -> Result<QueryResult> {
1902 self.require_lpg("GQL")?;
1903
1904 use crate::query::{
1905 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1906 processor::QueryLanguage, translators::gql,
1907 };
1908
1909 #[cfg(not(target_arch = "wasm32"))]
1910 let start_time = std::time::Instant::now();
1911
1912 let translation = gql::translate_full(query)?;
1914 let logical_plan = match translation {
1915 gql::GqlTranslationResult::SessionCommand(cmd) => {
1916 return self.execute_session_command(cmd);
1917 }
1918 gql::GqlTranslationResult::SchemaCommand(cmd) => {
1919 if *self.read_only_tx.lock() {
1921 return Err(grafeo_common::utils::error::Error::Transaction(
1922 grafeo_common::utils::error::TransactionError::ReadOnly,
1923 ));
1924 }
1925 return self.execute_schema_command(cmd);
1926 }
1927 gql::GqlTranslationResult::Plan(plan) => {
1928 if *self.read_only_tx.lock() && plan.root.has_mutations() {
1930 return Err(grafeo_common::utils::error::Error::Transaction(
1931 grafeo_common::utils::error::TransactionError::ReadOnly,
1932 ));
1933 }
1934 plan
1935 }
1936 };
1937
1938 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
1940
1941 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1943 cached_plan
1944 } else {
1945 let mut binder = Binder::new();
1947 let _binding_context = binder.bind(&logical_plan)?;
1948
1949 let active = self.active_store();
1951 let optimizer = Optimizer::from_graph_store(&*active);
1952 let plan = optimizer.optimize(logical_plan)?;
1953
1954 self.query_cache.put_optimized(cache_key, plan.clone());
1956
1957 plan
1958 };
1959
1960 let active = self.active_store();
1962
1963 if optimized_plan.explain {
1965 use crate::query::processor::{annotate_pushdown_hints, explain_result};
1966 let mut plan = optimized_plan;
1967 annotate_pushdown_hints(&mut plan.root, active.as_ref());
1968 return Ok(explain_result(&plan));
1969 }
1970
1971 if optimized_plan.profile {
1973 let has_mutations = optimized_plan.root.has_mutations();
1974 return self.with_auto_commit(has_mutations, || {
1975 let (viewing_epoch, transaction_id) = self.get_transaction_context();
1976 let planner = self.create_planner_for_store(
1977 Arc::clone(&active),
1978 viewing_epoch,
1979 transaction_id,
1980 );
1981 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
1982
1983 let executor = Executor::with_columns(physical_plan.columns.clone())
1984 .with_deadline(self.query_deadline());
1985 let _result = executor.execute(physical_plan.operator.as_mut())?;
1986
1987 let total_time_ms;
1988 #[cfg(not(target_arch = "wasm32"))]
1989 {
1990 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1991 }
1992 #[cfg(target_arch = "wasm32")]
1993 {
1994 total_time_ms = 0.0;
1995 }
1996
1997 let profile_tree = crate::query::profile::build_profile_tree(
1998 &optimized_plan.root,
1999 &mut entries.into_iter(),
2000 );
2001 Ok(crate::query::profile::profile_result(
2002 &profile_tree,
2003 total_time_ms,
2004 ))
2005 });
2006 }
2007
2008 let has_mutations = optimized_plan.root.has_mutations();
2009
2010 self.with_auto_commit(has_mutations, || {
2011 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2013
2014 let planner =
2017 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2018 let mut physical_plan = planner.plan(&optimized_plan)?;
2019
2020 let executor = Executor::with_columns(physical_plan.columns.clone())
2022 .with_deadline(self.query_deadline());
2023 let mut result = executor.execute(physical_plan.operator.as_mut())?;
2024
2025 let rows_scanned = result.rows.len() as u64;
2027 #[cfg(not(target_arch = "wasm32"))]
2028 {
2029 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2030 result.execution_time_ms = Some(elapsed_ms);
2031 }
2032 result.rows_scanned = Some(rows_scanned);
2033
2034 Ok(result)
2035 })
2036 }
2037
2038 #[cfg(feature = "gql")]
2047 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2048 let previous = self.viewing_epoch_override.lock().replace(epoch);
2049 let result = self.execute(query);
2050 *self.viewing_epoch_override.lock() = previous;
2051 result
2052 }
2053
2054 #[cfg(feature = "gql")]
2060 pub fn execute_with_params(
2061 &self,
2062 query: &str,
2063 params: std::collections::HashMap<String, Value>,
2064 ) -> Result<QueryResult> {
2065 self.require_lpg("GQL")?;
2066
2067 use crate::query::processor::{QueryLanguage, QueryProcessor};
2068
2069 let has_mutations = Self::query_looks_like_mutation(query);
2070 let active = self.active_store();
2071
2072 self.with_auto_commit(has_mutations, || {
2073 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2075
2076 let processor = QueryProcessor::for_graph_store_with_transaction(
2078 Arc::clone(&active),
2079 Arc::clone(&self.transaction_manager),
2080 )?;
2081
2082 let processor = if let Some(transaction_id) = transaction_id {
2084 processor.with_transaction_context(viewing_epoch, transaction_id)
2085 } else {
2086 processor
2087 };
2088
2089 processor.process(query, QueryLanguage::Gql, Some(¶ms))
2090 })
2091 }
2092
2093 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2099 pub fn execute_with_params(
2100 &self,
2101 _query: &str,
2102 _params: std::collections::HashMap<String, Value>,
2103 ) -> Result<QueryResult> {
2104 Err(grafeo_common::utils::error::Error::Internal(
2105 "No query language enabled".to_string(),
2106 ))
2107 }
2108
2109 #[cfg(not(any(feature = "gql", feature = "cypher")))]
2115 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2116 Err(grafeo_common::utils::error::Error::Internal(
2117 "No query language enabled".to_string(),
2118 ))
2119 }
2120
2121 #[cfg(feature = "cypher")]
2127 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2128 use crate::query::{
2129 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2130 processor::QueryLanguage, translators::cypher,
2131 };
2132 use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2133
2134 let translation = cypher::translate_full(query)?;
2136 match translation {
2137 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2138 if *self.read_only_tx.lock() {
2139 return Err(GrafeoError::Query(QueryError::new(
2140 QueryErrorKind::Semantic,
2141 "Cannot execute schema DDL in a read-only transaction",
2142 )));
2143 }
2144 return self.execute_schema_command(cmd);
2145 }
2146 cypher::CypherTranslationResult::ShowIndexes => {
2147 return self.execute_show_indexes();
2148 }
2149 cypher::CypherTranslationResult::ShowConstraints => {
2150 return self.execute_show_constraints();
2151 }
2152 cypher::CypherTranslationResult::ShowCurrentGraphType => {
2153 return self.execute_show_current_graph_type();
2154 }
2155 cypher::CypherTranslationResult::Plan(_) => {
2156 }
2158 }
2159
2160 #[cfg(not(target_arch = "wasm32"))]
2161 let start_time = std::time::Instant::now();
2162
2163 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2165
2166 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2168 cached_plan
2169 } else {
2170 let logical_plan = cypher::translate(query)?;
2172
2173 let mut binder = Binder::new();
2175 let _binding_context = binder.bind(&logical_plan)?;
2176
2177 let active = self.active_store();
2179 let optimizer = Optimizer::from_graph_store(&*active);
2180 let plan = optimizer.optimize(logical_plan)?;
2181
2182 self.query_cache.put_optimized(cache_key, plan.clone());
2184
2185 plan
2186 };
2187
2188 let active = self.active_store();
2190
2191 if optimized_plan.explain {
2193 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2194 let mut plan = optimized_plan;
2195 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2196 return Ok(explain_result(&plan));
2197 }
2198
2199 if optimized_plan.profile {
2201 let has_mutations = optimized_plan.root.has_mutations();
2202 return self.with_auto_commit(has_mutations, || {
2203 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2204 let planner = self.create_planner_for_store(
2205 Arc::clone(&active),
2206 viewing_epoch,
2207 transaction_id,
2208 );
2209 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2210
2211 let executor = Executor::with_columns(physical_plan.columns.clone())
2212 .with_deadline(self.query_deadline());
2213 let _result = executor.execute(physical_plan.operator.as_mut())?;
2214
2215 let total_time_ms;
2216 #[cfg(not(target_arch = "wasm32"))]
2217 {
2218 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2219 }
2220 #[cfg(target_arch = "wasm32")]
2221 {
2222 total_time_ms = 0.0;
2223 }
2224
2225 let profile_tree = crate::query::profile::build_profile_tree(
2226 &optimized_plan.root,
2227 &mut entries.into_iter(),
2228 );
2229 Ok(crate::query::profile::profile_result(
2230 &profile_tree,
2231 total_time_ms,
2232 ))
2233 });
2234 }
2235
2236 let has_mutations = optimized_plan.root.has_mutations();
2237
2238 self.with_auto_commit(has_mutations, || {
2239 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2241
2242 let planner =
2244 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2245 let mut physical_plan = planner.plan(&optimized_plan)?;
2246
2247 let executor = Executor::with_columns(physical_plan.columns.clone())
2249 .with_deadline(self.query_deadline());
2250 executor.execute(physical_plan.operator.as_mut())
2251 })
2252 }
2253
2254 #[cfg(feature = "gremlin")]
2278 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2279 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2280
2281 let logical_plan = gremlin::translate(query)?;
2283
2284 let mut binder = Binder::new();
2286 let _binding_context = binder.bind(&logical_plan)?;
2287
2288 let active = self.active_store();
2290 let optimizer = Optimizer::from_graph_store(&*active);
2291 let optimized_plan = optimizer.optimize(logical_plan)?;
2292
2293 let has_mutations = optimized_plan.root.has_mutations();
2294
2295 self.with_auto_commit(has_mutations, || {
2296 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2298
2299 let planner =
2301 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2302 let mut physical_plan = planner.plan(&optimized_plan)?;
2303
2304 let executor = Executor::with_columns(physical_plan.columns.clone())
2306 .with_deadline(self.query_deadline());
2307 executor.execute(physical_plan.operator.as_mut())
2308 })
2309 }
2310
2311 #[cfg(feature = "gremlin")]
2317 pub fn execute_gremlin_with_params(
2318 &self,
2319 query: &str,
2320 params: std::collections::HashMap<String, Value>,
2321 ) -> Result<QueryResult> {
2322 use crate::query::processor::{QueryLanguage, QueryProcessor};
2323
2324 let has_mutations = Self::query_looks_like_mutation(query);
2325 let active = self.active_store();
2326
2327 self.with_auto_commit(has_mutations, || {
2328 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2330
2331 let processor = QueryProcessor::for_graph_store_with_transaction(
2333 Arc::clone(&active),
2334 Arc::clone(&self.transaction_manager),
2335 )?;
2336
2337 let processor = if let Some(transaction_id) = transaction_id {
2339 processor.with_transaction_context(viewing_epoch, transaction_id)
2340 } else {
2341 processor
2342 };
2343
2344 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
2345 })
2346 }
2347
2348 #[cfg(feature = "graphql")]
2372 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2373 use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2374
2375 let logical_plan = graphql::translate(query)?;
2377
2378 let mut binder = Binder::new();
2380 let _binding_context = binder.bind(&logical_plan)?;
2381
2382 let active = self.active_store();
2384 let optimizer = Optimizer::from_graph_store(&*active);
2385 let optimized_plan = optimizer.optimize(logical_plan)?;
2386
2387 let has_mutations = optimized_plan.root.has_mutations();
2388
2389 self.with_auto_commit(has_mutations, || {
2390 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2392
2393 let planner =
2395 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2396 let mut physical_plan = planner.plan(&optimized_plan)?;
2397
2398 let executor = Executor::with_columns(physical_plan.columns.clone())
2400 .with_deadline(self.query_deadline());
2401 executor.execute(physical_plan.operator.as_mut())
2402 })
2403 }
2404
2405 #[cfg(feature = "graphql")]
2411 pub fn execute_graphql_with_params(
2412 &self,
2413 query: &str,
2414 params: std::collections::HashMap<String, Value>,
2415 ) -> Result<QueryResult> {
2416 use crate::query::processor::{QueryLanguage, QueryProcessor};
2417
2418 let has_mutations = Self::query_looks_like_mutation(query);
2419 let active = self.active_store();
2420
2421 self.with_auto_commit(has_mutations, || {
2422 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2424
2425 let processor = QueryProcessor::for_graph_store_with_transaction(
2427 Arc::clone(&active),
2428 Arc::clone(&self.transaction_manager),
2429 )?;
2430
2431 let processor = if let Some(transaction_id) = transaction_id {
2433 processor.with_transaction_context(viewing_epoch, transaction_id)
2434 } else {
2435 processor
2436 };
2437
2438 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
2439 })
2440 }
2441
2442 #[cfg(all(feature = "graphql", feature = "rdf"))]
2448 pub fn execute_graphql_rdf(&self, query: &str) -> Result<QueryResult> {
2449 use crate::query::{
2450 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::graphql_rdf,
2451 };
2452
2453 let logical_plan = graphql_rdf::translate(query, "http://example.org/")?;
2454
2455 let active = self.active_store();
2456 let optimizer = Optimizer::from_graph_store(&*active);
2457 let optimized_plan = optimizer.optimize(logical_plan)?;
2458
2459 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2460 .with_transaction_id(*self.current_transaction.lock());
2461 #[cfg(feature = "wal")]
2462 let planner = planner.with_wal(self.wal.clone());
2463 let mut physical_plan = planner.plan(&optimized_plan)?;
2464
2465 let executor = Executor::with_columns(physical_plan.columns.clone())
2466 .with_deadline(self.query_deadline());
2467 executor.execute(physical_plan.operator.as_mut())
2468 }
2469
2470 #[cfg(all(feature = "graphql", feature = "rdf"))]
2476 pub fn execute_graphql_rdf_with_params(
2477 &self,
2478 query: &str,
2479 params: std::collections::HashMap<String, Value>,
2480 ) -> Result<QueryResult> {
2481 use crate::query::processor::{QueryLanguage, QueryProcessor};
2482
2483 let has_mutations = Self::query_looks_like_mutation(query);
2484 let active = self.active_store();
2485
2486 self.with_auto_commit(has_mutations, || {
2487 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2488
2489 let processor = QueryProcessor::for_graph_store_with_transaction(
2490 Arc::clone(&active),
2491 Arc::clone(&self.transaction_manager),
2492 )?;
2493
2494 let processor = if let Some(transaction_id) = transaction_id {
2495 processor.with_transaction_context(viewing_epoch, transaction_id)
2496 } else {
2497 processor
2498 };
2499
2500 processor.process(query, QueryLanguage::GraphQLRdf, Some(¶ms))
2501 })
2502 }
2503
2504 #[cfg(feature = "sql-pgq")]
2529 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2530 use crate::query::{
2531 Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2532 processor::QueryLanguage, translators::sql_pgq,
2533 };
2534
2535 let logical_plan = sql_pgq::translate(query)?;
2537
2538 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2540 return Ok(QueryResult {
2541 columns: vec!["status".into()],
2542 column_types: vec![grafeo_common::types::LogicalType::String],
2543 rows: vec![vec![Value::from(format!(
2544 "Property graph '{}' created ({} node tables, {} edge tables)",
2545 cpg.name,
2546 cpg.node_tables.len(),
2547 cpg.edge_tables.len()
2548 ))]],
2549 execution_time_ms: None,
2550 rows_scanned: None,
2551 status_message: None,
2552 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2553 });
2554 }
2555
2556 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2558
2559 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2561 cached_plan
2562 } else {
2563 let mut binder = Binder::new();
2565 let _binding_context = binder.bind(&logical_plan)?;
2566
2567 let active = self.active_store();
2569 let optimizer = Optimizer::from_graph_store(&*active);
2570 let plan = optimizer.optimize(logical_plan)?;
2571
2572 self.query_cache.put_optimized(cache_key, plan.clone());
2574
2575 plan
2576 };
2577
2578 let active = self.active_store();
2579 let has_mutations = optimized_plan.root.has_mutations();
2580
2581 self.with_auto_commit(has_mutations, || {
2582 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2584
2585 let planner =
2587 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2588 let mut physical_plan = planner.plan(&optimized_plan)?;
2589
2590 let executor = Executor::with_columns(physical_plan.columns.clone())
2592 .with_deadline(self.query_deadline());
2593 executor.execute(physical_plan.operator.as_mut())
2594 })
2595 }
2596
2597 #[cfg(feature = "sql-pgq")]
2603 pub fn execute_sql_with_params(
2604 &self,
2605 query: &str,
2606 params: std::collections::HashMap<String, Value>,
2607 ) -> Result<QueryResult> {
2608 use crate::query::processor::{QueryLanguage, QueryProcessor};
2609
2610 let has_mutations = Self::query_looks_like_mutation(query);
2611 let active = self.active_store();
2612
2613 self.with_auto_commit(has_mutations, || {
2614 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2616
2617 let processor = QueryProcessor::for_graph_store_with_transaction(
2619 Arc::clone(&active),
2620 Arc::clone(&self.transaction_manager),
2621 )?;
2622
2623 let processor = if let Some(transaction_id) = transaction_id {
2625 processor.with_transaction_context(viewing_epoch, transaction_id)
2626 } else {
2627 processor
2628 };
2629
2630 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
2631 })
2632 }
2633
2634 #[cfg(all(feature = "sparql", feature = "rdf"))]
2640 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
2641 use crate::query::{
2642 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
2643 };
2644
2645 let logical_plan = sparql::translate(query)?;
2647
2648 let active = self.active_store();
2650 let optimizer = Optimizer::from_graph_store(&*active);
2651 let optimized_plan = optimizer.optimize(logical_plan)?;
2652
2653 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2655 .with_transaction_id(*self.current_transaction.lock());
2656 #[cfg(feature = "wal")]
2657 let planner = planner.with_wal(self.wal.clone());
2658 let mut physical_plan = planner.plan(&optimized_plan)?;
2659
2660 let executor = Executor::with_columns(physical_plan.columns.clone())
2662 .with_deadline(self.query_deadline());
2663 executor.execute(physical_plan.operator.as_mut())
2664 }
2665
2666 #[cfg(all(feature = "sparql", feature = "rdf"))]
2672 pub fn execute_sparql_with_params(
2673 &self,
2674 query: &str,
2675 params: std::collections::HashMap<String, Value>,
2676 ) -> Result<QueryResult> {
2677 use crate::query::{
2678 Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, processor::substitute_params,
2679 translators::sparql,
2680 };
2681
2682 let mut logical_plan = sparql::translate(query)?;
2683
2684 substitute_params(&mut logical_plan, ¶ms)?;
2685
2686 let active = self.active_store();
2687 let optimizer = Optimizer::from_graph_store(&*active);
2688 let optimized_plan = optimizer.optimize(logical_plan)?;
2689
2690 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2691 .with_transaction_id(*self.current_transaction.lock());
2692 #[cfg(feature = "wal")]
2693 let planner = planner.with_wal(self.wal.clone());
2694 let mut physical_plan = planner.plan(&optimized_plan)?;
2695
2696 let executor = Executor::with_columns(physical_plan.columns.clone())
2697 .with_deadline(self.query_deadline());
2698 executor.execute(physical_plan.operator.as_mut())
2699 }
2700
2701 pub fn execute_language(
2710 &self,
2711 query: &str,
2712 language: &str,
2713 params: Option<std::collections::HashMap<String, Value>>,
2714 ) -> Result<QueryResult> {
2715 match language {
2716 "gql" => {
2717 if let Some(p) = params {
2718 self.execute_with_params(query, p)
2719 } else {
2720 self.execute(query)
2721 }
2722 }
2723 #[cfg(feature = "cypher")]
2724 "cypher" => {
2725 if let Some(p) = params {
2726 use crate::query::processor::{QueryLanguage, QueryProcessor};
2727 let has_mutations = Self::query_looks_like_mutation(query);
2728 let active = self.active_store();
2729 self.with_auto_commit(has_mutations, || {
2730 let processor = QueryProcessor::for_graph_store_with_transaction(
2731 Arc::clone(&active),
2732 Arc::clone(&self.transaction_manager),
2733 )?;
2734 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2735 let processor = if let Some(transaction_id) = transaction_id {
2736 processor.with_transaction_context(viewing_epoch, transaction_id)
2737 } else {
2738 processor
2739 };
2740 processor.process(query, QueryLanguage::Cypher, Some(&p))
2741 })
2742 } else {
2743 self.execute_cypher(query)
2744 }
2745 }
2746 #[cfg(feature = "gremlin")]
2747 "gremlin" => {
2748 if let Some(p) = params {
2749 self.execute_gremlin_with_params(query, p)
2750 } else {
2751 self.execute_gremlin(query)
2752 }
2753 }
2754 #[cfg(feature = "graphql")]
2755 "graphql" => {
2756 if let Some(p) = params {
2757 self.execute_graphql_with_params(query, p)
2758 } else {
2759 self.execute_graphql(query)
2760 }
2761 }
2762 #[cfg(all(feature = "graphql", feature = "rdf"))]
2763 "graphql-rdf" => {
2764 if let Some(p) = params {
2765 self.execute_graphql_rdf_with_params(query, p)
2766 } else {
2767 self.execute_graphql_rdf(query)
2768 }
2769 }
2770 #[cfg(feature = "sql-pgq")]
2771 "sql" | "sql-pgq" => {
2772 if let Some(p) = params {
2773 self.execute_sql_with_params(query, p)
2774 } else {
2775 self.execute_sql(query)
2776 }
2777 }
2778 #[cfg(all(feature = "sparql", feature = "rdf"))]
2779 "sparql" => {
2780 if let Some(p) = params {
2781 self.execute_sparql_with_params(query, p)
2782 } else {
2783 self.execute_sparql(query)
2784 }
2785 }
2786 other => Err(grafeo_common::utils::error::Error::Query(
2787 grafeo_common::utils::error::QueryError::new(
2788 grafeo_common::utils::error::QueryErrorKind::Semantic,
2789 format!("Unknown query language: '{other}'"),
2790 ),
2791 )),
2792 }
2793 }
2794
2795 pub fn clear_plan_cache(&self) {
2822 self.query_cache.clear();
2823 }
2824
2825 pub fn begin_transaction(&mut self) -> Result<()> {
2833 self.begin_transaction_inner(false, None)
2834 }
2835
2836 pub fn begin_transaction_with_isolation(
2844 &mut self,
2845 isolation_level: crate::transaction::IsolationLevel,
2846 ) -> Result<()> {
2847 self.begin_transaction_inner(false, Some(isolation_level))
2848 }
2849
2850 fn begin_transaction_inner(
2852 &self,
2853 read_only: bool,
2854 isolation_level: Option<crate::transaction::IsolationLevel>,
2855 ) -> Result<()> {
2856 let mut current = self.current_transaction.lock();
2857 if current.is_some() {
2858 drop(current);
2860 let mut depth = self.transaction_nesting_depth.lock();
2861 *depth += 1;
2862 let sp_name = format!("_nested_tx_{}", *depth);
2863 self.savepoint(&sp_name)?;
2864 return Ok(());
2865 }
2866
2867 let active = self.active_lpg_store();
2868 self.transaction_start_node_count
2869 .store(active.node_count(), Ordering::Relaxed);
2870 self.transaction_start_edge_count
2871 .store(active.edge_count(), Ordering::Relaxed);
2872 let transaction_id = if let Some(level) = isolation_level {
2873 self.transaction_manager.begin_with_isolation(level)
2874 } else {
2875 self.transaction_manager.begin()
2876 };
2877 *current = Some(transaction_id);
2878 *self.read_only_tx.lock() = read_only;
2879
2880 let graph = self.current_graph.lock().clone();
2882 let normalized = match graph {
2883 Some(ref name) if name.eq_ignore_ascii_case("default") => None,
2884 other => other,
2885 };
2886 let mut touched = self.touched_graphs.lock();
2887 touched.clear();
2888 touched.push(normalized);
2889
2890 Ok(())
2891 }
2892
2893 pub fn commit(&mut self) -> Result<()> {
2901 self.commit_inner()
2902 }
2903
2904 fn commit_inner(&self) -> Result<()> {
2906 {
2908 let mut depth = self.transaction_nesting_depth.lock();
2909 if *depth > 0 {
2910 let sp_name = format!("_nested_tx_{depth}");
2911 *depth -= 1;
2912 drop(depth);
2913 return self.release_savepoint(&sp_name);
2914 }
2915 }
2916
2917 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2918 grafeo_common::utils::error::Error::Transaction(
2919 grafeo_common::utils::error::TransactionError::InvalidState(
2920 "No active transaction".to_string(),
2921 ),
2922 )
2923 })?;
2924
2925 let touched = self.touched_graphs.lock().clone();
2928 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
2929 Ok(epoch) => epoch,
2930 Err(e) => {
2931 for graph_name in &touched {
2933 let store = self.resolve_store(graph_name);
2934 store.rollback_transaction_properties(transaction_id);
2935 }
2936 #[cfg(feature = "rdf")]
2937 self.rdf_store.rollback_transaction(transaction_id);
2938 *self.read_only_tx.lock() = false;
2939 self.savepoints.lock().clear();
2940 self.touched_graphs.lock().clear();
2941 return Err(e);
2942 }
2943 };
2944
2945 for graph_name in &touched {
2947 let store = self.resolve_store(graph_name);
2948 store.finalize_version_epochs(transaction_id, commit_epoch);
2949 }
2950
2951 #[cfg(feature = "rdf")]
2953 self.rdf_store.commit_transaction(transaction_id);
2954
2955 for graph_name in &touched {
2956 let store = self.resolve_store(graph_name);
2957 store.commit_transaction_properties(transaction_id);
2958 }
2959
2960 let current_epoch = self.transaction_manager.current_epoch();
2963 for graph_name in &touched {
2964 let store = self.resolve_store(graph_name);
2965 store.sync_epoch(current_epoch);
2966 }
2967
2968 *self.read_only_tx.lock() = false;
2970 self.savepoints.lock().clear();
2971 self.touched_graphs.lock().clear();
2972
2973 if self.gc_interval > 0 {
2975 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
2976 if count.is_multiple_of(self.gc_interval) {
2977 let min_epoch = self.transaction_manager.min_active_epoch();
2978 for graph_name in &touched {
2979 let store = self.resolve_store(graph_name);
2980 store.gc_versions(min_epoch);
2981 }
2982 self.transaction_manager.gc();
2983 }
2984 }
2985
2986 Ok(())
2987 }
2988
2989 pub fn rollback(&mut self) -> Result<()> {
3013 self.rollback_inner()
3014 }
3015
3016 fn rollback_inner(&self) -> Result<()> {
3018 {
3020 let mut depth = self.transaction_nesting_depth.lock();
3021 if *depth > 0 {
3022 let sp_name = format!("_nested_tx_{depth}");
3023 *depth -= 1;
3024 drop(depth);
3025 return self.rollback_to_savepoint(&sp_name);
3026 }
3027 }
3028
3029 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3030 grafeo_common::utils::error::Error::Transaction(
3031 grafeo_common::utils::error::TransactionError::InvalidState(
3032 "No active transaction".to_string(),
3033 ),
3034 )
3035 })?;
3036
3037 *self.read_only_tx.lock() = false;
3039
3040 let touched = self.touched_graphs.lock().clone();
3042 for graph_name in &touched {
3043 let store = self.resolve_store(graph_name);
3044 store.discard_uncommitted_versions(transaction_id);
3045 }
3046
3047 #[cfg(feature = "rdf")]
3049 self.rdf_store.rollback_transaction(transaction_id);
3050
3051 self.savepoints.lock().clear();
3053 self.touched_graphs.lock().clear();
3054
3055 self.transaction_manager.abort(transaction_id)
3057 }
3058
3059 pub fn savepoint(&self, name: &str) -> Result<()> {
3069 let tx_id = self.current_transaction.lock().ok_or_else(|| {
3070 grafeo_common::utils::error::Error::Transaction(
3071 grafeo_common::utils::error::TransactionError::InvalidState(
3072 "No active transaction".to_string(),
3073 ),
3074 )
3075 })?;
3076
3077 let touched = self.touched_graphs.lock().clone();
3079 let graph_snapshots: Vec<GraphSavepoint> = touched
3080 .iter()
3081 .map(|graph_name| {
3082 let store = self.resolve_store(graph_name);
3083 GraphSavepoint {
3084 graph_name: graph_name.clone(),
3085 next_node_id: store.peek_next_node_id(),
3086 next_edge_id: store.peek_next_edge_id(),
3087 undo_log_position: store.property_undo_log_position(tx_id),
3088 }
3089 })
3090 .collect();
3091
3092 self.savepoints.lock().push(SavepointState {
3093 name: name.to_string(),
3094 graph_snapshots,
3095 active_graph: self.current_graph.lock().clone(),
3096 });
3097 Ok(())
3098 }
3099
3100 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3109 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3110 grafeo_common::utils::error::Error::Transaction(
3111 grafeo_common::utils::error::TransactionError::InvalidState(
3112 "No active transaction".to_string(),
3113 ),
3114 )
3115 })?;
3116
3117 let mut savepoints = self.savepoints.lock();
3118
3119 let pos = savepoints
3121 .iter()
3122 .rposition(|sp| sp.name == name)
3123 .ok_or_else(|| {
3124 grafeo_common::utils::error::Error::Transaction(
3125 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3126 "Savepoint '{name}' not found"
3127 )),
3128 )
3129 })?;
3130
3131 let sp_state = savepoints[pos].clone();
3132
3133 savepoints.truncate(pos);
3135 drop(savepoints);
3136
3137 for gs in &sp_state.graph_snapshots {
3139 let store = self.resolve_store(&gs.graph_name);
3140
3141 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3143
3144 let current_next_node = store.peek_next_node_id();
3146 let current_next_edge = store.peek_next_edge_id();
3147
3148 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3149 .map(NodeId::new)
3150 .collect();
3151 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3152 .map(EdgeId::new)
3153 .collect();
3154
3155 if !node_ids.is_empty() || !edge_ids.is_empty() {
3156 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3157 }
3158 }
3159
3160 let touched = self.touched_graphs.lock().clone();
3164 for graph_name in &touched {
3165 let already_captured = sp_state
3166 .graph_snapshots
3167 .iter()
3168 .any(|gs| gs.graph_name == *graph_name);
3169 if !already_captured {
3170 let store = self.resolve_store(graph_name);
3171 store.discard_uncommitted_versions(transaction_id);
3172 }
3173 }
3174
3175 let mut touched = self.touched_graphs.lock();
3177 touched.clear();
3178 for gs in &sp_state.graph_snapshots {
3179 if !touched.contains(&gs.graph_name) {
3180 touched.push(gs.graph_name.clone());
3181 }
3182 }
3183
3184 Ok(())
3185 }
3186
3187 pub fn release_savepoint(&self, name: &str) -> Result<()> {
3193 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3194 grafeo_common::utils::error::Error::Transaction(
3195 grafeo_common::utils::error::TransactionError::InvalidState(
3196 "No active transaction".to_string(),
3197 ),
3198 )
3199 })?;
3200
3201 let mut savepoints = self.savepoints.lock();
3202 let pos = savepoints
3203 .iter()
3204 .rposition(|sp| sp.name == name)
3205 .ok_or_else(|| {
3206 grafeo_common::utils::error::Error::Transaction(
3207 grafeo_common::utils::error::TransactionError::InvalidState(format!(
3208 "Savepoint '{name}' not found"
3209 )),
3210 )
3211 })?;
3212 savepoints.remove(pos);
3213 Ok(())
3214 }
3215
3216 #[must_use]
3218 pub fn in_transaction(&self) -> bool {
3219 self.current_transaction.lock().is_some()
3220 }
3221
3222 #[must_use]
3224 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3225 *self.current_transaction.lock()
3226 }
3227
3228 #[must_use]
3230 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3231 &self.transaction_manager
3232 }
3233
3234 #[must_use]
3236 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3237 (
3238 self.transaction_start_node_count.load(Ordering::Relaxed),
3239 self.active_lpg_store().node_count(),
3240 )
3241 }
3242
3243 #[must_use]
3245 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3246 (
3247 self.transaction_start_edge_count.load(Ordering::Relaxed),
3248 self.active_lpg_store().edge_count(),
3249 )
3250 }
3251
3252 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3286 crate::transaction::PreparedCommit::new(self)
3287 }
3288
3289 pub fn set_auto_commit(&mut self, auto_commit: bool) {
3291 self.auto_commit = auto_commit;
3292 }
3293
3294 #[must_use]
3296 pub fn auto_commit(&self) -> bool {
3297 self.auto_commit
3298 }
3299
3300 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3305 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3306 }
3307
3308 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3311 where
3312 F: FnOnce() -> Result<QueryResult>,
3313 {
3314 if self.needs_auto_commit(has_mutations) {
3315 self.begin_transaction_inner(false, None)?;
3316 match body() {
3317 Ok(result) => {
3318 self.commit_inner()?;
3319 Ok(result)
3320 }
3321 Err(e) => {
3322 let _ = self.rollback_inner();
3323 Err(e)
3324 }
3325 }
3326 } else {
3327 body()
3328 }
3329 }
3330
3331 fn query_looks_like_mutation(query: &str) -> bool {
3337 let upper = query.to_ascii_uppercase();
3338 upper.contains("INSERT")
3339 || upper.contains("CREATE")
3340 || upper.contains("DELETE")
3341 || upper.contains("MERGE")
3342 || upper.contains("SET")
3343 || upper.contains("REMOVE")
3344 || upper.contains("DROP")
3345 || upper.contains("ALTER")
3346 }
3347
3348 #[must_use]
3350 fn query_deadline(&self) -> Option<Instant> {
3351 #[cfg(not(target_arch = "wasm32"))]
3352 {
3353 self.query_timeout.map(|d| Instant::now() + d)
3354 }
3355 #[cfg(target_arch = "wasm32")]
3356 {
3357 let _ = &self.query_timeout;
3358 None
3359 }
3360 }
3361
3362 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3364 use grafeo_adapters::query::gql::ast::{Expression, Literal};
3365 match expr {
3366 Expression::Literal(Literal::Integer(n)) => Some(*n),
3367 _ => None,
3368 }
3369 }
3370
3371 #[must_use]
3377 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3378 if let Some(epoch) = *self.viewing_epoch_override.lock() {
3380 return (epoch, None);
3381 }
3382
3383 if let Some(transaction_id) = *self.current_transaction.lock() {
3384 let epoch = self
3386 .transaction_manager
3387 .start_epoch(transaction_id)
3388 .unwrap_or_else(|| self.transaction_manager.current_epoch());
3389 (epoch, Some(transaction_id))
3390 } else {
3391 (self.transaction_manager.current_epoch(), None)
3393 }
3394 }
3395
3396 fn create_planner_for_store(
3401 &self,
3402 store: Arc<dyn GraphStoreMut>,
3403 viewing_epoch: EpochId,
3404 transaction_id: Option<TransactionId>,
3405 ) -> crate::query::Planner {
3406 use crate::query::Planner;
3407
3408 let mut planner = Planner::with_context(
3409 Arc::clone(&store),
3410 Arc::clone(&self.transaction_manager),
3411 transaction_id,
3412 viewing_epoch,
3413 )
3414 .with_factorized_execution(self.factorized_execution)
3415 .with_catalog(Arc::clone(&self.catalog));
3416
3417 let validator =
3419 CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3420 planner = planner.with_validator(Arc::new(validator));
3421
3422 planner
3423 }
3424
3425 pub fn create_node(&self, labels: &[&str]) -> NodeId {
3430 let (epoch, transaction_id) = self.get_transaction_context();
3431 self.active_lpg_store().create_node_versioned(
3432 labels,
3433 epoch,
3434 transaction_id.unwrap_or(TransactionId::SYSTEM),
3435 )
3436 }
3437
3438 pub fn create_node_with_props<'a>(
3442 &self,
3443 labels: &[&str],
3444 properties: impl IntoIterator<Item = (&'a str, Value)>,
3445 ) -> NodeId {
3446 let (epoch, transaction_id) = self.get_transaction_context();
3447 self.active_lpg_store().create_node_with_props_versioned(
3448 labels,
3449 properties,
3450 epoch,
3451 transaction_id.unwrap_or(TransactionId::SYSTEM),
3452 )
3453 }
3454
3455 pub fn create_edge(
3460 &self,
3461 src: NodeId,
3462 dst: NodeId,
3463 edge_type: &str,
3464 ) -> grafeo_common::types::EdgeId {
3465 let (epoch, transaction_id) = self.get_transaction_context();
3466 self.active_lpg_store().create_edge_versioned(
3467 src,
3468 dst,
3469 edge_type,
3470 epoch,
3471 transaction_id.unwrap_or(TransactionId::SYSTEM),
3472 )
3473 }
3474
3475 #[must_use]
3503 pub fn get_node(&self, id: NodeId) -> Option<Node> {
3504 let (epoch, transaction_id) = self.get_transaction_context();
3505 self.active_lpg_store().get_node_versioned(
3506 id,
3507 epoch,
3508 transaction_id.unwrap_or(TransactionId::SYSTEM),
3509 )
3510 }
3511
3512 #[must_use]
3536 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3537 self.get_node(id)
3538 .and_then(|node| node.get_property(key).cloned())
3539 }
3540
3541 #[must_use]
3548 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3549 let (epoch, transaction_id) = self.get_transaction_context();
3550 self.active_lpg_store().get_edge_versioned(
3551 id,
3552 epoch,
3553 transaction_id.unwrap_or(TransactionId::SYSTEM),
3554 )
3555 }
3556
3557 #[must_use]
3583 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3584 self.active_lpg_store()
3585 .edges_from(node, Direction::Outgoing)
3586 .collect()
3587 }
3588
3589 #[must_use]
3598 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3599 self.active_lpg_store()
3600 .edges_from(node, Direction::Incoming)
3601 .collect()
3602 }
3603
3604 #[must_use]
3616 pub fn get_neighbors_outgoing_by_type(
3617 &self,
3618 node: NodeId,
3619 edge_type: &str,
3620 ) -> Vec<(NodeId, EdgeId)> {
3621 self.active_lpg_store()
3622 .edges_from(node, Direction::Outgoing)
3623 .filter(|(_, edge_id)| {
3624 self.get_edge(*edge_id)
3625 .is_some_and(|e| e.edge_type.as_str() == edge_type)
3626 })
3627 .collect()
3628 }
3629
3630 #[must_use]
3637 pub fn node_exists(&self, id: NodeId) -> bool {
3638 self.get_node(id).is_some()
3639 }
3640
3641 #[must_use]
3643 pub fn edge_exists(&self, id: EdgeId) -> bool {
3644 self.get_edge(id).is_some()
3645 }
3646
3647 #[must_use]
3651 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3652 let active = self.active_lpg_store();
3653 let out = active.out_degree(node);
3654 let in_degree = active.in_degree(node);
3655 (out, in_degree)
3656 }
3657
3658 #[must_use]
3668 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3669 let (epoch, transaction_id) = self.get_transaction_context();
3670 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3671 let active = self.active_lpg_store();
3672 ids.iter()
3673 .map(|&id| active.get_node_versioned(id, epoch, tx))
3674 .collect()
3675 }
3676
3677 #[cfg(feature = "cdc")]
3681 pub fn history(
3682 &self,
3683 entity_id: impl Into<crate::cdc::EntityId>,
3684 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3685 Ok(self.cdc_log.history(entity_id.into()))
3686 }
3687
3688 #[cfg(feature = "cdc")]
3690 pub fn history_since(
3691 &self,
3692 entity_id: impl Into<crate::cdc::EntityId>,
3693 since_epoch: EpochId,
3694 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3695 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3696 }
3697
3698 #[cfg(feature = "cdc")]
3700 pub fn changes_between(
3701 &self,
3702 start_epoch: EpochId,
3703 end_epoch: EpochId,
3704 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3705 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3706 }
3707}
3708
3709impl Drop for Session {
3710 fn drop(&mut self) {
3711 if self.in_transaction() {
3714 let _ = self.rollback_inner();
3715 }
3716 }
3717}
3718
3719#[cfg(test)]
3720mod tests {
3721 use crate::database::GrafeoDB;
3722
3723 #[test]
3724 fn test_session_create_node() {
3725 let db = GrafeoDB::new_in_memory();
3726 let session = db.session();
3727
3728 let id = session.create_node(&["Person"]);
3729 assert!(id.is_valid());
3730 assert_eq!(db.node_count(), 1);
3731 }
3732
3733 #[test]
3734 fn test_session_transaction() {
3735 let db = GrafeoDB::new_in_memory();
3736 let mut session = db.session();
3737
3738 assert!(!session.in_transaction());
3739
3740 session.begin_transaction().unwrap();
3741 assert!(session.in_transaction());
3742
3743 session.commit().unwrap();
3744 assert!(!session.in_transaction());
3745 }
3746
3747 #[test]
3748 fn test_session_transaction_context() {
3749 let db = GrafeoDB::new_in_memory();
3750 let mut session = db.session();
3751
3752 let (_epoch1, transaction_id1) = session.get_transaction_context();
3754 assert!(transaction_id1.is_none());
3755
3756 session.begin_transaction().unwrap();
3758 let (epoch2, transaction_id2) = session.get_transaction_context();
3759 assert!(transaction_id2.is_some());
3760 let _ = epoch2; session.commit().unwrap();
3765 let (epoch3, tx_id3) = session.get_transaction_context();
3766 assert!(tx_id3.is_none());
3767 assert!(epoch3.as_u64() >= epoch2.as_u64());
3769 }
3770
3771 #[test]
3772 fn test_session_rollback() {
3773 let db = GrafeoDB::new_in_memory();
3774 let mut session = db.session();
3775
3776 session.begin_transaction().unwrap();
3777 session.rollback().unwrap();
3778 assert!(!session.in_transaction());
3779 }
3780
3781 #[test]
3782 fn test_session_rollback_discards_versions() {
3783 use grafeo_common::types::TransactionId;
3784
3785 let db = GrafeoDB::new_in_memory();
3786
3787 let node_before = db.store().create_node(&["Person"]);
3789 assert!(node_before.is_valid());
3790 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3791
3792 let mut session = db.session();
3794 session.begin_transaction().unwrap();
3795 let transaction_id = session.current_transaction.lock().unwrap();
3796
3797 let epoch = db.store().current_epoch();
3799 let node_in_tx = db
3800 .store()
3801 .create_node_versioned(&["Person"], epoch, transaction_id);
3802 assert!(node_in_tx.is_valid());
3803
3804 assert_eq!(
3808 db.node_count(),
3809 1,
3810 "PENDING nodes should be invisible to non-versioned node_count()"
3811 );
3812 assert!(
3813 db.store()
3814 .get_node_versioned(node_in_tx, epoch, transaction_id)
3815 .is_some(),
3816 "Transaction node should be visible to its own transaction"
3817 );
3818
3819 session.rollback().unwrap();
3821 assert!(!session.in_transaction());
3822
3823 let count_after = db.node_count();
3826 assert_eq!(
3827 count_after, 1,
3828 "Rollback should discard uncommitted node, but got {count_after}"
3829 );
3830
3831 let current_epoch = db.store().current_epoch();
3833 assert!(
3834 db.store()
3835 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
3836 .is_some(),
3837 "Original node should still exist"
3838 );
3839
3840 assert!(
3842 db.store()
3843 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
3844 .is_none(),
3845 "Transaction node should be gone"
3846 );
3847 }
3848
3849 #[test]
3850 fn test_session_create_node_in_transaction() {
3851 let db = GrafeoDB::new_in_memory();
3853
3854 let node_before = db.create_node(&["Person"]);
3856 assert!(node_before.is_valid());
3857 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3858
3859 let mut session = db.session();
3861 session.begin_transaction().unwrap();
3862 let transaction_id = session.current_transaction.lock().unwrap();
3863
3864 let node_in_tx = session.create_node(&["Person"]);
3866 assert!(node_in_tx.is_valid());
3867
3868 assert_eq!(
3871 db.node_count(),
3872 1,
3873 "PENDING nodes should be invisible to non-versioned node_count()"
3874 );
3875 let epoch = db.store().current_epoch();
3876 assert!(
3877 db.store()
3878 .get_node_versioned(node_in_tx, epoch, transaction_id)
3879 .is_some(),
3880 "Transaction node should be visible to its own transaction"
3881 );
3882
3883 session.rollback().unwrap();
3885
3886 let count_after = db.node_count();
3888 assert_eq!(
3889 count_after, 1,
3890 "Rollback should discard node created via session.create_node(), but got {count_after}"
3891 );
3892 }
3893
3894 #[test]
3895 fn test_session_create_node_with_props_in_transaction() {
3896 use grafeo_common::types::Value;
3897
3898 let db = GrafeoDB::new_in_memory();
3900
3901 db.create_node(&["Person"]);
3903 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3904
3905 let mut session = db.session();
3907 session.begin_transaction().unwrap();
3908 let transaction_id = session.current_transaction.lock().unwrap();
3909
3910 let node_in_tx =
3911 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3912 assert!(node_in_tx.is_valid());
3913
3914 assert_eq!(
3917 db.node_count(),
3918 1,
3919 "PENDING nodes should be invisible to non-versioned node_count()"
3920 );
3921 let epoch = db.store().current_epoch();
3922 assert!(
3923 db.store()
3924 .get_node_versioned(node_in_tx, epoch, transaction_id)
3925 .is_some(),
3926 "Transaction node should be visible to its own transaction"
3927 );
3928
3929 session.rollback().unwrap();
3931
3932 let count_after = db.node_count();
3934 assert_eq!(
3935 count_after, 1,
3936 "Rollback should discard node created via session.create_node_with_props()"
3937 );
3938 }
3939
3940 #[cfg(feature = "gql")]
3941 mod gql_tests {
3942 use super::*;
3943
3944 #[test]
3945 fn test_gql_query_execution() {
3946 let db = GrafeoDB::new_in_memory();
3947 let session = db.session();
3948
3949 session.create_node(&["Person"]);
3951 session.create_node(&["Person"]);
3952 session.create_node(&["Animal"]);
3953
3954 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3956
3957 assert_eq!(result.row_count(), 2);
3959 assert_eq!(result.column_count(), 1);
3960 assert_eq!(result.columns[0], "n");
3961 }
3962
3963 #[test]
3964 fn test_gql_empty_result() {
3965 let db = GrafeoDB::new_in_memory();
3966 let session = db.session();
3967
3968 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3970
3971 assert_eq!(result.row_count(), 0);
3972 }
3973
3974 #[test]
3975 fn test_gql_parse_error() {
3976 let db = GrafeoDB::new_in_memory();
3977 let session = db.session();
3978
3979 let result = session.execute("MATCH (n RETURN n");
3981
3982 assert!(result.is_err());
3983 }
3984
3985 #[test]
3986 fn test_gql_relationship_traversal() {
3987 let db = GrafeoDB::new_in_memory();
3988 let session = db.session();
3989
3990 let alix = session.create_node(&["Person"]);
3992 let gus = session.create_node(&["Person"]);
3993 let vincent = session.create_node(&["Person"]);
3994
3995 session.create_edge(alix, gus, "KNOWS");
3996 session.create_edge(alix, vincent, "KNOWS");
3997
3998 let result = session
4000 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4001 .unwrap();
4002
4003 assert_eq!(result.row_count(), 2);
4005 assert_eq!(result.column_count(), 2);
4006 assert_eq!(result.columns[0], "a");
4007 assert_eq!(result.columns[1], "b");
4008 }
4009
4010 #[test]
4011 fn test_gql_relationship_with_type_filter() {
4012 let db = GrafeoDB::new_in_memory();
4013 let session = db.session();
4014
4015 let alix = session.create_node(&["Person"]);
4017 let gus = session.create_node(&["Person"]);
4018 let vincent = session.create_node(&["Person"]);
4019
4020 session.create_edge(alix, gus, "KNOWS");
4021 session.create_edge(alix, vincent, "WORKS_WITH");
4022
4023 let result = session
4025 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4026 .unwrap();
4027
4028 assert_eq!(result.row_count(), 1);
4030 }
4031
4032 #[test]
4033 fn test_gql_semantic_error_undefined_variable() {
4034 let db = GrafeoDB::new_in_memory();
4035 let session = db.session();
4036
4037 let result = session.execute("MATCH (n:Person) RETURN x");
4039
4040 assert!(result.is_err());
4042 let Err(err) = result else {
4043 panic!("Expected error")
4044 };
4045 assert!(
4046 err.to_string().contains("Undefined variable"),
4047 "Expected undefined variable error, got: {}",
4048 err
4049 );
4050 }
4051
4052 #[test]
4053 fn test_gql_where_clause_property_filter() {
4054 use grafeo_common::types::Value;
4055
4056 let db = GrafeoDB::new_in_memory();
4057 let session = db.session();
4058
4059 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4061 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4062 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4063
4064 let result = session
4066 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4067 .unwrap();
4068
4069 assert_eq!(result.row_count(), 2);
4071 }
4072
4073 #[test]
4074 fn test_gql_where_clause_equality() {
4075 use grafeo_common::types::Value;
4076
4077 let db = GrafeoDB::new_in_memory();
4078 let session = db.session();
4079
4080 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4082 session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4083 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4084
4085 let result = session
4087 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4088 .unwrap();
4089
4090 assert_eq!(result.row_count(), 2);
4092 }
4093
4094 #[test]
4095 fn test_gql_return_property_access() {
4096 use grafeo_common::types::Value;
4097
4098 let db = GrafeoDB::new_in_memory();
4099 let session = db.session();
4100
4101 session.create_node_with_props(
4103 &["Person"],
4104 [
4105 ("name", Value::String("Alix".into())),
4106 ("age", Value::Int64(30)),
4107 ],
4108 );
4109 session.create_node_with_props(
4110 &["Person"],
4111 [
4112 ("name", Value::String("Gus".into())),
4113 ("age", Value::Int64(25)),
4114 ],
4115 );
4116
4117 let result = session
4119 .execute("MATCH (n:Person) RETURN n.name, n.age")
4120 .unwrap();
4121
4122 assert_eq!(result.row_count(), 2);
4124 assert_eq!(result.column_count(), 2);
4125 assert_eq!(result.columns[0], "n.name");
4126 assert_eq!(result.columns[1], "n.age");
4127
4128 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4130 assert!(names.contains(&&Value::String("Alix".into())));
4131 assert!(names.contains(&&Value::String("Gus".into())));
4132 }
4133
4134 #[test]
4135 fn test_gql_return_mixed_expressions() {
4136 use grafeo_common::types::Value;
4137
4138 let db = GrafeoDB::new_in_memory();
4139 let session = db.session();
4140
4141 session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4143
4144 let result = session
4146 .execute("MATCH (n:Person) RETURN n, n.name")
4147 .unwrap();
4148
4149 assert_eq!(result.row_count(), 1);
4150 assert_eq!(result.column_count(), 2);
4151 assert_eq!(result.columns[0], "n");
4152 assert_eq!(result.columns[1], "n.name");
4153
4154 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4156 }
4157 }
4158
4159 #[cfg(feature = "cypher")]
4160 mod cypher_tests {
4161 use super::*;
4162
4163 #[test]
4164 fn test_cypher_query_execution() {
4165 let db = GrafeoDB::new_in_memory();
4166 let session = db.session();
4167
4168 session.create_node(&["Person"]);
4170 session.create_node(&["Person"]);
4171 session.create_node(&["Animal"]);
4172
4173 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4175
4176 assert_eq!(result.row_count(), 2);
4178 assert_eq!(result.column_count(), 1);
4179 assert_eq!(result.columns[0], "n");
4180 }
4181
4182 #[test]
4183 fn test_cypher_empty_result() {
4184 let db = GrafeoDB::new_in_memory();
4185 let session = db.session();
4186
4187 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4189
4190 assert_eq!(result.row_count(), 0);
4191 }
4192
4193 #[test]
4194 fn test_cypher_parse_error() {
4195 let db = GrafeoDB::new_in_memory();
4196 let session = db.session();
4197
4198 let result = session.execute_cypher("MATCH (n RETURN n");
4200
4201 assert!(result.is_err());
4202 }
4203 }
4204
4205 mod direct_lookup_tests {
4208 use super::*;
4209 use grafeo_common::types::Value;
4210
4211 #[test]
4212 fn test_get_node() {
4213 let db = GrafeoDB::new_in_memory();
4214 let session = db.session();
4215
4216 let id = session.create_node(&["Person"]);
4217 let node = session.get_node(id);
4218
4219 assert!(node.is_some());
4220 let node = node.unwrap();
4221 assert_eq!(node.id, id);
4222 }
4223
4224 #[test]
4225 fn test_get_node_not_found() {
4226 use grafeo_common::types::NodeId;
4227
4228 let db = GrafeoDB::new_in_memory();
4229 let session = db.session();
4230
4231 let node = session.get_node(NodeId::new(9999));
4233 assert!(node.is_none());
4234 }
4235
4236 #[test]
4237 fn test_get_node_property() {
4238 let db = GrafeoDB::new_in_memory();
4239 let session = db.session();
4240
4241 let id = session
4242 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4243
4244 let name = session.get_node_property(id, "name");
4245 assert_eq!(name, Some(Value::String("Alix".into())));
4246
4247 let missing = session.get_node_property(id, "missing");
4249 assert!(missing.is_none());
4250 }
4251
4252 #[test]
4253 fn test_get_edge() {
4254 let db = GrafeoDB::new_in_memory();
4255 let session = db.session();
4256
4257 let alix = session.create_node(&["Person"]);
4258 let gus = session.create_node(&["Person"]);
4259 let edge_id = session.create_edge(alix, gus, "KNOWS");
4260
4261 let edge = session.get_edge(edge_id);
4262 assert!(edge.is_some());
4263 let edge = edge.unwrap();
4264 assert_eq!(edge.id, edge_id);
4265 assert_eq!(edge.src, alix);
4266 assert_eq!(edge.dst, gus);
4267 }
4268
4269 #[test]
4270 fn test_get_edge_not_found() {
4271 use grafeo_common::types::EdgeId;
4272
4273 let db = GrafeoDB::new_in_memory();
4274 let session = db.session();
4275
4276 let edge = session.get_edge(EdgeId::new(9999));
4277 assert!(edge.is_none());
4278 }
4279
4280 #[test]
4281 fn test_get_neighbors_outgoing() {
4282 let db = GrafeoDB::new_in_memory();
4283 let session = db.session();
4284
4285 let alix = session.create_node(&["Person"]);
4286 let gus = session.create_node(&["Person"]);
4287 let harm = session.create_node(&["Person"]);
4288
4289 session.create_edge(alix, gus, "KNOWS");
4290 session.create_edge(alix, harm, "KNOWS");
4291
4292 let neighbors = session.get_neighbors_outgoing(alix);
4293 assert_eq!(neighbors.len(), 2);
4294
4295 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4296 assert!(neighbor_ids.contains(&gus));
4297 assert!(neighbor_ids.contains(&harm));
4298 }
4299
4300 #[test]
4301 fn test_get_neighbors_incoming() {
4302 let db = GrafeoDB::new_in_memory();
4303 let session = db.session();
4304
4305 let alix = session.create_node(&["Person"]);
4306 let gus = session.create_node(&["Person"]);
4307 let harm = session.create_node(&["Person"]);
4308
4309 session.create_edge(gus, alix, "KNOWS");
4310 session.create_edge(harm, alix, "KNOWS");
4311
4312 let neighbors = session.get_neighbors_incoming(alix);
4313 assert_eq!(neighbors.len(), 2);
4314
4315 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4316 assert!(neighbor_ids.contains(&gus));
4317 assert!(neighbor_ids.contains(&harm));
4318 }
4319
4320 #[test]
4321 fn test_get_neighbors_outgoing_by_type() {
4322 let db = GrafeoDB::new_in_memory();
4323 let session = db.session();
4324
4325 let alix = session.create_node(&["Person"]);
4326 let gus = session.create_node(&["Person"]);
4327 let company = session.create_node(&["Company"]);
4328
4329 session.create_edge(alix, gus, "KNOWS");
4330 session.create_edge(alix, company, "WORKS_AT");
4331
4332 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4333 assert_eq!(knows_neighbors.len(), 1);
4334 assert_eq!(knows_neighbors[0].0, gus);
4335
4336 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4337 assert_eq!(works_neighbors.len(), 1);
4338 assert_eq!(works_neighbors[0].0, company);
4339
4340 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4342 assert!(no_neighbors.is_empty());
4343 }
4344
4345 #[test]
4346 fn test_node_exists() {
4347 use grafeo_common::types::NodeId;
4348
4349 let db = GrafeoDB::new_in_memory();
4350 let session = db.session();
4351
4352 let id = session.create_node(&["Person"]);
4353
4354 assert!(session.node_exists(id));
4355 assert!(!session.node_exists(NodeId::new(9999)));
4356 }
4357
4358 #[test]
4359 fn test_edge_exists() {
4360 use grafeo_common::types::EdgeId;
4361
4362 let db = GrafeoDB::new_in_memory();
4363 let session = db.session();
4364
4365 let alix = session.create_node(&["Person"]);
4366 let gus = session.create_node(&["Person"]);
4367 let edge_id = session.create_edge(alix, gus, "KNOWS");
4368
4369 assert!(session.edge_exists(edge_id));
4370 assert!(!session.edge_exists(EdgeId::new(9999)));
4371 }
4372
4373 #[test]
4374 fn test_get_degree() {
4375 let db = GrafeoDB::new_in_memory();
4376 let session = db.session();
4377
4378 let alix = session.create_node(&["Person"]);
4379 let gus = session.create_node(&["Person"]);
4380 let harm = session.create_node(&["Person"]);
4381
4382 session.create_edge(alix, gus, "KNOWS");
4384 session.create_edge(alix, harm, "KNOWS");
4385 session.create_edge(gus, alix, "KNOWS");
4387
4388 let (out_degree, in_degree) = session.get_degree(alix);
4389 assert_eq!(out_degree, 2);
4390 assert_eq!(in_degree, 1);
4391
4392 let lonely = session.create_node(&["Person"]);
4394 let (out, in_deg) = session.get_degree(lonely);
4395 assert_eq!(out, 0);
4396 assert_eq!(in_deg, 0);
4397 }
4398
4399 #[test]
4400 fn test_get_nodes_batch() {
4401 let db = GrafeoDB::new_in_memory();
4402 let session = db.session();
4403
4404 let alix = session.create_node(&["Person"]);
4405 let gus = session.create_node(&["Person"]);
4406 let harm = session.create_node(&["Person"]);
4407
4408 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4409 assert_eq!(nodes.len(), 3);
4410 assert!(nodes[0].is_some());
4411 assert!(nodes[1].is_some());
4412 assert!(nodes[2].is_some());
4413
4414 use grafeo_common::types::NodeId;
4416 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4417 assert_eq!(nodes_with_missing.len(), 3);
4418 assert!(nodes_with_missing[0].is_some());
4419 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
4421 }
4422
4423 #[test]
4424 fn test_auto_commit_setting() {
4425 let db = GrafeoDB::new_in_memory();
4426 let mut session = db.session();
4427
4428 assert!(session.auto_commit());
4430
4431 session.set_auto_commit(false);
4432 assert!(!session.auto_commit());
4433
4434 session.set_auto_commit(true);
4435 assert!(session.auto_commit());
4436 }
4437
4438 #[test]
4439 fn test_transaction_double_begin_nests() {
4440 let db = GrafeoDB::new_in_memory();
4441 let mut session = db.session();
4442
4443 session.begin_transaction().unwrap();
4444 let result = session.begin_transaction();
4446 assert!(result.is_ok());
4447 session.commit().unwrap();
4449 session.commit().unwrap();
4451 }
4452
4453 #[test]
4454 fn test_commit_without_transaction_error() {
4455 let db = GrafeoDB::new_in_memory();
4456 let mut session = db.session();
4457
4458 let result = session.commit();
4459 assert!(result.is_err());
4460 }
4461
4462 #[test]
4463 fn test_rollback_without_transaction_error() {
4464 let db = GrafeoDB::new_in_memory();
4465 let mut session = db.session();
4466
4467 let result = session.rollback();
4468 assert!(result.is_err());
4469 }
4470
4471 #[test]
4472 fn test_create_edge_in_transaction() {
4473 let db = GrafeoDB::new_in_memory();
4474 let mut session = db.session();
4475
4476 let alix = session.create_node(&["Person"]);
4478 let gus = session.create_node(&["Person"]);
4479
4480 session.begin_transaction().unwrap();
4482 let edge_id = session.create_edge(alix, gus, "KNOWS");
4483
4484 assert!(session.edge_exists(edge_id));
4486
4487 session.commit().unwrap();
4489
4490 assert!(session.edge_exists(edge_id));
4492 }
4493
4494 #[test]
4495 fn test_neighbors_empty_node() {
4496 let db = GrafeoDB::new_in_memory();
4497 let session = db.session();
4498
4499 let lonely = session.create_node(&["Person"]);
4500
4501 assert!(session.get_neighbors_outgoing(lonely).is_empty());
4502 assert!(session.get_neighbors_incoming(lonely).is_empty());
4503 assert!(
4504 session
4505 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4506 .is_empty()
4507 );
4508 }
4509 }
4510
4511 #[test]
4512 fn test_auto_gc_triggers_on_commit_interval() {
4513 use crate::config::Config;
4514
4515 let config = Config::in_memory().with_gc_interval(2);
4516 let db = GrafeoDB::with_config(config).unwrap();
4517 let mut session = db.session();
4518
4519 session.begin_transaction().unwrap();
4521 session.create_node(&["A"]);
4522 session.commit().unwrap();
4523
4524 session.begin_transaction().unwrap();
4526 session.create_node(&["B"]);
4527 session.commit().unwrap();
4528
4529 assert_eq!(db.node_count(), 2);
4531 }
4532
4533 #[test]
4534 fn test_query_timeout_config_propagates_to_session() {
4535 use crate::config::Config;
4536 use std::time::Duration;
4537
4538 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4539 let db = GrafeoDB::with_config(config).unwrap();
4540 let session = db.session();
4541
4542 assert!(session.query_deadline().is_some());
4544 }
4545
4546 #[test]
4547 fn test_no_query_timeout_returns_no_deadline() {
4548 let db = GrafeoDB::new_in_memory();
4549 let session = db.session();
4550
4551 assert!(session.query_deadline().is_none());
4553 }
4554
4555 #[test]
4556 fn test_graph_model_accessor() {
4557 use crate::config::GraphModel;
4558
4559 let db = GrafeoDB::new_in_memory();
4560 let session = db.session();
4561
4562 assert_eq!(session.graph_model(), GraphModel::Lpg);
4563 }
4564
4565 #[cfg(feature = "gql")]
4566 #[test]
4567 fn test_external_store_session() {
4568 use grafeo_core::graph::GraphStoreMut;
4569 use std::sync::Arc;
4570
4571 let config = crate::config::Config::in_memory();
4572 let store =
4573 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4574 let db = GrafeoDB::with_store(store, config).unwrap();
4575
4576 let mut session = db.session();
4577
4578 session.begin_transaction().unwrap();
4582 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4583
4584 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4586 assert_eq!(result.row_count(), 1);
4587
4588 session.commit().unwrap();
4589 }
4590
4591 #[cfg(feature = "gql")]
4594 mod session_command_tests {
4595 use super::*;
4596 use grafeo_common::types::Value;
4597
4598 #[test]
4599 fn test_use_graph_sets_current_graph() {
4600 let db = GrafeoDB::new_in_memory();
4601 let session = db.session();
4602
4603 session.execute("CREATE GRAPH mydb").unwrap();
4605 session.execute("USE GRAPH mydb").unwrap();
4606
4607 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4608 }
4609
4610 #[test]
4611 fn test_use_graph_nonexistent_errors() {
4612 let db = GrafeoDB::new_in_memory();
4613 let session = db.session();
4614
4615 let result = session.execute("USE GRAPH doesnotexist");
4616 assert!(result.is_err());
4617 let err = result.unwrap_err().to_string();
4618 assert!(
4619 err.contains("does not exist"),
4620 "Expected 'does not exist' error, got: {err}"
4621 );
4622 }
4623
4624 #[test]
4625 fn test_use_graph_default_always_valid() {
4626 let db = GrafeoDB::new_in_memory();
4627 let session = db.session();
4628
4629 session.execute("USE GRAPH default").unwrap();
4631 assert_eq!(session.current_graph(), Some("default".to_string()));
4632 }
4633
4634 #[test]
4635 fn test_session_set_graph() {
4636 let db = GrafeoDB::new_in_memory();
4637 let session = db.session();
4638
4639 session.execute("SESSION SET GRAPH analytics").unwrap();
4641 assert_eq!(session.current_graph(), Some("analytics".to_string()));
4642 }
4643
4644 #[test]
4645 fn test_session_set_time_zone() {
4646 let db = GrafeoDB::new_in_memory();
4647 let session = db.session();
4648
4649 assert_eq!(session.time_zone(), None);
4650
4651 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4652 assert_eq!(session.time_zone(), Some("UTC".to_string()));
4653
4654 session
4655 .execute("SESSION SET TIME ZONE 'America/New_York'")
4656 .unwrap();
4657 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
4658 }
4659
4660 #[test]
4661 fn test_session_set_parameter() {
4662 let db = GrafeoDB::new_in_memory();
4663 let session = db.session();
4664
4665 session
4666 .execute("SESSION SET PARAMETER $timeout = 30")
4667 .unwrap();
4668
4669 assert!(session.get_parameter("timeout").is_some());
4672 }
4673
4674 #[test]
4675 fn test_session_reset_clears_all_state() {
4676 let db = GrafeoDB::new_in_memory();
4677 let session = db.session();
4678
4679 session.execute("SESSION SET GRAPH analytics").unwrap();
4681 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4682 session
4683 .execute("SESSION SET PARAMETER $limit = 100")
4684 .unwrap();
4685
4686 assert!(session.current_graph().is_some());
4688 assert!(session.time_zone().is_some());
4689 assert!(session.get_parameter("limit").is_some());
4690
4691 session.execute("SESSION RESET").unwrap();
4693
4694 assert_eq!(session.current_graph(), None);
4695 assert_eq!(session.time_zone(), None);
4696 assert!(session.get_parameter("limit").is_none());
4697 }
4698
4699 #[test]
4700 fn test_session_close_clears_state() {
4701 let db = GrafeoDB::new_in_memory();
4702 let session = db.session();
4703
4704 session.execute("SESSION SET GRAPH analytics").unwrap();
4705 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4706
4707 session.execute("SESSION CLOSE").unwrap();
4708
4709 assert_eq!(session.current_graph(), None);
4710 assert_eq!(session.time_zone(), None);
4711 }
4712
4713 #[test]
4714 fn test_create_graph() {
4715 let db = GrafeoDB::new_in_memory();
4716 let session = db.session();
4717
4718 session.execute("CREATE GRAPH mydb").unwrap();
4719
4720 session.execute("USE GRAPH mydb").unwrap();
4722 assert_eq!(session.current_graph(), Some("mydb".to_string()));
4723 }
4724
4725 #[test]
4726 fn test_create_graph_duplicate_errors() {
4727 let db = GrafeoDB::new_in_memory();
4728 let session = db.session();
4729
4730 session.execute("CREATE GRAPH mydb").unwrap();
4731 let result = session.execute("CREATE GRAPH mydb");
4732
4733 assert!(result.is_err());
4734 let err = result.unwrap_err().to_string();
4735 assert!(
4736 err.contains("already exists"),
4737 "Expected 'already exists' error, got: {err}"
4738 );
4739 }
4740
4741 #[test]
4742 fn test_create_graph_if_not_exists() {
4743 let db = GrafeoDB::new_in_memory();
4744 let session = db.session();
4745
4746 session.execute("CREATE GRAPH mydb").unwrap();
4747 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
4749 }
4750
4751 #[test]
4752 fn test_drop_graph() {
4753 let db = GrafeoDB::new_in_memory();
4754 let session = db.session();
4755
4756 session.execute("CREATE GRAPH mydb").unwrap();
4757 session.execute("DROP GRAPH mydb").unwrap();
4758
4759 let result = session.execute("USE GRAPH mydb");
4761 assert!(result.is_err());
4762 }
4763
4764 #[test]
4765 fn test_drop_graph_nonexistent_errors() {
4766 let db = GrafeoDB::new_in_memory();
4767 let session = db.session();
4768
4769 let result = session.execute("DROP GRAPH nosuchgraph");
4770 assert!(result.is_err());
4771 let err = result.unwrap_err().to_string();
4772 assert!(
4773 err.contains("does not exist"),
4774 "Expected 'does not exist' error, got: {err}"
4775 );
4776 }
4777
4778 #[test]
4779 fn test_drop_graph_if_exists() {
4780 let db = GrafeoDB::new_in_memory();
4781 let session = db.session();
4782
4783 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
4785 }
4786
4787 #[test]
4788 fn test_start_transaction_via_gql() {
4789 let db = GrafeoDB::new_in_memory();
4790 let session = db.session();
4791
4792 session.execute("START TRANSACTION").unwrap();
4793 assert!(session.in_transaction());
4794 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4795 session.execute("COMMIT").unwrap();
4796 assert!(!session.in_transaction());
4797
4798 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4799 assert_eq!(result.rows.len(), 1);
4800 }
4801
4802 #[test]
4803 fn test_start_transaction_read_only_blocks_insert() {
4804 let db = GrafeoDB::new_in_memory();
4805 let session = db.session();
4806
4807 session.execute("START TRANSACTION READ ONLY").unwrap();
4808 let result = session.execute("INSERT (:Person {name: 'Alix'})");
4809 assert!(result.is_err());
4810 let err = result.unwrap_err().to_string();
4811 assert!(
4812 err.contains("read-only"),
4813 "Expected read-only error, got: {err}"
4814 );
4815 session.execute("ROLLBACK").unwrap();
4816 }
4817
4818 #[test]
4819 fn test_start_transaction_read_only_allows_reads() {
4820 let db = GrafeoDB::new_in_memory();
4821 let mut session = db.session();
4822 session.begin_transaction().unwrap();
4823 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4824 session.commit().unwrap();
4825
4826 session.execute("START TRANSACTION READ ONLY").unwrap();
4827 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4828 assert_eq!(result.rows.len(), 1);
4829 session.execute("COMMIT").unwrap();
4830 }
4831
4832 #[test]
4833 fn test_rollback_via_gql() {
4834 let db = GrafeoDB::new_in_memory();
4835 let session = db.session();
4836
4837 session.execute("START TRANSACTION").unwrap();
4838 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4839 session.execute("ROLLBACK").unwrap();
4840
4841 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4842 assert!(result.rows.is_empty());
4843 }
4844
4845 #[test]
4846 fn test_start_transaction_with_isolation_level() {
4847 let db = GrafeoDB::new_in_memory();
4848 let session = db.session();
4849
4850 session
4851 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
4852 .unwrap();
4853 assert!(session.in_transaction());
4854 session.execute("ROLLBACK").unwrap();
4855 }
4856
4857 #[test]
4858 fn test_session_commands_return_empty_result() {
4859 let db = GrafeoDB::new_in_memory();
4860 let session = db.session();
4861
4862 let result = session.execute("SESSION SET GRAPH test").unwrap();
4863 assert_eq!(result.row_count(), 0);
4864 assert_eq!(result.column_count(), 0);
4865 }
4866
4867 #[test]
4868 fn test_current_graph_default_is_none() {
4869 let db = GrafeoDB::new_in_memory();
4870 let session = db.session();
4871
4872 assert_eq!(session.current_graph(), None);
4873 }
4874
4875 #[test]
4876 fn test_time_zone_default_is_none() {
4877 let db = GrafeoDB::new_in_memory();
4878 let session = db.session();
4879
4880 assert_eq!(session.time_zone(), None);
4881 }
4882
4883 #[test]
4884 fn test_session_state_independent_across_sessions() {
4885 let db = GrafeoDB::new_in_memory();
4886 let session1 = db.session();
4887 let session2 = db.session();
4888
4889 session1.execute("SESSION SET GRAPH first").unwrap();
4890 session2.execute("SESSION SET GRAPH second").unwrap();
4891
4892 assert_eq!(session1.current_graph(), Some("first".to_string()));
4893 assert_eq!(session2.current_graph(), Some("second".to_string()));
4894 }
4895
4896 #[test]
4897 fn test_show_node_types() {
4898 let db = GrafeoDB::new_in_memory();
4899 let session = db.session();
4900
4901 session
4902 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
4903 .unwrap();
4904
4905 let result = session.execute("SHOW NODE TYPES").unwrap();
4906 assert_eq!(
4907 result.columns,
4908 vec!["name", "properties", "constraints", "parents"]
4909 );
4910 assert_eq!(result.rows.len(), 1);
4911 assert_eq!(result.rows[0][0], Value::from("Person"));
4913 }
4914
4915 #[test]
4916 fn test_show_edge_types() {
4917 let db = GrafeoDB::new_in_memory();
4918 let session = db.session();
4919
4920 session
4921 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
4922 .unwrap();
4923
4924 let result = session.execute("SHOW EDGE TYPES").unwrap();
4925 assert_eq!(
4926 result.columns,
4927 vec!["name", "properties", "source_types", "target_types"]
4928 );
4929 assert_eq!(result.rows.len(), 1);
4930 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
4931 }
4932
4933 #[test]
4934 fn test_show_graph_types() {
4935 let db = GrafeoDB::new_in_memory();
4936 let session = db.session();
4937
4938 session
4939 .execute("CREATE NODE TYPE Person (name STRING)")
4940 .unwrap();
4941 session
4942 .execute(
4943 "CREATE GRAPH TYPE social (\
4944 NODE TYPE Person (name STRING)\
4945 )",
4946 )
4947 .unwrap();
4948
4949 let result = session.execute("SHOW GRAPH TYPES").unwrap();
4950 assert_eq!(
4951 result.columns,
4952 vec!["name", "open", "node_types", "edge_types"]
4953 );
4954 assert_eq!(result.rows.len(), 1);
4955 assert_eq!(result.rows[0][0], Value::from("social"));
4956 }
4957
4958 #[test]
4959 fn test_show_graph_type_named() {
4960 let db = GrafeoDB::new_in_memory();
4961 let session = db.session();
4962
4963 session
4964 .execute("CREATE NODE TYPE Person (name STRING)")
4965 .unwrap();
4966 session
4967 .execute(
4968 "CREATE GRAPH TYPE social (\
4969 NODE TYPE Person (name STRING)\
4970 )",
4971 )
4972 .unwrap();
4973
4974 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
4975 assert_eq!(result.rows.len(), 1);
4976 assert_eq!(result.rows[0][0], Value::from("social"));
4977 }
4978
4979 #[test]
4980 fn test_show_graph_type_not_found() {
4981 let db = GrafeoDB::new_in_memory();
4982 let session = db.session();
4983
4984 let result = session.execute("SHOW GRAPH TYPE nonexistent");
4985 assert!(result.is_err());
4986 }
4987
4988 #[test]
4989 fn test_show_indexes_via_gql() {
4990 let db = GrafeoDB::new_in_memory();
4991 let session = db.session();
4992
4993 let result = session.execute("SHOW INDEXES").unwrap();
4994 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
4995 }
4996
4997 #[test]
4998 fn test_show_constraints_via_gql() {
4999 let db = GrafeoDB::new_in_memory();
5000 let session = db.session();
5001
5002 let result = session.execute("SHOW CONSTRAINTS").unwrap();
5003 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5004 }
5005
5006 #[test]
5007 fn test_pattern_form_graph_type_roundtrip() {
5008 let db = GrafeoDB::new_in_memory();
5009 let session = db.session();
5010
5011 session
5013 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5014 .unwrap();
5015 session
5016 .execute("CREATE NODE TYPE City (name STRING)")
5017 .unwrap();
5018 session
5019 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5020 .unwrap();
5021 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5022
5023 session
5025 .execute(
5026 "CREATE GRAPH TYPE social (\
5027 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5028 (:Person)-[:LIVES_IN]->(:City)\
5029 )",
5030 )
5031 .unwrap();
5032
5033 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5035 assert_eq!(result.rows.len(), 1);
5036 assert_eq!(result.rows[0][0], Value::from("social"));
5037 }
5038 }
5039}