1#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut, GraphStoreSearch};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::Executor;
37use crate::query::cache::QueryCache;
38use crate::transaction::TransactionManager;
39
40const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
43
44fn parse_default_literal(text: &str) -> Value {
49 if text.eq_ignore_ascii_case("null") {
50 return Value::Null;
51 }
52 if text.eq_ignore_ascii_case("true") {
53 return Value::Bool(true);
54 }
55 if text.eq_ignore_ascii_case("false") {
56 return Value::Bool(false);
57 }
58 if (text.starts_with('\'') && text.ends_with('\''))
60 || (text.starts_with('"') && text.ends_with('"'))
61 {
62 return Value::String(text[1..text.len() - 1].into());
63 }
64 if let Ok(i) = text.parse::<i64>() {
66 return Value::Int64(i);
67 }
68 if let Ok(f) = text.parse::<f64>() {
69 return Value::Float64(f);
70 }
71 Value::String(text.into())
73}
74
75pub(crate) struct SessionConfig {
80 pub transaction_manager: Arc<TransactionManager>,
81 pub query_cache: Arc<QueryCache>,
82 pub catalog: Arc<Catalog>,
83 pub adaptive_config: AdaptiveConfig,
84 pub factorized_execution: bool,
85 pub graph_model: GraphModel,
86 pub query_timeout: Option<Duration>,
87 pub max_property_size: Option<usize>,
88 pub buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
90 pub commit_counter: Arc<AtomicUsize>,
91 pub gc_interval: usize,
92 pub read_only: bool,
94 pub identity: crate::auth::Identity,
96 #[cfg(feature = "lpg")]
98 pub projections: Arc<
99 parking_lot::RwLock<
100 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
101 >,
102 >,
103}
104
105pub struct Session {
111 #[cfg(feature = "lpg")]
113 store: Arc<LpgStore>,
114 graph_store: Arc<dyn GraphStoreSearch>,
116 graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
118 catalog: Arc<Catalog>,
120 #[cfg(feature = "triple-store")]
122 rdf_store: Arc<RdfStore>,
123 transaction_manager: Arc<TransactionManager>,
125 query_cache: Arc<QueryCache>,
127 current_transaction: parking_lot::Mutex<Option<TransactionId>>,
131 read_only_tx: parking_lot::Mutex<bool>,
133 db_read_only: bool,
136 identity: crate::auth::Identity,
138 auto_commit: bool,
140 #[allow(dead_code)] adaptive_config: AdaptiveConfig,
143 factorized_execution: bool,
145 graph_model: GraphModel,
147 query_timeout: Option<Duration>,
149 max_property_size: Option<usize>,
151 buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
153 commit_counter: Arc<AtomicUsize>,
155 gc_interval: usize,
157 transaction_start_node_count: AtomicUsize,
159 transaction_start_edge_count: AtomicUsize,
161 #[cfg(feature = "wal")]
163 wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
164 #[cfg(feature = "wal")]
166 wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
167 #[cfg(feature = "cdc")]
169 cdc_log: Arc<crate::cdc::CdcLog>,
170 #[cfg(feature = "cdc")]
173 cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
174 current_graph: parking_lot::Mutex<Option<String>>,
176 current_schema: parking_lot::Mutex<Option<String>>,
179 time_zone: parking_lot::Mutex<Option<String>>,
181 session_params:
183 parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
184 viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
186 savepoints: parking_lot::Mutex<Vec<SavepointState>>,
188 transaction_nesting_depth: parking_lot::Mutex<u32>,
192 touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
196 active_streams: AtomicUsize,
200 #[cfg(feature = "metrics")]
202 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
203 #[cfg(feature = "metrics")]
205 tx_start_time: parking_lot::Mutex<Option<Instant>>,
206 #[cfg(feature = "lpg")]
208 projections: Arc<
209 parking_lot::RwLock<
210 std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
211 >,
212 >,
213}
214
215#[derive(Clone)]
217struct GraphSavepoint {
218 graph_name: Option<String>,
219 next_node_id: u64,
220 next_edge_id: u64,
221 undo_log_position: usize,
222}
223
224#[derive(Clone)]
226struct SavepointState {
227 name: String,
228 graph_snapshots: Vec<GraphSavepoint>,
229 #[allow(dead_code)]
232 active_graph: Option<String>,
233 #[cfg(feature = "cdc")]
236 cdc_event_position: usize,
237}
238
239impl Session {
240 #[cfg(feature = "lpg")]
242 #[allow(dead_code)] pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
244 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
245 let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
246 Self {
247 store,
248 graph_store,
249 graph_store_mut,
250 catalog: cfg.catalog,
251 #[cfg(feature = "triple-store")]
252 rdf_store: Arc::new(RdfStore::new()),
253 transaction_manager: cfg.transaction_manager,
254 query_cache: cfg.query_cache,
255 current_transaction: parking_lot::Mutex::new(None),
256 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
257 db_read_only: cfg.read_only,
258 identity: cfg.identity,
259 auto_commit: true,
260 adaptive_config: cfg.adaptive_config,
261 factorized_execution: cfg.factorized_execution,
262 graph_model: cfg.graph_model,
263 query_timeout: cfg.query_timeout,
264 max_property_size: cfg.max_property_size,
265 buffer_manager: cfg.buffer_manager,
266 commit_counter: cfg.commit_counter,
267 gc_interval: cfg.gc_interval,
268 transaction_start_node_count: AtomicUsize::new(0),
269 transaction_start_edge_count: AtomicUsize::new(0),
270 #[cfg(feature = "wal")]
271 wal: None,
272 #[cfg(feature = "wal")]
273 wal_graph_context: None,
274 #[cfg(feature = "cdc")]
275 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
276 #[cfg(feature = "cdc")]
277 cdc_pending_events: None,
278 current_graph: parking_lot::Mutex::new(None),
279 current_schema: parking_lot::Mutex::new(None),
280 time_zone: parking_lot::Mutex::new(None),
281 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
282 viewing_epoch_override: parking_lot::Mutex::new(None),
283 savepoints: parking_lot::Mutex::new(Vec::new()),
284 transaction_nesting_depth: parking_lot::Mutex::new(0),
285 touched_graphs: parking_lot::Mutex::new(Vec::new()),
286 active_streams: AtomicUsize::new(0),
287 #[cfg(feature = "metrics")]
288 metrics: None,
289 #[cfg(feature = "metrics")]
290 tx_start_time: parking_lot::Mutex::new(None),
291 projections: cfg.projections,
292 }
293 }
294
295 #[cfg(all(feature = "compact-store", feature = "lpg"))]
301 pub(crate) fn override_stores(
302 &mut self,
303 read_store: Arc<dyn GraphStoreSearch>,
304 write_store: Option<Arc<dyn GraphStoreMut>>,
305 ) {
306 self.graph_store = read_store;
307 self.graph_store_mut = write_store;
308 }
309
310 #[cfg(all(feature = "wal", feature = "lpg"))]
315 pub(crate) fn set_wal(
316 &mut self,
317 wal: Arc<grafeo_storage::wal::LpgWal>,
318 wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
319 ) {
320 let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
322 Arc::clone(&self.store),
323 Arc::clone(&wal),
324 Arc::clone(&wal_graph_context),
325 ));
326 self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStoreSearch>;
327 self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
328 self.wal = Some(wal);
329 self.wal_graph_context = Some(wal_graph_context);
330 }
331
332 #[cfg(feature = "cdc")]
339 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
340 if let Some(ref write_store) = self.graph_store_mut {
343 let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
344 Arc::clone(write_store),
345 Arc::clone(&cdc_log),
346 ));
347 self.cdc_pending_events = Some(cdc_store.pending_events());
348 self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
349 }
350 self.cdc_log = cdc_log;
351 }
352
353 #[cfg(feature = "metrics")]
355 pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
356 self.metrics = Some(metrics);
357 }
358
359 pub(crate) fn with_external_store(
368 read_store: Arc<dyn GraphStoreSearch>,
369 write_store: Option<Arc<dyn GraphStoreMut>>,
370 cfg: SessionConfig,
371 ) -> Result<Self> {
372 Ok(Self {
373 #[cfg(feature = "lpg")]
374 store: Arc::new(LpgStore::new()?),
375 graph_store: read_store,
376 graph_store_mut: write_store,
377 catalog: cfg.catalog,
378 #[cfg(feature = "triple-store")]
379 rdf_store: Arc::new(RdfStore::new()),
380 transaction_manager: cfg.transaction_manager,
381 query_cache: cfg.query_cache,
382 current_transaction: parking_lot::Mutex::new(None),
383 read_only_tx: parking_lot::Mutex::new(cfg.read_only),
384 db_read_only: cfg.read_only,
385 identity: cfg.identity,
386 auto_commit: true,
387 adaptive_config: cfg.adaptive_config,
388 factorized_execution: cfg.factorized_execution,
389 graph_model: cfg.graph_model,
390 query_timeout: cfg.query_timeout,
391 max_property_size: cfg.max_property_size,
392 buffer_manager: cfg.buffer_manager,
393 commit_counter: cfg.commit_counter,
394 gc_interval: cfg.gc_interval,
395 transaction_start_node_count: AtomicUsize::new(0),
396 transaction_start_edge_count: AtomicUsize::new(0),
397 #[cfg(feature = "wal")]
398 wal: None,
399 #[cfg(feature = "wal")]
400 wal_graph_context: None,
401 #[cfg(feature = "cdc")]
402 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
403 #[cfg(feature = "cdc")]
404 cdc_pending_events: None,
405 current_graph: parking_lot::Mutex::new(None),
406 current_schema: parking_lot::Mutex::new(None),
407 time_zone: parking_lot::Mutex::new(None),
408 session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
409 viewing_epoch_override: parking_lot::Mutex::new(None),
410 savepoints: parking_lot::Mutex::new(Vec::new()),
411 transaction_nesting_depth: parking_lot::Mutex::new(0),
412 touched_graphs: parking_lot::Mutex::new(Vec::new()),
413 active_streams: AtomicUsize::new(0),
414 #[cfg(feature = "metrics")]
415 metrics: None,
416 #[cfg(feature = "metrics")]
417 tx_start_time: parking_lot::Mutex::new(None),
418 #[cfg(feature = "lpg")]
419 projections: cfg.projections,
420 })
421 }
422
423 #[must_use]
425 pub fn graph_model(&self) -> GraphModel {
426 self.graph_model
427 }
428
429 #[must_use]
431 pub fn identity(&self) -> &crate::auth::Identity {
432 &self.identity
433 }
434
435 pub fn use_graph(&self, name: &str) {
439 *self.current_graph.lock() = Some(name.to_string());
440 self.track_graph_touch();
441 }
442
443 #[must_use]
445 pub fn current_graph(&self) -> Option<String> {
446 self.current_graph.lock().clone()
447 }
448
449 pub fn set_schema(&self, name: &str) {
453 *self.current_schema.lock() = Some(name.to_string());
454 self.track_graph_touch();
455 }
456
457 #[must_use]
461 pub fn current_schema(&self) -> Option<String> {
462 self.current_schema.lock().clone()
463 }
464
465 fn effective_graph_key(&self, graph_name: &str) -> String {
470 let schema = self.current_schema.lock().clone();
471 match schema {
472 Some(s) => format!("{s}/{graph_name}"),
473 None => graph_name.to_string(),
474 }
475 }
476
477 fn effective_type_key(&self, type_name: &str) -> String {
481 let schema = self.current_schema.lock().clone();
482 match schema {
483 Some(s) => format!("{s}/{type_name}"),
484 None => type_name.to_string(),
485 }
486 }
487
488 fn active_graph_storage_key(&self) -> Option<String> {
492 let graph = self.current_graph.lock().clone();
493 let schema = self.current_schema.lock().clone();
494 match (&schema, &graph) {
495 (None, None) => None,
496 (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
497 (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
498 (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
499 Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
500 }
501 (None, Some(name)) => Some(name.clone()),
502 (Some(s), Some(g)) => Some(format!("{s}/{g}")),
503 }
504 }
505
506 fn active_store(&self) -> Arc<dyn GraphStoreSearch> {
514 let key = self.active_graph_storage_key();
515 match key {
516 None => Arc::clone(&self.graph_store),
517 #[cfg(feature = "lpg")]
518 Some(ref name) => match self.store.graph(name) {
519 Some(named_store) => {
520 #[cfg(feature = "wal")]
521 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
522 return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
523 named_store,
524 Arc::clone(wal),
525 name.clone(),
526 Arc::clone(ctx),
527 )) as Arc<dyn GraphStoreSearch>;
528 }
529 named_store as Arc<dyn GraphStoreSearch>
530 }
531 None => Arc::clone(&self.graph_store),
532 },
533 #[cfg(not(feature = "lpg"))]
534 Some(_) => Arc::clone(&self.graph_store),
535 }
536 }
537
538 fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
543 let key = self.active_graph_storage_key();
544 match key {
545 None => self.graph_store_mut.as_ref().map(Arc::clone),
546 #[cfg(feature = "lpg")]
547 Some(ref name) => match self.store.graph(name) {
548 Some(named_store) => {
549 let mut store: Arc<dyn GraphStoreMut> = named_store;
550
551 #[cfg(feature = "wal")]
552 if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
553 store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
554 self.store
556 .graph(name)
557 .unwrap_or_else(|| Arc::clone(&self.store)),
558 Arc::clone(wal),
559 name.clone(),
560 Arc::clone(ctx),
561 ));
562 }
563
564 #[cfg(feature = "cdc")]
565 if let Some(ref pending) = self.cdc_pending_events {
566 store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
567 store,
568 Arc::clone(&self.cdc_log),
569 Arc::clone(pending),
570 ));
571 }
572
573 Some(store)
574 }
575 None => self.graph_store_mut.as_ref().map(Arc::clone),
576 },
577 #[cfg(not(feature = "lpg"))]
578 Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
579 }
580 }
581
582 #[cfg(feature = "lpg")]
587 fn active_lpg_store(&self) -> Arc<LpgStore> {
588 let key = self.active_graph_storage_key();
589 match key {
590 None => Arc::clone(&self.store),
591 Some(ref name) => self
592 .store
593 .graph(name)
594 .unwrap_or_else(|| Arc::clone(&self.store)),
595 }
596 }
597
598 #[cfg(feature = "lpg")]
601 fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
602 match graph_name {
603 None => Arc::clone(&self.store),
604 Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
605 Some(name) => self
606 .store
607 .graph(name)
608 .unwrap_or_else(|| Arc::clone(&self.store)),
609 }
610 }
611
612 fn track_graph_touch(&self) {
621 if self.current_transaction.lock().is_some() {
622 let key = self.active_graph_storage_key();
623 let mut touched = self.touched_graphs.lock();
624 if !touched.contains(&key) {
625 touched.push(key);
626 }
627 }
628 }
629
630 pub fn set_time_zone(&self, tz: &str) {
632 *self.time_zone.lock() = Some(tz.to_string());
633 }
634
635 #[must_use]
637 pub fn time_zone(&self) -> Option<String> {
638 self.time_zone.lock().clone()
639 }
640
641 pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
643 self.session_params.lock().insert(key.to_string(), value);
644 }
645
646 #[must_use]
648 pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
649 self.session_params.lock().get(key).cloned()
650 }
651
652 pub fn reset_session(&self) {
654 *self.current_schema.lock() = None;
655 *self.current_graph.lock() = None;
656 *self.time_zone.lock() = None;
657 self.session_params.lock().clear();
658 *self.viewing_epoch_override.lock() = None;
659 self.track_graph_touch();
660 }
661
662 pub fn reset_schema(&self) {
664 *self.current_schema.lock() = None;
665 self.track_graph_touch();
666 }
667
668 pub fn reset_graph(&self) {
670 *self.current_graph.lock() = None;
671 self.track_graph_touch();
672 }
673
674 pub fn reset_time_zone(&self) {
676 *self.time_zone.lock() = None;
677 }
678
679 pub fn reset_parameters(&self) {
681 self.session_params.lock().clear();
682 }
683
684 pub fn set_viewing_epoch(&self, epoch: EpochId) {
692 *self.viewing_epoch_override.lock() = Some(epoch);
693 }
694
695 pub fn clear_viewing_epoch(&self) {
697 *self.viewing_epoch_override.lock() = None;
698 }
699
700 #[must_use]
702 pub fn viewing_epoch(&self) -> Option<EpochId> {
703 *self.viewing_epoch_override.lock()
704 }
705
706 #[cfg(feature = "lpg")]
710 #[must_use]
711 pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
712 self.active_lpg_store().get_node_history(id)
713 }
714
715 #[cfg(feature = "lpg")]
719 #[must_use]
720 pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
721 self.active_lpg_store().get_edge_history(id)
722 }
723
724 fn require_lpg(&self, language: &str) -> Result<()> {
726 if self.graph_model == GraphModel::Rdf {
727 return Err(grafeo_common::utils::error::Error::Internal(format!(
728 "This is an RDF database. {language} queries require an LPG database."
729 )));
730 }
731 Ok(())
732 }
733
734 #[inline]
740 fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
741 if self.identity.can_admin() {
743 return Ok(());
744 }
745 crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
746 grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
747 grafeo_common::utils::error::QueryErrorKind::Semantic,
748 denied.to_string(),
749 ))
750 })
751 }
752
753 #[cfg(feature = "gql")]
755 fn execute_session_command(
756 &self,
757 cmd: grafeo_adapters::query::gql::ast::SessionCommand,
758 ) -> Result<QueryResult> {
759 use grafeo_adapters::query::gql::ast::SessionCommand;
760 #[cfg(feature = "lpg")]
761 use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
762 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
763
764 match &cmd {
766 SessionCommand::CreateGraph { .. }
767 | SessionCommand::DropGraph { .. }
768 | SessionCommand::CreateProjection { .. }
769 | SessionCommand::DropProjection { .. } => {
770 self.require_permission(crate::auth::StatementKind::Write)?;
771 }
772 _ => {} }
774
775 if self.identity.has_grants() {
777 match &cmd {
778 SessionCommand::CreateGraph { name, .. }
779 | SessionCommand::DropGraph { name, .. }
780 if !self
781 .identity
782 .can_access_graph(name, crate::auth::Role::ReadWrite) =>
783 {
784 return Err(Error::Query(QueryError::new(
785 QueryErrorKind::Semantic,
786 format!(
787 "permission denied: no grant for graph '{name}' (user: {})",
788 self.identity.user_id()
789 ),
790 )));
791 }
792 _ => {}
793 }
794 }
795
796 if *self.read_only_tx.lock() {
798 match &cmd {
799 SessionCommand::CreateGraph { .. }
800 | SessionCommand::DropGraph { .. }
801 | SessionCommand::CreateProjection { .. }
802 | SessionCommand::DropProjection { .. } => {
803 return Err(Error::Transaction(
804 grafeo_common::utils::error::TransactionError::ReadOnly,
805 ));
806 }
807 _ => {} }
809 }
810
811 match cmd {
812 #[cfg(feature = "lpg")]
813 SessionCommand::CreateGraph {
814 name,
815 if_not_exists,
816 typed,
817 like_graph,
818 copy_of,
819 open: _,
820 } => {
821 if name.contains('/') {
823 return Err(Error::Query(QueryError::new(
824 QueryErrorKind::Semantic,
825 format!(
826 "Graph name '{name}' must not contain '/' (reserved as schema/graph separator)"
827 ),
828 )));
829 }
830 let storage_key = self.effective_graph_key(&name);
831
832 if let Some(ref src) = like_graph {
834 let src_key = self.effective_graph_key(src);
835 if self.store.graph(&src_key).is_none() {
836 return Err(Error::Query(QueryError::new(
837 QueryErrorKind::Semantic,
838 format!("Source graph '{src}' does not exist"),
839 )));
840 }
841 }
842 if let Some(ref src) = copy_of {
843 let src_key = self.effective_graph_key(src);
844 if self.store.graph(&src_key).is_none() {
845 return Err(Error::Query(QueryError::new(
846 QueryErrorKind::Semantic,
847 format!("Source graph '{src}' does not exist"),
848 )));
849 }
850 }
851
852 let created = self
853 .store
854 .create_graph(&storage_key)
855 .map_err(|e| Error::Internal(e.to_string()))?;
856 if !created && !if_not_exists {
857 return Err(Error::Query(QueryError::new(
858 QueryErrorKind::Semantic,
859 format!("Graph '{name}' already exists"),
860 )));
861 }
862 if created {
863 #[cfg(feature = "wal")]
864 self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
865 name: storage_key.clone(),
866 });
867 }
868
869 if let Some(ref src) = copy_of {
871 let src_key = self.effective_graph_key(src);
872 self.store
873 .copy_graph(Some(&src_key), Some(&storage_key))
874 .map_err(|e| Error::Internal(e.to_string()))?;
875 }
876
877 if let Some(type_name) = typed
881 && let Err(e) = self.catalog.bind_graph_type(
882 &storage_key,
883 if type_name.contains('/') {
884 type_name.clone()
885 } else {
886 self.effective_type_key(&type_name)
887 },
888 )
889 {
890 return Err(Error::Query(QueryError::new(
891 QueryErrorKind::Semantic,
892 e.to_string(),
893 )));
894 }
895
896 if let Some(ref src) = like_graph {
898 let src_key = self.effective_graph_key(src);
899 if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
900 let _ = self.catalog.bind_graph_type(&storage_key, src_type);
901 }
902 }
903
904 Ok(QueryResult::empty())
905 }
906 #[cfg(feature = "lpg")]
907 SessionCommand::DropGraph { name, if_exists } => {
908 let storage_key = self.effective_graph_key(&name);
909 let dropped = self.store.drop_graph(&storage_key);
910 if !dropped && !if_exists {
911 return Err(Error::Query(QueryError::new(
912 QueryErrorKind::Semantic,
913 format!("Graph '{name}' does not exist"),
914 )));
915 }
916 if dropped {
917 #[cfg(feature = "wal")]
918 self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
919 name: storage_key.clone(),
920 });
921 let mut current = self.current_graph.lock();
923 if current
924 .as_deref()
925 .is_some_and(|g| g.eq_ignore_ascii_case(&name))
926 {
927 *current = None;
928 }
929 }
930 Ok(QueryResult::empty())
931 }
932 #[cfg(feature = "lpg")]
933 SessionCommand::UseGraph(name) => {
934 if self.identity.has_grants()
936 && !name.eq_ignore_ascii_case("default")
937 && !self
938 .identity
939 .can_access_graph(&name, crate::auth::Role::ReadOnly)
940 {
941 return Err(Error::Query(QueryError::new(
942 QueryErrorKind::Semantic,
943 format!(
944 "permission denied: no grant for graph '{name}' (user: {})",
945 self.identity.user_id()
946 ),
947 )));
948 }
949 let effective_key = self.effective_graph_key(&name);
951 if !name.eq_ignore_ascii_case("default")
952 && self.store.graph(&effective_key).is_none()
953 {
954 return Err(Error::Query(QueryError::new(
955 QueryErrorKind::Semantic,
956 format!("Graph '{name}' does not exist"),
957 )));
958 }
959 self.use_graph(&name);
960 Ok(QueryResult::empty())
961 }
962 #[cfg(feature = "lpg")]
963 SessionCommand::SessionSetGraph(name) => {
964 if self.identity.has_grants()
967 && !name.eq_ignore_ascii_case("default")
968 && !self
969 .identity
970 .can_access_graph(&name, crate::auth::Role::ReadOnly)
971 {
972 return Err(Error::Query(QueryError::new(
973 QueryErrorKind::Semantic,
974 format!(
975 "permission denied: no grant for graph '{name}' (user: {})",
976 self.identity.user_id()
977 ),
978 )));
979 }
980 let effective_key = self.effective_graph_key(&name);
981 if !name.eq_ignore_ascii_case("default")
982 && self.store.graph(&effective_key).is_none()
983 {
984 return Err(Error::Query(QueryError::new(
985 QueryErrorKind::Semantic,
986 format!("Graph '{name}' does not exist"),
987 )));
988 }
989 self.use_graph(&name);
990 Ok(QueryResult::empty())
991 }
992 SessionCommand::SessionSetSchema(name) => {
993 if !self.catalog.schema_exists(&name) {
995 return Err(Error::Query(QueryError::new(
996 QueryErrorKind::Semantic,
997 format!("Schema '{name}' does not exist"),
998 )));
999 }
1000 self.set_schema(&name);
1001 Ok(QueryResult::empty())
1002 }
1003 SessionCommand::SessionSetTimeZone(tz) => {
1004 self.set_time_zone(&tz);
1005 Ok(QueryResult::empty())
1006 }
1007 #[cfg(feature = "gql")]
1008 SessionCommand::SessionSetParameter(key, expr) => {
1009 if key.eq_ignore_ascii_case("viewing_epoch") {
1010 match Self::eval_integer_literal(&expr) {
1011 Some(n) if n >= 0 => {
1012 #[allow(clippy::cast_sign_loss)]
1014 let epoch = n as u64;
1015 self.set_viewing_epoch(EpochId::new(epoch));
1016 Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
1017 }
1018 _ => Err(Error::Query(QueryError::new(
1019 QueryErrorKind::Semantic,
1020 "viewing_epoch must be a non-negative integer literal",
1021 ))),
1022 }
1023 } else {
1024 self.set_parameter(&key, Value::Null);
1027 Ok(QueryResult::empty())
1028 }
1029 }
1030 SessionCommand::SessionReset(target) => {
1031 use grafeo_adapters::query::gql::ast::SessionResetTarget;
1032 match target {
1033 SessionResetTarget::All => self.reset_session(),
1034 SessionResetTarget::Schema => self.reset_schema(),
1035 SessionResetTarget::Graph => self.reset_graph(),
1036 SessionResetTarget::TimeZone => self.reset_time_zone(),
1037 SessionResetTarget::Parameters => self.reset_parameters(),
1038 }
1039 Ok(QueryResult::empty())
1040 }
1041 SessionCommand::SessionClose => {
1042 self.reset_session();
1043 Ok(QueryResult::empty())
1044 }
1045 #[cfg(feature = "lpg")]
1046 SessionCommand::StartTransaction {
1047 read_only,
1048 isolation_level,
1049 } => {
1050 let engine_level = isolation_level.map(|l| match l {
1051 TransactionIsolationLevel::ReadCommitted => {
1052 crate::transaction::IsolationLevel::ReadCommitted
1053 }
1054 TransactionIsolationLevel::SnapshotIsolation => {
1055 crate::transaction::IsolationLevel::SnapshotIsolation
1056 }
1057 TransactionIsolationLevel::Serializable => {
1058 crate::transaction::IsolationLevel::Serializable
1059 }
1060 });
1061 self.begin_transaction_inner(read_only, engine_level)?;
1062 Ok(QueryResult::status("Transaction started"))
1063 }
1064 #[cfg(feature = "lpg")]
1065 SessionCommand::Commit => {
1066 self.commit_inner()?;
1067 Ok(QueryResult::status("Transaction committed"))
1068 }
1069 #[cfg(feature = "lpg")]
1070 SessionCommand::Rollback => {
1071 self.rollback_inner()?;
1072 Ok(QueryResult::status("Transaction rolled back"))
1073 }
1074 #[cfg(feature = "lpg")]
1075 SessionCommand::Savepoint(name) => {
1076 self.savepoint(&name)?;
1077 Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1078 }
1079 #[cfg(feature = "lpg")]
1080 SessionCommand::RollbackToSavepoint(name) => {
1081 self.rollback_to_savepoint(&name)?;
1082 Ok(QueryResult::status(format!(
1083 "Rolled back to savepoint '{name}'"
1084 )))
1085 }
1086 #[cfg(feature = "lpg")]
1087 SessionCommand::ReleaseSavepoint(name) => {
1088 self.release_savepoint(&name)?;
1089 Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1090 }
1091 #[cfg(feature = "lpg")]
1092 SessionCommand::CreateProjection {
1093 name,
1094 node_labels,
1095 edge_types,
1096 } => {
1097 use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1098 use std::collections::hash_map::Entry;
1099
1100 let spec = ProjectionSpec::new()
1101 .with_node_labels(node_labels)
1102 .with_edge_types(edge_types);
1103
1104 let store = self.active_store();
1105 let projection = Arc::new(GraphProjection::new(store, spec));
1106 let mut projections = self.projections.write();
1107 match projections.entry(name.clone()) {
1108 Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1109 QueryErrorKind::Semantic,
1110 format!("Projection '{name}' already exists"),
1111 ))),
1112 Entry::Vacant(e) => {
1113 e.insert(projection);
1114 Ok(QueryResult::status(format!("Projection '{name}' created")))
1115 }
1116 }
1117 }
1118 #[cfg(feature = "lpg")]
1119 SessionCommand::DropProjection { name } => {
1120 let removed = self.projections.write().remove(&name).is_some();
1121 if !removed {
1122 return Err(Error::Query(QueryError::new(
1123 QueryErrorKind::Semantic,
1124 format!("Projection '{name}' does not exist"),
1125 )));
1126 }
1127 Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1128 }
1129 #[cfg(feature = "lpg")]
1130 SessionCommand::ShowProjections => {
1131 let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1132 names.sort();
1133 let rows: Vec<Vec<Value>> =
1134 names.into_iter().map(|n| vec![Value::from(n)]).collect();
1135 Ok(QueryResult {
1136 columns: vec!["name".to_string()],
1137 column_types: Vec::new(),
1138 rows,
1139 ..QueryResult::empty()
1140 })
1141 }
1142 #[cfg(not(feature = "lpg"))]
1143 _ => Err(grafeo_common::utils::error::Error::Internal(
1144 "This command requires the `lpg` feature".to_string(),
1145 )),
1146 }
1147 }
1148
1149 #[cfg(feature = "wal")]
1151 fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1152 if let Some(ref wal) = self.wal
1153 && let Err(e) = wal.log(record)
1154 {
1155 grafeo_warn!("Failed to log schema change to WAL: {}", e);
1156 }
1157 }
1158
1159 #[cfg(all(feature = "lpg", feature = "gql"))]
1161 fn execute_schema_command(
1162 &self,
1163 cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1164 ) -> Result<QueryResult> {
1165 use crate::catalog::{
1166 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1167 };
1168 use grafeo_adapters::query::gql::ast::SchemaStatement;
1169 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1170 #[cfg(feature = "wal")]
1171 use grafeo_storage::wal::WalRecord;
1172
1173 macro_rules! wal_log {
1175 ($self:expr, $record:expr) => {
1176 #[cfg(feature = "wal")]
1177 $self.log_schema_wal(&$record);
1178 };
1179 }
1180
1181 let result = match cmd {
1182 SchemaStatement::CreateNodeType(stmt) => {
1183 let effective_name = self.effective_type_key(&stmt.name);
1184 #[cfg(feature = "wal")]
1185 let props_for_wal: Vec<(String, String, bool)> = stmt
1186 .properties
1187 .iter()
1188 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1189 .collect();
1190 let def = NodeTypeDefinition {
1191 name: effective_name.clone(),
1192 properties: stmt
1193 .properties
1194 .iter()
1195 .map(|p| TypedProperty {
1196 name: p.name.clone(),
1197 data_type: PropertyDataType::from_type_name(&p.data_type),
1198 nullable: p.nullable,
1199 default_value: p
1200 .default_value
1201 .as_ref()
1202 .map(|s| parse_default_literal(s)),
1203 })
1204 .collect(),
1205 constraints: Vec::new(),
1206 parent_types: stmt.parent_types.clone(),
1207 };
1208 let result = if stmt.or_replace {
1209 let _ = self.catalog.drop_node_type(&effective_name);
1210 self.catalog.register_node_type(def)
1211 } else {
1212 self.catalog.register_node_type(def)
1213 };
1214 match result {
1215 Ok(()) => {
1216 wal_log!(
1217 self,
1218 WalRecord::CreateNodeType {
1219 name: effective_name.clone(),
1220 properties: props_for_wal,
1221 constraints: Vec::new(),
1222 }
1223 );
1224 Ok(QueryResult::status(format!(
1225 "Created node type '{}'",
1226 stmt.name
1227 )))
1228 }
1229 Err(e) if stmt.if_not_exists => {
1230 let _ = e;
1231 Ok(QueryResult::status("No change"))
1232 }
1233 Err(e) => Err(Error::Query(QueryError::new(
1234 QueryErrorKind::Semantic,
1235 e.to_string(),
1236 ))),
1237 }
1238 }
1239 SchemaStatement::CreateEdgeType(stmt) => {
1240 let effective_name = self.effective_type_key(&stmt.name);
1241 #[cfg(feature = "wal")]
1242 let props_for_wal: Vec<(String, String, bool)> = stmt
1243 .properties
1244 .iter()
1245 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1246 .collect();
1247 let def = EdgeTypeDefinition {
1248 name: effective_name.clone(),
1249 properties: stmt
1250 .properties
1251 .iter()
1252 .map(|p| TypedProperty {
1253 name: p.name.clone(),
1254 data_type: PropertyDataType::from_type_name(&p.data_type),
1255 nullable: p.nullable,
1256 default_value: p
1257 .default_value
1258 .as_ref()
1259 .map(|s| parse_default_literal(s)),
1260 })
1261 .collect(),
1262 constraints: Vec::new(),
1263 source_node_types: stmt.source_node_types.clone(),
1264 target_node_types: stmt.target_node_types.clone(),
1265 };
1266 let result = if stmt.or_replace {
1267 let _ = self.catalog.drop_edge_type_def(&effective_name);
1268 self.catalog.register_edge_type_def(def)
1269 } else {
1270 self.catalog.register_edge_type_def(def)
1271 };
1272 match result {
1273 Ok(()) => {
1274 wal_log!(
1275 self,
1276 WalRecord::CreateEdgeType {
1277 name: effective_name.clone(),
1278 properties: props_for_wal,
1279 constraints: Vec::new(),
1280 }
1281 );
1282 Ok(QueryResult::status(format!(
1283 "Created edge type '{}'",
1284 stmt.name
1285 )))
1286 }
1287 Err(e) if stmt.if_not_exists => {
1288 let _ = e;
1289 Ok(QueryResult::status("No change"))
1290 }
1291 Err(e) => Err(Error::Query(QueryError::new(
1292 QueryErrorKind::Semantic,
1293 e.to_string(),
1294 ))),
1295 }
1296 }
1297 SchemaStatement::CreateVectorIndex(stmt) => {
1298 Self::create_vector_index_on_store(
1299 &self.active_lpg_store(),
1300 &stmt.node_label,
1301 &stmt.property,
1302 stmt.dimensions,
1303 stmt.metric.as_deref(),
1304 )?;
1305 wal_log!(
1306 self,
1307 WalRecord::CreateIndex {
1308 name: stmt.name.clone(),
1309 label: stmt.node_label.clone(),
1310 property: stmt.property.clone(),
1311 index_type: "vector".to_string(),
1312 }
1313 );
1314 Ok(QueryResult::status(format!(
1315 "Created vector index '{}'",
1316 stmt.name
1317 )))
1318 }
1319 SchemaStatement::DropNodeType { name, if_exists } => {
1320 let effective_name = self.effective_type_key(&name);
1321 match self.catalog.drop_node_type(&effective_name) {
1322 Ok(()) => {
1323 wal_log!(
1324 self,
1325 WalRecord::DropNodeType {
1326 name: effective_name
1327 }
1328 );
1329 Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1330 }
1331 Err(e) if if_exists => {
1332 let _ = e;
1333 Ok(QueryResult::status("No change"))
1334 }
1335 Err(e) => Err(Error::Query(QueryError::new(
1336 QueryErrorKind::Semantic,
1337 e.to_string(),
1338 ))),
1339 }
1340 }
1341 SchemaStatement::DropEdgeType { name, if_exists } => {
1342 let effective_name = self.effective_type_key(&name);
1343 match self.catalog.drop_edge_type_def(&effective_name) {
1344 Ok(()) => {
1345 wal_log!(
1346 self,
1347 WalRecord::DropEdgeType {
1348 name: effective_name
1349 }
1350 );
1351 Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1352 }
1353 Err(e) if if_exists => {
1354 let _ = e;
1355 Ok(QueryResult::status("No change"))
1356 }
1357 Err(e) => Err(Error::Query(QueryError::new(
1358 QueryErrorKind::Semantic,
1359 e.to_string(),
1360 ))),
1361 }
1362 }
1363 SchemaStatement::CreateIndex(stmt) => {
1364 use crate::catalog::IndexType as CatalogIndexType;
1365 use grafeo_adapters::query::gql::ast::IndexKind;
1366 let active = self.active_lpg_store();
1367 let index_type_str = match stmt.index_kind {
1368 IndexKind::Property => "property",
1369 IndexKind::BTree => "btree",
1370 IndexKind::Text => "text",
1371 IndexKind::Vector => "vector",
1372 };
1373 match stmt.index_kind {
1374 IndexKind::Property | IndexKind::BTree => {
1375 for prop in &stmt.properties {
1376 active.create_property_index(prop);
1377 }
1378 }
1379 IndexKind::Text => {
1380 for prop in &stmt.properties {
1381 Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1382 }
1383 }
1384 IndexKind::Vector => {
1385 for prop in &stmt.properties {
1386 Self::create_vector_index_on_store(
1387 &active,
1388 &stmt.label,
1389 prop,
1390 stmt.options.dimensions,
1391 stmt.options.metric.as_deref(),
1392 )?;
1393 }
1394 }
1395 }
1396 let catalog_index_type = match stmt.index_kind {
1399 IndexKind::Property => CatalogIndexType::Hash,
1400 IndexKind::BTree => CatalogIndexType::BTree,
1401 IndexKind::Text => CatalogIndexType::FullText,
1402 IndexKind::Vector => CatalogIndexType::Hash,
1403 };
1404 let label_id = self.catalog.get_or_create_label(&stmt.label);
1405 for prop in &stmt.properties {
1406 let prop_id = self.catalog.get_or_create_property_key(prop);
1407 self.catalog
1408 .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1409 }
1410 #[cfg(feature = "wal")]
1411 for prop in &stmt.properties {
1412 wal_log!(
1413 self,
1414 WalRecord::CreateIndex {
1415 name: stmt.name.clone(),
1416 label: stmt.label.clone(),
1417 property: prop.clone(),
1418 index_type: index_type_str.to_string(),
1419 }
1420 );
1421 }
1422 Ok(QueryResult::status(format!(
1423 "Created {} index '{}'",
1424 index_type_str, stmt.name
1425 )))
1426 }
1427 SchemaStatement::DropIndex { name, if_exists } => {
1428 if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1431 let def = self.catalog.get_index(index_id);
1432 self.catalog.drop_index(index_id);
1433 if let Some(def) = def
1434 && let Some(prop_name) =
1435 self.catalog.get_property_key_name(def.property_key)
1436 {
1437 self.active_lpg_store().drop_property_index(&prop_name);
1438 }
1439 wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1440 Ok(QueryResult::status(format!("Dropped index '{name}'")))
1441 } else if if_exists {
1442 Ok(QueryResult::status("No change".to_string()))
1443 } else {
1444 Err(Error::Query(QueryError::new(
1445 QueryErrorKind::Semantic,
1446 format!("Index '{name}' does not exist"),
1447 )))
1448 }
1449 }
1450 SchemaStatement::CreateConstraint(stmt) => {
1451 use crate::catalog::TypeConstraint;
1452 use grafeo_adapters::query::gql::ast::ConstraintKind;
1453 let kind_str = match stmt.constraint_kind {
1454 ConstraintKind::Unique => "unique",
1455 ConstraintKind::NodeKey => "node_key",
1456 ConstraintKind::NotNull => "not_null",
1457 ConstraintKind::Exists => "exists",
1458 };
1459 let constraint_name = stmt
1460 .name
1461 .clone()
1462 .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1463
1464 match stmt.constraint_kind {
1466 ConstraintKind::Unique => {
1467 for prop in &stmt.properties {
1468 let label_id = self.catalog.get_or_create_label(&stmt.label);
1469 let prop_id = self.catalog.get_or_create_property_key(prop);
1470 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1471 }
1472 let _ = self.catalog.add_constraint_to_type(
1473 &stmt.label,
1474 TypeConstraint::Unique(stmt.properties.clone()),
1475 );
1476 }
1477 ConstraintKind::NodeKey => {
1478 for prop in &stmt.properties {
1479 let label_id = self.catalog.get_or_create_label(&stmt.label);
1480 let prop_id = self.catalog.get_or_create_property_key(prop);
1481 let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1482 let _ = self.catalog.add_required_property(label_id, prop_id);
1483 }
1484 let _ = self.catalog.add_constraint_to_type(
1485 &stmt.label,
1486 TypeConstraint::PrimaryKey(stmt.properties.clone()),
1487 );
1488 }
1489 ConstraintKind::NotNull | ConstraintKind::Exists => {
1490 for prop in &stmt.properties {
1491 let label_id = self.catalog.get_or_create_label(&stmt.label);
1492 let prop_id = self.catalog.get_or_create_property_key(prop);
1493 let _ = self.catalog.add_required_property(label_id, prop_id);
1494 let _ = self.catalog.add_constraint_to_type(
1495 &stmt.label,
1496 TypeConstraint::NotNull(prop.clone()),
1497 );
1498 }
1499 }
1500 }
1501
1502 wal_log!(
1503 self,
1504 WalRecord::CreateConstraint {
1505 name: constraint_name.clone(),
1506 label: stmt.label.clone(),
1507 properties: stmt.properties.clone(),
1508 kind: kind_str.to_string(),
1509 }
1510 );
1511 Ok(QueryResult::status(format!(
1512 "Created {kind_str} constraint '{constraint_name}'"
1513 )))
1514 }
1515 SchemaStatement::DropConstraint { name, if_exists } => {
1516 let _ = if_exists;
1517 wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1518 Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1519 }
1520 SchemaStatement::CreateGraphType(stmt) => {
1521 use crate::catalog::GraphTypeDefinition;
1522 use grafeo_adapters::query::gql::ast::InlineElementType;
1523
1524 let effective_name = self.effective_type_key(&stmt.name);
1525
1526 let (mut node_types, mut edge_types, open) =
1528 if let Some(ref like_graph) = stmt.like_graph {
1529 if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1531 if let Some(existing) = self
1532 .catalog
1533 .schema()
1534 .and_then(|s| s.get_graph_type(&type_name))
1535 {
1536 (
1537 existing.allowed_node_types.clone(),
1538 existing.allowed_edge_types.clone(),
1539 existing.open,
1540 )
1541 } else {
1542 (Vec::new(), Vec::new(), true)
1543 }
1544 } else {
1545 let nt = self.catalog.all_node_type_names();
1547 let et = self.catalog.all_edge_type_names();
1548 if nt.is_empty() && et.is_empty() {
1549 (Vec::new(), Vec::new(), true)
1550 } else {
1551 (nt, et, false)
1552 }
1553 }
1554 } else {
1555 let nt = stmt
1557 .node_types
1558 .iter()
1559 .map(|n| self.effective_type_key(n))
1560 .collect();
1561 let et = stmt
1562 .edge_types
1563 .iter()
1564 .map(|n| self.effective_type_key(n))
1565 .collect();
1566 (nt, et, stmt.open)
1567 };
1568
1569 for inline in &stmt.inline_types {
1571 match inline {
1572 InlineElementType::Node {
1573 name,
1574 properties,
1575 key_labels,
1576 ..
1577 } => {
1578 let inline_effective = self.effective_type_key(name);
1579 let def = NodeTypeDefinition {
1580 name: inline_effective.clone(),
1581 properties: properties
1582 .iter()
1583 .map(|p| TypedProperty {
1584 name: p.name.clone(),
1585 data_type: PropertyDataType::from_type_name(&p.data_type),
1586 nullable: p.nullable,
1587 default_value: None,
1588 })
1589 .collect(),
1590 constraints: Vec::new(),
1591 parent_types: key_labels.clone(),
1592 };
1593 self.catalog.register_or_replace_node_type(def);
1595 #[cfg(feature = "wal")]
1596 {
1597 let props_for_wal: Vec<(String, String, bool)> = properties
1598 .iter()
1599 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1600 .collect();
1601 self.log_schema_wal(&WalRecord::CreateNodeType {
1602 name: inline_effective.clone(),
1603 properties: props_for_wal,
1604 constraints: Vec::new(),
1605 });
1606 }
1607 if !node_types.contains(&inline_effective) {
1608 node_types.push(inline_effective);
1609 }
1610 }
1611 InlineElementType::Edge {
1612 name,
1613 properties,
1614 source_node_types,
1615 target_node_types,
1616 ..
1617 } => {
1618 let inline_effective = self.effective_type_key(name);
1619 let def = EdgeTypeDefinition {
1620 name: inline_effective.clone(),
1621 properties: properties
1622 .iter()
1623 .map(|p| TypedProperty {
1624 name: p.name.clone(),
1625 data_type: PropertyDataType::from_type_name(&p.data_type),
1626 nullable: p.nullable,
1627 default_value: None,
1628 })
1629 .collect(),
1630 constraints: Vec::new(),
1631 source_node_types: source_node_types.clone(),
1632 target_node_types: target_node_types.clone(),
1633 };
1634 self.catalog.register_or_replace_edge_type_def(def);
1635 #[cfg(feature = "wal")]
1636 {
1637 let props_for_wal: Vec<(String, String, bool)> = properties
1638 .iter()
1639 .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1640 .collect();
1641 self.log_schema_wal(&WalRecord::CreateEdgeType {
1642 name: inline_effective.clone(),
1643 properties: props_for_wal,
1644 constraints: Vec::new(),
1645 });
1646 }
1647 if !edge_types.contains(&inline_effective) {
1648 edge_types.push(inline_effective);
1649 }
1650 }
1651 }
1652 }
1653
1654 let def = GraphTypeDefinition {
1655 name: effective_name.clone(),
1656 allowed_node_types: node_types.clone(),
1657 allowed_edge_types: edge_types.clone(),
1658 open,
1659 };
1660 let result = if stmt.or_replace {
1661 let _ = self.catalog.drop_graph_type(&effective_name);
1663 self.catalog.register_graph_type(def)
1664 } else {
1665 self.catalog.register_graph_type(def)
1666 };
1667 match result {
1668 Ok(()) => {
1669 wal_log!(
1670 self,
1671 WalRecord::CreateGraphType {
1672 name: effective_name.clone(),
1673 node_types,
1674 edge_types,
1675 open,
1676 }
1677 );
1678 Ok(QueryResult::status(format!(
1679 "Created graph type '{}'",
1680 stmt.name
1681 )))
1682 }
1683 Err(e) if stmt.if_not_exists => {
1684 let _ = e;
1685 Ok(QueryResult::status("No change"))
1686 }
1687 Err(e) => Err(Error::Query(QueryError::new(
1688 QueryErrorKind::Semantic,
1689 e.to_string(),
1690 ))),
1691 }
1692 }
1693 SchemaStatement::DropGraphType { name, if_exists } => {
1694 let effective_name = self.effective_type_key(&name);
1695 match self.catalog.drop_graph_type(&effective_name) {
1696 Ok(()) => {
1697 wal_log!(
1698 self,
1699 WalRecord::DropGraphType {
1700 name: effective_name
1701 }
1702 );
1703 Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1704 }
1705 Err(e) if if_exists => {
1706 let _ = e;
1707 Ok(QueryResult::status("No change"))
1708 }
1709 Err(e) => Err(Error::Query(QueryError::new(
1710 QueryErrorKind::Semantic,
1711 e.to_string(),
1712 ))),
1713 }
1714 }
1715 SchemaStatement::CreateSchema {
1716 name,
1717 if_not_exists,
1718 } => {
1719 if name.contains('/') {
1720 return Err(Error::Query(QueryError::new(
1721 QueryErrorKind::Semantic,
1722 format!(
1723 "Schema name '{name}' must not contain '/' (reserved as schema/graph separator)"
1724 ),
1725 )));
1726 }
1727 match self.catalog.register_schema_namespace(name.clone()) {
1728 Ok(()) => {
1729 wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1730 let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1733 if self.store.create_graph(&default_key).unwrap_or(false) {
1734 wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1735 }
1736 Ok(QueryResult::status(format!("Created schema '{name}'")))
1737 }
1738 Err(e) if if_not_exists => {
1739 let _ = e;
1740 Ok(QueryResult::status("No change"))
1741 }
1742 Err(e) => Err(Error::Query(QueryError::new(
1743 QueryErrorKind::Semantic,
1744 e.to_string(),
1745 ))),
1746 }
1747 }
1748 SchemaStatement::DropSchema { name, if_exists } => {
1749 let prefix = format!("{name}/");
1752 let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1753 let has_graphs = self
1754 .store
1755 .graph_names()
1756 .iter()
1757 .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1758 let has_types = self
1759 .catalog
1760 .all_node_type_names()
1761 .iter()
1762 .any(|n| n.starts_with(&prefix))
1763 || self
1764 .catalog
1765 .all_edge_type_names()
1766 .iter()
1767 .any(|n| n.starts_with(&prefix))
1768 || self
1769 .catalog
1770 .all_graph_type_names()
1771 .iter()
1772 .any(|n| n.starts_with(&prefix));
1773 if has_graphs || has_types {
1774 return Err(Error::Query(QueryError::new(
1775 QueryErrorKind::Semantic,
1776 format!("Schema '{name}' is not empty: drop all graphs and types first"),
1777 )));
1778 }
1779 match self.catalog.drop_schema_namespace(&name) {
1780 Ok(()) => {
1781 wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1782 if self.store.drop_graph(&default_graph_key) {
1784 wal_log!(
1785 self,
1786 WalRecord::DropNamedGraph {
1787 name: default_graph_key,
1788 }
1789 );
1790 }
1791 let mut current = self.current_schema.lock();
1793 if current
1794 .as_deref()
1795 .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1796 {
1797 *current = None;
1798 }
1799 Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1800 }
1801 Err(e) if if_exists => {
1802 let _ = e;
1803 Ok(QueryResult::status("No change"))
1804 }
1805 Err(e) => Err(Error::Query(QueryError::new(
1806 QueryErrorKind::Semantic,
1807 e.to_string(),
1808 ))),
1809 }
1810 }
1811 SchemaStatement::AlterNodeType(stmt) => {
1812 use grafeo_adapters::query::gql::ast::TypeAlteration;
1813 let effective_name = self.effective_type_key(&stmt.name);
1814 let mut wal_alts = Vec::new();
1815 for alt in &stmt.alterations {
1816 match alt {
1817 TypeAlteration::AddProperty(prop) => {
1818 let typed = TypedProperty {
1819 name: prop.name.clone(),
1820 data_type: PropertyDataType::from_type_name(&prop.data_type),
1821 nullable: prop.nullable,
1822 default_value: prop
1823 .default_value
1824 .as_ref()
1825 .map(|s| parse_default_literal(s)),
1826 };
1827 self.catalog
1828 .alter_node_type_add_property(&effective_name, typed)
1829 .map_err(|e| {
1830 Error::Query(QueryError::new(
1831 QueryErrorKind::Semantic,
1832 e.to_string(),
1833 ))
1834 })?;
1835 wal_alts.push((
1836 "add".to_string(),
1837 prop.name.clone(),
1838 prop.data_type.clone(),
1839 prop.nullable,
1840 ));
1841 }
1842 TypeAlteration::DropProperty(name) => {
1843 self.catalog
1844 .alter_node_type_drop_property(&effective_name, name)
1845 .map_err(|e| {
1846 Error::Query(QueryError::new(
1847 QueryErrorKind::Semantic,
1848 e.to_string(),
1849 ))
1850 })?;
1851 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1852 }
1853 }
1854 }
1855 wal_log!(
1856 self,
1857 WalRecord::AlterNodeType {
1858 name: effective_name,
1859 alterations: wal_alts,
1860 }
1861 );
1862 Ok(QueryResult::status(format!(
1863 "Altered node type '{}'",
1864 stmt.name
1865 )))
1866 }
1867 SchemaStatement::AlterEdgeType(stmt) => {
1868 use grafeo_adapters::query::gql::ast::TypeAlteration;
1869 let effective_name = self.effective_type_key(&stmt.name);
1870 let mut wal_alts = Vec::new();
1871 for alt in &stmt.alterations {
1872 match alt {
1873 TypeAlteration::AddProperty(prop) => {
1874 let typed = TypedProperty {
1875 name: prop.name.clone(),
1876 data_type: PropertyDataType::from_type_name(&prop.data_type),
1877 nullable: prop.nullable,
1878 default_value: prop
1879 .default_value
1880 .as_ref()
1881 .map(|s| parse_default_literal(s)),
1882 };
1883 self.catalog
1884 .alter_edge_type_add_property(&effective_name, typed)
1885 .map_err(|e| {
1886 Error::Query(QueryError::new(
1887 QueryErrorKind::Semantic,
1888 e.to_string(),
1889 ))
1890 })?;
1891 wal_alts.push((
1892 "add".to_string(),
1893 prop.name.clone(),
1894 prop.data_type.clone(),
1895 prop.nullable,
1896 ));
1897 }
1898 TypeAlteration::DropProperty(name) => {
1899 self.catalog
1900 .alter_edge_type_drop_property(&effective_name, name)
1901 .map_err(|e| {
1902 Error::Query(QueryError::new(
1903 QueryErrorKind::Semantic,
1904 e.to_string(),
1905 ))
1906 })?;
1907 wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1908 }
1909 }
1910 }
1911 wal_log!(
1912 self,
1913 WalRecord::AlterEdgeType {
1914 name: effective_name,
1915 alterations: wal_alts,
1916 }
1917 );
1918 Ok(QueryResult::status(format!(
1919 "Altered edge type '{}'",
1920 stmt.name
1921 )))
1922 }
1923 SchemaStatement::AlterGraphType(stmt) => {
1924 use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1925 let effective_name = self.effective_type_key(&stmt.name);
1926 let mut wal_alts = Vec::new();
1927 for alt in &stmt.alterations {
1928 match alt {
1929 GraphTypeAlteration::AddNodeType(name) => {
1930 self.catalog
1931 .alter_graph_type_add_node_type(&effective_name, name.clone())
1932 .map_err(|e| {
1933 Error::Query(QueryError::new(
1934 QueryErrorKind::Semantic,
1935 e.to_string(),
1936 ))
1937 })?;
1938 wal_alts.push(("add_node_type".to_string(), name.clone()));
1939 }
1940 GraphTypeAlteration::DropNodeType(name) => {
1941 self.catalog
1942 .alter_graph_type_drop_node_type(&effective_name, name)
1943 .map_err(|e| {
1944 Error::Query(QueryError::new(
1945 QueryErrorKind::Semantic,
1946 e.to_string(),
1947 ))
1948 })?;
1949 wal_alts.push(("drop_node_type".to_string(), name.clone()));
1950 }
1951 GraphTypeAlteration::AddEdgeType(name) => {
1952 self.catalog
1953 .alter_graph_type_add_edge_type(&effective_name, name.clone())
1954 .map_err(|e| {
1955 Error::Query(QueryError::new(
1956 QueryErrorKind::Semantic,
1957 e.to_string(),
1958 ))
1959 })?;
1960 wal_alts.push(("add_edge_type".to_string(), name.clone()));
1961 }
1962 GraphTypeAlteration::DropEdgeType(name) => {
1963 self.catalog
1964 .alter_graph_type_drop_edge_type(&effective_name, name)
1965 .map_err(|e| {
1966 Error::Query(QueryError::new(
1967 QueryErrorKind::Semantic,
1968 e.to_string(),
1969 ))
1970 })?;
1971 wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1972 }
1973 }
1974 }
1975 wal_log!(
1976 self,
1977 WalRecord::AlterGraphType {
1978 name: effective_name,
1979 alterations: wal_alts,
1980 }
1981 );
1982 Ok(QueryResult::status(format!(
1983 "Altered graph type '{}'",
1984 stmt.name
1985 )))
1986 }
1987 SchemaStatement::CreateProcedure(stmt) => {
1988 use crate::catalog::ProcedureDefinition;
1989
1990 let def = ProcedureDefinition {
1991 name: stmt.name.clone(),
1992 params: stmt
1993 .params
1994 .iter()
1995 .map(|p| (p.name.clone(), p.param_type.clone()))
1996 .collect(),
1997 returns: stmt
1998 .returns
1999 .iter()
2000 .map(|r| (r.name.clone(), r.return_type.clone()))
2001 .collect(),
2002 body: stmt.body.clone(),
2003 };
2004
2005 if stmt.or_replace {
2006 self.catalog.replace_procedure(def).map_err(|e| {
2007 Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
2008 })?;
2009 } else {
2010 match self.catalog.register_procedure(def) {
2011 Ok(()) => {}
2012 Err(_) if stmt.if_not_exists => {
2013 return Ok(QueryResult::empty());
2014 }
2015 Err(e) => {
2016 return Err(Error::Query(QueryError::new(
2017 QueryErrorKind::Semantic,
2018 e.to_string(),
2019 )));
2020 }
2021 }
2022 }
2023
2024 wal_log!(
2025 self,
2026 WalRecord::CreateProcedure {
2027 name: stmt.name.clone(),
2028 params: stmt
2029 .params
2030 .iter()
2031 .map(|p| (p.name.clone(), p.param_type.clone()))
2032 .collect(),
2033 returns: stmt
2034 .returns
2035 .iter()
2036 .map(|r| (r.name.clone(), r.return_type.clone()))
2037 .collect(),
2038 body: stmt.body,
2039 }
2040 );
2041 Ok(QueryResult::status(format!(
2042 "Created procedure '{}'",
2043 stmt.name
2044 )))
2045 }
2046 SchemaStatement::DropProcedure { name, if_exists } => {
2047 match self.catalog.drop_procedure(&name) {
2048 Ok(()) => {}
2049 Err(_) if if_exists => {
2050 return Ok(QueryResult::empty());
2051 }
2052 Err(e) => {
2053 return Err(Error::Query(QueryError::new(
2054 QueryErrorKind::Semantic,
2055 e.to_string(),
2056 )));
2057 }
2058 }
2059 wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2060 Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2061 }
2062 SchemaStatement::ShowIndexes => {
2063 return self.execute_show_indexes();
2064 }
2065 SchemaStatement::ShowConstraints => {
2066 return self.execute_show_constraints();
2067 }
2068 SchemaStatement::ShowNodeTypes => {
2069 return self.execute_show_node_types();
2070 }
2071 SchemaStatement::ShowEdgeTypes => {
2072 return self.execute_show_edge_types();
2073 }
2074 SchemaStatement::ShowGraphTypes => {
2075 return self.execute_show_graph_types();
2076 }
2077 SchemaStatement::ShowGraphType(name) => {
2078 return self.execute_show_graph_type(&name);
2079 }
2080 SchemaStatement::ShowCurrentGraphType => {
2081 return self.execute_show_current_graph_type();
2082 }
2083 SchemaStatement::ShowGraphs => {
2084 return self.execute_show_graphs();
2085 }
2086 SchemaStatement::ShowSchemas => {
2087 return self.execute_show_schemas();
2088 }
2089 };
2090
2091 if result.is_ok() {
2094 self.query_cache.clear();
2095 }
2096
2097 result
2098 }
2099
2100 #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2102 fn create_vector_index_on_store(
2103 store: &LpgStore,
2104 label: &str,
2105 property: &str,
2106 dimensions: Option<usize>,
2107 metric: Option<&str>,
2108 ) -> Result<()> {
2109 use grafeo_common::types::{PropertyKey, Value};
2110 use grafeo_common::utils::error::Error;
2111 use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2112
2113 let metric = match metric {
2114 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2115 Error::Internal(format!(
2116 "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2117 ))
2118 })?,
2119 None => DistanceMetric::Cosine,
2120 };
2121
2122 let prop_key = PropertyKey::new(property);
2123 let mut found_dims: Option<usize> = dimensions;
2124 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2125
2126 for node in store.nodes_with_label(label) {
2127 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2128 if let Some(expected) = found_dims {
2129 if v.len() != expected {
2130 return Err(Error::Internal(format!(
2131 "Vector dimension mismatch: expected {expected}, found {} on node {}",
2132 v.len(),
2133 node.id.0
2134 )));
2135 }
2136 } else {
2137 found_dims = Some(v.len());
2138 }
2139 vectors.push((node.id, v.to_vec()));
2140 }
2141 }
2142
2143 let Some(dims) = found_dims else {
2144 return Err(Error::Internal(format!(
2145 "No vector properties found on :{label}({property}) and no dimensions specified"
2146 )));
2147 };
2148
2149 let config = HnswConfig::new(dims, metric);
2150 let index = HnswIndex::with_capacity(config, vectors.len());
2151 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2152 for (node_id, vec) in &vectors {
2153 index.insert(*node_id, vec, &accessor);
2154 }
2155
2156 store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2157 Ok(())
2158 }
2159
2160 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2162 fn create_vector_index_on_store(
2163 _store: &LpgStore,
2164 _label: &str,
2165 _property: &str,
2166 _dimensions: Option<usize>,
2167 _metric: Option<&str>,
2168 ) -> Result<()> {
2169 Err(grafeo_common::utils::error::Error::Internal(
2170 "Vector index support requires the 'vector-index' feature".to_string(),
2171 ))
2172 }
2173
2174 #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2176 fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2177 use grafeo_common::types::{PropertyKey, Value};
2178 use grafeo_core::index::text::{BM25Config, InvertedIndex};
2179
2180 let mut index = InvertedIndex::new(BM25Config::default());
2181 let prop_key = PropertyKey::new(property);
2182
2183 let nodes = store.nodes_by_label(label);
2184 for node_id in nodes {
2185 if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2186 index.insert(node_id, text.as_str());
2187 }
2188 }
2189
2190 store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2191 Ok(())
2192 }
2193
2194 #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2196 fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2197 Err(grafeo_common::utils::error::Error::Internal(
2198 "Text index support requires the 'text-index' feature".to_string(),
2199 ))
2200 }
2201
2202 fn execute_show_indexes(&self) -> Result<QueryResult> {
2204 let indexes = self.catalog.all_indexes();
2205 let columns = vec![
2206 "name".to_string(),
2207 "type".to_string(),
2208 "label".to_string(),
2209 "property".to_string(),
2210 ];
2211 let rows: Vec<Vec<Value>> = indexes
2212 .into_iter()
2213 .map(|def| {
2214 let label_name = self
2215 .catalog
2216 .get_label_name(def.label)
2217 .unwrap_or_else(|| "?".into());
2218 let prop_name = self
2219 .catalog
2220 .get_property_key_name(def.property_key)
2221 .unwrap_or_else(|| "?".into());
2222 vec![
2223 Value::from(def.name),
2224 Value::from(format!("{:?}", def.index_type)),
2225 Value::from(&*label_name),
2226 Value::from(&*prop_name),
2227 ]
2228 })
2229 .collect();
2230 Ok(QueryResult {
2231 columns,
2232 column_types: Vec::new(),
2233 rows,
2234 ..QueryResult::empty()
2235 })
2236 }
2237
2238 fn execute_show_constraints(&self) -> Result<QueryResult> {
2240 Ok(QueryResult {
2243 columns: vec![
2244 "name".to_string(),
2245 "type".to_string(),
2246 "label".to_string(),
2247 "properties".to_string(),
2248 ],
2249 column_types: Vec::new(),
2250 rows: Vec::new(),
2251 ..QueryResult::empty()
2252 })
2253 }
2254
2255 fn execute_show_node_types(&self) -> Result<QueryResult> {
2257 let columns = vec![
2258 "name".to_string(),
2259 "properties".to_string(),
2260 "constraints".to_string(),
2261 "parents".to_string(),
2262 ];
2263 let schema = self.current_schema.lock().clone();
2264 let all_names = self.catalog.all_node_type_names();
2265 let type_names: Vec<String> = match &schema {
2266 Some(s) => {
2267 let prefix = format!("{s}/");
2268 all_names
2269 .into_iter()
2270 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2271 .collect()
2272 }
2273 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2274 };
2275 let rows: Vec<Vec<Value>> = type_names
2276 .into_iter()
2277 .filter_map(|name| {
2278 let lookup = match &schema {
2279 Some(s) => format!("{s}/{name}"),
2280 None => name.clone(),
2281 };
2282 let def = self.catalog.get_node_type(&lookup)?;
2283 let props: Vec<String> = def
2284 .properties
2285 .iter()
2286 .map(|p| {
2287 let nullable = if p.nullable { "" } else { " NOT NULL" };
2288 format!("{} {}{}", p.name, p.data_type, nullable)
2289 })
2290 .collect();
2291 let constraints: Vec<String> =
2292 def.constraints.iter().map(|c| format!("{c:?}")).collect();
2293 let parents = def.parent_types.join(", ");
2294 Some(vec![
2295 Value::from(name),
2296 Value::from(props.join(", ")),
2297 Value::from(constraints.join(", ")),
2298 Value::from(parents),
2299 ])
2300 })
2301 .collect();
2302 Ok(QueryResult {
2303 columns,
2304 column_types: Vec::new(),
2305 rows,
2306 ..QueryResult::empty()
2307 })
2308 }
2309
2310 fn execute_show_edge_types(&self) -> Result<QueryResult> {
2312 let columns = vec![
2313 "name".to_string(),
2314 "properties".to_string(),
2315 "source_types".to_string(),
2316 "target_types".to_string(),
2317 ];
2318 let schema = self.current_schema.lock().clone();
2319 let all_names = self.catalog.all_edge_type_names();
2320 let type_names: Vec<String> = match &schema {
2321 Some(s) => {
2322 let prefix = format!("{s}/");
2323 all_names
2324 .into_iter()
2325 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2326 .collect()
2327 }
2328 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2329 };
2330 let rows: Vec<Vec<Value>> = type_names
2331 .into_iter()
2332 .filter_map(|name| {
2333 let lookup = match &schema {
2334 Some(s) => format!("{s}/{name}"),
2335 None => name.clone(),
2336 };
2337 let def = self.catalog.get_edge_type_def(&lookup)?;
2338 let props: Vec<String> = def
2339 .properties
2340 .iter()
2341 .map(|p| {
2342 let nullable = if p.nullable { "" } else { " NOT NULL" };
2343 format!("{} {}{}", p.name, p.data_type, nullable)
2344 })
2345 .collect();
2346 let src = def.source_node_types.join(", ");
2347 let tgt = def.target_node_types.join(", ");
2348 Some(vec![
2349 Value::from(name),
2350 Value::from(props.join(", ")),
2351 Value::from(src),
2352 Value::from(tgt),
2353 ])
2354 })
2355 .collect();
2356 Ok(QueryResult {
2357 columns,
2358 column_types: Vec::new(),
2359 rows,
2360 ..QueryResult::empty()
2361 })
2362 }
2363
2364 fn execute_show_graph_types(&self) -> Result<QueryResult> {
2366 let columns = vec![
2367 "name".to_string(),
2368 "open".to_string(),
2369 "node_types".to_string(),
2370 "edge_types".to_string(),
2371 ];
2372 let schema = self.current_schema.lock().clone();
2373 let all_names = self.catalog.all_graph_type_names();
2374 let type_names: Vec<String> = match &schema {
2375 Some(s) => {
2376 let prefix = format!("{s}/");
2377 all_names
2378 .into_iter()
2379 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2380 .collect()
2381 }
2382 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2383 };
2384 let rows: Vec<Vec<Value>> = type_names
2385 .into_iter()
2386 .filter_map(|name| {
2387 let lookup = match &schema {
2388 Some(s) => format!("{s}/{name}"),
2389 None => name.clone(),
2390 };
2391 let def = self.catalog.get_graph_type_def(&lookup)?;
2392 let strip = |n: &String| -> String {
2394 match &schema {
2395 Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2396 None => n.clone(),
2397 }
2398 };
2399 let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2400 let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2401 Some(vec![
2402 Value::from(name),
2403 Value::from(def.open),
2404 Value::from(node_types.join(", ")),
2405 Value::from(edge_types.join(", ")),
2406 ])
2407 })
2408 .collect();
2409 Ok(QueryResult {
2410 columns,
2411 column_types: Vec::new(),
2412 rows,
2413 ..QueryResult::empty()
2414 })
2415 }
2416
2417 #[cfg(feature = "lpg")]
2423 fn execute_show_graphs(&self) -> Result<QueryResult> {
2424 let schema = self.current_schema.lock().clone();
2425 let all_names = self.store.graph_names();
2426
2427 let mut names: Vec<String> = match &schema {
2428 Some(s) => {
2429 let prefix = format!("{s}/");
2430 all_names
2431 .into_iter()
2432 .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2433 .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2434 .collect()
2435 }
2436 None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2437 };
2438 names.sort();
2439
2440 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2441 Ok(QueryResult {
2442 columns: vec!["name".to_string()],
2443 column_types: Vec::new(),
2444 rows,
2445 ..QueryResult::empty()
2446 })
2447 }
2448
2449 fn execute_show_schemas(&self) -> Result<QueryResult> {
2451 let mut names = self.catalog.schema_names();
2452 names.sort();
2453 let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2454 Ok(QueryResult {
2455 columns: vec!["name".to_string()],
2456 column_types: Vec::new(),
2457 rows,
2458 ..QueryResult::empty()
2459 })
2460 }
2461
2462 fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2464 use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2465
2466 let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2467 Error::Query(QueryError::new(
2468 QueryErrorKind::Semantic,
2469 format!("Graph type '{name}' not found"),
2470 ))
2471 })?;
2472
2473 let columns = vec![
2474 "name".to_string(),
2475 "open".to_string(),
2476 "node_types".to_string(),
2477 "edge_types".to_string(),
2478 ];
2479 let rows = vec![vec![
2480 Value::from(def.name),
2481 Value::from(def.open),
2482 Value::from(def.allowed_node_types.join(", ")),
2483 Value::from(def.allowed_edge_types.join(", ")),
2484 ]];
2485 Ok(QueryResult {
2486 columns,
2487 column_types: Vec::new(),
2488 rows,
2489 ..QueryResult::empty()
2490 })
2491 }
2492
2493 fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2495 let graph_name = self
2496 .current_graph()
2497 .unwrap_or_else(|| "default".to_string());
2498 let columns = vec![
2499 "graph".to_string(),
2500 "graph_type".to_string(),
2501 "open".to_string(),
2502 "node_types".to_string(),
2503 "edge_types".to_string(),
2504 ];
2505
2506 if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2507 && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2508 {
2509 let rows = vec![vec![
2510 Value::from(graph_name),
2511 Value::from(type_name),
2512 Value::from(def.open),
2513 Value::from(def.allowed_node_types.join(", ")),
2514 Value::from(def.allowed_edge_types.join(", ")),
2515 ]];
2516 return Ok(QueryResult {
2517 columns,
2518 column_types: Vec::new(),
2519 rows,
2520 ..QueryResult::empty()
2521 });
2522 }
2523
2524 Ok(QueryResult {
2526 columns,
2527 column_types: Vec::new(),
2528 rows: vec![vec![
2529 Value::from(graph_name),
2530 Value::Null,
2531 Value::Null,
2532 Value::Null,
2533 Value::Null,
2534 ]],
2535 ..QueryResult::empty()
2536 })
2537 }
2538
2539 #[cfg(feature = "gql")]
2566 pub fn execute(&self, query: &str) -> Result<QueryResult> {
2567 self.require_lpg("GQL")?;
2568
2569 #[cfg(feature = "testing-statement-injection")]
2570 grafeo_common::testing::statement_failure::maybe_fail_statement().map_err(|e| {
2571 grafeo_common::utils::error::Error::Internal(format!("injected failure: {e}"))
2572 })?;
2573
2574 use crate::query::{
2575 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2576 translators::gql,
2577 };
2578
2579 let _span = grafeo_info_span!(
2580 "grafeo::session::execute",
2581 language = "gql",
2582 query_len = query.len(),
2583 );
2584
2585 #[cfg(not(target_arch = "wasm32"))]
2586 let start_time = std::time::Instant::now();
2587
2588 let translation = gql::translate_full(query)?;
2590 let logical_plan = match translation {
2591 gql::GqlTranslationResult::SessionCommand(cmd) => {
2592 return self.execute_session_command(cmd);
2593 }
2594 #[cfg(feature = "lpg")]
2595 gql::GqlTranslationResult::SchemaCommand(cmd) => {
2596 self.require_permission(crate::auth::StatementKind::Admin)?;
2598 if *self.read_only_tx.lock() {
2599 return Err(grafeo_common::utils::error::Error::Transaction(
2600 grafeo_common::utils::error::TransactionError::ReadOnly,
2601 ));
2602 }
2603 return self.execute_schema_command(cmd);
2604 }
2605 gql::GqlTranslationResult::Plan(plan) => {
2606 let read_only = *self.read_only_tx.lock();
2611 let need_check = read_only || !self.identity.can_admin();
2612 let is_mutation = need_check && plan.root.has_mutations();
2613 if is_mutation {
2614 self.require_permission(crate::auth::StatementKind::Write)?;
2615 }
2616 if read_only && is_mutation {
2617 return Err(grafeo_common::utils::error::Error::Transaction(
2618 grafeo_common::utils::error::TransactionError::ReadOnly,
2619 ));
2620 }
2621 plan
2622 }
2623 #[cfg(not(feature = "lpg"))]
2624 gql::GqlTranslationResult::SchemaCommand(_) => {
2625 return Err(grafeo_common::utils::error::Error::Internal(
2626 "Schema commands require the `lpg` feature".to_string(),
2627 ));
2628 }
2629 };
2630
2631 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2633
2634 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2636 cached_plan
2637 } else {
2638 let mut binder = Binder::new();
2640 let _binding_context = binder.bind(&logical_plan)?;
2641
2642 let active = self.active_store();
2644 let optimizer = Optimizer::from_graph_store(&*active);
2645 let plan = optimizer.optimize(logical_plan)?;
2646
2647 self.query_cache.put_optimized(cache_key, plan.clone());
2649
2650 plan
2651 };
2652
2653 let active = self.active_store();
2655
2656 if optimized_plan.explain {
2658 use crate::query::processor::{annotate_pushdown_hints, explain_result};
2659 let mut plan = optimized_plan;
2660 annotate_pushdown_hints(&mut plan.root, active.as_ref());
2661 return Ok(explain_result(&plan));
2662 }
2663
2664 if optimized_plan.profile {
2666 let has_mutations = optimized_plan.root.has_mutations();
2667 return self.with_auto_commit(has_mutations, || {
2668 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2669 let planner = self.create_planner_for_store(
2670 Arc::clone(&active),
2671 viewing_epoch,
2672 transaction_id,
2673 );
2674 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2675
2676 let executor = self.make_executor(physical_plan.columns.clone());
2677 let _result = executor.execute(physical_plan.operator.as_mut())?;
2678
2679 let total_time_ms;
2680 #[cfg(not(target_arch = "wasm32"))]
2681 {
2682 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2683 }
2684 #[cfg(target_arch = "wasm32")]
2685 {
2686 total_time_ms = 0.0;
2687 }
2688
2689 let profile_tree = crate::query::profile::build_profile_tree(
2690 &optimized_plan.root,
2691 &mut entries.into_iter(),
2692 );
2693 Ok(crate::query::profile::profile_result(
2694 &profile_tree,
2695 total_time_ms,
2696 ))
2697 });
2698 }
2699
2700 let has_mutations = optimized_plan.root.has_mutations();
2701
2702 let result = self.with_auto_commit(has_mutations, || {
2703 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2705
2706 let has_active_tx = self.current_transaction.lock().is_some();
2711 let read_only = !has_mutations && !has_active_tx;
2712 let planner = self.create_planner_for_store_with_read_only(
2713 Arc::clone(&active),
2714 viewing_epoch,
2715 transaction_id,
2716 read_only,
2717 );
2718 let physical_plan = planner.plan(&optimized_plan)?;
2719
2720 let executor = self.make_executor(physical_plan.columns.clone());
2722 let (mut source, push_ops) = {
2723 #[cfg(feature = "spill")]
2724 {
2725 let memory_ctx = self.make_operator_memory_context();
2726 grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2727 physical_plan.into_operator(),
2728 memory_ctx,
2729 )
2730 }
2731 #[cfg(not(feature = "spill"))]
2732 {
2733 grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2734 physical_plan.into_operator(),
2735 )
2736 }
2737 };
2738 let mut result = if push_ops.is_empty() {
2739 executor.execute(source.as_mut())?
2741 } else {
2742 executor.execute_pipeline(source, push_ops)?
2744 };
2745
2746 let rows_scanned = result.rows.len() as u64;
2748 #[cfg(not(target_arch = "wasm32"))]
2749 {
2750 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2751 result.execution_time_ms = Some(elapsed_ms);
2752 }
2753 result.rows_scanned = Some(rows_scanned);
2754
2755 Ok(result)
2756 });
2757
2758 #[cfg(feature = "metrics")]
2760 {
2761 #[cfg(not(target_arch = "wasm32"))]
2762 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2763 #[cfg(target_arch = "wasm32")]
2764 let elapsed_ms = None;
2765 self.record_query_metrics("gql", elapsed_ms, &result);
2766 }
2767
2768 result
2769 }
2770
2771 #[cfg(all(feature = "gql", feature = "lpg"))]
2792 pub fn execute_streaming(
2793 &self,
2794 query: &str,
2795 ) -> Result<crate::query::executor::stream::ResultStream<'_>> {
2796 use crate::query::executor::stream::{ResultStream, StreamGuard};
2797
2798 let (source, columns, deadline) = self.build_streaming_plan(query)?;
2799 let guard = StreamGuard::new(&self.active_streams);
2800 Ok(ResultStream::new(source, columns, deadline, guard))
2801 }
2802
2803 #[cfg(all(feature = "gql", feature = "lpg"))]
2811 pub(crate) fn build_streaming_plan(
2812 &self,
2813 query: &str,
2814 ) -> Result<(
2815 Box<dyn grafeo_core::execution::operators::Operator>,
2816 Vec<String>,
2817 Option<Instant>,
2818 )> {
2819 use crate::query::{
2820 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2821 translators::gql,
2822 };
2823
2824 self.require_lpg("GQL")?;
2825
2826 let _span = grafeo_info_span!(
2827 "grafeo::session::execute_streaming",
2828 language = "gql",
2829 query_len = query.len(),
2830 );
2831
2832 let translation = gql::translate_full(query)?;
2834 let logical_plan = match translation {
2835 gql::GqlTranslationResult::SessionCommand(_) => {
2836 return Err(grafeo_common::utils::error::Error::Query(
2837 grafeo_common::utils::error::QueryError::new(
2838 grafeo_common::utils::error::QueryErrorKind::Semantic,
2839 "session commands cannot be streamed; use execute() instead",
2840 ),
2841 ));
2842 }
2843 gql::GqlTranslationResult::SchemaCommand(_) => {
2844 return Err(grafeo_common::utils::error::Error::Query(
2845 grafeo_common::utils::error::QueryError::new(
2846 grafeo_common::utils::error::QueryErrorKind::Semantic,
2847 "schema DDL cannot be streamed; use execute() instead",
2848 ),
2849 ));
2850 }
2851 gql::GqlTranslationResult::Plan(plan) => {
2852 if plan.root.has_mutations() {
2853 return Err(grafeo_common::utils::error::Error::Query(
2854 grafeo_common::utils::error::QueryError::new(
2855 grafeo_common::utils::error::QueryErrorKind::Semantic,
2856 "mutating queries cannot be streamed; use execute() instead",
2857 ),
2858 ));
2859 }
2860 if !self.identity.can_admin() {
2861 self.require_permission(crate::auth::StatementKind::Read)?;
2862 }
2863 plan
2864 }
2865 };
2866
2867 let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2869 let optimized_plan = if let Some(cached) = self.query_cache.get_optimized(&cache_key) {
2870 cached
2871 } else {
2872 let mut binder = Binder::new();
2873 let _binding_context = binder.bind(&logical_plan)?;
2874 let active = self.active_store();
2875 let optimizer = Optimizer::from_graph_store(&*active);
2876 let plan = optimizer.optimize(logical_plan)?;
2877 self.query_cache.put_optimized(cache_key, plan.clone());
2878 plan
2879 };
2880
2881 if optimized_plan.explain || optimized_plan.profile {
2882 return Err(grafeo_common::utils::error::Error::Query(
2883 grafeo_common::utils::error::QueryError::new(
2884 grafeo_common::utils::error::QueryErrorKind::Semantic,
2885 "EXPLAIN and PROFILE cannot be streamed; use execute() instead",
2886 ),
2887 ));
2888 }
2889
2890 let active = self.active_store();
2892 let has_active_tx = self.current_transaction.lock().is_some();
2893 let (viewing_epoch, transaction_id) = self.get_transaction_context();
2894 let planner = self.create_planner_for_store_with_read_only(
2895 Arc::clone(&active),
2896 viewing_epoch,
2897 transaction_id,
2898 !has_active_tx,
2899 );
2900 let physical_plan = planner.plan(&optimized_plan)?;
2901 let columns = physical_plan.columns.clone();
2902
2903 let (source, push_ops) = {
2907 #[cfg(feature = "spill")]
2908 {
2909 let memory_ctx = self.make_operator_memory_context();
2910 grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2911 physical_plan.into_operator(),
2912 memory_ctx,
2913 )
2914 }
2915 #[cfg(not(feature = "spill"))]
2916 {
2917 grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2918 physical_plan.into_operator(),
2919 )
2920 }
2921 };
2922 if !push_ops.is_empty() {
2923 return Err(grafeo_common::utils::error::Error::Query(
2924 grafeo_common::utils::error::QueryError::new(
2925 grafeo_common::utils::error::QueryErrorKind::Semantic,
2926 "query requires a push-based pipeline (ORDER BY / aggregate / DISTINCT) \
2927 which cannot be streamed; use execute() instead",
2928 ),
2929 ));
2930 }
2931
2932 Ok((source, columns, self.query_deadline()))
2933 }
2934
2935 #[cfg(feature = "gql")]
2944 pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2945 let previous = self.viewing_epoch_override.lock().replace(epoch);
2946 let result = self.execute(query);
2947 *self.viewing_epoch_override.lock() = previous;
2948 result
2949 }
2950
2951 #[cfg(feature = "gql")]
2959 pub fn execute_at_epoch_with_params(
2960 &self,
2961 query: &str,
2962 epoch: EpochId,
2963 params: Option<std::collections::HashMap<String, Value>>,
2964 ) -> Result<QueryResult> {
2965 let previous = self.viewing_epoch_override.lock().replace(epoch);
2966 let result = if let Some(p) = params {
2967 self.execute_with_params(query, p)
2968 } else {
2969 self.execute(query)
2970 };
2971 *self.viewing_epoch_override.lock() = previous;
2972 result
2973 }
2974
2975 #[cfg(feature = "gql")]
2981 pub fn execute_with_params(
2982 &self,
2983 query: &str,
2984 params: std::collections::HashMap<String, Value>,
2985 ) -> Result<QueryResult> {
2986 self.require_lpg("GQL")?;
2987
2988 use crate::query::processor::{QueryLanguage, QueryProcessor};
2989
2990 let has_mutations = if self.identity.can_write() {
2994 Self::query_looks_like_mutation(query)
2996 } else {
2997 use crate::query::translators::gql;
2999 match gql::translate(query) {
3000 Ok(plan) if plan.root.has_mutations() => {
3001 self.require_permission(crate::auth::StatementKind::Write)?;
3002 true
3003 }
3004 Ok(_) => false,
3005 Err(_) => Self::query_looks_like_mutation(query),
3007 }
3008 };
3009 let active = self.active_store();
3010
3011 self.with_auto_commit(has_mutations, || {
3012 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3014
3015 let processor = QueryProcessor::for_stores_with_transaction(
3017 Arc::clone(&active),
3018 self.active_write_store(),
3019 Arc::clone(&self.transaction_manager),
3020 )?;
3021
3022 let processor = if let Some(transaction_id) = transaction_id {
3024 processor.with_transaction_context(viewing_epoch, transaction_id)
3025 } else {
3026 processor
3027 };
3028
3029 processor.process(query, QueryLanguage::Gql, Some(¶ms))
3030 })
3031 }
3032
3033 #[cfg(not(any(feature = "gql", feature = "cypher")))]
3039 pub fn execute_with_params(
3040 &self,
3041 _query: &str,
3042 _params: std::collections::HashMap<String, Value>,
3043 ) -> Result<QueryResult> {
3044 Err(grafeo_common::utils::error::Error::Internal(
3045 "No query language enabled".to_string(),
3046 ))
3047 }
3048
3049 #[cfg(not(any(feature = "gql", feature = "cypher")))]
3055 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
3056 Err(grafeo_common::utils::error::Error::Internal(
3057 "No query language enabled".to_string(),
3058 ))
3059 }
3060
3061 #[cfg(feature = "cypher")]
3067 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
3068 use crate::query::{
3069 binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
3070 translators::cypher,
3071 };
3072
3073 let translation = cypher::translate_full(query)?;
3075 match translation {
3076 #[cfg(feature = "lpg")]
3077 cypher::CypherTranslationResult::SchemaCommand(cmd) => {
3078 use grafeo_common::utils::error::{
3079 Error as GrafeoError, QueryError, QueryErrorKind,
3080 };
3081 self.require_permission(crate::auth::StatementKind::Admin)?;
3082 if *self.read_only_tx.lock() {
3083 return Err(GrafeoError::Query(QueryError::new(
3084 QueryErrorKind::Semantic,
3085 "Cannot execute schema DDL in a read-only transaction",
3086 )));
3087 }
3088 return self.execute_schema_command(cmd);
3089 }
3090 #[cfg(not(feature = "lpg"))]
3091 cypher::CypherTranslationResult::SchemaCommand(_) => {
3092 return Err(grafeo_common::utils::error::Error::Internal(
3093 "Schema DDL requires the `lpg` feature".to_string(),
3094 ));
3095 }
3096 cypher::CypherTranslationResult::ShowIndexes => {
3097 return self.execute_show_indexes();
3098 }
3099 cypher::CypherTranslationResult::ShowConstraints => {
3100 return self.execute_show_constraints();
3101 }
3102 cypher::CypherTranslationResult::ShowCurrentGraphType => {
3103 return self.execute_show_current_graph_type();
3104 }
3105 cypher::CypherTranslationResult::Plan(_) => {
3106 }
3108 }
3109
3110 #[cfg(not(target_arch = "wasm32"))]
3111 let start_time = std::time::Instant::now();
3112
3113 let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
3115
3116 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3118 cached_plan
3119 } else {
3120 let logical_plan = cypher::translate(query)?;
3122
3123 let mut binder = Binder::new();
3125 let _binding_context = binder.bind(&logical_plan)?;
3126
3127 let active = self.active_store();
3129 let optimizer = Optimizer::from_graph_store(&*active);
3130 let plan = optimizer.optimize(logical_plan)?;
3131
3132 self.query_cache.put_optimized(cache_key, plan.clone());
3134
3135 plan
3136 };
3137
3138 if optimized_plan.root.has_mutations() {
3140 self.require_permission(crate::auth::StatementKind::Write)?;
3141 }
3142
3143 let active = self.active_store();
3145
3146 if optimized_plan.explain {
3148 use crate::query::processor::{annotate_pushdown_hints, explain_result};
3149 let mut plan = optimized_plan;
3150 annotate_pushdown_hints(&mut plan.root, active.as_ref());
3151 return Ok(explain_result(&plan));
3152 }
3153
3154 if optimized_plan.profile {
3156 let has_mutations = optimized_plan.root.has_mutations();
3157 return self.with_auto_commit(has_mutations, || {
3158 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3159 let planner = self.create_planner_for_store(
3160 Arc::clone(&active),
3161 viewing_epoch,
3162 transaction_id,
3163 );
3164 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
3165
3166 let executor = self.make_executor(physical_plan.columns.clone());
3167 let _result = executor.execute(physical_plan.operator.as_mut())?;
3168
3169 let total_time_ms;
3170 #[cfg(not(target_arch = "wasm32"))]
3171 {
3172 total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
3173 }
3174 #[cfg(target_arch = "wasm32")]
3175 {
3176 total_time_ms = 0.0;
3177 }
3178
3179 let profile_tree = crate::query::profile::build_profile_tree(
3180 &optimized_plan.root,
3181 &mut entries.into_iter(),
3182 );
3183 Ok(crate::query::profile::profile_result(
3184 &profile_tree,
3185 total_time_ms,
3186 ))
3187 });
3188 }
3189
3190 let has_mutations = optimized_plan.root.has_mutations();
3191
3192 let result = self.with_auto_commit(has_mutations, || {
3193 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3195
3196 let planner =
3198 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3199 let mut physical_plan = planner.plan(&optimized_plan)?;
3200
3201 let executor = self.make_executor(physical_plan.columns.clone());
3203 executor.execute(physical_plan.operator.as_mut())
3204 });
3205
3206 #[cfg(feature = "metrics")]
3207 {
3208 #[cfg(not(target_arch = "wasm32"))]
3209 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3210 #[cfg(target_arch = "wasm32")]
3211 let elapsed_ms = None;
3212 self.record_query_metrics("cypher", elapsed_ms, &result);
3213 }
3214
3215 result
3216 }
3217
3218 #[cfg(feature = "gremlin")]
3242 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
3243 use crate::query::{binder::Binder, optimizer::Optimizer, translators::gremlin};
3244
3245 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3246 let start_time = Instant::now();
3247
3248 let logical_plan = gremlin::translate(query)?;
3250
3251 let mut binder = Binder::new();
3253 let _binding_context = binder.bind(&logical_plan)?;
3254
3255 let active = self.active_store();
3257 let optimizer = Optimizer::from_graph_store(&*active);
3258 let optimized_plan = optimizer.optimize(logical_plan)?;
3259
3260 let has_mutations = optimized_plan.root.has_mutations();
3261 if has_mutations {
3262 self.require_permission(crate::auth::StatementKind::Write)?;
3263 }
3264
3265 let result = self.with_auto_commit(has_mutations, || {
3266 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3268
3269 let planner =
3271 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3272 let mut physical_plan = planner.plan(&optimized_plan)?;
3273
3274 let executor = self.make_executor(physical_plan.columns.clone());
3276 executor.execute(physical_plan.operator.as_mut())
3277 });
3278
3279 #[cfg(feature = "metrics")]
3280 {
3281 #[cfg(not(target_arch = "wasm32"))]
3282 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3283 #[cfg(target_arch = "wasm32")]
3284 let elapsed_ms = None;
3285 self.record_query_metrics("gremlin", elapsed_ms, &result);
3286 }
3287
3288 result
3289 }
3290
3291 #[cfg(feature = "gremlin")]
3297 pub fn execute_gremlin_with_params(
3298 &self,
3299 query: &str,
3300 params: std::collections::HashMap<String, Value>,
3301 ) -> Result<QueryResult> {
3302 use crate::query::{
3303 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3304 translators::gremlin,
3305 };
3306
3307 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3308 let start_time = Instant::now();
3309
3310 let mut logical_plan = gremlin::translate(query)?;
3312
3313 substitute_params(&mut logical_plan, ¶ms)?;
3315
3316 let mut binder = Binder::new();
3318 let _binding_context = binder.bind(&logical_plan)?;
3319
3320 let active = self.active_store();
3322 let optimizer = Optimizer::from_graph_store(&*active);
3323 let optimized_plan = optimizer.optimize(logical_plan)?;
3324
3325 let has_mutations = optimized_plan.root.has_mutations();
3326 if has_mutations {
3327 self.require_permission(crate::auth::StatementKind::Write)?;
3328 }
3329
3330 let result = self.with_auto_commit(has_mutations, || {
3331 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3332 let planner =
3333 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3334 let mut physical_plan = planner.plan(&optimized_plan)?;
3335 let executor = self.make_executor(physical_plan.columns.clone());
3336 executor.execute(physical_plan.operator.as_mut())
3337 });
3338
3339 #[cfg(feature = "metrics")]
3340 {
3341 #[cfg(not(target_arch = "wasm32"))]
3342 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3343 #[cfg(target_arch = "wasm32")]
3344 let elapsed_ms = None;
3345 self.record_query_metrics("gremlin", elapsed_ms, &result);
3346 }
3347
3348 result
3349 }
3350
3351 #[cfg(feature = "graphql")]
3375 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3376 use crate::query::{
3377 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3378 translators::graphql,
3379 };
3380
3381 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3382 let start_time = Instant::now();
3383
3384 let mut logical_plan = graphql::translate(query)?;
3385
3386 if !logical_plan.default_params.is_empty() {
3388 let defaults = logical_plan.default_params.clone();
3389 substitute_params(&mut logical_plan, &defaults)?;
3390 }
3391
3392 let mut binder = Binder::new();
3393 let _binding_context = binder.bind(&logical_plan)?;
3394
3395 let active = self.active_store();
3396 let optimizer = Optimizer::from_graph_store(&*active);
3397 let optimized_plan = optimizer.optimize(logical_plan)?;
3398 let has_mutations = optimized_plan.root.has_mutations();
3399 if has_mutations {
3400 self.require_permission(crate::auth::StatementKind::Write)?;
3401 }
3402
3403 let result = self.with_auto_commit(has_mutations, || {
3404 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3405 let planner =
3406 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3407 let mut physical_plan = planner.plan(&optimized_plan)?;
3408 let executor = self.make_executor(physical_plan.columns.clone());
3409 executor.execute(physical_plan.operator.as_mut())
3410 });
3411
3412 #[cfg(feature = "metrics")]
3413 {
3414 #[cfg(not(target_arch = "wasm32"))]
3415 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3416 #[cfg(target_arch = "wasm32")]
3417 let elapsed_ms = None;
3418 self.record_query_metrics("graphql", elapsed_ms, &result);
3419 }
3420
3421 result
3422 }
3423
3424 #[cfg(feature = "graphql")]
3430 pub fn execute_graphql_with_params(
3431 &self,
3432 query: &str,
3433 params: std::collections::HashMap<String, Value>,
3434 ) -> Result<QueryResult> {
3435 use crate::query::{
3436 binder::Binder, optimizer::Optimizer, processor::substitute_params,
3437 translators::graphql,
3438 };
3439
3440 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3441 let start_time = Instant::now();
3442
3443 let mut logical_plan = graphql::translate(query)?;
3445
3446 if !logical_plan.default_params.is_empty() {
3448 let mut merged = logical_plan.default_params.clone();
3449 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3450 substitute_params(&mut logical_plan, &merged)?;
3451 } else {
3452 substitute_params(&mut logical_plan, ¶ms)?;
3453 }
3454
3455 let mut binder = Binder::new();
3457 let _binding_context = binder.bind(&logical_plan)?;
3458
3459 let active = self.active_store();
3461 let optimizer = Optimizer::from_graph_store(&*active);
3462 let optimized_plan = optimizer.optimize(logical_plan)?;
3463
3464 let has_mutations = optimized_plan.root.has_mutations();
3465 if has_mutations {
3466 self.require_permission(crate::auth::StatementKind::Write)?;
3467 }
3468
3469 let result = self.with_auto_commit(has_mutations, || {
3470 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3471 let planner =
3472 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3473 let mut physical_plan = planner.plan(&optimized_plan)?;
3474 let executor = self.make_executor(physical_plan.columns.clone());
3475 executor.execute(physical_plan.operator.as_mut())
3476 });
3477
3478 #[cfg(feature = "metrics")]
3479 {
3480 #[cfg(not(target_arch = "wasm32"))]
3481 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3482 #[cfg(target_arch = "wasm32")]
3483 let elapsed_ms = None;
3484 self.record_query_metrics("graphql", elapsed_ms, &result);
3485 }
3486
3487 result
3488 }
3489
3490 #[cfg(feature = "sql-pgq")]
3515 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3516 use crate::query::{
3517 binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3518 processor::QueryLanguage, translators::sql_pgq,
3519 };
3520
3521 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3522 let start_time = Instant::now();
3523
3524 let logical_plan = sql_pgq::translate(query)?;
3526
3527 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3529 self.require_permission(crate::auth::StatementKind::Admin)?;
3530 return Ok(QueryResult {
3531 columns: vec!["status".into()],
3532 column_types: vec![grafeo_common::types::LogicalType::String],
3533 rows: vec![vec![Value::from(format!(
3534 "Property graph '{}' created ({} node tables, {} edge tables)",
3535 cpg.name,
3536 cpg.node_tables.len(),
3537 cpg.edge_tables.len()
3538 ))]],
3539 execution_time_ms: None,
3540 rows_scanned: None,
3541 status_message: None,
3542 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3543 });
3544 }
3545
3546 let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3547
3548 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3549 cached_plan
3550 } else {
3551 let mut binder = Binder::new();
3552 let _binding_context = binder.bind(&logical_plan)?;
3553 let active = self.active_store();
3554 let optimizer = Optimizer::from_graph_store(&*active);
3555 let plan = optimizer.optimize(logical_plan)?;
3556 self.query_cache.put_optimized(cache_key, plan.clone());
3557 plan
3558 };
3559
3560 let active = self.active_store();
3561 let has_mutations = optimized_plan.root.has_mutations();
3562 if has_mutations {
3563 self.require_permission(crate::auth::StatementKind::Write)?;
3564 }
3565
3566 let result = self.with_auto_commit(has_mutations, || {
3567 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3568 let planner =
3569 self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3570 let mut physical_plan = planner.plan(&optimized_plan)?;
3571 let executor = self.make_executor(physical_plan.columns.clone());
3572 executor.execute(physical_plan.operator.as_mut())
3573 });
3574
3575 #[cfg(feature = "metrics")]
3576 {
3577 #[cfg(not(target_arch = "wasm32"))]
3578 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3579 #[cfg(target_arch = "wasm32")]
3580 let elapsed_ms = None;
3581 self.record_query_metrics("sql", elapsed_ms, &result);
3582 }
3583
3584 result
3585 }
3586
3587 #[cfg(feature = "sql-pgq")]
3593 pub fn execute_sql_with_params(
3594 &self,
3595 query: &str,
3596 params: std::collections::HashMap<String, Value>,
3597 ) -> Result<QueryResult> {
3598 use crate::query::processor::{QueryLanguage, QueryProcessor};
3599
3600 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3601 let start_time = Instant::now();
3602
3603 let has_mutations = if self.identity.can_write() {
3604 Self::query_looks_like_mutation(query)
3605 } else {
3606 use crate::query::translators::sql_pgq;
3607 match sql_pgq::translate(query) {
3608 Ok(plan) if plan.root.has_mutations() => {
3609 self.require_permission(crate::auth::StatementKind::Write)?;
3610 true
3611 }
3612 Ok(_) => false,
3613 Err(_) => Self::query_looks_like_mutation(query),
3614 }
3615 };
3616 if has_mutations {
3617 self.require_permission(crate::auth::StatementKind::Write)?;
3618 }
3619 let active = self.active_store();
3620
3621 let result = self.with_auto_commit(has_mutations, || {
3622 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3623 let processor = QueryProcessor::for_stores_with_transaction(
3624 Arc::clone(&active),
3625 self.active_write_store(),
3626 Arc::clone(&self.transaction_manager),
3627 )?;
3628 let processor = if let Some(transaction_id) = transaction_id {
3629 processor.with_transaction_context(viewing_epoch, transaction_id)
3630 } else {
3631 processor
3632 };
3633 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
3634 });
3635
3636 #[cfg(feature = "metrics")]
3637 {
3638 #[cfg(not(target_arch = "wasm32"))]
3639 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3640 #[cfg(target_arch = "wasm32")]
3641 let elapsed_ms = None;
3642 self.record_query_metrics("sql", elapsed_ms, &result);
3643 }
3644
3645 result
3646 }
3647
3648 pub fn execute_language(
3657 &self,
3658 query: &str,
3659 language: &str,
3660 params: Option<std::collections::HashMap<String, Value>>,
3661 ) -> Result<QueryResult> {
3662 let _span = grafeo_info_span!(
3663 "grafeo::session::execute",
3664 language,
3665 query_len = query.len(),
3666 );
3667 match language {
3668 "gql" => {
3669 if let Some(p) = params {
3670 self.execute_with_params(query, p)
3671 } else {
3672 self.execute(query)
3673 }
3674 }
3675 #[cfg(feature = "cypher")]
3676 "cypher" => {
3677 if let Some(p) = params {
3678 use crate::query::processor::{QueryLanguage, QueryProcessor};
3679
3680 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3681 let start_time = Instant::now();
3682
3683 let has_mutations = if self.identity.can_write() {
3684 Self::query_looks_like_mutation(query)
3685 } else {
3686 use crate::query::translators::cypher;
3687 match cypher::translate(query) {
3688 Ok(plan) if plan.root.has_mutations() => {
3689 self.require_permission(crate::auth::StatementKind::Write)?;
3690 true
3691 }
3692 Ok(_) => false,
3693 Err(_) => Self::query_looks_like_mutation(query),
3694 }
3695 };
3696 let active = self.active_store();
3697 let result = self.with_auto_commit(has_mutations, || {
3698 let processor = QueryProcessor::for_stores_with_transaction(
3699 Arc::clone(&active),
3700 self.active_write_store(),
3701 Arc::clone(&self.transaction_manager),
3702 )?;
3703 let (viewing_epoch, transaction_id) = self.get_transaction_context();
3704 let processor = if let Some(transaction_id) = transaction_id {
3705 processor.with_transaction_context(viewing_epoch, transaction_id)
3706 } else {
3707 processor
3708 };
3709 processor.process(query, QueryLanguage::Cypher, Some(&p))
3710 });
3711
3712 #[cfg(feature = "metrics")]
3713 {
3714 #[cfg(not(target_arch = "wasm32"))]
3715 let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3716 #[cfg(target_arch = "wasm32")]
3717 let elapsed_ms = None;
3718 self.record_query_metrics("cypher", elapsed_ms, &result);
3719 }
3720
3721 result
3722 } else {
3723 self.execute_cypher(query)
3724 }
3725 }
3726 #[cfg(feature = "gremlin")]
3727 "gremlin" => {
3728 if let Some(p) = params {
3729 self.execute_gremlin_with_params(query, p)
3730 } else {
3731 self.execute_gremlin(query)
3732 }
3733 }
3734 #[cfg(feature = "graphql")]
3735 "graphql" => {
3736 if let Some(p) = params {
3737 self.execute_graphql_with_params(query, p)
3738 } else {
3739 self.execute_graphql(query)
3740 }
3741 }
3742 #[cfg(all(feature = "graphql", feature = "triple-store"))]
3743 "graphql-rdf" => {
3744 if let Some(p) = params {
3745 self.execute_graphql_rdf_with_params(query, p)
3746 } else {
3747 self.execute_graphql_rdf(query)
3748 }
3749 }
3750 #[cfg(feature = "sql-pgq")]
3751 "sql" | "sql-pgq" => {
3752 if let Some(p) = params {
3753 self.execute_sql_with_params(query, p)
3754 } else {
3755 self.execute_sql(query)
3756 }
3757 }
3758 #[cfg(all(feature = "sparql", feature = "triple-store"))]
3759 "sparql" => {
3760 if let Some(p) = params {
3761 self.execute_sparql_with_params(query, p)
3762 } else {
3763 self.execute_sparql(query)
3764 }
3765 }
3766 other => Err(grafeo_common::utils::error::Error::Query(
3767 grafeo_common::utils::error::QueryError::new(
3768 grafeo_common::utils::error::QueryErrorKind::Semantic,
3769 format!("Unknown query language: '{other}'"),
3770 ),
3771 )),
3772 }
3773 }
3774
3775 pub fn clear_plan_cache(&self) {
3802 self.query_cache.clear();
3803 }
3804
3805 #[cfg(feature = "lpg")]
3813 pub fn begin_transaction(&mut self) -> Result<()> {
3814 self.begin_transaction_inner(false, None)
3815 }
3816
3817 #[cfg(feature = "lpg")]
3825 pub fn begin_transaction_with_isolation(
3826 &mut self,
3827 isolation_level: crate::transaction::IsolationLevel,
3828 ) -> Result<()> {
3829 self.begin_transaction_inner(false, Some(isolation_level))
3830 }
3831
3832 #[cfg(feature = "lpg")]
3834 fn begin_transaction_inner(
3835 &self,
3836 read_only: bool,
3837 isolation_level: Option<crate::transaction::IsolationLevel>,
3838 ) -> Result<()> {
3839 let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3840 let mut current = self.current_transaction.lock();
3841 if current.is_some() {
3842 drop(current);
3844 let mut depth = self.transaction_nesting_depth.lock();
3845 *depth += 1;
3846 let sp_name = format!("_nested_tx_{}", *depth);
3847 self.savepoint(&sp_name)?;
3848 return Ok(());
3849 }
3850
3851 let active = self.active_lpg_store();
3852 self.transaction_start_node_count
3853 .store(active.node_count(), Ordering::Relaxed);
3854 self.transaction_start_edge_count
3855 .store(active.edge_count(), Ordering::Relaxed);
3856 let transaction_id = if let Some(level) = isolation_level {
3857 self.transaction_manager.begin_with_isolation(level)
3858 } else {
3859 self.transaction_manager.begin()
3860 };
3861 *current = Some(transaction_id);
3862 *self.read_only_tx.lock() = read_only || self.db_read_only;
3863
3864 let key = self.active_graph_storage_key();
3867 let mut touched = self.touched_graphs.lock();
3868 touched.clear();
3869 touched.push(key);
3870
3871 #[cfg(feature = "metrics")]
3872 {
3873 crate::metrics::record_metric!(self.metrics, tx_active, inc);
3874 #[cfg(not(target_arch = "wasm32"))]
3875 {
3876 *self.tx_start_time.lock() = Some(Instant::now());
3877 }
3878 }
3879
3880 Ok(())
3881 }
3882
3883 #[cfg(feature = "lpg")]
3891 pub fn commit(&mut self) -> Result<()> {
3892 self.commit_inner()
3893 }
3894
3895 #[cfg(feature = "lpg")]
3897 fn commit_inner(&self) -> Result<()> {
3898 let _span = grafeo_debug_span!("grafeo::tx::commit");
3899
3900 #[cfg(feature = "testing-statement-injection")]
3901 if let Err(e) = grafeo_common::testing::statement_failure::maybe_fail_commit() {
3902 let _ = self.rollback_inner();
3908 return Err(grafeo_common::utils::error::Error::Internal(format!(
3909 "injected commit failure: {e}"
3910 )));
3911 }
3912
3913 self.check_no_active_streams("commit")?;
3914 {
3916 let mut depth = self.transaction_nesting_depth.lock();
3917 if *depth > 0 {
3918 let sp_name = format!("_nested_tx_{depth}");
3919 *depth -= 1;
3920 drop(depth);
3921 return self.release_savepoint(&sp_name);
3922 }
3923 }
3924
3925 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3926 grafeo_common::utils::error::Error::Transaction(
3927 grafeo_common::utils::error::TransactionError::InvalidState(
3928 "No active transaction".to_string(),
3929 ),
3930 )
3931 })?;
3932
3933 let touched = std::mem::take(&mut *self.touched_graphs.lock());
3941 let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3942 Ok(epoch) => epoch,
3943 Err(e) => {
3944 for graph_name in &touched {
3946 let store = self.resolve_store(graph_name);
3947 store.rollback_transaction_properties(transaction_id);
3948 }
3949 #[cfg(feature = "triple-store")]
3950 self.rollback_rdf_transaction(transaction_id);
3951 #[cfg(feature = "cdc")]
3953 if let Some(ref pending) = self.cdc_pending_events {
3954 pending.lock().clear();
3955 }
3956 *self.read_only_tx.lock() = self.db_read_only;
3957 self.savepoints.lock().clear();
3958 self.touched_graphs.lock().clear();
3959 #[cfg(feature = "metrics")]
3960 {
3961 crate::metrics::record_metric!(self.metrics, tx_active, dec);
3962 crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3963 #[cfg(not(target_arch = "wasm32"))]
3964 if let Some(start) = self.tx_start_time.lock().take() {
3965 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3966 crate::metrics::record_metric!(
3967 self.metrics,
3968 tx_duration,
3969 observe duration_ms
3970 );
3971 }
3972 }
3973 return Err(e);
3974 }
3975 };
3976
3977 for graph_name in &touched {
3979 let store = self.resolve_store(graph_name);
3980 store.finalize_version_epochs(transaction_id, commit_epoch);
3981 }
3982
3983 #[cfg(feature = "triple-store")]
3985 self.commit_rdf_transaction(transaction_id);
3986
3987 for graph_name in &touched {
3988 let store = self.resolve_store(graph_name);
3989 store.commit_transaction_properties(transaction_id);
3990 }
3991
3992 #[cfg(feature = "cdc")]
3996 if let Some(ref pending) = self.cdc_pending_events {
3997 let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3998 self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3999 e.epoch = commit_epoch;
4000 e
4001 }));
4002 }
4003
4004 #[cfg(feature = "wal")]
4009 if let Some(ref wal) = self.wal {
4010 use grafeo_storage::wal::WalRecord;
4011 if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
4012 grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
4013 }
4014 if let Err(e) = wal.log(&WalRecord::EpochAdvance {
4015 epoch: commit_epoch,
4016 }) {
4017 grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
4018 }
4019 }
4020
4021 let current_epoch = self.transaction_manager.current_epoch();
4024 for graph_name in &touched {
4025 let store = self.resolve_store(graph_name);
4026 store.sync_epoch(current_epoch);
4027 }
4028
4029 *self.read_only_tx.lock() = self.db_read_only;
4032 self.savepoints.lock().clear();
4033
4034 if self.gc_interval > 0 {
4036 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
4037 if count.is_multiple_of(self.gc_interval) {
4038 #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
4039 let gc_start = std::time::Instant::now();
4040
4041 let min_epoch = self.transaction_manager.min_active_epoch();
4042 for graph_name in &touched {
4043 let store = self.resolve_store(graph_name);
4044 store.gc_versions(min_epoch);
4045 }
4046 self.transaction_manager.gc();
4047
4048 #[cfg(feature = "metrics")]
4049 {
4050 crate::metrics::record_metric!(self.metrics, gc_runs, inc);
4051 #[cfg(not(target_arch = "wasm32"))]
4052 {
4053 let gc_duration_ms = gc_start.elapsed().as_secs_f64() * 1000.0;
4054 crate::metrics::record_metric!(
4055 self.metrics,
4056 gc_duration,
4057 observe gc_duration_ms
4058 );
4059 }
4060 }
4061 }
4062 }
4063
4064 #[cfg(feature = "metrics")]
4065 {
4066 crate::metrics::record_metric!(self.metrics, tx_active, dec);
4067 crate::metrics::record_metric!(self.metrics, tx_committed, inc);
4068 #[cfg(not(target_arch = "wasm32"))]
4069 if let Some(start) = self.tx_start_time.lock().take() {
4070 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4071 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4072 }
4073 }
4074
4075 Ok(())
4076 }
4077
4078 #[cfg(feature = "lpg")]
4102 pub fn rollback(&mut self) -> Result<()> {
4103 self.rollback_inner()
4104 }
4105
4106 #[cfg(feature = "lpg")]
4108 fn rollback_inner(&self) -> Result<()> {
4109 let _span = grafeo_debug_span!("grafeo::tx::rollback");
4110 self.check_no_active_streams("rollback")?;
4111 {
4113 let mut depth = self.transaction_nesting_depth.lock();
4114 if *depth > 0 {
4115 let sp_name = format!("_nested_tx_{depth}");
4116 *depth -= 1;
4117 drop(depth);
4118 return self.rollback_to_savepoint(&sp_name);
4119 }
4120 }
4121
4122 let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
4123 grafeo_common::utils::error::Error::Transaction(
4124 grafeo_common::utils::error::TransactionError::InvalidState(
4125 "No active transaction".to_string(),
4126 ),
4127 )
4128 })?;
4129
4130 *self.read_only_tx.lock() = self.db_read_only;
4132
4133 let touched = self.touched_graphs.lock().clone();
4135 for graph_name in &touched {
4136 let store = self.resolve_store(graph_name);
4137 store.discard_uncommitted_versions(transaction_id);
4138 }
4139
4140 #[cfg(feature = "triple-store")]
4142 self.rollback_rdf_transaction(transaction_id);
4143
4144 #[cfg(feature = "cdc")]
4146 if let Some(ref pending) = self.cdc_pending_events {
4147 pending.lock().clear();
4148 }
4149
4150 self.savepoints.lock().clear();
4152 self.touched_graphs.lock().clear();
4153
4154 let result = self.transaction_manager.abort(transaction_id);
4156
4157 #[cfg(feature = "metrics")]
4158 if result.is_ok() {
4159 crate::metrics::record_metric!(self.metrics, tx_active, dec);
4160 crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
4161 #[cfg(not(target_arch = "wasm32"))]
4162 if let Some(start) = self.tx_start_time.lock().take() {
4163 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4164 crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4165 }
4166 }
4167
4168 result
4169 }
4170
4171 #[cfg(feature = "lpg")]
4181 pub fn savepoint(&self, name: &str) -> Result<()> {
4182 let tx_id = self.current_transaction.lock().ok_or_else(|| {
4183 grafeo_common::utils::error::Error::Transaction(
4184 grafeo_common::utils::error::TransactionError::InvalidState(
4185 "No active transaction".to_string(),
4186 ),
4187 )
4188 })?;
4189
4190 let touched = self.touched_graphs.lock().clone();
4192 let graph_snapshots: Vec<GraphSavepoint> = touched
4193 .iter()
4194 .map(|graph_name| {
4195 let store = self.resolve_store(graph_name);
4196 GraphSavepoint {
4197 graph_name: graph_name.clone(),
4198 next_node_id: store.peek_next_node_id(),
4199 next_edge_id: store.peek_next_edge_id(),
4200 undo_log_position: store.property_undo_log_position(tx_id),
4201 }
4202 })
4203 .collect();
4204
4205 self.savepoints.lock().push(SavepointState {
4206 name: name.to_string(),
4207 graph_snapshots,
4208 active_graph: self.current_graph.lock().clone(),
4209 #[cfg(feature = "cdc")]
4210 cdc_event_position: self
4211 .cdc_pending_events
4212 .as_ref()
4213 .map_or(0, |p| p.lock().len()),
4214 });
4215 Ok(())
4216 }
4217
4218 #[cfg(feature = "lpg")]
4227 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4228 let transaction_id = self.current_transaction.lock().ok_or_else(|| {
4229 grafeo_common::utils::error::Error::Transaction(
4230 grafeo_common::utils::error::TransactionError::InvalidState(
4231 "No active transaction".to_string(),
4232 ),
4233 )
4234 })?;
4235
4236 let mut savepoints = self.savepoints.lock();
4237
4238 let pos = savepoints
4240 .iter()
4241 .rposition(|sp| sp.name == name)
4242 .ok_or_else(|| {
4243 grafeo_common::utils::error::Error::Transaction(
4244 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4245 "Savepoint '{name}' not found"
4246 )),
4247 )
4248 })?;
4249
4250 let sp_state = savepoints[pos].clone();
4251
4252 savepoints.truncate(pos);
4254 drop(savepoints);
4255
4256 for gs in &sp_state.graph_snapshots {
4258 let store = self.resolve_store(&gs.graph_name);
4259
4260 store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
4262
4263 let current_next_node = store.peek_next_node_id();
4265 let current_next_edge = store.peek_next_edge_id();
4266
4267 let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
4268 .map(NodeId::new)
4269 .collect();
4270 let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
4271 .map(EdgeId::new)
4272 .collect();
4273
4274 if !node_ids.is_empty() || !edge_ids.is_empty() {
4275 store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
4276 }
4277 }
4278
4279 let touched = self.touched_graphs.lock().clone();
4283 for graph_name in &touched {
4284 let already_captured = sp_state
4285 .graph_snapshots
4286 .iter()
4287 .any(|gs| gs.graph_name == *graph_name);
4288 if !already_captured {
4289 let store = self.resolve_store(graph_name);
4290 store.discard_uncommitted_versions(transaction_id);
4291 }
4292 }
4293
4294 #[cfg(feature = "cdc")]
4296 if let Some(ref pending) = self.cdc_pending_events {
4297 pending.lock().truncate(sp_state.cdc_event_position);
4298 }
4299
4300 let mut touched = self.touched_graphs.lock();
4302 touched.clear();
4303 for gs in &sp_state.graph_snapshots {
4304 if !touched.contains(&gs.graph_name) {
4305 touched.push(gs.graph_name.clone());
4306 }
4307 }
4308
4309 Ok(())
4310 }
4311
4312 pub fn release_savepoint(&self, name: &str) -> Result<()> {
4318 let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4319 grafeo_common::utils::error::Error::Transaction(
4320 grafeo_common::utils::error::TransactionError::InvalidState(
4321 "No active transaction".to_string(),
4322 ),
4323 )
4324 })?;
4325
4326 let mut savepoints = self.savepoints.lock();
4327 let pos = savepoints
4328 .iter()
4329 .rposition(|sp| sp.name == name)
4330 .ok_or_else(|| {
4331 grafeo_common::utils::error::Error::Transaction(
4332 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4333 "Savepoint '{name}' not found"
4334 )),
4335 )
4336 })?;
4337 savepoints.remove(pos);
4338 Ok(())
4339 }
4340
4341 #[must_use]
4343 pub fn in_transaction(&self) -> bool {
4344 self.current_transaction.lock().is_some()
4345 }
4346
4347 #[must_use]
4349 pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4350 *self.current_transaction.lock()
4351 }
4352
4353 #[must_use]
4355 pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4356 &self.transaction_manager
4357 }
4358
4359 #[cfg(feature = "lpg")]
4361 #[must_use]
4362 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4363 (
4364 self.transaction_start_node_count.load(Ordering::Relaxed),
4365 self.active_lpg_store().node_count(),
4366 )
4367 }
4368
4369 #[cfg(feature = "lpg")]
4371 #[must_use]
4372 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4373 (
4374 self.transaction_start_edge_count.load(Ordering::Relaxed),
4375 self.active_lpg_store().edge_count(),
4376 )
4377 }
4378
4379 #[cfg(feature = "lpg")]
4413 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4414 crate::transaction::PreparedCommit::new(self)
4415 }
4416
4417 pub fn set_auto_commit(&mut self, auto_commit: bool) {
4419 self.auto_commit = auto_commit;
4420 }
4421
4422 #[must_use]
4424 pub fn auto_commit(&self) -> bool {
4425 self.auto_commit
4426 }
4427
4428 fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4433 self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4434 }
4435
4436 #[cfg(feature = "lpg")]
4439 fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4440 where
4441 F: FnOnce() -> Result<QueryResult>,
4442 {
4443 if self.needs_auto_commit(has_mutations) {
4444 self.begin_transaction_inner(false, None)?;
4445 match body() {
4446 Ok(result) => {
4447 self.commit_inner()?;
4448 Ok(result)
4449 }
4450 Err(e) => {
4451 let _ = self.rollback_inner();
4452 Err(e)
4453 }
4454 }
4455 } else {
4456 body()
4457 }
4458 }
4459
4460 #[cfg(not(feature = "lpg"))]
4462 fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4463 where
4464 F: FnOnce() -> Result<QueryResult>,
4465 {
4466 body()
4467 }
4468
4469 fn query_looks_like_mutation(query: &str) -> bool {
4475 let upper = query.to_ascii_uppercase();
4476 upper.contains("INSERT")
4477 || upper.contains("CREATE")
4478 || upper.contains("DELETE")
4479 || upper.contains("MERGE")
4480 || upper.contains("SET")
4481 || upper.contains("REMOVE")
4482 || upper.contains("DROP")
4483 || upper.contains("ALTER")
4484 }
4485
4486 #[cfg(feature = "lpg")]
4489 fn check_no_active_streams(&self, op: &str) -> Result<()> {
4490 if self.active_streams.load(Ordering::Acquire) > 0 {
4491 return Err(grafeo_common::utils::error::Error::Transaction(
4492 grafeo_common::utils::error::TransactionError::InvalidState(format!(
4493 "Cannot {op} while streaming results are active; drop the stream first"
4494 )),
4495 ));
4496 }
4497 Ok(())
4498 }
4499
4500 #[must_use]
4502 fn query_deadline(&self) -> Option<Instant> {
4503 #[cfg(not(target_arch = "wasm32"))]
4504 {
4505 self.query_timeout.map(|d| Instant::now() + d)
4506 }
4507 #[cfg(target_arch = "wasm32")]
4508 {
4509 let _ = &self.query_timeout;
4510 None
4511 }
4512 }
4513
4514 fn make_executor(&self, columns: Vec<String>) -> Executor {
4516 Executor::with_columns(columns)
4517 .with_deadline(self.query_deadline())
4518 .with_timeout_duration(self.query_timeout)
4519 }
4520
4521 #[cfg(feature = "spill")]
4526 fn make_operator_memory_context(
4527 &self,
4528 ) -> Option<grafeo_core::execution::OperatorMemoryContext> {
4529 let bm = self.buffer_manager.as_ref()?;
4530 let spill_path = bm.config().spill_path.as_ref()?;
4531 let query_id = self
4533 .commit_counter
4534 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4535 let query_dir = spill_path.join(format!("query_{query_id}"));
4536 let sm = std::sync::Arc::new(grafeo_core::execution::SpillManager::new(&query_dir).ok()?);
4537 Some(grafeo_core::execution::OperatorMemoryContext::new(
4538 std::sync::Arc::clone(bm),
4539 sm,
4540 ))
4541 }
4542
4543 fn check_property_size(&self, key: &str, value: &Value) -> Result<()> {
4545 if let Some(limit) = self.max_property_size {
4546 let size = value.estimated_size_bytes();
4547 if size > limit {
4548 let limit_display = if limit >= 1024 * 1024 && limit % (1024 * 1024) == 0 {
4549 format!("{} MiB", limit / (1024 * 1024))
4550 } else if limit >= 1024 && limit % 1024 == 0 {
4551 format!("{} KiB", limit / 1024)
4552 } else {
4553 format!("{limit} bytes")
4554 };
4555 return Err(grafeo_common::utils::error::Error::Query(
4556 grafeo_common::utils::error::QueryError::new(
4557 grafeo_common::utils::error::QueryErrorKind::Execution,
4558 format!(
4559 "Property '{key}' value exceeds maximum size of {limit_display} ({size} bytes)"
4560 ),
4561 )
4562 .with_hint(
4563 "Increase with Config::with_max_property_size() or disable with Config::without_max_property_size()".to_string(),
4564 ),
4565 ));
4566 }
4567 }
4568 Ok(())
4569 }
4570
4571 #[cfg(feature = "metrics")]
4577 fn record_query_metrics(
4578 &self,
4579 language: &str,
4580 elapsed_ms: Option<f64>,
4581 result: &Result<crate::database::QueryResult>,
4582 ) {
4583 use crate::metrics::record_metric;
4584
4585 record_metric!(self.metrics, query_count, inc);
4586 if let Some(ref reg) = self.metrics {
4587 reg.query_count_by_language.increment(language);
4588 }
4589 if let Some(ms) = elapsed_ms {
4590 record_metric!(self.metrics, query_latency, observe ms);
4591 }
4592 match result {
4593 Ok(r) => {
4594 let returned = r.rows.len() as u64;
4595 record_metric!(self.metrics, rows_returned, add returned);
4596 if let Some(scanned) = r.rows_scanned {
4597 record_metric!(self.metrics, rows_scanned, add scanned);
4598 }
4599 }
4600 Err(e) => {
4601 record_metric!(self.metrics, query_errors, inc);
4602 let msg = e.to_string();
4604 if msg.contains("exceeded timeout") {
4605 record_metric!(self.metrics, query_timeouts, inc);
4606 }
4607 }
4608 }
4609 }
4610
4611 #[cfg(feature = "gql")]
4613 fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4614 use grafeo_adapters::query::gql::ast::{Expression, Literal};
4615 match expr {
4616 Expression::Literal(Literal::Integer(n)) => Some(*n),
4617 _ => None,
4618 }
4619 }
4620
4621 #[must_use]
4627 fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4628 if let Some(epoch) = *self.viewing_epoch_override.lock() {
4630 return (epoch, None);
4631 }
4632
4633 if let Some(transaction_id) = *self.current_transaction.lock() {
4634 let epoch = self
4636 .transaction_manager
4637 .start_epoch(transaction_id)
4638 .unwrap_or_else(|| self.transaction_manager.current_epoch());
4639 (epoch, Some(transaction_id))
4640 } else {
4641 (self.transaction_manager.current_epoch(), None)
4643 }
4644 }
4645
4646 fn create_planner_for_store(
4651 &self,
4652 store: Arc<dyn GraphStoreSearch>,
4653 viewing_epoch: EpochId,
4654 transaction_id: Option<TransactionId>,
4655 ) -> crate::query::Planner {
4656 self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4657 }
4658
4659 fn create_planner_for_store_with_read_only(
4660 &self,
4661 store: Arc<dyn GraphStoreSearch>,
4662 viewing_epoch: EpochId,
4663 transaction_id: Option<TransactionId>,
4664 read_only: bool,
4665 ) -> crate::query::Planner {
4666 use crate::query::Planner;
4667 use grafeo_core::execution::operators::{LazyValue, SessionContext};
4668
4669 let info_store = Arc::clone(&store);
4671 let schema_store = Arc::clone(&store);
4672
4673 let session_context = SessionContext {
4674 current_schema: self.current_schema(),
4675 current_graph: self.current_graph(),
4676 db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4677 schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4678 };
4679
4680 let write_store = self.active_write_store();
4681
4682 let mut planner = Planner::with_context(
4683 Arc::clone(&store),
4684 write_store,
4685 Arc::clone(&self.transaction_manager),
4686 transaction_id,
4687 viewing_epoch,
4688 )
4689 .with_factorized_execution(self.factorized_execution)
4690 .with_catalog(Arc::clone(&self.catalog))
4691 .with_session_context(session_context)
4692 .with_read_only(read_only);
4693
4694 let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog))
4696 .with_store(store)
4697 .with_max_property_size(self.max_property_size);
4698 planner = planner.with_validator(Arc::new(validator));
4699
4700 planner
4701 }
4702
4703 fn build_info_value(store: &dyn GraphStore) -> Value {
4705 use grafeo_common::types::PropertyKey;
4706 use std::collections::BTreeMap;
4707
4708 let mut map = BTreeMap::new();
4709 map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4710 #[allow(clippy::cast_possible_wrap)]
4712 let node_count = store.node_count() as i64;
4713 #[allow(clippy::cast_possible_wrap)]
4715 let edge_count = store.edge_count() as i64;
4716 map.insert(PropertyKey::from("node_count"), Value::Int64(node_count));
4717 map.insert(PropertyKey::from("edge_count"), Value::Int64(edge_count));
4718 map.insert(
4719 PropertyKey::from("version"),
4720 Value::String(env!("CARGO_PKG_VERSION").into()),
4721 );
4722 Value::Map(map.into())
4723 }
4724
4725 fn build_schema_value(store: &dyn GraphStore) -> Value {
4727 use grafeo_common::types::PropertyKey;
4728 use std::collections::BTreeMap;
4729
4730 let labels: Vec<Value> = store
4731 .all_labels()
4732 .into_iter()
4733 .map(|l| Value::String(l.into()))
4734 .collect();
4735 let edge_types: Vec<Value> = store
4736 .all_edge_types()
4737 .into_iter()
4738 .map(|t| Value::String(t.into()))
4739 .collect();
4740 let property_keys: Vec<Value> = store
4741 .all_property_keys()
4742 .into_iter()
4743 .map(|k| Value::String(k.into()))
4744 .collect();
4745
4746 let mut map = BTreeMap::new();
4747 map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4748 map.insert(
4749 PropertyKey::from("edge_types"),
4750 Value::List(edge_types.into()),
4751 );
4752 map.insert(
4753 PropertyKey::from("property_keys"),
4754 Value::List(property_keys.into()),
4755 );
4756 Value::Map(map.into())
4757 }
4758
4759 #[cfg(feature = "lpg")]
4764 pub fn create_node(&self, labels: &[&str]) -> NodeId {
4765 let (epoch, transaction_id) = self.get_transaction_context();
4766 self.active_lpg_store().create_node_versioned(
4767 labels,
4768 epoch,
4769 transaction_id.unwrap_or(TransactionId::SYSTEM),
4770 )
4771 }
4772
4773 #[cfg(feature = "lpg")]
4781 pub fn create_node_with_props<'a>(
4782 &self,
4783 labels: &[&str],
4784 properties: impl IntoIterator<Item = (&'a str, Value)>,
4785 ) -> Result<NodeId> {
4786 let props: Vec<(&str, Value)> = properties.into_iter().collect();
4787 for (key, value) in &props {
4788 self.check_property_size(key, value)?;
4789 }
4790 let (epoch, transaction_id) = self.get_transaction_context();
4791 Ok(self.active_lpg_store().create_node_with_props_versioned(
4792 labels,
4793 props,
4794 epoch,
4795 transaction_id.unwrap_or(TransactionId::SYSTEM),
4796 ))
4797 }
4798
4799 #[cfg(feature = "lpg")]
4804 pub fn create_edge(
4805 &self,
4806 src: NodeId,
4807 dst: NodeId,
4808 edge_type: &str,
4809 ) -> grafeo_common::types::EdgeId {
4810 let (epoch, transaction_id) = self.get_transaction_context();
4811 self.active_lpg_store().create_edge_versioned(
4812 src,
4813 dst,
4814 edge_type,
4815 epoch,
4816 transaction_id.unwrap_or(TransactionId::SYSTEM),
4817 )
4818 }
4819
4820 #[cfg(feature = "lpg")]
4826 pub fn create_edge_with_props<'a>(
4827 &self,
4828 src: NodeId,
4829 dst: NodeId,
4830 edge_type: &str,
4831 properties: impl IntoIterator<Item = (&'a str, Value)>,
4832 ) -> Result<grafeo_common::types::EdgeId> {
4833 let props: Vec<(&str, Value)> = properties.into_iter().collect();
4834 for (key, value) in &props {
4835 self.check_property_size(key, value)?;
4836 }
4837 let (epoch, transaction_id) = self.get_transaction_context();
4838 let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4839 let store = self.active_lpg_store();
4840 let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4841 for (key, value) in props {
4842 store.set_edge_property_versioned(eid, key, value, tid);
4843 }
4844 Ok(eid)
4845 }
4846
4847 #[cfg(feature = "lpg")]
4853 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) -> Result<()> {
4854 self.check_property_size(key, &value)?;
4855 let (_, transaction_id) = self.get_transaction_context();
4856 if let Some(tid) = transaction_id {
4857 self.active_lpg_store()
4858 .set_node_property_versioned(id, key, value, tid);
4859 } else {
4860 self.active_lpg_store().set_node_property(id, key, value);
4861 }
4862 Ok(())
4863 }
4864
4865 #[cfg(feature = "lpg")]
4871 pub fn set_edge_property(
4872 &self,
4873 id: grafeo_common::types::EdgeId,
4874 key: &str,
4875 value: Value,
4876 ) -> Result<()> {
4877 self.check_property_size(key, &value)?;
4878 let (_, transaction_id) = self.get_transaction_context();
4879 if let Some(tid) = transaction_id {
4880 self.active_lpg_store()
4881 .set_edge_property_versioned(id, key, value, tid);
4882 } else {
4883 self.active_lpg_store().set_edge_property(id, key, value);
4884 }
4885 Ok(())
4886 }
4887
4888 #[cfg(feature = "lpg")]
4890 pub fn delete_node(&self, id: NodeId) -> bool {
4891 let (epoch, transaction_id) = self.get_transaction_context();
4892 if let Some(tid) = transaction_id {
4893 self.active_lpg_store()
4894 .delete_node_versioned(id, epoch, tid)
4895 } else {
4896 self.active_lpg_store().delete_node(id)
4897 }
4898 }
4899
4900 #[cfg(feature = "lpg")]
4902 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4903 let (epoch, transaction_id) = self.get_transaction_context();
4904 if let Some(tid) = transaction_id {
4905 self.active_lpg_store()
4906 .delete_edge_versioned(id, epoch, tid)
4907 } else {
4908 self.active_lpg_store().delete_edge(id)
4909 }
4910 }
4911
4912 #[cfg(feature = "lpg")]
4940 #[must_use]
4941 pub fn get_node(&self, id: NodeId) -> Option<Node> {
4942 let (epoch, transaction_id) = self.get_transaction_context();
4943 self.active_lpg_store().get_node_versioned(
4944 id,
4945 epoch,
4946 transaction_id.unwrap_or(TransactionId::SYSTEM),
4947 )
4948 }
4949
4950 #[cfg(feature = "lpg")]
4974 #[must_use]
4975 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4976 self.get_node(id)
4977 .and_then(|node| node.get_property(key).cloned())
4978 }
4979
4980 #[cfg(feature = "lpg")]
4987 #[must_use]
4988 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4989 let (epoch, transaction_id) = self.get_transaction_context();
4990 self.active_lpg_store().get_edge_versioned(
4991 id,
4992 epoch,
4993 transaction_id.unwrap_or(TransactionId::SYSTEM),
4994 )
4995 }
4996
4997 #[cfg(feature = "lpg")]
5023 #[must_use]
5024 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5025 self.active_lpg_store()
5026 .edges_from(node, Direction::Outgoing)
5027 .collect()
5028 }
5029
5030 #[cfg(feature = "lpg")]
5039 #[must_use]
5040 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5041 self.active_lpg_store()
5042 .edges_from(node, Direction::Incoming)
5043 .collect()
5044 }
5045
5046 #[cfg(feature = "lpg")]
5058 #[must_use]
5059 pub fn get_neighbors_outgoing_by_type(
5060 &self,
5061 node: NodeId,
5062 edge_type: &str,
5063 ) -> Vec<(NodeId, EdgeId)> {
5064 self.active_lpg_store()
5065 .edges_from(node, Direction::Outgoing)
5066 .filter(|(_, edge_id)| {
5067 self.get_edge(*edge_id)
5068 .is_some_and(|e| e.edge_type.as_str() == edge_type)
5069 })
5070 .collect()
5071 }
5072
5073 #[cfg(feature = "lpg")]
5080 #[must_use]
5081 pub fn node_exists(&self, id: NodeId) -> bool {
5082 self.get_node(id).is_some()
5083 }
5084
5085 #[cfg(feature = "lpg")]
5087 #[must_use]
5088 pub fn edge_exists(&self, id: EdgeId) -> bool {
5089 self.get_edge(id).is_some()
5090 }
5091
5092 #[cfg(feature = "lpg")]
5096 #[must_use]
5097 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
5098 let active = self.active_lpg_store();
5099 let out = active.out_degree(node);
5100 let in_degree = active.in_degree(node);
5101 (out, in_degree)
5102 }
5103
5104 #[cfg(feature = "lpg")]
5114 #[must_use]
5115 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
5116 let (epoch, transaction_id) = self.get_transaction_context();
5117 let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
5118 let active = self.active_lpg_store();
5119 ids.iter()
5120 .map(|&id| active.get_node_versioned(id, epoch, tx))
5121 .collect()
5122 }
5123
5124 #[cfg(feature = "cdc")]
5132 pub fn history(
5133 &self,
5134 entity_id: impl Into<crate::cdc::EntityId>,
5135 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5136 self.require_permission(crate::auth::StatementKind::Read)?;
5137 Ok(self.cdc_log.history(entity_id.into()))
5138 }
5139
5140 #[cfg(feature = "cdc")]
5146 pub fn history_since(
5147 &self,
5148 entity_id: impl Into<crate::cdc::EntityId>,
5149 since_epoch: EpochId,
5150 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5151 self.require_permission(crate::auth::StatementKind::Read)?;
5152 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
5153 }
5154
5155 #[cfg(feature = "cdc")]
5161 pub fn changes_between(
5162 &self,
5163 start_epoch: EpochId,
5164 end_epoch: EpochId,
5165 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5166 self.require_permission(crate::auth::StatementKind::Read)?;
5167 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
5168 }
5169}
5170
5171impl Drop for Session {
5172 fn drop(&mut self) {
5173 #[cfg(feature = "lpg")]
5176 if self.in_transaction() {
5177 let _ = self.rollback_inner();
5178 }
5179
5180 #[cfg(feature = "metrics")]
5181 if let Some(ref reg) = self.metrics {
5182 reg.session_active
5183 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
5184 }
5185 }
5186}
5187
5188#[cfg(test)]
5189mod tests {
5190 use super::parse_default_literal;
5191 use crate::database::GrafeoDB;
5192 use grafeo_common::types::Value;
5193
5194 #[test]
5199 fn parse_default_literal_null() {
5200 assert_eq!(parse_default_literal("null"), Value::Null);
5201 assert_eq!(parse_default_literal("NULL"), Value::Null);
5202 assert_eq!(parse_default_literal("Null"), Value::Null);
5203 }
5204
5205 #[test]
5206 fn parse_default_literal_bool() {
5207 assert_eq!(parse_default_literal("true"), Value::Bool(true));
5208 assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
5209 assert_eq!(parse_default_literal("false"), Value::Bool(false));
5210 assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
5211 }
5212
5213 #[test]
5214 fn parse_default_literal_string_single_quoted() {
5215 assert_eq!(
5216 parse_default_literal("'hello'"),
5217 Value::String("hello".into())
5218 );
5219 }
5220
5221 #[test]
5222 fn parse_default_literal_string_double_quoted() {
5223 assert_eq!(
5224 parse_default_literal("\"world\""),
5225 Value::String("world".into())
5226 );
5227 }
5228
5229 #[test]
5230 fn parse_default_literal_integer() {
5231 assert_eq!(parse_default_literal("42"), Value::Int64(42));
5232 assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
5233 assert_eq!(parse_default_literal("0"), Value::Int64(0));
5234 }
5235
5236 #[test]
5237 fn parse_default_literal_float() {
5238 assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
5239 assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
5240 }
5241
5242 #[test]
5243 fn parse_default_literal_fallback_string() {
5244 assert_eq!(
5246 parse_default_literal("some_identifier"),
5247 Value::String("some_identifier".into())
5248 );
5249 }
5250
5251 #[test]
5252 fn test_session_create_node() {
5253 let db = GrafeoDB::new_in_memory();
5254 let session = db.session();
5255
5256 let id = session.create_node(&["Person"]);
5257 assert!(id.is_valid());
5258 assert_eq!(db.node_count(), 1);
5259 }
5260
5261 #[test]
5262 fn test_session_transaction() {
5263 let db = GrafeoDB::new_in_memory();
5264 let mut session = db.session();
5265
5266 assert!(!session.in_transaction());
5267
5268 session.begin_transaction().unwrap();
5269 assert!(session.in_transaction());
5270
5271 session.commit().unwrap();
5272 assert!(!session.in_transaction());
5273 }
5274
5275 #[test]
5276 fn test_session_transaction_context() {
5277 let db = GrafeoDB::new_in_memory();
5278 let mut session = db.session();
5279
5280 let (_epoch1, transaction_id1) = session.get_transaction_context();
5282 assert!(transaction_id1.is_none());
5283
5284 session.begin_transaction().unwrap();
5286 let (epoch2, transaction_id2) = session.get_transaction_context();
5287 assert!(transaction_id2.is_some());
5288 let _ = epoch2; session.commit().unwrap();
5293 let (epoch3, tx_id3) = session.get_transaction_context();
5294 assert!(tx_id3.is_none());
5295 assert!(epoch3.as_u64() >= epoch2.as_u64());
5297 }
5298
5299 #[test]
5300 fn test_session_rollback() {
5301 let db = GrafeoDB::new_in_memory();
5302 let mut session = db.session();
5303
5304 session.begin_transaction().unwrap();
5305 session.rollback().unwrap();
5306 assert!(!session.in_transaction());
5307 }
5308
5309 #[test]
5310 fn test_session_rollback_discards_versions() {
5311 use grafeo_common::types::TransactionId;
5312
5313 let db = GrafeoDB::new_in_memory();
5314
5315 let node_before = db.store().create_node(&["Person"]);
5317 assert!(node_before.is_valid());
5318 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5319
5320 let mut session = db.session();
5322 session.begin_transaction().unwrap();
5323 let transaction_id = session.current_transaction.lock().unwrap();
5324
5325 let epoch = db.store().current_epoch();
5327 let node_in_tx = db
5328 .store()
5329 .create_node_versioned(&["Person"], epoch, transaction_id);
5330 assert!(node_in_tx.is_valid());
5331
5332 assert_eq!(
5336 db.node_count(),
5337 1,
5338 "PENDING nodes should be invisible to non-versioned node_count()"
5339 );
5340 assert!(
5341 db.store()
5342 .get_node_versioned(node_in_tx, epoch, transaction_id)
5343 .is_some(),
5344 "Transaction node should be visible to its own transaction"
5345 );
5346
5347 session.rollback().unwrap();
5349 assert!(!session.in_transaction());
5350
5351 let count_after = db.node_count();
5354 assert_eq!(
5355 count_after, 1,
5356 "Rollback should discard uncommitted node, but got {count_after}"
5357 );
5358
5359 let current_epoch = db.store().current_epoch();
5361 assert!(
5362 db.store()
5363 .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
5364 .is_some(),
5365 "Original node should still exist"
5366 );
5367
5368 assert!(
5370 db.store()
5371 .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
5372 .is_none(),
5373 "Transaction node should be gone"
5374 );
5375 }
5376
5377 #[test]
5378 fn test_session_create_node_in_transaction() {
5379 let db = GrafeoDB::new_in_memory();
5381
5382 let node_before = db.create_node(&["Person"]);
5384 assert!(node_before.is_valid());
5385 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5386
5387 let mut session = db.session();
5389 session.begin_transaction().unwrap();
5390 let transaction_id = session.current_transaction.lock().unwrap();
5391
5392 let node_in_tx = session.create_node(&["Person"]);
5394 assert!(node_in_tx.is_valid());
5395
5396 assert_eq!(
5399 db.node_count(),
5400 1,
5401 "PENDING nodes should be invisible to non-versioned node_count()"
5402 );
5403 let epoch = db.store().current_epoch();
5404 assert!(
5405 db.store()
5406 .get_node_versioned(node_in_tx, epoch, transaction_id)
5407 .is_some(),
5408 "Transaction node should be visible to its own transaction"
5409 );
5410
5411 session.rollback().unwrap();
5413
5414 let count_after = db.node_count();
5416 assert_eq!(
5417 count_after, 1,
5418 "Rollback should discard node created via session.create_node(), but got {count_after}"
5419 );
5420 }
5421
5422 #[test]
5423 fn test_session_create_node_with_props_in_transaction() {
5424 use grafeo_common::types::Value;
5425
5426 let db = GrafeoDB::new_in_memory();
5428
5429 db.create_node(&["Person"]);
5431 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5432
5433 let mut session = db.session();
5435 session.begin_transaction().unwrap();
5436 let transaction_id = session.current_transaction.lock().unwrap();
5437
5438 let node_in_tx = session
5439 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5440 .unwrap();
5441 assert!(node_in_tx.is_valid());
5442
5443 assert_eq!(
5446 db.node_count(),
5447 1,
5448 "PENDING nodes should be invisible to non-versioned node_count()"
5449 );
5450 let epoch = db.store().current_epoch();
5451 assert!(
5452 db.store()
5453 .get_node_versioned(node_in_tx, epoch, transaction_id)
5454 .is_some(),
5455 "Transaction node should be visible to its own transaction"
5456 );
5457
5458 session.rollback().unwrap();
5460
5461 let count_after = db.node_count();
5463 assert_eq!(
5464 count_after, 1,
5465 "Rollback should discard node created via session.create_node_with_props()"
5466 );
5467 }
5468
5469 #[cfg(feature = "gql")]
5470 mod gql_tests {
5471 use super::*;
5472
5473 #[test]
5474 fn test_gql_query_execution() {
5475 let db = GrafeoDB::new_in_memory();
5476 let session = db.session();
5477
5478 session.create_node(&["Person"]);
5480 session.create_node(&["Person"]);
5481 session.create_node(&["Animal"]);
5482
5483 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5485
5486 assert_eq!(result.row_count(), 2);
5488 assert_eq!(result.column_count(), 1);
5489 assert_eq!(result.columns[0], "n");
5490 }
5491
5492 #[test]
5493 fn test_gql_empty_result() {
5494 let db = GrafeoDB::new_in_memory();
5495 let session = db.session();
5496
5497 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5499
5500 assert_eq!(result.row_count(), 0);
5501 }
5502
5503 #[test]
5504 fn test_gql_parse_error() {
5505 let db = GrafeoDB::new_in_memory();
5506 let session = db.session();
5507
5508 let result = session.execute("MATCH (n RETURN n");
5510
5511 assert!(result.is_err());
5512 }
5513
5514 #[test]
5515 fn test_gql_relationship_traversal() {
5516 let db = GrafeoDB::new_in_memory();
5517 let session = db.session();
5518
5519 let alix = session.create_node(&["Person"]);
5521 let gus = session.create_node(&["Person"]);
5522 let vincent = session.create_node(&["Person"]);
5523
5524 session.create_edge(alix, gus, "KNOWS");
5525 session.create_edge(alix, vincent, "KNOWS");
5526
5527 let result = session
5529 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5530 .unwrap();
5531
5532 assert_eq!(result.row_count(), 2);
5534 assert_eq!(result.column_count(), 2);
5535 assert_eq!(result.columns[0], "a");
5536 assert_eq!(result.columns[1], "b");
5537 }
5538
5539 #[test]
5540 fn test_gql_relationship_with_type_filter() {
5541 let db = GrafeoDB::new_in_memory();
5542 let session = db.session();
5543
5544 let alix = session.create_node(&["Person"]);
5546 let gus = session.create_node(&["Person"]);
5547 let vincent = session.create_node(&["Person"]);
5548
5549 session.create_edge(alix, gus, "KNOWS");
5550 session.create_edge(alix, vincent, "WORKS_WITH");
5551
5552 let result = session
5554 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5555 .unwrap();
5556
5557 assert_eq!(result.row_count(), 1);
5559 }
5560
5561 #[test]
5562 fn test_gql_semantic_error_undefined_variable() {
5563 let db = GrafeoDB::new_in_memory();
5564 let session = db.session();
5565
5566 let result = session.execute("MATCH (n:Person) RETURN x");
5568
5569 assert!(result.is_err());
5571 let Err(err) = result else {
5572 panic!("Expected error")
5573 };
5574 assert!(
5575 err.to_string().contains("Undefined variable"),
5576 "Expected undefined variable error, got: {}",
5577 err
5578 );
5579 }
5580
5581 #[test]
5582 fn test_gql_where_clause_property_filter() {
5583 use grafeo_common::types::Value;
5584
5585 let db = GrafeoDB::new_in_memory();
5586 let session = db.session();
5587
5588 session
5590 .create_node_with_props(&["Person"], [("age", Value::Int64(25))])
5591 .unwrap();
5592 session
5593 .create_node_with_props(&["Person"], [("age", Value::Int64(35))])
5594 .unwrap();
5595 session
5596 .create_node_with_props(&["Person"], [("age", Value::Int64(45))])
5597 .unwrap();
5598
5599 let result = session
5601 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5602 .unwrap();
5603
5604 assert_eq!(result.row_count(), 2);
5606 }
5607
5608 #[test]
5609 fn test_gql_where_clause_equality() {
5610 use grafeo_common::types::Value;
5611
5612 let db = GrafeoDB::new_in_memory();
5613 let session = db.session();
5614
5615 session
5617 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5618 .unwrap();
5619 session
5620 .create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))])
5621 .unwrap();
5622 session
5623 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5624 .unwrap();
5625
5626 let result = session
5628 .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5629 .unwrap();
5630
5631 assert_eq!(result.row_count(), 2);
5633 }
5634
5635 #[test]
5636 fn test_gql_return_property_access() {
5637 use grafeo_common::types::Value;
5638
5639 let db = GrafeoDB::new_in_memory();
5640 let session = db.session();
5641
5642 session
5644 .create_node_with_props(
5645 &["Person"],
5646 [
5647 ("name", Value::String("Alix".into())),
5648 ("age", Value::Int64(30)),
5649 ],
5650 )
5651 .unwrap();
5652 session
5653 .create_node_with_props(
5654 &["Person"],
5655 [
5656 ("name", Value::String("Gus".into())),
5657 ("age", Value::Int64(25)),
5658 ],
5659 )
5660 .unwrap();
5661
5662 let result = session
5664 .execute("MATCH (n:Person) RETURN n.name, n.age")
5665 .unwrap();
5666
5667 assert_eq!(result.row_count(), 2);
5669 assert_eq!(result.column_count(), 2);
5670 assert_eq!(result.columns[0], "n.name");
5671 assert_eq!(result.columns[1], "n.age");
5672
5673 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5675 assert!(names.contains(&&Value::String("Alix".into())));
5676 assert!(names.contains(&&Value::String("Gus".into())));
5677 }
5678
5679 #[test]
5680 fn test_gql_return_mixed_expressions() {
5681 use grafeo_common::types::Value;
5682
5683 let db = GrafeoDB::new_in_memory();
5684 let session = db.session();
5685
5686 session
5688 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5689 .unwrap();
5690
5691 let result = session
5693 .execute("MATCH (n:Person) RETURN n, n.name")
5694 .unwrap();
5695
5696 assert_eq!(result.row_count(), 1);
5697 assert_eq!(result.column_count(), 2);
5698 assert_eq!(result.columns[0], "n");
5699 assert_eq!(result.columns[1], "n.name");
5700
5701 assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5703 }
5704 }
5705
5706 #[cfg(feature = "cypher")]
5707 mod cypher_tests {
5708 use super::*;
5709
5710 #[test]
5711 fn test_cypher_query_execution() {
5712 let db = GrafeoDB::new_in_memory();
5713 let session = db.session();
5714
5715 session.create_node(&["Person"]);
5717 session.create_node(&["Person"]);
5718 session.create_node(&["Animal"]);
5719
5720 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5722
5723 assert_eq!(result.row_count(), 2);
5725 assert_eq!(result.column_count(), 1);
5726 assert_eq!(result.columns[0], "n");
5727 }
5728
5729 #[test]
5730 fn test_cypher_empty_result() {
5731 let db = GrafeoDB::new_in_memory();
5732 let session = db.session();
5733
5734 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5736
5737 assert_eq!(result.row_count(), 0);
5738 }
5739
5740 #[test]
5741 fn test_cypher_parse_error() {
5742 let db = GrafeoDB::new_in_memory();
5743 let session = db.session();
5744
5745 let result = session.execute_cypher("MATCH (n RETURN n");
5747
5748 assert!(result.is_err());
5749 }
5750 }
5751
5752 mod direct_lookup_tests {
5755 use super::*;
5756 use grafeo_common::types::Value;
5757
5758 #[test]
5759 fn test_get_node() {
5760 let db = GrafeoDB::new_in_memory();
5761 let session = db.session();
5762
5763 let id = session.create_node(&["Person"]);
5764 let node = session.get_node(id);
5765
5766 assert!(node.is_some());
5767 let node = node.unwrap();
5768 assert_eq!(node.id, id);
5769 }
5770
5771 #[test]
5772 fn test_get_node_not_found() {
5773 use grafeo_common::types::NodeId;
5774
5775 let db = GrafeoDB::new_in_memory();
5776 let session = db.session();
5777
5778 let node = session.get_node(NodeId::new(9999));
5780 assert!(node.is_none());
5781 }
5782
5783 #[test]
5784 fn test_get_node_property() {
5785 let db = GrafeoDB::new_in_memory();
5786 let session = db.session();
5787
5788 let id = session
5789 .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5790 .unwrap();
5791
5792 let name = session.get_node_property(id, "name");
5793 assert_eq!(name, Some(Value::String("Alix".into())));
5794
5795 let missing = session.get_node_property(id, "missing");
5797 assert!(missing.is_none());
5798 }
5799
5800 #[test]
5801 fn test_get_edge() {
5802 let db = GrafeoDB::new_in_memory();
5803 let session = db.session();
5804
5805 let alix = session.create_node(&["Person"]);
5806 let gus = session.create_node(&["Person"]);
5807 let edge_id = session.create_edge(alix, gus, "KNOWS");
5808
5809 let edge = session.get_edge(edge_id);
5810 assert!(edge.is_some());
5811 let edge = edge.unwrap();
5812 assert_eq!(edge.id, edge_id);
5813 assert_eq!(edge.src, alix);
5814 assert_eq!(edge.dst, gus);
5815 }
5816
5817 #[test]
5818 fn test_get_edge_not_found() {
5819 use grafeo_common::types::EdgeId;
5820
5821 let db = GrafeoDB::new_in_memory();
5822 let session = db.session();
5823
5824 let edge = session.get_edge(EdgeId::new(9999));
5825 assert!(edge.is_none());
5826 }
5827
5828 #[test]
5829 fn test_get_neighbors_outgoing() {
5830 let db = GrafeoDB::new_in_memory();
5831 let session = db.session();
5832
5833 let alix = session.create_node(&["Person"]);
5834 let gus = session.create_node(&["Person"]);
5835 let harm = session.create_node(&["Person"]);
5836
5837 session.create_edge(alix, gus, "KNOWS");
5838 session.create_edge(alix, harm, "KNOWS");
5839
5840 let neighbors = session.get_neighbors_outgoing(alix);
5841 assert_eq!(neighbors.len(), 2);
5842
5843 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5844 assert!(neighbor_ids.contains(&gus));
5845 assert!(neighbor_ids.contains(&harm));
5846 }
5847
5848 #[test]
5849 fn test_get_neighbors_incoming() {
5850 let db = GrafeoDB::new_in_memory();
5851 let session = db.session();
5852
5853 let alix = session.create_node(&["Person"]);
5854 let gus = session.create_node(&["Person"]);
5855 let harm = session.create_node(&["Person"]);
5856
5857 session.create_edge(gus, alix, "KNOWS");
5858 session.create_edge(harm, alix, "KNOWS");
5859
5860 let neighbors = session.get_neighbors_incoming(alix);
5861 assert_eq!(neighbors.len(), 2);
5862
5863 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5864 assert!(neighbor_ids.contains(&gus));
5865 assert!(neighbor_ids.contains(&harm));
5866 }
5867
5868 #[test]
5869 fn test_get_neighbors_outgoing_by_type() {
5870 let db = GrafeoDB::new_in_memory();
5871 let session = db.session();
5872
5873 let alix = session.create_node(&["Person"]);
5874 let gus = session.create_node(&["Person"]);
5875 let company = session.create_node(&["Company"]);
5876
5877 session.create_edge(alix, gus, "KNOWS");
5878 session.create_edge(alix, company, "WORKS_AT");
5879
5880 let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5881 assert_eq!(knows_neighbors.len(), 1);
5882 assert_eq!(knows_neighbors[0].0, gus);
5883
5884 let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5885 assert_eq!(works_neighbors.len(), 1);
5886 assert_eq!(works_neighbors[0].0, company);
5887
5888 let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5890 assert!(no_neighbors.is_empty());
5891 }
5892
5893 #[test]
5894 fn test_node_exists() {
5895 use grafeo_common::types::NodeId;
5896
5897 let db = GrafeoDB::new_in_memory();
5898 let session = db.session();
5899
5900 let id = session.create_node(&["Person"]);
5901
5902 assert!(session.node_exists(id));
5903 assert!(!session.node_exists(NodeId::new(9999)));
5904 }
5905
5906 #[test]
5907 fn test_edge_exists() {
5908 use grafeo_common::types::EdgeId;
5909
5910 let db = GrafeoDB::new_in_memory();
5911 let session = db.session();
5912
5913 let alix = session.create_node(&["Person"]);
5914 let gus = session.create_node(&["Person"]);
5915 let edge_id = session.create_edge(alix, gus, "KNOWS");
5916
5917 assert!(session.edge_exists(edge_id));
5918 assert!(!session.edge_exists(EdgeId::new(9999)));
5919 }
5920
5921 #[test]
5922 fn test_get_degree() {
5923 let db = GrafeoDB::new_in_memory();
5924 let session = db.session();
5925
5926 let alix = session.create_node(&["Person"]);
5927 let gus = session.create_node(&["Person"]);
5928 let harm = session.create_node(&["Person"]);
5929
5930 session.create_edge(alix, gus, "KNOWS");
5932 session.create_edge(alix, harm, "KNOWS");
5933 session.create_edge(gus, alix, "KNOWS");
5935
5936 let (out_degree, in_degree) = session.get_degree(alix);
5937 assert_eq!(out_degree, 2);
5938 assert_eq!(in_degree, 1);
5939
5940 let lonely = session.create_node(&["Person"]);
5942 let (out, in_deg) = session.get_degree(lonely);
5943 assert_eq!(out, 0);
5944 assert_eq!(in_deg, 0);
5945 }
5946
5947 #[test]
5948 fn test_get_nodes_batch() {
5949 let db = GrafeoDB::new_in_memory();
5950 let session = db.session();
5951
5952 let alix = session.create_node(&["Person"]);
5953 let gus = session.create_node(&["Person"]);
5954 let harm = session.create_node(&["Person"]);
5955
5956 let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5957 assert_eq!(nodes.len(), 3);
5958 assert!(nodes[0].is_some());
5959 assert!(nodes[1].is_some());
5960 assert!(nodes[2].is_some());
5961
5962 use grafeo_common::types::NodeId;
5964 let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5965 assert_eq!(nodes_with_missing.len(), 3);
5966 assert!(nodes_with_missing[0].is_some());
5967 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
5969 }
5970
5971 #[test]
5972 fn test_auto_commit_setting() {
5973 let db = GrafeoDB::new_in_memory();
5974 let mut session = db.session();
5975
5976 assert!(session.auto_commit());
5978
5979 session.set_auto_commit(false);
5980 assert!(!session.auto_commit());
5981
5982 session.set_auto_commit(true);
5983 assert!(session.auto_commit());
5984 }
5985
5986 #[test]
5987 fn test_transaction_double_begin_nests() {
5988 let db = GrafeoDB::new_in_memory();
5989 let mut session = db.session();
5990
5991 session.begin_transaction().unwrap();
5992 let result = session.begin_transaction();
5994 assert!(result.is_ok());
5995 session.commit().unwrap();
5997 session.commit().unwrap();
5999 }
6000
6001 #[test]
6002 fn test_commit_without_transaction_error() {
6003 let db = GrafeoDB::new_in_memory();
6004 let mut session = db.session();
6005
6006 let result = session.commit();
6007 assert!(result.is_err());
6008 }
6009
6010 #[test]
6011 fn test_rollback_without_transaction_error() {
6012 let db = GrafeoDB::new_in_memory();
6013 let mut session = db.session();
6014
6015 let result = session.rollback();
6016 assert!(result.is_err());
6017 }
6018
6019 #[test]
6020 fn test_create_edge_in_transaction() {
6021 let db = GrafeoDB::new_in_memory();
6022 let mut session = db.session();
6023
6024 let alix = session.create_node(&["Person"]);
6026 let gus = session.create_node(&["Person"]);
6027
6028 session.begin_transaction().unwrap();
6030 let edge_id = session.create_edge(alix, gus, "KNOWS");
6031
6032 assert!(session.edge_exists(edge_id));
6034
6035 session.commit().unwrap();
6037
6038 assert!(session.edge_exists(edge_id));
6040 }
6041
6042 #[test]
6043 fn test_neighbors_empty_node() {
6044 let db = GrafeoDB::new_in_memory();
6045 let session = db.session();
6046
6047 let lonely = session.create_node(&["Person"]);
6048
6049 assert!(session.get_neighbors_outgoing(lonely).is_empty());
6050 assert!(session.get_neighbors_incoming(lonely).is_empty());
6051 assert!(
6052 session
6053 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
6054 .is_empty()
6055 );
6056 }
6057 }
6058
6059 #[test]
6060 fn test_auto_gc_triggers_on_commit_interval() {
6061 use crate::config::Config;
6062
6063 let config = Config::in_memory().with_gc_interval(2);
6064 let db = GrafeoDB::with_config(config).unwrap();
6065 let mut session = db.session();
6066
6067 session.begin_transaction().unwrap();
6069 session.create_node(&["A"]);
6070 session.commit().unwrap();
6071
6072 session.begin_transaction().unwrap();
6074 session.create_node(&["B"]);
6075 session.commit().unwrap();
6076
6077 assert_eq!(db.node_count(), 2);
6079 }
6080
6081 #[test]
6082 fn test_query_timeout_config_propagates_to_session() {
6083 use crate::config::Config;
6084 use std::time::Duration;
6085
6086 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
6087 let db = GrafeoDB::with_config(config).unwrap();
6088 let session = db.session();
6089
6090 assert!(session.query_deadline().is_some());
6092 }
6093
6094 #[test]
6095 fn test_default_query_timeout_returns_deadline() {
6096 let db = GrafeoDB::new_in_memory();
6097 let session = db.session();
6098
6099 assert!(session.query_deadline().is_some());
6101 }
6102
6103 #[test]
6104 fn test_no_query_timeout_returns_no_deadline() {
6105 use crate::config::Config;
6106
6107 let config = Config::in_memory().without_query_timeout();
6108 let db = GrafeoDB::with_config(config).unwrap();
6109 let session = db.session();
6110
6111 assert!(session.query_deadline().is_none());
6112 }
6113
6114 #[test]
6115 fn test_graph_model_accessor() {
6116 use crate::config::GraphModel;
6117
6118 let db = GrafeoDB::new_in_memory();
6119 let session = db.session();
6120
6121 assert_eq!(session.graph_model(), GraphModel::Lpg);
6122 }
6123
6124 #[test]
6125 fn test_reject_oversized_property() {
6126 use crate::config::Config;
6127
6128 let config = Config::in_memory().with_max_property_size(100);
6129 let db = GrafeoDB::with_config(config).unwrap();
6130 let session = db.session();
6131
6132 let node = session.create_node(&["Test"]);
6133
6134 session
6136 .set_node_property(node, "small", Value::from("hello"))
6137 .unwrap();
6138
6139 let big = "x".repeat(200);
6141 let result = session.set_node_property(node, "big", Value::from(big.as_str()));
6142 assert!(result.is_err());
6143 let err = result.unwrap_err().to_string();
6144 assert!(
6145 err.contains("exceeds maximum size"),
6146 "Expected size error, got: {err}"
6147 );
6148 }
6149
6150 #[test]
6151 fn test_no_property_size_limit() {
6152 use crate::config::Config;
6153
6154 let config = Config::in_memory().without_max_property_size();
6155 let db = GrafeoDB::with_config(config).unwrap();
6156 let session = db.session();
6157
6158 let node = session.create_node(&["Test"]);
6159
6160 let big = "x".repeat(10_000);
6162 session
6163 .set_node_property(node, "big", Value::from(big.as_str()))
6164 .unwrap();
6165 }
6166
6167 #[cfg(feature = "gql")]
6168 #[test]
6169 fn test_external_store_session() {
6170 use grafeo_core::graph::GraphStoreMut;
6171 use std::sync::Arc;
6172
6173 let config = crate::config::Config::in_memory();
6174 let store =
6175 Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
6176 let db = GrafeoDB::with_store(store, config).unwrap();
6177
6178 let mut session = db.session();
6179
6180 session.begin_transaction().unwrap();
6184 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
6185
6186 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
6188 assert_eq!(result.row_count(), 1);
6189
6190 session.commit().unwrap();
6191 }
6192
6193 #[cfg(feature = "gql")]
6196 mod session_command_tests {
6197 use super::*;
6198 use grafeo_common::types::Value;
6199
6200 #[test]
6201 fn test_use_graph_sets_current_graph() {
6202 let db = GrafeoDB::new_in_memory();
6203 let session = db.session();
6204
6205 session.execute("CREATE GRAPH mydb").unwrap();
6207 session.execute("USE GRAPH mydb").unwrap();
6208
6209 assert_eq!(session.current_graph(), Some("mydb".to_string()));
6210 }
6211
6212 #[test]
6213 fn test_use_graph_nonexistent_errors() {
6214 let db = GrafeoDB::new_in_memory();
6215 let session = db.session();
6216
6217 let result = session.execute("USE GRAPH doesnotexist");
6218 assert!(result.is_err());
6219 let err = result.unwrap_err().to_string();
6220 assert!(
6221 err.contains("does not exist"),
6222 "Expected 'does not exist' error, got: {err}"
6223 );
6224 }
6225
6226 #[test]
6227 fn test_use_graph_default_always_valid() {
6228 let db = GrafeoDB::new_in_memory();
6229 let session = db.session();
6230
6231 session.execute("USE GRAPH default").unwrap();
6233 assert_eq!(session.current_graph(), Some("default".to_string()));
6234 }
6235
6236 #[test]
6237 fn test_session_set_graph() {
6238 let db = GrafeoDB::new_in_memory();
6239 let session = db.session();
6240
6241 session.execute("CREATE GRAPH analytics").unwrap();
6242 session.execute("SESSION SET GRAPH analytics").unwrap();
6243 assert_eq!(session.current_graph(), Some("analytics".to_string()));
6244 }
6245
6246 #[test]
6247 fn test_session_set_graph_nonexistent_errors() {
6248 let db = GrafeoDB::new_in_memory();
6249 let session = db.session();
6250
6251 let result = session.execute("SESSION SET GRAPH nosuchgraph");
6252 assert!(result.is_err());
6253 }
6254
6255 #[test]
6256 fn test_session_set_time_zone() {
6257 let db = GrafeoDB::new_in_memory();
6258 let session = db.session();
6259
6260 assert_eq!(session.time_zone(), None);
6261
6262 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6263 assert_eq!(session.time_zone(), Some("UTC".to_string()));
6264
6265 session
6266 .execute("SESSION SET TIME ZONE 'America/New_York'")
6267 .unwrap();
6268 assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
6269 }
6270
6271 #[test]
6272 fn test_session_set_parameter() {
6273 let db = GrafeoDB::new_in_memory();
6274 let session = db.session();
6275
6276 session
6277 .execute("SESSION SET PARAMETER $timeout = 30")
6278 .unwrap();
6279
6280 assert!(session.get_parameter("timeout").is_some());
6283 }
6284
6285 #[test]
6286 fn test_session_reset_clears_all_state() {
6287 let db = GrafeoDB::new_in_memory();
6288 let session = db.session();
6289
6290 session.execute("CREATE GRAPH analytics").unwrap();
6292 session.execute("SESSION SET GRAPH analytics").unwrap();
6293 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6294 session
6295 .execute("SESSION SET PARAMETER $limit = 100")
6296 .unwrap();
6297
6298 assert!(session.current_graph().is_some());
6300 assert!(session.time_zone().is_some());
6301 assert!(session.get_parameter("limit").is_some());
6302
6303 session.execute("SESSION RESET").unwrap();
6305
6306 assert_eq!(session.current_graph(), None);
6307 assert_eq!(session.time_zone(), None);
6308 assert!(session.get_parameter("limit").is_none());
6309 }
6310
6311 #[test]
6312 fn test_session_close_clears_state() {
6313 let db = GrafeoDB::new_in_memory();
6314 let session = db.session();
6315
6316 session.execute("CREATE GRAPH analytics").unwrap();
6317 session.execute("SESSION SET GRAPH analytics").unwrap();
6318 session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6319
6320 session.execute("SESSION CLOSE").unwrap();
6321
6322 assert_eq!(session.current_graph(), None);
6323 assert_eq!(session.time_zone(), None);
6324 }
6325
6326 #[test]
6327 fn test_create_graph() {
6328 let db = GrafeoDB::new_in_memory();
6329 let session = db.session();
6330
6331 session.execute("CREATE GRAPH mydb").unwrap();
6332
6333 session.execute("USE GRAPH mydb").unwrap();
6335 assert_eq!(session.current_graph(), Some("mydb".to_string()));
6336 }
6337
6338 #[test]
6339 fn test_create_graph_duplicate_errors() {
6340 let db = GrafeoDB::new_in_memory();
6341 let session = db.session();
6342
6343 session.execute("CREATE GRAPH mydb").unwrap();
6344 let result = session.execute("CREATE GRAPH mydb");
6345
6346 assert!(result.is_err());
6347 let err = result.unwrap_err().to_string();
6348 assert!(
6349 err.contains("already exists"),
6350 "Expected 'already exists' error, got: {err}"
6351 );
6352 }
6353
6354 #[test]
6355 fn test_create_graph_if_not_exists() {
6356 let db = GrafeoDB::new_in_memory();
6357 let session = db.session();
6358
6359 session.execute("CREATE GRAPH mydb").unwrap();
6360 session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
6362 }
6363
6364 #[test]
6365 fn test_drop_graph() {
6366 let db = GrafeoDB::new_in_memory();
6367 let session = db.session();
6368
6369 session.execute("CREATE GRAPH mydb").unwrap();
6370 session.execute("DROP GRAPH mydb").unwrap();
6371
6372 let result = session.execute("USE GRAPH mydb");
6374 assert!(result.is_err());
6375 }
6376
6377 #[test]
6378 fn test_drop_graph_nonexistent_errors() {
6379 let db = GrafeoDB::new_in_memory();
6380 let session = db.session();
6381
6382 let result = session.execute("DROP GRAPH nosuchgraph");
6383 assert!(result.is_err());
6384 let err = result.unwrap_err().to_string();
6385 assert!(
6386 err.contains("does not exist"),
6387 "Expected 'does not exist' error, got: {err}"
6388 );
6389 }
6390
6391 #[test]
6392 fn test_drop_graph_if_exists() {
6393 let db = GrafeoDB::new_in_memory();
6394 let session = db.session();
6395
6396 session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
6398 }
6399
6400 #[test]
6401 fn test_start_transaction_via_gql() {
6402 let db = GrafeoDB::new_in_memory();
6403 let session = db.session();
6404
6405 session.execute("START TRANSACTION").unwrap();
6406 assert!(session.in_transaction());
6407 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6408 session.execute("COMMIT").unwrap();
6409 assert!(!session.in_transaction());
6410
6411 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6412 assert_eq!(result.rows.len(), 1);
6413 }
6414
6415 #[test]
6416 fn test_start_transaction_read_only_blocks_insert() {
6417 let db = GrafeoDB::new_in_memory();
6418 let session = db.session();
6419
6420 session.execute("START TRANSACTION READ ONLY").unwrap();
6421 let result = session.execute("INSERT (:Person {name: 'Alix'})");
6422 assert!(result.is_err());
6423 let err = result.unwrap_err().to_string();
6424 assert!(
6425 err.contains("read-only"),
6426 "Expected read-only error, got: {err}"
6427 );
6428 session.execute("ROLLBACK").unwrap();
6429 }
6430
6431 #[test]
6432 fn test_start_transaction_read_only_allows_reads() {
6433 let db = GrafeoDB::new_in_memory();
6434 let mut session = db.session();
6435 session.begin_transaction().unwrap();
6436 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6437 session.commit().unwrap();
6438
6439 session.execute("START TRANSACTION READ ONLY").unwrap();
6440 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6441 assert_eq!(result.rows.len(), 1);
6442 session.execute("COMMIT").unwrap();
6443 }
6444
6445 #[test]
6446 fn test_rollback_via_gql() {
6447 let db = GrafeoDB::new_in_memory();
6448 let session = db.session();
6449
6450 session.execute("START TRANSACTION").unwrap();
6451 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6452 session.execute("ROLLBACK").unwrap();
6453
6454 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6455 assert!(result.rows.is_empty());
6456 }
6457
6458 #[test]
6459 fn test_start_transaction_with_isolation_level() {
6460 let db = GrafeoDB::new_in_memory();
6461 let session = db.session();
6462
6463 session
6464 .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6465 .unwrap();
6466 assert!(session.in_transaction());
6467 session.execute("ROLLBACK").unwrap();
6468 }
6469
6470 #[test]
6471 fn test_session_commands_return_empty_result() {
6472 let db = GrafeoDB::new_in_memory();
6473 let session = db.session();
6474
6475 session.execute("CREATE GRAPH test").unwrap();
6476 let result = session.execute("SESSION SET GRAPH test").unwrap();
6477 assert_eq!(result.row_count(), 0);
6478 assert_eq!(result.column_count(), 0);
6479 }
6480
6481 #[test]
6482 fn test_current_graph_default_is_none() {
6483 let db = GrafeoDB::new_in_memory();
6484 let session = db.session();
6485
6486 assert_eq!(session.current_graph(), None);
6487 }
6488
6489 #[test]
6490 fn test_time_zone_default_is_none() {
6491 let db = GrafeoDB::new_in_memory();
6492 let session = db.session();
6493
6494 assert_eq!(session.time_zone(), None);
6495 }
6496
6497 #[test]
6498 fn test_session_state_independent_across_sessions() {
6499 let db = GrafeoDB::new_in_memory();
6500 let session1 = db.session();
6501 let session2 = db.session();
6502
6503 session1.execute("CREATE GRAPH first").unwrap();
6504 session1.execute("CREATE GRAPH second").unwrap();
6505 session1.execute("SESSION SET GRAPH first").unwrap();
6506 session2.execute("SESSION SET GRAPH second").unwrap();
6507
6508 assert_eq!(session1.current_graph(), Some("first".to_string()));
6509 assert_eq!(session2.current_graph(), Some("second".to_string()));
6510 }
6511
6512 #[test]
6513 fn test_show_node_types() {
6514 let db = GrafeoDB::new_in_memory();
6515 let session = db.session();
6516
6517 session
6518 .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6519 .unwrap();
6520
6521 let result = session.execute("SHOW NODE TYPES").unwrap();
6522 assert_eq!(
6523 result.columns,
6524 vec!["name", "properties", "constraints", "parents"]
6525 );
6526 assert_eq!(result.rows.len(), 1);
6527 assert_eq!(result.rows[0][0], Value::from("Person"));
6529 }
6530
6531 #[test]
6532 fn test_show_edge_types() {
6533 let db = GrafeoDB::new_in_memory();
6534 let session = db.session();
6535
6536 session
6537 .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6538 .unwrap();
6539
6540 let result = session.execute("SHOW EDGE TYPES").unwrap();
6541 assert_eq!(
6542 result.columns,
6543 vec!["name", "properties", "source_types", "target_types"]
6544 );
6545 assert_eq!(result.rows.len(), 1);
6546 assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6547 }
6548
6549 #[test]
6550 fn test_show_graph_types() {
6551 let db = GrafeoDB::new_in_memory();
6552 let session = db.session();
6553
6554 session
6555 .execute("CREATE NODE TYPE Person (name STRING)")
6556 .unwrap();
6557 session
6558 .execute(
6559 "CREATE GRAPH TYPE social (\
6560 NODE TYPE Person (name STRING)\
6561 )",
6562 )
6563 .unwrap();
6564
6565 let result = session.execute("SHOW GRAPH TYPES").unwrap();
6566 assert_eq!(
6567 result.columns,
6568 vec!["name", "open", "node_types", "edge_types"]
6569 );
6570 assert_eq!(result.rows.len(), 1);
6571 assert_eq!(result.rows[0][0], Value::from("social"));
6572 }
6573
6574 #[test]
6575 fn test_show_graph_type_named() {
6576 let db = GrafeoDB::new_in_memory();
6577 let session = db.session();
6578
6579 session
6580 .execute("CREATE NODE TYPE Person (name STRING)")
6581 .unwrap();
6582 session
6583 .execute(
6584 "CREATE GRAPH TYPE social (\
6585 NODE TYPE Person (name STRING)\
6586 )",
6587 )
6588 .unwrap();
6589
6590 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6591 assert_eq!(result.rows.len(), 1);
6592 assert_eq!(result.rows[0][0], Value::from("social"));
6593 }
6594
6595 #[test]
6596 fn test_show_graph_type_not_found() {
6597 let db = GrafeoDB::new_in_memory();
6598 let session = db.session();
6599
6600 let result = session.execute("SHOW GRAPH TYPE nonexistent");
6601 assert!(result.is_err());
6602 }
6603
6604 #[test]
6605 fn test_show_indexes_via_gql() {
6606 let db = GrafeoDB::new_in_memory();
6607 let session = db.session();
6608
6609 let result = session.execute("SHOW INDEXES").unwrap();
6610 assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6611 }
6612
6613 #[test]
6614 fn test_show_constraints_via_gql() {
6615 let db = GrafeoDB::new_in_memory();
6616 let session = db.session();
6617
6618 let result = session.execute("SHOW CONSTRAINTS").unwrap();
6619 assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6620 }
6621
6622 #[test]
6623 fn test_pattern_form_graph_type_roundtrip() {
6624 let db = GrafeoDB::new_in_memory();
6625 let session = db.session();
6626
6627 session
6629 .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6630 .unwrap();
6631 session
6632 .execute("CREATE NODE TYPE City (name STRING)")
6633 .unwrap();
6634 session
6635 .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6636 .unwrap();
6637 session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6638
6639 session
6641 .execute(
6642 "CREATE GRAPH TYPE social (\
6643 (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6644 (:Person)-[:LIVES_IN]->(:City)\
6645 )",
6646 )
6647 .unwrap();
6648
6649 let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6651 assert_eq!(result.rows.len(), 1);
6652 assert_eq!(result.rows[0][0], Value::from("social"));
6653 }
6654 }
6655}