Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25/// Parses a DDL default-value literal string into a [`Value`].
26///
27/// Handles string literals (single- or double-quoted), integers, floats,
28/// booleans (`true`/`false`), and `NULL`.
29fn parse_default_literal(text: &str) -> Value {
30    if text.eq_ignore_ascii_case("null") {
31        return Value::Null;
32    }
33    if text.eq_ignore_ascii_case("true") {
34        return Value::Bool(true);
35    }
36    if text.eq_ignore_ascii_case("false") {
37        return Value::Bool(false);
38    }
39    // String literal: strip surrounding quotes
40    if (text.starts_with('\'') && text.ends_with('\''))
41        || (text.starts_with('"') && text.ends_with('"'))
42    {
43        return Value::String(text[1..text.len() - 1].into());
44    }
45    // Try integer, then float
46    if let Ok(i) = text.parse::<i64>() {
47        return Value::Int64(i);
48    }
49    if let Ok(f) = text.parse::<f64>() {
50        return Value::Float64(f);
51    }
52    // Fallback: treat as string
53    Value::String(text.into())
54}
55
56/// Runtime configuration for creating a new session.
57///
58/// Groups the shared parameters passed to all session constructors, keeping
59/// call sites readable and avoiding long argument lists.
60pub(crate) struct SessionConfig {
61    pub transaction_manager: Arc<TransactionManager>,
62    pub query_cache: Arc<QueryCache>,
63    pub catalog: Arc<Catalog>,
64    pub adaptive_config: AdaptiveConfig,
65    pub factorized_execution: bool,
66    pub graph_model: GraphModel,
67    pub query_timeout: Option<Duration>,
68    pub commit_counter: Arc<AtomicUsize>,
69    pub gc_interval: usize,
70}
71
72/// Your handle to the database - execute queries and manage transactions.
73///
74/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
75/// tracks its own transaction state, so you can have multiple concurrent
76/// sessions without them interfering.
77pub struct Session {
78    /// The underlying store.
79    store: Arc<LpgStore>,
80    /// Graph store trait object for pluggable storage backends.
81    graph_store: Arc<dyn GraphStoreMut>,
82    /// Schema and metadata catalog shared across sessions.
83    catalog: Arc<Catalog>,
84    /// RDF triple store (if RDF feature is enabled).
85    #[cfg(feature = "rdf")]
86    rdf_store: Arc<RdfStore>,
87    /// Transaction manager.
88    transaction_manager: Arc<TransactionManager>,
89    /// Query cache shared across sessions.
90    query_cache: Arc<QueryCache>,
91    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
92    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
93    /// from within `execute(&self)`.
94    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
95    /// Whether the current transaction is read-only (blocks mutations).
96    read_only_tx: parking_lot::Mutex<bool>,
97    /// Whether the session is in auto-commit mode.
98    auto_commit: bool,
99    /// Adaptive execution configuration.
100    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
101    adaptive_config: AdaptiveConfig,
102    /// Whether to use factorized execution for multi-hop queries.
103    factorized_execution: bool,
104    /// The graph data model this session operates on.
105    graph_model: GraphModel,
106    /// Maximum time a query may run before being cancelled.
107    query_timeout: Option<Duration>,
108    /// Shared commit counter for triggering auto-GC.
109    commit_counter: Arc<AtomicUsize>,
110    /// GC every N commits (0 = disabled).
111    gc_interval: usize,
112    /// Node count at the start of the current transaction (for PreparedCommit stats).
113    transaction_start_node_count: AtomicUsize,
114    /// Edge count at the start of the current transaction (for PreparedCommit stats).
115    transaction_start_edge_count: AtomicUsize,
116    /// WAL for logging schema changes.
117    #[cfg(feature = "wal")]
118    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
119    /// Shared WAL graph context tracker for named graph awareness.
120    #[cfg(feature = "wal")]
121    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
122    /// CDC log for change tracking.
123    #[cfg(feature = "cdc")]
124    cdc_log: Arc<crate::cdc::CdcLog>,
125    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
126    current_graph: parking_lot::Mutex<Option<String>>,
127    /// Session time zone override.
128    time_zone: parking_lot::Mutex<Option<String>>,
129    /// Session-level parameters (SET PARAMETER).
130    session_params:
131        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
132    /// Override epoch for time-travel queries (None = use transaction/current epoch).
133    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
134    /// Savepoints within the current transaction.
135    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
136    /// Nesting depth for nested transactions (0 = outermost).
137    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
138    /// releases it, nested `ROLLBACK` rolls back to it.
139    transaction_nesting_depth: parking_lot::Mutex<u32>,
140    /// Named graphs touched during the current transaction (for cross-graph atomicity).
141    /// `None` represents the default graph. Populated at `BEGIN` time and on each
142    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
143    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
144}
145
146/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
147#[derive(Clone)]
148struct GraphSavepoint {
149    graph_name: Option<String>,
150    next_node_id: u64,
151    next_edge_id: u64,
152    undo_log_position: usize,
153}
154
155/// Savepoint state: name + per-graph snapshots + the graph that was active.
156#[derive(Clone)]
157struct SavepointState {
158    name: String,
159    graph_snapshots: Vec<GraphSavepoint>,
160    /// The graph that was active when the savepoint was created.
161    /// Reserved for future use (e.g., restoring graph context on rollback).
162    #[allow(dead_code)]
163    active_graph: Option<String>,
164}
165
166impl Session {
167    /// Creates a new session with adaptive execution configuration.
168    #[allow(dead_code)]
169    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
170        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
171        Self {
172            store,
173            graph_store,
174            catalog: cfg.catalog,
175            #[cfg(feature = "rdf")]
176            rdf_store: Arc::new(RdfStore::new()),
177            transaction_manager: cfg.transaction_manager,
178            query_cache: cfg.query_cache,
179            current_transaction: parking_lot::Mutex::new(None),
180            read_only_tx: parking_lot::Mutex::new(false),
181            auto_commit: true,
182            adaptive_config: cfg.adaptive_config,
183            factorized_execution: cfg.factorized_execution,
184            graph_model: cfg.graph_model,
185            query_timeout: cfg.query_timeout,
186            commit_counter: cfg.commit_counter,
187            gc_interval: cfg.gc_interval,
188            transaction_start_node_count: AtomicUsize::new(0),
189            transaction_start_edge_count: AtomicUsize::new(0),
190            #[cfg(feature = "wal")]
191            wal: None,
192            #[cfg(feature = "wal")]
193            wal_graph_context: None,
194            #[cfg(feature = "cdc")]
195            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
196            current_graph: parking_lot::Mutex::new(None),
197            time_zone: parking_lot::Mutex::new(None),
198            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
199            viewing_epoch_override: parking_lot::Mutex::new(None),
200            savepoints: parking_lot::Mutex::new(Vec::new()),
201            transaction_nesting_depth: parking_lot::Mutex::new(0),
202            touched_graphs: parking_lot::Mutex::new(Vec::new()),
203        }
204    }
205
206    /// Sets the WAL for this session (shared with the database).
207    ///
208    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
209    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
210    #[cfg(feature = "wal")]
211    pub(crate) fn set_wal(
212        &mut self,
213        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
214        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
215    ) {
216        // Wrap the graph store so query-engine mutations are WAL-logged
217        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
218            Arc::clone(&self.store),
219            Arc::clone(&wal),
220            Arc::clone(&wal_graph_context),
221        ));
222        self.wal = Some(wal);
223        self.wal_graph_context = Some(wal_graph_context);
224    }
225
226    /// Sets the CDC log for this session (shared with the database).
227    #[cfg(feature = "cdc")]
228    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
229        self.cdc_log = cdc_log;
230    }
231
232    /// Creates a new session with RDF store and adaptive configuration.
233    #[cfg(feature = "rdf")]
234    pub(crate) fn with_rdf_store_and_adaptive(
235        store: Arc<LpgStore>,
236        rdf_store: Arc<RdfStore>,
237        cfg: SessionConfig,
238    ) -> Self {
239        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
240        Self {
241            store,
242            graph_store,
243            catalog: cfg.catalog,
244            rdf_store,
245            transaction_manager: cfg.transaction_manager,
246            query_cache: cfg.query_cache,
247            current_transaction: parking_lot::Mutex::new(None),
248            read_only_tx: parking_lot::Mutex::new(false),
249            auto_commit: true,
250            adaptive_config: cfg.adaptive_config,
251            factorized_execution: cfg.factorized_execution,
252            graph_model: cfg.graph_model,
253            query_timeout: cfg.query_timeout,
254            commit_counter: cfg.commit_counter,
255            gc_interval: cfg.gc_interval,
256            transaction_start_node_count: AtomicUsize::new(0),
257            transaction_start_edge_count: AtomicUsize::new(0),
258            #[cfg(feature = "wal")]
259            wal: None,
260            #[cfg(feature = "wal")]
261            wal_graph_context: None,
262            #[cfg(feature = "cdc")]
263            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
264            current_graph: parking_lot::Mutex::new(None),
265            time_zone: parking_lot::Mutex::new(None),
266            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
267            viewing_epoch_override: parking_lot::Mutex::new(None),
268            savepoints: parking_lot::Mutex::new(Vec::new()),
269            transaction_nesting_depth: parking_lot::Mutex::new(0),
270            touched_graphs: parking_lot::Mutex::new(Vec::new()),
271        }
272    }
273
274    /// Creates a session backed by an external graph store.
275    ///
276    /// The external store handles all data operations. Transaction management
277    /// (begin/commit/rollback) is not supported for external stores.
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if the internal arena allocation fails (out of memory).
282    pub(crate) fn with_external_store(
283        store: Arc<dyn GraphStoreMut>,
284        cfg: SessionConfig,
285    ) -> Result<Self> {
286        Ok(Self {
287            store: Arc::new(LpgStore::new()?),
288            graph_store: store,
289            catalog: cfg.catalog,
290            #[cfg(feature = "rdf")]
291            rdf_store: Arc::new(RdfStore::new()),
292            transaction_manager: cfg.transaction_manager,
293            query_cache: cfg.query_cache,
294            current_transaction: parking_lot::Mutex::new(None),
295            read_only_tx: parking_lot::Mutex::new(false),
296            auto_commit: true,
297            adaptive_config: cfg.adaptive_config,
298            factorized_execution: cfg.factorized_execution,
299            graph_model: cfg.graph_model,
300            query_timeout: cfg.query_timeout,
301            commit_counter: cfg.commit_counter,
302            gc_interval: cfg.gc_interval,
303            transaction_start_node_count: AtomicUsize::new(0),
304            transaction_start_edge_count: AtomicUsize::new(0),
305            #[cfg(feature = "wal")]
306            wal: None,
307            #[cfg(feature = "wal")]
308            wal_graph_context: None,
309            #[cfg(feature = "cdc")]
310            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
311            current_graph: parking_lot::Mutex::new(None),
312            time_zone: parking_lot::Mutex::new(None),
313            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
314            viewing_epoch_override: parking_lot::Mutex::new(None),
315            savepoints: parking_lot::Mutex::new(Vec::new()),
316            transaction_nesting_depth: parking_lot::Mutex::new(0),
317            touched_graphs: parking_lot::Mutex::new(Vec::new()),
318        })
319    }
320
321    /// Returns the graph model this session operates on.
322    #[must_use]
323    pub fn graph_model(&self) -> GraphModel {
324        self.graph_model
325    }
326
327    // === Session State Management ===
328
329    /// Sets the current graph for this session (USE GRAPH).
330    pub fn use_graph(&self, name: &str) {
331        *self.current_graph.lock() = Some(name.to_string());
332    }
333
334    /// Returns the current graph name, if any.
335    #[must_use]
336    pub fn current_graph(&self) -> Option<String> {
337        self.current_graph.lock().clone()
338    }
339
340    /// Returns the graph store for the currently active graph.
341    ///
342    /// If `current_graph` is `None` or `"default"`, returns the session's
343    /// default `graph_store` (already WAL-wrapped for the default graph).
344    /// Otherwise looks up the named graph in the root store and wraps it
345    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
346    /// graph context.
347    fn active_store(&self) -> Arc<dyn GraphStoreMut> {
348        let graph_name = self.current_graph.lock().clone();
349        match graph_name {
350            None => Arc::clone(&self.graph_store),
351            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.graph_store),
352            Some(ref name) => match self.store.graph(name) {
353                Some(named_store) => {
354                    #[cfg(feature = "wal")]
355                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
356                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
357                            named_store,
358                            Arc::clone(wal),
359                            name.clone(),
360                            Arc::clone(ctx),
361                        )) as Arc<dyn GraphStoreMut>;
362                    }
363                    named_store as Arc<dyn GraphStoreMut>
364                }
365                None => Arc::clone(&self.graph_store),
366            },
367        }
368    }
369
370    /// Returns the concrete `LpgStore` for the currently active graph.
371    ///
372    /// Used by direct CRUD methods that need the concrete store type
373    /// for versioned operations.
374    fn active_lpg_store(&self) -> Arc<LpgStore> {
375        let graph_name = self.current_graph.lock().clone();
376        match graph_name {
377            None => Arc::clone(&self.store),
378            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
379            Some(ref name) => self
380                .store
381                .graph(name)
382                .unwrap_or_else(|| Arc::clone(&self.store)),
383        }
384    }
385
386    /// Resolves a graph name to a concrete `LpgStore`.
387    /// `None` and `"default"` resolve to the session's root store.
388    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
389        match graph_name {
390            None => Arc::clone(&self.store),
391            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
392            Some(name) => self
393                .store
394                .graph(name)
395                .unwrap_or_else(|| Arc::clone(&self.store)),
396        }
397    }
398
399    /// Records the current graph as "touched" if a transaction is active.
400    fn track_graph_touch(&self) {
401        if self.current_transaction.lock().is_some() {
402            let graph = self.current_graph.lock().clone();
403            // Normalize: treat Some("default") as None
404            let normalized = match graph {
405                Some(ref name) if name.eq_ignore_ascii_case("default") => None,
406                other => other,
407            };
408            let mut touched = self.touched_graphs.lock();
409            if !touched.contains(&normalized) {
410                touched.push(normalized);
411            }
412        }
413    }
414
415    /// Sets the session time zone.
416    pub fn set_time_zone(&self, tz: &str) {
417        *self.time_zone.lock() = Some(tz.to_string());
418    }
419
420    /// Returns the session time zone, if set.
421    #[must_use]
422    pub fn time_zone(&self) -> Option<String> {
423        self.time_zone.lock().clone()
424    }
425
426    /// Sets a session parameter.
427    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
428        self.session_params.lock().insert(key.to_string(), value);
429    }
430
431    /// Gets a session parameter by cloning it.
432    #[must_use]
433    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
434        self.session_params.lock().get(key).cloned()
435    }
436
437    /// Resets all session state to defaults.
438    pub fn reset_session(&self) {
439        *self.current_graph.lock() = None;
440        *self.time_zone.lock() = None;
441        self.session_params.lock().clear();
442        *self.viewing_epoch_override.lock() = None;
443    }
444
445    // --- Time-travel API ---
446
447    /// Sets a viewing epoch override for time-travel queries.
448    ///
449    /// While set, all queries on this session see the database as it existed
450    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
451    /// to return to normal behavior.
452    pub fn set_viewing_epoch(&self, epoch: EpochId) {
453        *self.viewing_epoch_override.lock() = Some(epoch);
454    }
455
456    /// Clears the viewing epoch override, returning to normal behavior.
457    pub fn clear_viewing_epoch(&self) {
458        *self.viewing_epoch_override.lock() = None;
459    }
460
461    /// Returns the current viewing epoch override, if any.
462    #[must_use]
463    pub fn viewing_epoch(&self) -> Option<EpochId> {
464        *self.viewing_epoch_override.lock()
465    }
466
467    /// Returns all versions of a node with their creation/deletion epochs.
468    ///
469    /// Properties and labels reflect the current state (not versioned per-epoch).
470    #[must_use]
471    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
472        self.active_lpg_store().get_node_history(id)
473    }
474
475    /// Returns all versions of an edge with their creation/deletion epochs.
476    ///
477    /// Properties reflect the current state (not versioned per-epoch).
478    #[must_use]
479    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
480        self.active_lpg_store().get_edge_history(id)
481    }
482
483    /// Checks that the session's graph model supports LPG operations.
484    fn require_lpg(&self, language: &str) -> Result<()> {
485        if self.graph_model == GraphModel::Rdf {
486            return Err(grafeo_common::utils::error::Error::Internal(format!(
487                "This is an RDF database. {language} queries require an LPG database."
488            )));
489        }
490        Ok(())
491    }
492
493    /// Executes a session or transaction command, returning an empty result.
494    #[cfg(feature = "gql")]
495    fn execute_session_command(
496        &self,
497        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
498    ) -> Result<QueryResult> {
499        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
500        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
501
502        match cmd {
503            SessionCommand::CreateGraph {
504                name,
505                if_not_exists,
506                typed,
507                like_graph,
508                copy_of,
509                open: _,
510            } => {
511                // Validate source graph exists for LIKE / AS COPY OF
512                if let Some(ref src) = like_graph
513                    && self.store.graph(src).is_none()
514                {
515                    return Err(Error::Query(QueryError::new(
516                        QueryErrorKind::Semantic,
517                        format!("Source graph '{src}' does not exist"),
518                    )));
519                }
520                if let Some(ref src) = copy_of
521                    && self.store.graph(src).is_none()
522                {
523                    return Err(Error::Query(QueryError::new(
524                        QueryErrorKind::Semantic,
525                        format!("Source graph '{src}' does not exist"),
526                    )));
527                }
528
529                let created = self
530                    .store
531                    .create_graph(&name)
532                    .map_err(|e| Error::Internal(e.to_string()))?;
533                if !created && !if_not_exists {
534                    return Err(Error::Query(QueryError::new(
535                        QueryErrorKind::Semantic,
536                        format!("Graph '{name}' already exists"),
537                    )));
538                }
539                if created {
540                    #[cfg(feature = "wal")]
541                    self.log_schema_wal(
542                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
543                            name: name.clone(),
544                        },
545                    );
546                }
547
548                // AS COPY OF: copy data from source graph
549                if let Some(ref src) = copy_of {
550                    self.store
551                        .copy_graph(Some(src), Some(&name))
552                        .map_err(|e| Error::Internal(e.to_string()))?;
553                }
554
555                // Bind to graph type if specified
556                if let Some(type_name) = typed
557                    && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
558                {
559                    return Err(Error::Query(QueryError::new(
560                        QueryErrorKind::Semantic,
561                        e.to_string(),
562                    )));
563                }
564
565                // LIKE: copy graph type binding from source
566                if let Some(ref src) = like_graph
567                    && let Some(src_type) = self.catalog.get_graph_type_binding(src)
568                {
569                    let _ = self.catalog.bind_graph_type(&name, src_type);
570                }
571
572                Ok(QueryResult::empty())
573            }
574            SessionCommand::DropGraph { name, if_exists } => {
575                let dropped = self.store.drop_graph(&name);
576                if !dropped && !if_exists {
577                    return Err(Error::Query(QueryError::new(
578                        QueryErrorKind::Semantic,
579                        format!("Graph '{name}' does not exist"),
580                    )));
581                }
582                if dropped {
583                    #[cfg(feature = "wal")]
584                    self.log_schema_wal(
585                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
586                            name: name.clone(),
587                        },
588                    );
589                    // If this session was using the dropped graph, reset to default
590                    let mut current = self.current_graph.lock();
591                    if current
592                        .as_deref()
593                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
594                    {
595                        *current = None;
596                    }
597                }
598                Ok(QueryResult::empty())
599            }
600            SessionCommand::UseGraph(name) => {
601                // Verify graph exists (default graph is always valid)
602                if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
603                    return Err(Error::Query(QueryError::new(
604                        QueryErrorKind::Semantic,
605                        format!("Graph '{name}' does not exist"),
606                    )));
607                }
608                self.use_graph(&name);
609                // Track the new graph if in a transaction
610                self.track_graph_touch();
611                Ok(QueryResult::empty())
612            }
613            SessionCommand::SessionSetGraph(name) => {
614                self.use_graph(&name);
615                // Track the new graph if in a transaction
616                self.track_graph_touch();
617                Ok(QueryResult::empty())
618            }
619            SessionCommand::SessionSetTimeZone(tz) => {
620                self.set_time_zone(&tz);
621                Ok(QueryResult::empty())
622            }
623            SessionCommand::SessionSetParameter(key, expr) => {
624                if key.eq_ignore_ascii_case("viewing_epoch") {
625                    match Self::eval_integer_literal(&expr) {
626                        Some(n) if n >= 0 => {
627                            self.set_viewing_epoch(EpochId::new(n as u64));
628                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
629                        }
630                        _ => Err(Error::Query(QueryError::new(
631                            QueryErrorKind::Semantic,
632                            "viewing_epoch must be a non-negative integer literal",
633                        ))),
634                    }
635                } else {
636                    // For now, store parameter name with Null value.
637                    // Full expression evaluation would require building and executing a plan.
638                    self.set_parameter(&key, Value::Null);
639                    Ok(QueryResult::empty())
640                }
641            }
642            SessionCommand::SessionReset => {
643                self.reset_session();
644                Ok(QueryResult::empty())
645            }
646            SessionCommand::SessionClose => {
647                self.reset_session();
648                Ok(QueryResult::empty())
649            }
650            SessionCommand::StartTransaction {
651                read_only,
652                isolation_level,
653            } => {
654                let engine_level = isolation_level.map(|l| match l {
655                    TransactionIsolationLevel::ReadCommitted => {
656                        crate::transaction::IsolationLevel::ReadCommitted
657                    }
658                    TransactionIsolationLevel::SnapshotIsolation => {
659                        crate::transaction::IsolationLevel::SnapshotIsolation
660                    }
661                    TransactionIsolationLevel::Serializable => {
662                        crate::transaction::IsolationLevel::Serializable
663                    }
664                });
665                self.begin_transaction_inner(read_only, engine_level)?;
666                Ok(QueryResult::status("Transaction started"))
667            }
668            SessionCommand::Commit => {
669                self.commit_inner()?;
670                Ok(QueryResult::status("Transaction committed"))
671            }
672            SessionCommand::Rollback => {
673                self.rollback_inner()?;
674                Ok(QueryResult::status("Transaction rolled back"))
675            }
676            SessionCommand::Savepoint(name) => {
677                self.savepoint(&name)?;
678                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
679            }
680            SessionCommand::RollbackToSavepoint(name) => {
681                self.rollback_to_savepoint(&name)?;
682                Ok(QueryResult::status(format!(
683                    "Rolled back to savepoint '{name}'"
684                )))
685            }
686            SessionCommand::ReleaseSavepoint(name) => {
687                self.release_savepoint(&name)?;
688                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
689            }
690        }
691    }
692
693    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
694    #[cfg(feature = "wal")]
695    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
696        if let Some(ref wal) = self.wal
697            && let Err(e) = wal.log(record)
698        {
699            tracing::warn!("Failed to log schema change to WAL: {}", e);
700        }
701    }
702
703    /// Executes a schema DDL command, returning a status result.
704    #[cfg(feature = "gql")]
705    fn execute_schema_command(
706        &self,
707        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
708    ) -> Result<QueryResult> {
709        use crate::catalog::{
710            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
711        };
712        use grafeo_adapters::query::gql::ast::SchemaStatement;
713        #[cfg(feature = "wal")]
714        use grafeo_adapters::storage::wal::WalRecord;
715        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
716
717        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
718        macro_rules! wal_log {
719            ($self:expr, $record:expr) => {
720                #[cfg(feature = "wal")]
721                $self.log_schema_wal(&$record);
722            };
723        }
724
725        let result = match cmd {
726            SchemaStatement::CreateNodeType(stmt) => {
727                #[cfg(feature = "wal")]
728                let props_for_wal: Vec<(String, String, bool)> = stmt
729                    .properties
730                    .iter()
731                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
732                    .collect();
733                let def = NodeTypeDefinition {
734                    name: stmt.name.clone(),
735                    properties: stmt
736                        .properties
737                        .iter()
738                        .map(|p| TypedProperty {
739                            name: p.name.clone(),
740                            data_type: PropertyDataType::from_type_name(&p.data_type),
741                            nullable: p.nullable,
742                            default_value: p
743                                .default_value
744                                .as_ref()
745                                .map(|s| parse_default_literal(s)),
746                        })
747                        .collect(),
748                    constraints: Vec::new(),
749                    parent_types: stmt.parent_types.clone(),
750                };
751                let result = if stmt.or_replace {
752                    let _ = self.catalog.drop_node_type(&stmt.name);
753                    self.catalog.register_node_type(def)
754                } else {
755                    self.catalog.register_node_type(def)
756                };
757                match result {
758                    Ok(()) => {
759                        wal_log!(
760                            self,
761                            WalRecord::CreateNodeType {
762                                name: stmt.name.clone(),
763                                properties: props_for_wal,
764                                constraints: Vec::new(),
765                            }
766                        );
767                        Ok(QueryResult::status(format!(
768                            "Created node type '{}'",
769                            stmt.name
770                        )))
771                    }
772                    Err(e) if stmt.if_not_exists => {
773                        let _ = e;
774                        Ok(QueryResult::status("No change"))
775                    }
776                    Err(e) => Err(Error::Query(QueryError::new(
777                        QueryErrorKind::Semantic,
778                        e.to_string(),
779                    ))),
780                }
781            }
782            SchemaStatement::CreateEdgeType(stmt) => {
783                #[cfg(feature = "wal")]
784                let props_for_wal: Vec<(String, String, bool)> = stmt
785                    .properties
786                    .iter()
787                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
788                    .collect();
789                let def = EdgeTypeDefinition {
790                    name: stmt.name.clone(),
791                    properties: stmt
792                        .properties
793                        .iter()
794                        .map(|p| TypedProperty {
795                            name: p.name.clone(),
796                            data_type: PropertyDataType::from_type_name(&p.data_type),
797                            nullable: p.nullable,
798                            default_value: p
799                                .default_value
800                                .as_ref()
801                                .map(|s| parse_default_literal(s)),
802                        })
803                        .collect(),
804                    constraints: Vec::new(),
805                    source_node_types: stmt.source_node_types.clone(),
806                    target_node_types: stmt.target_node_types.clone(),
807                };
808                let result = if stmt.or_replace {
809                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
810                    self.catalog.register_edge_type_def(def)
811                } else {
812                    self.catalog.register_edge_type_def(def)
813                };
814                match result {
815                    Ok(()) => {
816                        wal_log!(
817                            self,
818                            WalRecord::CreateEdgeType {
819                                name: stmt.name.clone(),
820                                properties: props_for_wal,
821                                constraints: Vec::new(),
822                            }
823                        );
824                        Ok(QueryResult::status(format!(
825                            "Created edge type '{}'",
826                            stmt.name
827                        )))
828                    }
829                    Err(e) if stmt.if_not_exists => {
830                        let _ = e;
831                        Ok(QueryResult::status("No change"))
832                    }
833                    Err(e) => Err(Error::Query(QueryError::new(
834                        QueryErrorKind::Semantic,
835                        e.to_string(),
836                    ))),
837                }
838            }
839            SchemaStatement::CreateVectorIndex(stmt) => {
840                Self::create_vector_index_on_store(
841                    &self.active_lpg_store(),
842                    &stmt.node_label,
843                    &stmt.property,
844                    stmt.dimensions,
845                    stmt.metric.as_deref(),
846                )?;
847                wal_log!(
848                    self,
849                    WalRecord::CreateIndex {
850                        name: stmt.name.clone(),
851                        label: stmt.node_label.clone(),
852                        property: stmt.property.clone(),
853                        index_type: "vector".to_string(),
854                    }
855                );
856                Ok(QueryResult::status(format!(
857                    "Created vector index '{}'",
858                    stmt.name
859                )))
860            }
861            SchemaStatement::DropNodeType { name, if_exists } => {
862                match self.catalog.drop_node_type(&name) {
863                    Ok(()) => {
864                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
865                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
866                    }
867                    Err(e) if if_exists => {
868                        let _ = e;
869                        Ok(QueryResult::status("No change"))
870                    }
871                    Err(e) => Err(Error::Query(QueryError::new(
872                        QueryErrorKind::Semantic,
873                        e.to_string(),
874                    ))),
875                }
876            }
877            SchemaStatement::DropEdgeType { name, if_exists } => {
878                match self.catalog.drop_edge_type_def(&name) {
879                    Ok(()) => {
880                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
881                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
882                    }
883                    Err(e) if if_exists => {
884                        let _ = e;
885                        Ok(QueryResult::status("No change"))
886                    }
887                    Err(e) => Err(Error::Query(QueryError::new(
888                        QueryErrorKind::Semantic,
889                        e.to_string(),
890                    ))),
891                }
892            }
893            SchemaStatement::CreateIndex(stmt) => {
894                use grafeo_adapters::query::gql::ast::IndexKind;
895                let active = self.active_lpg_store();
896                let index_type_str = match stmt.index_kind {
897                    IndexKind::Property => "property",
898                    IndexKind::BTree => "btree",
899                    IndexKind::Text => "text",
900                    IndexKind::Vector => "vector",
901                };
902                match stmt.index_kind {
903                    IndexKind::Property | IndexKind::BTree => {
904                        for prop in &stmt.properties {
905                            active.create_property_index(prop);
906                        }
907                    }
908                    IndexKind::Text => {
909                        for prop in &stmt.properties {
910                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
911                        }
912                    }
913                    IndexKind::Vector => {
914                        for prop in &stmt.properties {
915                            Self::create_vector_index_on_store(
916                                &active,
917                                &stmt.label,
918                                prop,
919                                stmt.options.dimensions,
920                                stmt.options.metric.as_deref(),
921                            )?;
922                        }
923                    }
924                }
925                #[cfg(feature = "wal")]
926                for prop in &stmt.properties {
927                    wal_log!(
928                        self,
929                        WalRecord::CreateIndex {
930                            name: stmt.name.clone(),
931                            label: stmt.label.clone(),
932                            property: prop.clone(),
933                            index_type: index_type_str.to_string(),
934                        }
935                    );
936                }
937                Ok(QueryResult::status(format!(
938                    "Created {} index '{}'",
939                    index_type_str, stmt.name
940                )))
941            }
942            SchemaStatement::DropIndex { name, if_exists } => {
943                // Try to drop property index by name
944                let dropped = self.active_lpg_store().drop_property_index(&name);
945                if dropped || if_exists {
946                    if dropped {
947                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
948                    }
949                    Ok(QueryResult::status(if dropped {
950                        format!("Dropped index '{name}'")
951                    } else {
952                        "No change".to_string()
953                    }))
954                } else {
955                    Err(Error::Query(QueryError::new(
956                        QueryErrorKind::Semantic,
957                        format!("Index '{name}' does not exist"),
958                    )))
959                }
960            }
961            SchemaStatement::CreateConstraint(stmt) => {
962                use crate::catalog::TypeConstraint;
963                use grafeo_adapters::query::gql::ast::ConstraintKind;
964                let kind_str = match stmt.constraint_kind {
965                    ConstraintKind::Unique => "unique",
966                    ConstraintKind::NodeKey => "node_key",
967                    ConstraintKind::NotNull => "not_null",
968                    ConstraintKind::Exists => "exists",
969                };
970                let constraint_name = stmt
971                    .name
972                    .clone()
973                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
974
975                // Register constraint in catalog type definitions
976                match stmt.constraint_kind {
977                    ConstraintKind::Unique => {
978                        for prop in &stmt.properties {
979                            let label_id = self.catalog.get_or_create_label(&stmt.label);
980                            let prop_id = self.catalog.get_or_create_property_key(prop);
981                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
982                        }
983                        let _ = self.catalog.add_constraint_to_type(
984                            &stmt.label,
985                            TypeConstraint::Unique(stmt.properties.clone()),
986                        );
987                    }
988                    ConstraintKind::NodeKey => {
989                        for prop in &stmt.properties {
990                            let label_id = self.catalog.get_or_create_label(&stmt.label);
991                            let prop_id = self.catalog.get_or_create_property_key(prop);
992                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
993                            let _ = self.catalog.add_required_property(label_id, prop_id);
994                        }
995                        let _ = self.catalog.add_constraint_to_type(
996                            &stmt.label,
997                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
998                        );
999                    }
1000                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1001                        for prop in &stmt.properties {
1002                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1003                            let prop_id = self.catalog.get_or_create_property_key(prop);
1004                            let _ = self.catalog.add_required_property(label_id, prop_id);
1005                            let _ = self.catalog.add_constraint_to_type(
1006                                &stmt.label,
1007                                TypeConstraint::NotNull(prop.clone()),
1008                            );
1009                        }
1010                    }
1011                }
1012
1013                wal_log!(
1014                    self,
1015                    WalRecord::CreateConstraint {
1016                        name: constraint_name.clone(),
1017                        label: stmt.label.clone(),
1018                        properties: stmt.properties.clone(),
1019                        kind: kind_str.to_string(),
1020                    }
1021                );
1022                Ok(QueryResult::status(format!(
1023                    "Created {kind_str} constraint '{constraint_name}'"
1024                )))
1025            }
1026            SchemaStatement::DropConstraint { name, if_exists } => {
1027                let _ = if_exists;
1028                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1029                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1030            }
1031            SchemaStatement::CreateGraphType(stmt) => {
1032                use crate::catalog::GraphTypeDefinition;
1033                use grafeo_adapters::query::gql::ast::InlineElementType;
1034
1035                // GG04: LIKE clause copies type from existing graph
1036                let (mut node_types, mut edge_types, open) =
1037                    if let Some(ref like_graph) = stmt.like_graph {
1038                        // Infer types from the graph's bound type, or use its existing types
1039                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1040                            if let Some(existing) = self
1041                                .catalog
1042                                .schema()
1043                                .and_then(|s| s.get_graph_type(&type_name))
1044                            {
1045                                (
1046                                    existing.allowed_node_types.clone(),
1047                                    existing.allowed_edge_types.clone(),
1048                                    existing.open,
1049                                )
1050                            } else {
1051                                (Vec::new(), Vec::new(), true)
1052                            }
1053                        } else {
1054                            // GG22: Infer from graph data (labels used in graph)
1055                            let nt = self.catalog.all_node_type_names();
1056                            let et = self.catalog.all_edge_type_names();
1057                            if nt.is_empty() && et.is_empty() {
1058                                (Vec::new(), Vec::new(), true)
1059                            } else {
1060                                (nt, et, false)
1061                            }
1062                        }
1063                    } else {
1064                        (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1065                    };
1066
1067                // GG03: Register inline element types and add their names
1068                for inline in &stmt.inline_types {
1069                    match inline {
1070                        InlineElementType::Node {
1071                            name,
1072                            properties,
1073                            key_labels,
1074                            ..
1075                        } => {
1076                            let def = NodeTypeDefinition {
1077                                name: name.clone(),
1078                                properties: properties
1079                                    .iter()
1080                                    .map(|p| TypedProperty {
1081                                        name: p.name.clone(),
1082                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1083                                        nullable: p.nullable,
1084                                        default_value: None,
1085                                    })
1086                                    .collect(),
1087                                constraints: Vec::new(),
1088                                parent_types: key_labels.clone(),
1089                            };
1090                            // Register or replace so inline defs override existing
1091                            self.catalog.register_or_replace_node_type(def);
1092                            #[cfg(feature = "wal")]
1093                            {
1094                                let props_for_wal: Vec<(String, String, bool)> = properties
1095                                    .iter()
1096                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1097                                    .collect();
1098                                self.log_schema_wal(&WalRecord::CreateNodeType {
1099                                    name: name.clone(),
1100                                    properties: props_for_wal,
1101                                    constraints: Vec::new(),
1102                                });
1103                            }
1104                            if !node_types.contains(name) {
1105                                node_types.push(name.clone());
1106                            }
1107                        }
1108                        InlineElementType::Edge {
1109                            name,
1110                            properties,
1111                            source_node_types,
1112                            target_node_types,
1113                            ..
1114                        } => {
1115                            let def = EdgeTypeDefinition {
1116                                name: name.clone(),
1117                                properties: properties
1118                                    .iter()
1119                                    .map(|p| TypedProperty {
1120                                        name: p.name.clone(),
1121                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1122                                        nullable: p.nullable,
1123                                        default_value: None,
1124                                    })
1125                                    .collect(),
1126                                constraints: Vec::new(),
1127                                source_node_types: source_node_types.clone(),
1128                                target_node_types: target_node_types.clone(),
1129                            };
1130                            self.catalog.register_or_replace_edge_type_def(def);
1131                            #[cfg(feature = "wal")]
1132                            {
1133                                let props_for_wal: Vec<(String, String, bool)> = properties
1134                                    .iter()
1135                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1136                                    .collect();
1137                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1138                                    name: name.clone(),
1139                                    properties: props_for_wal,
1140                                    constraints: Vec::new(),
1141                                });
1142                            }
1143                            if !edge_types.contains(name) {
1144                                edge_types.push(name.clone());
1145                            }
1146                        }
1147                    }
1148                }
1149
1150                let def = GraphTypeDefinition {
1151                    name: stmt.name.clone(),
1152                    allowed_node_types: node_types.clone(),
1153                    allowed_edge_types: edge_types.clone(),
1154                    open,
1155                };
1156                let result = if stmt.or_replace {
1157                    // Drop existing first, ignore error if not found
1158                    let _ = self.catalog.drop_graph_type(&stmt.name);
1159                    self.catalog.register_graph_type(def)
1160                } else {
1161                    self.catalog.register_graph_type(def)
1162                };
1163                match result {
1164                    Ok(()) => {
1165                        wal_log!(
1166                            self,
1167                            WalRecord::CreateGraphType {
1168                                name: stmt.name.clone(),
1169                                node_types,
1170                                edge_types,
1171                                open,
1172                            }
1173                        );
1174                        Ok(QueryResult::status(format!(
1175                            "Created graph type '{}'",
1176                            stmt.name
1177                        )))
1178                    }
1179                    Err(e) if stmt.if_not_exists => {
1180                        let _ = e;
1181                        Ok(QueryResult::status("No change"))
1182                    }
1183                    Err(e) => Err(Error::Query(QueryError::new(
1184                        QueryErrorKind::Semantic,
1185                        e.to_string(),
1186                    ))),
1187                }
1188            }
1189            SchemaStatement::DropGraphType { name, if_exists } => {
1190                match self.catalog.drop_graph_type(&name) {
1191                    Ok(()) => {
1192                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1193                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1194                    }
1195                    Err(e) if if_exists => {
1196                        let _ = e;
1197                        Ok(QueryResult::status("No change"))
1198                    }
1199                    Err(e) => Err(Error::Query(QueryError::new(
1200                        QueryErrorKind::Semantic,
1201                        e.to_string(),
1202                    ))),
1203                }
1204            }
1205            SchemaStatement::CreateSchema {
1206                name,
1207                if_not_exists,
1208            } => match self.catalog.register_schema_namespace(name.clone()) {
1209                Ok(()) => {
1210                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1211                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1212                }
1213                Err(e) if if_not_exists => {
1214                    let _ = e;
1215                    Ok(QueryResult::status("No change"))
1216                }
1217                Err(e) => Err(Error::Query(QueryError::new(
1218                    QueryErrorKind::Semantic,
1219                    e.to_string(),
1220                ))),
1221            },
1222            SchemaStatement::DropSchema { name, if_exists } => {
1223                match self.catalog.drop_schema_namespace(&name) {
1224                    Ok(()) => {
1225                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1226                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1227                    }
1228                    Err(e) if if_exists => {
1229                        let _ = e;
1230                        Ok(QueryResult::status("No change"))
1231                    }
1232                    Err(e) => Err(Error::Query(QueryError::new(
1233                        QueryErrorKind::Semantic,
1234                        e.to_string(),
1235                    ))),
1236                }
1237            }
1238            SchemaStatement::AlterNodeType(stmt) => {
1239                use grafeo_adapters::query::gql::ast::TypeAlteration;
1240                let mut wal_alts = Vec::new();
1241                for alt in &stmt.alterations {
1242                    match alt {
1243                        TypeAlteration::AddProperty(prop) => {
1244                            let typed = TypedProperty {
1245                                name: prop.name.clone(),
1246                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1247                                nullable: prop.nullable,
1248                                default_value: prop
1249                                    .default_value
1250                                    .as_ref()
1251                                    .map(|s| parse_default_literal(s)),
1252                            };
1253                            self.catalog
1254                                .alter_node_type_add_property(&stmt.name, typed)
1255                                .map_err(|e| {
1256                                    Error::Query(QueryError::new(
1257                                        QueryErrorKind::Semantic,
1258                                        e.to_string(),
1259                                    ))
1260                                })?;
1261                            wal_alts.push((
1262                                "add".to_string(),
1263                                prop.name.clone(),
1264                                prop.data_type.clone(),
1265                                prop.nullable,
1266                            ));
1267                        }
1268                        TypeAlteration::DropProperty(name) => {
1269                            self.catalog
1270                                .alter_node_type_drop_property(&stmt.name, name)
1271                                .map_err(|e| {
1272                                    Error::Query(QueryError::new(
1273                                        QueryErrorKind::Semantic,
1274                                        e.to_string(),
1275                                    ))
1276                                })?;
1277                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1278                        }
1279                    }
1280                }
1281                wal_log!(
1282                    self,
1283                    WalRecord::AlterNodeType {
1284                        name: stmt.name.clone(),
1285                        alterations: wal_alts,
1286                    }
1287                );
1288                Ok(QueryResult::status(format!(
1289                    "Altered node type '{}'",
1290                    stmt.name
1291                )))
1292            }
1293            SchemaStatement::AlterEdgeType(stmt) => {
1294                use grafeo_adapters::query::gql::ast::TypeAlteration;
1295                let mut wal_alts = Vec::new();
1296                for alt in &stmt.alterations {
1297                    match alt {
1298                        TypeAlteration::AddProperty(prop) => {
1299                            let typed = TypedProperty {
1300                                name: prop.name.clone(),
1301                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1302                                nullable: prop.nullable,
1303                                default_value: prop
1304                                    .default_value
1305                                    .as_ref()
1306                                    .map(|s| parse_default_literal(s)),
1307                            };
1308                            self.catalog
1309                                .alter_edge_type_add_property(&stmt.name, typed)
1310                                .map_err(|e| {
1311                                    Error::Query(QueryError::new(
1312                                        QueryErrorKind::Semantic,
1313                                        e.to_string(),
1314                                    ))
1315                                })?;
1316                            wal_alts.push((
1317                                "add".to_string(),
1318                                prop.name.clone(),
1319                                prop.data_type.clone(),
1320                                prop.nullable,
1321                            ));
1322                        }
1323                        TypeAlteration::DropProperty(name) => {
1324                            self.catalog
1325                                .alter_edge_type_drop_property(&stmt.name, name)
1326                                .map_err(|e| {
1327                                    Error::Query(QueryError::new(
1328                                        QueryErrorKind::Semantic,
1329                                        e.to_string(),
1330                                    ))
1331                                })?;
1332                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1333                        }
1334                    }
1335                }
1336                wal_log!(
1337                    self,
1338                    WalRecord::AlterEdgeType {
1339                        name: stmt.name.clone(),
1340                        alterations: wal_alts,
1341                    }
1342                );
1343                Ok(QueryResult::status(format!(
1344                    "Altered edge type '{}'",
1345                    stmt.name
1346                )))
1347            }
1348            SchemaStatement::AlterGraphType(stmt) => {
1349                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1350                let mut wal_alts = Vec::new();
1351                for alt in &stmt.alterations {
1352                    match alt {
1353                        GraphTypeAlteration::AddNodeType(name) => {
1354                            self.catalog
1355                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
1356                                .map_err(|e| {
1357                                    Error::Query(QueryError::new(
1358                                        QueryErrorKind::Semantic,
1359                                        e.to_string(),
1360                                    ))
1361                                })?;
1362                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1363                        }
1364                        GraphTypeAlteration::DropNodeType(name) => {
1365                            self.catalog
1366                                .alter_graph_type_drop_node_type(&stmt.name, name)
1367                                .map_err(|e| {
1368                                    Error::Query(QueryError::new(
1369                                        QueryErrorKind::Semantic,
1370                                        e.to_string(),
1371                                    ))
1372                                })?;
1373                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1374                        }
1375                        GraphTypeAlteration::AddEdgeType(name) => {
1376                            self.catalog
1377                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1378                                .map_err(|e| {
1379                                    Error::Query(QueryError::new(
1380                                        QueryErrorKind::Semantic,
1381                                        e.to_string(),
1382                                    ))
1383                                })?;
1384                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1385                        }
1386                        GraphTypeAlteration::DropEdgeType(name) => {
1387                            self.catalog
1388                                .alter_graph_type_drop_edge_type(&stmt.name, name)
1389                                .map_err(|e| {
1390                                    Error::Query(QueryError::new(
1391                                        QueryErrorKind::Semantic,
1392                                        e.to_string(),
1393                                    ))
1394                                })?;
1395                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1396                        }
1397                    }
1398                }
1399                wal_log!(
1400                    self,
1401                    WalRecord::AlterGraphType {
1402                        name: stmt.name.clone(),
1403                        alterations: wal_alts,
1404                    }
1405                );
1406                Ok(QueryResult::status(format!(
1407                    "Altered graph type '{}'",
1408                    stmt.name
1409                )))
1410            }
1411            SchemaStatement::CreateProcedure(stmt) => {
1412                use crate::catalog::ProcedureDefinition;
1413
1414                let def = ProcedureDefinition {
1415                    name: stmt.name.clone(),
1416                    params: stmt
1417                        .params
1418                        .iter()
1419                        .map(|p| (p.name.clone(), p.param_type.clone()))
1420                        .collect(),
1421                    returns: stmt
1422                        .returns
1423                        .iter()
1424                        .map(|r| (r.name.clone(), r.return_type.clone()))
1425                        .collect(),
1426                    body: stmt.body.clone(),
1427                };
1428
1429                if stmt.or_replace {
1430                    self.catalog.replace_procedure(def).map_err(|e| {
1431                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1432                    })?;
1433                } else {
1434                    match self.catalog.register_procedure(def) {
1435                        Ok(()) => {}
1436                        Err(_) if stmt.if_not_exists => {
1437                            return Ok(QueryResult::empty());
1438                        }
1439                        Err(e) => {
1440                            return Err(Error::Query(QueryError::new(
1441                                QueryErrorKind::Semantic,
1442                                e.to_string(),
1443                            )));
1444                        }
1445                    }
1446                }
1447
1448                wal_log!(
1449                    self,
1450                    WalRecord::CreateProcedure {
1451                        name: stmt.name.clone(),
1452                        params: stmt
1453                            .params
1454                            .iter()
1455                            .map(|p| (p.name.clone(), p.param_type.clone()))
1456                            .collect(),
1457                        returns: stmt
1458                            .returns
1459                            .iter()
1460                            .map(|r| (r.name.clone(), r.return_type.clone()))
1461                            .collect(),
1462                        body: stmt.body,
1463                    }
1464                );
1465                Ok(QueryResult::status(format!(
1466                    "Created procedure '{}'",
1467                    stmt.name
1468                )))
1469            }
1470            SchemaStatement::DropProcedure { name, if_exists } => {
1471                match self.catalog.drop_procedure(&name) {
1472                    Ok(()) => {}
1473                    Err(_) if if_exists => {
1474                        return Ok(QueryResult::empty());
1475                    }
1476                    Err(e) => {
1477                        return Err(Error::Query(QueryError::new(
1478                            QueryErrorKind::Semantic,
1479                            e.to_string(),
1480                        )));
1481                    }
1482                }
1483                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1484                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1485            }
1486            SchemaStatement::ShowIndexes => {
1487                return self.execute_show_indexes();
1488            }
1489            SchemaStatement::ShowConstraints => {
1490                return self.execute_show_constraints();
1491            }
1492            SchemaStatement::ShowNodeTypes => {
1493                return self.execute_show_node_types();
1494            }
1495            SchemaStatement::ShowEdgeTypes => {
1496                return self.execute_show_edge_types();
1497            }
1498            SchemaStatement::ShowGraphTypes => {
1499                return self.execute_show_graph_types();
1500            }
1501            SchemaStatement::ShowGraphType(name) => {
1502                return self.execute_show_graph_type(&name);
1503            }
1504            SchemaStatement::ShowCurrentGraphType => {
1505                return self.execute_show_current_graph_type();
1506            }
1507            SchemaStatement::ShowGraphs => {
1508                return self.execute_show_graphs();
1509            }
1510        };
1511
1512        // Invalidate all cached query plans after any successful DDL change.
1513        // DDL is rare, so clearing the entire cache is cheap and correct.
1514        if result.is_ok() {
1515            self.query_cache.clear();
1516        }
1517
1518        result
1519    }
1520
1521    /// Creates a vector index on the store by scanning existing nodes.
1522    #[cfg(all(feature = "gql", feature = "vector-index"))]
1523    fn create_vector_index_on_store(
1524        store: &LpgStore,
1525        label: &str,
1526        property: &str,
1527        dimensions: Option<usize>,
1528        metric: Option<&str>,
1529    ) -> Result<()> {
1530        use grafeo_common::types::{PropertyKey, Value};
1531        use grafeo_common::utils::error::Error;
1532        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1533
1534        let metric = match metric {
1535            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1536                Error::Internal(format!(
1537                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1538                ))
1539            })?,
1540            None => DistanceMetric::Cosine,
1541        };
1542
1543        let prop_key = PropertyKey::new(property);
1544        let mut found_dims: Option<usize> = dimensions;
1545        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1546
1547        for node in store.nodes_with_label(label) {
1548            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1549                if let Some(expected) = found_dims {
1550                    if v.len() != expected {
1551                        return Err(Error::Internal(format!(
1552                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1553                            v.len(),
1554                            node.id.0
1555                        )));
1556                    }
1557                } else {
1558                    found_dims = Some(v.len());
1559                }
1560                vectors.push((node.id, v.to_vec()));
1561            }
1562        }
1563
1564        let Some(dims) = found_dims else {
1565            return Err(Error::Internal(format!(
1566                "No vector properties found on :{label}({property}) and no dimensions specified"
1567            )));
1568        };
1569
1570        let config = HnswConfig::new(dims, metric);
1571        let index = HnswIndex::with_capacity(config, vectors.len());
1572        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1573        for (node_id, vec) in &vectors {
1574            index.insert(*node_id, vec, &accessor);
1575        }
1576
1577        store.add_vector_index(label, property, Arc::new(index));
1578        Ok(())
1579    }
1580
1581    /// Stub for when vector-index feature is not enabled.
1582    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1583    fn create_vector_index_on_store(
1584        _store: &LpgStore,
1585        _label: &str,
1586        _property: &str,
1587        _dimensions: Option<usize>,
1588        _metric: Option<&str>,
1589    ) -> Result<()> {
1590        Err(grafeo_common::utils::error::Error::Internal(
1591            "Vector index support requires the 'vector-index' feature".to_string(),
1592        ))
1593    }
1594
1595    /// Creates a text index on the store by scanning existing nodes.
1596    #[cfg(all(feature = "gql", feature = "text-index"))]
1597    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1598        use grafeo_common::types::{PropertyKey, Value};
1599        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1600
1601        let mut index = InvertedIndex::new(BM25Config::default());
1602        let prop_key = PropertyKey::new(property);
1603
1604        let nodes = store.nodes_by_label(label);
1605        for node_id in nodes {
1606            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1607                index.insert(node_id, text.as_str());
1608            }
1609        }
1610
1611        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1612        Ok(())
1613    }
1614
1615    /// Stub for when text-index feature is not enabled.
1616    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1617    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1618        Err(grafeo_common::utils::error::Error::Internal(
1619            "Text index support requires the 'text-index' feature".to_string(),
1620        ))
1621    }
1622
1623    /// Returns a table of all indexes from the catalog.
1624    fn execute_show_indexes(&self) -> Result<QueryResult> {
1625        let indexes = self.catalog.all_indexes();
1626        let columns = vec![
1627            "name".to_string(),
1628            "type".to_string(),
1629            "label".to_string(),
1630            "property".to_string(),
1631        ];
1632        let rows: Vec<Vec<Value>> = indexes
1633            .into_iter()
1634            .map(|def| {
1635                let label_name = self
1636                    .catalog
1637                    .get_label_name(def.label)
1638                    .unwrap_or_else(|| "?".into());
1639                let prop_name = self
1640                    .catalog
1641                    .get_property_key_name(def.property_key)
1642                    .unwrap_or_else(|| "?".into());
1643                vec![
1644                    Value::from(format!("idx_{}_{}", label_name, prop_name)),
1645                    Value::from(format!("{:?}", def.index_type)),
1646                    Value::from(&*label_name),
1647                    Value::from(&*prop_name),
1648                ]
1649            })
1650            .collect();
1651        Ok(QueryResult {
1652            columns,
1653            column_types: Vec::new(),
1654            rows,
1655            ..QueryResult::empty()
1656        })
1657    }
1658
1659    /// Returns a table of all constraints (currently metadata-only).
1660    fn execute_show_constraints(&self) -> Result<QueryResult> {
1661        // Constraints are tracked in WAL but not yet in a queryable catalog.
1662        // Return an empty table with the expected schema.
1663        Ok(QueryResult {
1664            columns: vec![
1665                "name".to_string(),
1666                "type".to_string(),
1667                "label".to_string(),
1668                "properties".to_string(),
1669            ],
1670            column_types: Vec::new(),
1671            rows: Vec::new(),
1672            ..QueryResult::empty()
1673        })
1674    }
1675
1676    /// Returns a table of all registered node types.
1677    fn execute_show_node_types(&self) -> Result<QueryResult> {
1678        let columns = vec![
1679            "name".to_string(),
1680            "properties".to_string(),
1681            "constraints".to_string(),
1682            "parents".to_string(),
1683        ];
1684        let type_names = self.catalog.all_node_type_names();
1685        let rows: Vec<Vec<Value>> = type_names
1686            .into_iter()
1687            .filter_map(|name| {
1688                let def = self.catalog.get_node_type(&name)?;
1689                let props: Vec<String> = def
1690                    .properties
1691                    .iter()
1692                    .map(|p| {
1693                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1694                        format!("{} {}{}", p.name, p.data_type, nullable)
1695                    })
1696                    .collect();
1697                let constraints: Vec<String> =
1698                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
1699                let parents = def.parent_types.join(", ");
1700                Some(vec![
1701                    Value::from(name),
1702                    Value::from(props.join(", ")),
1703                    Value::from(constraints.join(", ")),
1704                    Value::from(parents),
1705                ])
1706            })
1707            .collect();
1708        Ok(QueryResult {
1709            columns,
1710            column_types: Vec::new(),
1711            rows,
1712            ..QueryResult::empty()
1713        })
1714    }
1715
1716    /// Returns a table of all registered edge types.
1717    fn execute_show_edge_types(&self) -> Result<QueryResult> {
1718        let columns = vec![
1719            "name".to_string(),
1720            "properties".to_string(),
1721            "source_types".to_string(),
1722            "target_types".to_string(),
1723        ];
1724        let type_names = self.catalog.all_edge_type_names();
1725        let rows: Vec<Vec<Value>> = type_names
1726            .into_iter()
1727            .filter_map(|name| {
1728                let def = self.catalog.get_edge_type_def(&name)?;
1729                let props: Vec<String> = def
1730                    .properties
1731                    .iter()
1732                    .map(|p| {
1733                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1734                        format!("{} {}{}", p.name, p.data_type, nullable)
1735                    })
1736                    .collect();
1737                let src = def.source_node_types.join(", ");
1738                let tgt = def.target_node_types.join(", ");
1739                Some(vec![
1740                    Value::from(name),
1741                    Value::from(props.join(", ")),
1742                    Value::from(src),
1743                    Value::from(tgt),
1744                ])
1745            })
1746            .collect();
1747        Ok(QueryResult {
1748            columns,
1749            column_types: Vec::new(),
1750            rows,
1751            ..QueryResult::empty()
1752        })
1753    }
1754
1755    /// Returns a table of all registered graph types.
1756    fn execute_show_graph_types(&self) -> Result<QueryResult> {
1757        let columns = vec![
1758            "name".to_string(),
1759            "open".to_string(),
1760            "node_types".to_string(),
1761            "edge_types".to_string(),
1762        ];
1763        let type_names = self.catalog.all_graph_type_names();
1764        let rows: Vec<Vec<Value>> = type_names
1765            .into_iter()
1766            .filter_map(|name| {
1767                let def = self.catalog.get_graph_type_def(&name)?;
1768                Some(vec![
1769                    Value::from(name),
1770                    Value::from(def.open),
1771                    Value::from(def.allowed_node_types.join(", ")),
1772                    Value::from(def.allowed_edge_types.join(", ")),
1773                ])
1774            })
1775            .collect();
1776        Ok(QueryResult {
1777            columns,
1778            column_types: Vec::new(),
1779            rows,
1780            ..QueryResult::empty()
1781        })
1782    }
1783
1784    /// Returns the list of named graphs in the database.
1785    fn execute_show_graphs(&self) -> Result<QueryResult> {
1786        let mut names = self.store.graph_names();
1787        names.sort();
1788        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1789        Ok(QueryResult {
1790            columns: vec!["name".to_string()],
1791            column_types: Vec::new(),
1792            rows,
1793            ..QueryResult::empty()
1794        })
1795    }
1796
1797    /// Returns detailed info for a specific graph type.
1798    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1799        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1800
1801        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1802            Error::Query(QueryError::new(
1803                QueryErrorKind::Semantic,
1804                format!("Graph type '{name}' not found"),
1805            ))
1806        })?;
1807
1808        let columns = vec![
1809            "name".to_string(),
1810            "open".to_string(),
1811            "node_types".to_string(),
1812            "edge_types".to_string(),
1813        ];
1814        let rows = vec![vec![
1815            Value::from(def.name),
1816            Value::from(def.open),
1817            Value::from(def.allowed_node_types.join(", ")),
1818            Value::from(def.allowed_edge_types.join(", ")),
1819        ]];
1820        Ok(QueryResult {
1821            columns,
1822            column_types: Vec::new(),
1823            rows,
1824            ..QueryResult::empty()
1825        })
1826    }
1827
1828    /// Returns the graph type bound to the current graph.
1829    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1830        let graph_name = self
1831            .current_graph()
1832            .unwrap_or_else(|| "default".to_string());
1833        let columns = vec![
1834            "graph".to_string(),
1835            "graph_type".to_string(),
1836            "open".to_string(),
1837            "node_types".to_string(),
1838            "edge_types".to_string(),
1839        ];
1840
1841        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
1842            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
1843        {
1844            let rows = vec![vec![
1845                Value::from(graph_name),
1846                Value::from(type_name),
1847                Value::from(def.open),
1848                Value::from(def.allowed_node_types.join(", ")),
1849                Value::from(def.allowed_edge_types.join(", ")),
1850            ]];
1851            return Ok(QueryResult {
1852                columns,
1853                column_types: Vec::new(),
1854                rows,
1855                ..QueryResult::empty()
1856            });
1857        }
1858
1859        // No graph type binding found
1860        Ok(QueryResult {
1861            columns,
1862            column_types: Vec::new(),
1863            rows: vec![vec![
1864                Value::from(graph_name),
1865                Value::Null,
1866                Value::Null,
1867                Value::Null,
1868                Value::Null,
1869            ]],
1870            ..QueryResult::empty()
1871        })
1872    }
1873
1874    /// Executes a GQL query.
1875    ///
1876    /// # Errors
1877    ///
1878    /// Returns an error if the query fails to parse or execute.
1879    ///
1880    /// # Examples
1881    ///
1882    /// ```no_run
1883    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1884    /// use grafeo_engine::GrafeoDB;
1885    ///
1886    /// let db = GrafeoDB::new_in_memory();
1887    /// let session = db.session();
1888    ///
1889    /// // Create a node
1890    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
1891    ///
1892    /// // Query nodes
1893    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
1894    /// for row in &result.rows {
1895    ///     println!("{:?}", row);
1896    /// }
1897    /// # Ok(())
1898    /// # }
1899    /// ```
1900    #[cfg(feature = "gql")]
1901    pub fn execute(&self, query: &str) -> Result<QueryResult> {
1902        self.require_lpg("GQL")?;
1903
1904        use crate::query::{
1905            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1906            processor::QueryLanguage, translators::gql,
1907        };
1908
1909        #[cfg(not(target_arch = "wasm32"))]
1910        let start_time = std::time::Instant::now();
1911
1912        // Parse and translate, checking for session/schema commands first
1913        let translation = gql::translate_full(query)?;
1914        let logical_plan = match translation {
1915            gql::GqlTranslationResult::SessionCommand(cmd) => {
1916                return self.execute_session_command(cmd);
1917            }
1918            gql::GqlTranslationResult::SchemaCommand(cmd) => {
1919                // All DDL is a write operation
1920                if *self.read_only_tx.lock() {
1921                    return Err(grafeo_common::utils::error::Error::Transaction(
1922                        grafeo_common::utils::error::TransactionError::ReadOnly,
1923                    ));
1924                }
1925                return self.execute_schema_command(cmd);
1926            }
1927            gql::GqlTranslationResult::Plan(plan) => {
1928                // Block mutations in read-only transactions
1929                if *self.read_only_tx.lock() && plan.root.has_mutations() {
1930                    return Err(grafeo_common::utils::error::Error::Transaction(
1931                        grafeo_common::utils::error::TransactionError::ReadOnly,
1932                    ));
1933                }
1934                plan
1935            }
1936        };
1937
1938        // Create cache key for this query
1939        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
1940
1941        // Try to get cached optimized plan, or use the plan we just translated
1942        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1943            cached_plan
1944        } else {
1945            // Semantic validation
1946            let mut binder = Binder::new();
1947            let _binding_context = binder.bind(&logical_plan)?;
1948
1949            // Optimize the plan
1950            let active = self.active_store();
1951            let optimizer = Optimizer::from_graph_store(&*active);
1952            let plan = optimizer.optimize(logical_plan)?;
1953
1954            // Cache the optimized plan for future use
1955            self.query_cache.put_optimized(cache_key, plan.clone());
1956
1957            plan
1958        };
1959
1960        // Resolve the active store for query execution
1961        let active = self.active_store();
1962
1963        // EXPLAIN: annotate pushdown hints and return the plan tree
1964        if optimized_plan.explain {
1965            use crate::query::processor::{annotate_pushdown_hints, explain_result};
1966            let mut plan = optimized_plan;
1967            annotate_pushdown_hints(&mut plan.root, active.as_ref());
1968            return Ok(explain_result(&plan));
1969        }
1970
1971        // PROFILE: execute with per-operator instrumentation
1972        if optimized_plan.profile {
1973            let has_mutations = optimized_plan.root.has_mutations();
1974            return self.with_auto_commit(has_mutations, || {
1975                let (viewing_epoch, transaction_id) = self.get_transaction_context();
1976                let planner = self.create_planner_for_store(
1977                    Arc::clone(&active),
1978                    viewing_epoch,
1979                    transaction_id,
1980                );
1981                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
1982
1983                let executor = Executor::with_columns(physical_plan.columns.clone())
1984                    .with_deadline(self.query_deadline());
1985                let _result = executor.execute(physical_plan.operator.as_mut())?;
1986
1987                let total_time_ms;
1988                #[cfg(not(target_arch = "wasm32"))]
1989                {
1990                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1991                }
1992                #[cfg(target_arch = "wasm32")]
1993                {
1994                    total_time_ms = 0.0;
1995                }
1996
1997                let profile_tree = crate::query::profile::build_profile_tree(
1998                    &optimized_plan.root,
1999                    &mut entries.into_iter(),
2000                );
2001                Ok(crate::query::profile::profile_result(
2002                    &profile_tree,
2003                    total_time_ms,
2004                ))
2005            });
2006        }
2007
2008        let has_mutations = optimized_plan.root.has_mutations();
2009
2010        self.with_auto_commit(has_mutations, || {
2011            // Get transaction context for MVCC visibility
2012            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2013
2014            // Convert to physical plan with transaction context
2015            // (Physical planning cannot be cached as it depends on transaction state)
2016            let planner =
2017                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2018            let mut physical_plan = planner.plan(&optimized_plan)?;
2019
2020            // Execute the plan
2021            let executor = Executor::with_columns(physical_plan.columns.clone())
2022                .with_deadline(self.query_deadline());
2023            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2024
2025            // Add execution metrics
2026            let rows_scanned = result.rows.len() as u64;
2027            #[cfg(not(target_arch = "wasm32"))]
2028            {
2029                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2030                result.execution_time_ms = Some(elapsed_ms);
2031            }
2032            result.rows_scanned = Some(rows_scanned);
2033
2034            Ok(result)
2035        })
2036    }
2037
2038    /// Executes a GQL query with visibility at the specified epoch.
2039    ///
2040    /// This enables time-travel queries: the query sees the database
2041    /// as it existed at the given epoch.
2042    ///
2043    /// # Errors
2044    ///
2045    /// Returns an error if parsing or execution fails.
2046    #[cfg(feature = "gql")]
2047    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2048        let previous = self.viewing_epoch_override.lock().replace(epoch);
2049        let result = self.execute(query);
2050        *self.viewing_epoch_override.lock() = previous;
2051        result
2052    }
2053
2054    /// Executes a GQL query with parameters.
2055    ///
2056    /// # Errors
2057    ///
2058    /// Returns an error if the query fails to parse or execute.
2059    #[cfg(feature = "gql")]
2060    pub fn execute_with_params(
2061        &self,
2062        query: &str,
2063        params: std::collections::HashMap<String, Value>,
2064    ) -> Result<QueryResult> {
2065        self.require_lpg("GQL")?;
2066
2067        use crate::query::processor::{QueryLanguage, QueryProcessor};
2068
2069        let has_mutations = Self::query_looks_like_mutation(query);
2070        let active = self.active_store();
2071
2072        self.with_auto_commit(has_mutations, || {
2073            // Get transaction context for MVCC visibility
2074            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2075
2076            // Create processor with transaction context
2077            let processor = QueryProcessor::for_graph_store_with_transaction(
2078                Arc::clone(&active),
2079                Arc::clone(&self.transaction_manager),
2080            )?;
2081
2082            // Apply transaction context if in a transaction
2083            let processor = if let Some(transaction_id) = transaction_id {
2084                processor.with_transaction_context(viewing_epoch, transaction_id)
2085            } else {
2086                processor
2087            };
2088
2089            processor.process(query, QueryLanguage::Gql, Some(&params))
2090        })
2091    }
2092
2093    /// Executes a GQL query with parameters.
2094    ///
2095    /// # Errors
2096    ///
2097    /// Returns an error if no query language is enabled.
2098    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2099    pub fn execute_with_params(
2100        &self,
2101        _query: &str,
2102        _params: std::collections::HashMap<String, Value>,
2103    ) -> Result<QueryResult> {
2104        Err(grafeo_common::utils::error::Error::Internal(
2105            "No query language enabled".to_string(),
2106        ))
2107    }
2108
2109    /// Executes a GQL query.
2110    ///
2111    /// # Errors
2112    ///
2113    /// Returns an error if no query language is enabled.
2114    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2115    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2116        Err(grafeo_common::utils::error::Error::Internal(
2117            "No query language enabled".to_string(),
2118        ))
2119    }
2120
2121    /// Executes a Cypher query.
2122    ///
2123    /// # Errors
2124    ///
2125    /// Returns an error if the query fails to parse or execute.
2126    #[cfg(feature = "cypher")]
2127    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2128        use crate::query::{
2129            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2130            processor::QueryLanguage, translators::cypher,
2131        };
2132        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2133
2134        // Handle schema DDL and SHOW commands before the normal query path
2135        let translation = cypher::translate_full(query)?;
2136        match translation {
2137            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2138                if *self.read_only_tx.lock() {
2139                    return Err(GrafeoError::Query(QueryError::new(
2140                        QueryErrorKind::Semantic,
2141                        "Cannot execute schema DDL in a read-only transaction",
2142                    )));
2143                }
2144                return self.execute_schema_command(cmd);
2145            }
2146            cypher::CypherTranslationResult::ShowIndexes => {
2147                return self.execute_show_indexes();
2148            }
2149            cypher::CypherTranslationResult::ShowConstraints => {
2150                return self.execute_show_constraints();
2151            }
2152            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2153                return self.execute_show_current_graph_type();
2154            }
2155            cypher::CypherTranslationResult::Plan(_) => {
2156                // Fall through to normal execution below
2157            }
2158        }
2159
2160        #[cfg(not(target_arch = "wasm32"))]
2161        let start_time = std::time::Instant::now();
2162
2163        // Create cache key for this query
2164        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2165
2166        // Try to get cached optimized plan
2167        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2168            cached_plan
2169        } else {
2170            // Parse and translate the query to a logical plan
2171            let logical_plan = cypher::translate(query)?;
2172
2173            // Semantic validation
2174            let mut binder = Binder::new();
2175            let _binding_context = binder.bind(&logical_plan)?;
2176
2177            // Optimize the plan
2178            let active = self.active_store();
2179            let optimizer = Optimizer::from_graph_store(&*active);
2180            let plan = optimizer.optimize(logical_plan)?;
2181
2182            // Cache the optimized plan
2183            self.query_cache.put_optimized(cache_key, plan.clone());
2184
2185            plan
2186        };
2187
2188        // Resolve the active store for query execution
2189        let active = self.active_store();
2190
2191        // EXPLAIN
2192        if optimized_plan.explain {
2193            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2194            let mut plan = optimized_plan;
2195            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2196            return Ok(explain_result(&plan));
2197        }
2198
2199        // PROFILE
2200        if optimized_plan.profile {
2201            let has_mutations = optimized_plan.root.has_mutations();
2202            return self.with_auto_commit(has_mutations, || {
2203                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2204                let planner = self.create_planner_for_store(
2205                    Arc::clone(&active),
2206                    viewing_epoch,
2207                    transaction_id,
2208                );
2209                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2210
2211                let executor = Executor::with_columns(physical_plan.columns.clone())
2212                    .with_deadline(self.query_deadline());
2213                let _result = executor.execute(physical_plan.operator.as_mut())?;
2214
2215                let total_time_ms;
2216                #[cfg(not(target_arch = "wasm32"))]
2217                {
2218                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2219                }
2220                #[cfg(target_arch = "wasm32")]
2221                {
2222                    total_time_ms = 0.0;
2223                }
2224
2225                let profile_tree = crate::query::profile::build_profile_tree(
2226                    &optimized_plan.root,
2227                    &mut entries.into_iter(),
2228                );
2229                Ok(crate::query::profile::profile_result(
2230                    &profile_tree,
2231                    total_time_ms,
2232                ))
2233            });
2234        }
2235
2236        let has_mutations = optimized_plan.root.has_mutations();
2237
2238        self.with_auto_commit(has_mutations, || {
2239            // Get transaction context for MVCC visibility
2240            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2241
2242            // Convert to physical plan with transaction context
2243            let planner =
2244                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2245            let mut physical_plan = planner.plan(&optimized_plan)?;
2246
2247            // Execute the plan
2248            let executor = Executor::with_columns(physical_plan.columns.clone())
2249                .with_deadline(self.query_deadline());
2250            executor.execute(physical_plan.operator.as_mut())
2251        })
2252    }
2253
2254    /// Executes a Gremlin query.
2255    ///
2256    /// # Errors
2257    ///
2258    /// Returns an error if the query fails to parse or execute.
2259    ///
2260    /// # Examples
2261    ///
2262    /// ```no_run
2263    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2264    /// use grafeo_engine::GrafeoDB;
2265    ///
2266    /// let db = GrafeoDB::new_in_memory();
2267    /// let session = db.session();
2268    ///
2269    /// // Create some nodes first
2270    /// session.create_node(&["Person"]);
2271    ///
2272    /// // Query using Gremlin
2273    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2274    /// # Ok(())
2275    /// # }
2276    /// ```
2277    #[cfg(feature = "gremlin")]
2278    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2279        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2280
2281        // Parse and translate the query to a logical plan
2282        let logical_plan = gremlin::translate(query)?;
2283
2284        // Semantic validation
2285        let mut binder = Binder::new();
2286        let _binding_context = binder.bind(&logical_plan)?;
2287
2288        // Optimize the plan
2289        let active = self.active_store();
2290        let optimizer = Optimizer::from_graph_store(&*active);
2291        let optimized_plan = optimizer.optimize(logical_plan)?;
2292
2293        let has_mutations = optimized_plan.root.has_mutations();
2294
2295        self.with_auto_commit(has_mutations, || {
2296            // Get transaction context for MVCC visibility
2297            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2298
2299            // Convert to physical plan with transaction context
2300            let planner =
2301                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2302            let mut physical_plan = planner.plan(&optimized_plan)?;
2303
2304            // Execute the plan
2305            let executor = Executor::with_columns(physical_plan.columns.clone())
2306                .with_deadline(self.query_deadline());
2307            executor.execute(physical_plan.operator.as_mut())
2308        })
2309    }
2310
2311    /// Executes a Gremlin query with parameters.
2312    ///
2313    /// # Errors
2314    ///
2315    /// Returns an error if the query fails to parse or execute.
2316    #[cfg(feature = "gremlin")]
2317    pub fn execute_gremlin_with_params(
2318        &self,
2319        query: &str,
2320        params: std::collections::HashMap<String, Value>,
2321    ) -> Result<QueryResult> {
2322        use crate::query::processor::{QueryLanguage, QueryProcessor};
2323
2324        let has_mutations = Self::query_looks_like_mutation(query);
2325        let active = self.active_store();
2326
2327        self.with_auto_commit(has_mutations, || {
2328            // Get transaction context for MVCC visibility
2329            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2330
2331            // Create processor with transaction context
2332            let processor = QueryProcessor::for_graph_store_with_transaction(
2333                Arc::clone(&active),
2334                Arc::clone(&self.transaction_manager),
2335            )?;
2336
2337            // Apply transaction context if in a transaction
2338            let processor = if let Some(transaction_id) = transaction_id {
2339                processor.with_transaction_context(viewing_epoch, transaction_id)
2340            } else {
2341                processor
2342            };
2343
2344            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2345        })
2346    }
2347
2348    /// Executes a GraphQL query against the LPG store.
2349    ///
2350    /// # Errors
2351    ///
2352    /// Returns an error if the query fails to parse or execute.
2353    ///
2354    /// # Examples
2355    ///
2356    /// ```no_run
2357    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2358    /// use grafeo_engine::GrafeoDB;
2359    ///
2360    /// let db = GrafeoDB::new_in_memory();
2361    /// let session = db.session();
2362    ///
2363    /// // Create some nodes first
2364    /// session.create_node(&["User"]);
2365    ///
2366    /// // Query using GraphQL
2367    /// let result = session.execute_graphql("query { user { id name } }")?;
2368    /// # Ok(())
2369    /// # }
2370    /// ```
2371    #[cfg(feature = "graphql")]
2372    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2373        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2374
2375        // Parse and translate the query to a logical plan
2376        let logical_plan = graphql::translate(query)?;
2377
2378        // Semantic validation
2379        let mut binder = Binder::new();
2380        let _binding_context = binder.bind(&logical_plan)?;
2381
2382        // Optimize the plan
2383        let active = self.active_store();
2384        let optimizer = Optimizer::from_graph_store(&*active);
2385        let optimized_plan = optimizer.optimize(logical_plan)?;
2386
2387        let has_mutations = optimized_plan.root.has_mutations();
2388
2389        self.with_auto_commit(has_mutations, || {
2390            // Get transaction context for MVCC visibility
2391            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2392
2393            // Convert to physical plan with transaction context
2394            let planner =
2395                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2396            let mut physical_plan = planner.plan(&optimized_plan)?;
2397
2398            // Execute the plan
2399            let executor = Executor::with_columns(physical_plan.columns.clone())
2400                .with_deadline(self.query_deadline());
2401            executor.execute(physical_plan.operator.as_mut())
2402        })
2403    }
2404
2405    /// Executes a GraphQL query with parameters.
2406    ///
2407    /// # Errors
2408    ///
2409    /// Returns an error if the query fails to parse or execute.
2410    #[cfg(feature = "graphql")]
2411    pub fn execute_graphql_with_params(
2412        &self,
2413        query: &str,
2414        params: std::collections::HashMap<String, Value>,
2415    ) -> Result<QueryResult> {
2416        use crate::query::processor::{QueryLanguage, QueryProcessor};
2417
2418        let has_mutations = Self::query_looks_like_mutation(query);
2419        let active = self.active_store();
2420
2421        self.with_auto_commit(has_mutations, || {
2422            // Get transaction context for MVCC visibility
2423            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2424
2425            // Create processor with transaction context
2426            let processor = QueryProcessor::for_graph_store_with_transaction(
2427                Arc::clone(&active),
2428                Arc::clone(&self.transaction_manager),
2429            )?;
2430
2431            // Apply transaction context if in a transaction
2432            let processor = if let Some(transaction_id) = transaction_id {
2433                processor.with_transaction_context(viewing_epoch, transaction_id)
2434            } else {
2435                processor
2436            };
2437
2438            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2439        })
2440    }
2441
2442    /// Executes a GraphQL query against the RDF store.
2443    ///
2444    /// # Errors
2445    ///
2446    /// Returns an error if the query fails to parse or execute.
2447    #[cfg(all(feature = "graphql", feature = "rdf"))]
2448    pub fn execute_graphql_rdf(&self, query: &str) -> Result<QueryResult> {
2449        use crate::query::{
2450            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::graphql_rdf,
2451        };
2452
2453        let logical_plan = graphql_rdf::translate(query, "http://example.org/")?;
2454
2455        let active = self.active_store();
2456        let optimizer = Optimizer::from_graph_store(&*active);
2457        let optimized_plan = optimizer.optimize(logical_plan)?;
2458
2459        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2460            .with_transaction_id(*self.current_transaction.lock());
2461        #[cfg(feature = "wal")]
2462        let planner = planner.with_wal(self.wal.clone());
2463        let mut physical_plan = planner.plan(&optimized_plan)?;
2464
2465        let executor = Executor::with_columns(physical_plan.columns.clone())
2466            .with_deadline(self.query_deadline());
2467        executor.execute(physical_plan.operator.as_mut())
2468    }
2469
2470    /// Executes a GraphQL query against the RDF store with parameters.
2471    ///
2472    /// # Errors
2473    ///
2474    /// Returns an error if the query fails to parse or execute.
2475    #[cfg(all(feature = "graphql", feature = "rdf"))]
2476    pub fn execute_graphql_rdf_with_params(
2477        &self,
2478        query: &str,
2479        params: std::collections::HashMap<String, Value>,
2480    ) -> Result<QueryResult> {
2481        use crate::query::processor::{QueryLanguage, QueryProcessor};
2482
2483        let has_mutations = Self::query_looks_like_mutation(query);
2484        let active = self.active_store();
2485
2486        self.with_auto_commit(has_mutations, || {
2487            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2488
2489            let processor = QueryProcessor::for_graph_store_with_transaction(
2490                Arc::clone(&active),
2491                Arc::clone(&self.transaction_manager),
2492            )?;
2493
2494            let processor = if let Some(transaction_id) = transaction_id {
2495                processor.with_transaction_context(viewing_epoch, transaction_id)
2496            } else {
2497                processor
2498            };
2499
2500            processor.process(query, QueryLanguage::GraphQLRdf, Some(&params))
2501        })
2502    }
2503
2504    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2505    ///
2506    /// # Errors
2507    ///
2508    /// Returns an error if the query fails to parse or execute.
2509    ///
2510    /// # Examples
2511    ///
2512    /// ```no_run
2513    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2514    /// use grafeo_engine::GrafeoDB;
2515    ///
2516    /// let db = GrafeoDB::new_in_memory();
2517    /// let session = db.session();
2518    ///
2519    /// let result = session.execute_sql(
2520    ///     "SELECT * FROM GRAPH_TABLE (
2521    ///         MATCH (n:Person)
2522    ///         COLUMNS (n.name AS name)
2523    ///     )"
2524    /// )?;
2525    /// # Ok(())
2526    /// # }
2527    /// ```
2528    #[cfg(feature = "sql-pgq")]
2529    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2530        use crate::query::{
2531            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2532            processor::QueryLanguage, translators::sql_pgq,
2533        };
2534
2535        // Parse and translate (always needed to check for DDL)
2536        let logical_plan = sql_pgq::translate(query)?;
2537
2538        // Handle DDL statements directly (they don't go through the query pipeline)
2539        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2540            return Ok(QueryResult {
2541                columns: vec!["status".into()],
2542                column_types: vec![grafeo_common::types::LogicalType::String],
2543                rows: vec![vec![Value::from(format!(
2544                    "Property graph '{}' created ({} node tables, {} edge tables)",
2545                    cpg.name,
2546                    cpg.node_tables.len(),
2547                    cpg.edge_tables.len()
2548                ))]],
2549                execution_time_ms: None,
2550                rows_scanned: None,
2551                status_message: None,
2552                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2553            });
2554        }
2555
2556        // Create cache key for query plans
2557        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2558
2559        // Try to get cached optimized plan
2560        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2561            cached_plan
2562        } else {
2563            // Semantic validation
2564            let mut binder = Binder::new();
2565            let _binding_context = binder.bind(&logical_plan)?;
2566
2567            // Optimize the plan
2568            let active = self.active_store();
2569            let optimizer = Optimizer::from_graph_store(&*active);
2570            let plan = optimizer.optimize(logical_plan)?;
2571
2572            // Cache the optimized plan
2573            self.query_cache.put_optimized(cache_key, plan.clone());
2574
2575            plan
2576        };
2577
2578        let active = self.active_store();
2579        let has_mutations = optimized_plan.root.has_mutations();
2580
2581        self.with_auto_commit(has_mutations, || {
2582            // Get transaction context for MVCC visibility
2583            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2584
2585            // Convert to physical plan with transaction context
2586            let planner =
2587                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2588            let mut physical_plan = planner.plan(&optimized_plan)?;
2589
2590            // Execute the plan
2591            let executor = Executor::with_columns(physical_plan.columns.clone())
2592                .with_deadline(self.query_deadline());
2593            executor.execute(physical_plan.operator.as_mut())
2594        })
2595    }
2596
2597    /// Executes a SQL/PGQ query with parameters.
2598    ///
2599    /// # Errors
2600    ///
2601    /// Returns an error if the query fails to parse or execute.
2602    #[cfg(feature = "sql-pgq")]
2603    pub fn execute_sql_with_params(
2604        &self,
2605        query: &str,
2606        params: std::collections::HashMap<String, Value>,
2607    ) -> Result<QueryResult> {
2608        use crate::query::processor::{QueryLanguage, QueryProcessor};
2609
2610        let has_mutations = Self::query_looks_like_mutation(query);
2611        let active = self.active_store();
2612
2613        self.with_auto_commit(has_mutations, || {
2614            // Get transaction context for MVCC visibility
2615            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2616
2617            // Create processor with transaction context
2618            let processor = QueryProcessor::for_graph_store_with_transaction(
2619                Arc::clone(&active),
2620                Arc::clone(&self.transaction_manager),
2621            )?;
2622
2623            // Apply transaction context if in a transaction
2624            let processor = if let Some(transaction_id) = transaction_id {
2625                processor.with_transaction_context(viewing_epoch, transaction_id)
2626            } else {
2627                processor
2628            };
2629
2630            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
2631        })
2632    }
2633
2634    /// Executes a SPARQL query.
2635    ///
2636    /// # Errors
2637    ///
2638    /// Returns an error if the query fails to parse or execute.
2639    #[cfg(all(feature = "sparql", feature = "rdf"))]
2640    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
2641        use crate::query::{
2642            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
2643        };
2644
2645        // Parse and translate the SPARQL query to a logical plan
2646        let logical_plan = sparql::translate(query)?;
2647
2648        // Optimize the plan
2649        let active = self.active_store();
2650        let optimizer = Optimizer::from_graph_store(&*active);
2651        let optimized_plan = optimizer.optimize(logical_plan)?;
2652
2653        // Convert to physical plan using RDF planner
2654        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2655            .with_transaction_id(*self.current_transaction.lock());
2656        #[cfg(feature = "wal")]
2657        let planner = planner.with_wal(self.wal.clone());
2658        let mut physical_plan = planner.plan(&optimized_plan)?;
2659
2660        // Execute the plan
2661        let executor = Executor::with_columns(physical_plan.columns.clone())
2662            .with_deadline(self.query_deadline());
2663        executor.execute(physical_plan.operator.as_mut())
2664    }
2665
2666    /// Executes a SPARQL query with parameters.
2667    ///
2668    /// # Errors
2669    ///
2670    /// Returns an error if the query fails to parse or execute.
2671    #[cfg(all(feature = "sparql", feature = "rdf"))]
2672    pub fn execute_sparql_with_params(
2673        &self,
2674        query: &str,
2675        params: std::collections::HashMap<String, Value>,
2676    ) -> Result<QueryResult> {
2677        use crate::query::{
2678            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, processor::substitute_params,
2679            translators::sparql,
2680        };
2681
2682        let mut logical_plan = sparql::translate(query)?;
2683
2684        substitute_params(&mut logical_plan, &params)?;
2685
2686        let active = self.active_store();
2687        let optimizer = Optimizer::from_graph_store(&*active);
2688        let optimized_plan = optimizer.optimize(logical_plan)?;
2689
2690        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2691            .with_transaction_id(*self.current_transaction.lock());
2692        #[cfg(feature = "wal")]
2693        let planner = planner.with_wal(self.wal.clone());
2694        let mut physical_plan = planner.plan(&optimized_plan)?;
2695
2696        let executor = Executor::with_columns(physical_plan.columns.clone())
2697            .with_deadline(self.query_deadline());
2698        executor.execute(physical_plan.operator.as_mut())
2699    }
2700
2701    /// Executes a query in the specified language by name.
2702    ///
2703    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
2704    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
2705    ///
2706    /// # Errors
2707    ///
2708    /// Returns an error if the language is unknown/disabled or the query fails.
2709    pub fn execute_language(
2710        &self,
2711        query: &str,
2712        language: &str,
2713        params: Option<std::collections::HashMap<String, Value>>,
2714    ) -> Result<QueryResult> {
2715        match language {
2716            "gql" => {
2717                if let Some(p) = params {
2718                    self.execute_with_params(query, p)
2719                } else {
2720                    self.execute(query)
2721                }
2722            }
2723            #[cfg(feature = "cypher")]
2724            "cypher" => {
2725                if let Some(p) = params {
2726                    use crate::query::processor::{QueryLanguage, QueryProcessor};
2727                    let has_mutations = Self::query_looks_like_mutation(query);
2728                    let active = self.active_store();
2729                    self.with_auto_commit(has_mutations, || {
2730                        let processor = QueryProcessor::for_graph_store_with_transaction(
2731                            Arc::clone(&active),
2732                            Arc::clone(&self.transaction_manager),
2733                        )?;
2734                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2735                        let processor = if let Some(transaction_id) = transaction_id {
2736                            processor.with_transaction_context(viewing_epoch, transaction_id)
2737                        } else {
2738                            processor
2739                        };
2740                        processor.process(query, QueryLanguage::Cypher, Some(&p))
2741                    })
2742                } else {
2743                    self.execute_cypher(query)
2744                }
2745            }
2746            #[cfg(feature = "gremlin")]
2747            "gremlin" => {
2748                if let Some(p) = params {
2749                    self.execute_gremlin_with_params(query, p)
2750                } else {
2751                    self.execute_gremlin(query)
2752                }
2753            }
2754            #[cfg(feature = "graphql")]
2755            "graphql" => {
2756                if let Some(p) = params {
2757                    self.execute_graphql_with_params(query, p)
2758                } else {
2759                    self.execute_graphql(query)
2760                }
2761            }
2762            #[cfg(all(feature = "graphql", feature = "rdf"))]
2763            "graphql-rdf" => {
2764                if let Some(p) = params {
2765                    self.execute_graphql_rdf_with_params(query, p)
2766                } else {
2767                    self.execute_graphql_rdf(query)
2768                }
2769            }
2770            #[cfg(feature = "sql-pgq")]
2771            "sql" | "sql-pgq" => {
2772                if let Some(p) = params {
2773                    self.execute_sql_with_params(query, p)
2774                } else {
2775                    self.execute_sql(query)
2776                }
2777            }
2778            #[cfg(all(feature = "sparql", feature = "rdf"))]
2779            "sparql" => {
2780                if let Some(p) = params {
2781                    self.execute_sparql_with_params(query, p)
2782                } else {
2783                    self.execute_sparql(query)
2784                }
2785            }
2786            other => Err(grafeo_common::utils::error::Error::Query(
2787                grafeo_common::utils::error::QueryError::new(
2788                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2789                    format!("Unknown query language: '{other}'"),
2790                ),
2791            )),
2792        }
2793    }
2794
2795    /// Begins a new transaction.
2796    ///
2797    /// # Errors
2798    ///
2799    /// Returns an error if a transaction is already active.
2800    ///
2801    /// # Examples
2802    ///
2803    /// ```no_run
2804    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2805    /// use grafeo_engine::GrafeoDB;
2806    ///
2807    /// let db = GrafeoDB::new_in_memory();
2808    /// let mut session = db.session();
2809    ///
2810    /// session.begin_transaction()?;
2811    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2812    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
2813    /// session.commit()?; // Both inserts committed atomically
2814    /// # Ok(())
2815    /// # }
2816    /// ```
2817    /// Clears all cached query plans.
2818    ///
2819    /// The plan cache is shared across all sessions on the same database,
2820    /// so clearing from one session affects all sessions.
2821    pub fn clear_plan_cache(&self) {
2822        self.query_cache.clear();
2823    }
2824
2825    /// Begins a new transaction on this session.
2826    ///
2827    /// Uses the default isolation level (`SnapshotIsolation`).
2828    ///
2829    /// # Errors
2830    ///
2831    /// Returns an error if a transaction is already active.
2832    pub fn begin_transaction(&mut self) -> Result<()> {
2833        self.begin_transaction_inner(false, None)
2834    }
2835
2836    /// Begins a transaction with a specific isolation level.
2837    ///
2838    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
2839    ///
2840    /// # Errors
2841    ///
2842    /// Returns an error if a transaction is already active.
2843    pub fn begin_transaction_with_isolation(
2844        &mut self,
2845        isolation_level: crate::transaction::IsolationLevel,
2846    ) -> Result<()> {
2847        self.begin_transaction_inner(false, Some(isolation_level))
2848    }
2849
2850    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
2851    fn begin_transaction_inner(
2852        &self,
2853        read_only: bool,
2854        isolation_level: Option<crate::transaction::IsolationLevel>,
2855    ) -> Result<()> {
2856        let mut current = self.current_transaction.lock();
2857        if current.is_some() {
2858            // Nested transaction: create an auto-savepoint instead of a new tx.
2859            drop(current);
2860            let mut depth = self.transaction_nesting_depth.lock();
2861            *depth += 1;
2862            let sp_name = format!("_nested_tx_{}", *depth);
2863            self.savepoint(&sp_name)?;
2864            return Ok(());
2865        }
2866
2867        let active = self.active_lpg_store();
2868        self.transaction_start_node_count
2869            .store(active.node_count(), Ordering::Relaxed);
2870        self.transaction_start_edge_count
2871            .store(active.edge_count(), Ordering::Relaxed);
2872        let transaction_id = if let Some(level) = isolation_level {
2873            self.transaction_manager.begin_with_isolation(level)
2874        } else {
2875            self.transaction_manager.begin()
2876        };
2877        *current = Some(transaction_id);
2878        *self.read_only_tx.lock() = read_only;
2879
2880        // Record the initial graph as "touched" for cross-graph atomicity.
2881        let graph = self.current_graph.lock().clone();
2882        let normalized = match graph {
2883            Some(ref name) if name.eq_ignore_ascii_case("default") => None,
2884            other => other,
2885        };
2886        let mut touched = self.touched_graphs.lock();
2887        touched.clear();
2888        touched.push(normalized);
2889
2890        Ok(())
2891    }
2892
2893    /// Commits the current transaction.
2894    ///
2895    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
2896    ///
2897    /// # Errors
2898    ///
2899    /// Returns an error if no transaction is active.
2900    pub fn commit(&mut self) -> Result<()> {
2901        self.commit_inner()
2902    }
2903
2904    /// Core commit logic, usable from both `&mut self` and `&self` paths.
2905    fn commit_inner(&self) -> Result<()> {
2906        // Nested transaction: release the auto-savepoint (changes are preserved).
2907        {
2908            let mut depth = self.transaction_nesting_depth.lock();
2909            if *depth > 0 {
2910                let sp_name = format!("_nested_tx_{depth}");
2911                *depth -= 1;
2912                drop(depth);
2913                return self.release_savepoint(&sp_name);
2914            }
2915        }
2916
2917        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2918            grafeo_common::utils::error::Error::Transaction(
2919                grafeo_common::utils::error::TransactionError::InvalidState(
2920                    "No active transaction".to_string(),
2921                ),
2922            )
2923        })?;
2924
2925        // Validate the transaction first (conflict detection) before committing data.
2926        // If this fails, we rollback the data changes instead of making them permanent.
2927        let touched = self.touched_graphs.lock().clone();
2928        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
2929            Ok(epoch) => epoch,
2930            Err(e) => {
2931                // Conflict detected: rollback the data changes
2932                for graph_name in &touched {
2933                    let store = self.resolve_store(graph_name);
2934                    store.rollback_transaction_properties(transaction_id);
2935                }
2936                #[cfg(feature = "rdf")]
2937                self.rdf_store.rollback_transaction(transaction_id);
2938                *self.read_only_tx.lock() = false;
2939                self.savepoints.lock().clear();
2940                self.touched_graphs.lock().clear();
2941                return Err(e);
2942            }
2943        };
2944
2945        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
2946        for graph_name in &touched {
2947            let store = self.resolve_store(graph_name);
2948            store.finalize_version_epochs(transaction_id, commit_epoch);
2949        }
2950
2951        // Commit succeeded: discard undo logs (make changes permanent)
2952        #[cfg(feature = "rdf")]
2953        self.rdf_store.commit_transaction(transaction_id);
2954
2955        for graph_name in &touched {
2956            let store = self.resolve_store(graph_name);
2957            store.commit_transaction_properties(transaction_id);
2958        }
2959
2960        // Sync epoch for all touched graphs so that convenience lookups
2961        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
2962        let current_epoch = self.transaction_manager.current_epoch();
2963        for graph_name in &touched {
2964            let store = self.resolve_store(graph_name);
2965            store.sync_epoch(current_epoch);
2966        }
2967
2968        // Reset read-only flag, clear savepoints and touched graphs
2969        *self.read_only_tx.lock() = false;
2970        self.savepoints.lock().clear();
2971        self.touched_graphs.lock().clear();
2972
2973        // Auto-GC: periodically prune old MVCC versions
2974        if self.gc_interval > 0 {
2975            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
2976            if count.is_multiple_of(self.gc_interval) {
2977                let min_epoch = self.transaction_manager.min_active_epoch();
2978                for graph_name in &touched {
2979                    let store = self.resolve_store(graph_name);
2980                    store.gc_versions(min_epoch);
2981                }
2982                self.transaction_manager.gc();
2983            }
2984        }
2985
2986        Ok(())
2987    }
2988
2989    /// Aborts the current transaction.
2990    ///
2991    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
2992    ///
2993    /// # Errors
2994    ///
2995    /// Returns an error if no transaction is active.
2996    ///
2997    /// # Examples
2998    ///
2999    /// ```no_run
3000    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3001    /// use grafeo_engine::GrafeoDB;
3002    ///
3003    /// let db = GrafeoDB::new_in_memory();
3004    /// let mut session = db.session();
3005    ///
3006    /// session.begin_transaction()?;
3007    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3008    /// session.rollback()?; // Insert is discarded
3009    /// # Ok(())
3010    /// # }
3011    /// ```
3012    pub fn rollback(&mut self) -> Result<()> {
3013        self.rollback_inner()
3014    }
3015
3016    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3017    fn rollback_inner(&self) -> Result<()> {
3018        // Nested transaction: rollback to the auto-savepoint.
3019        {
3020            let mut depth = self.transaction_nesting_depth.lock();
3021            if *depth > 0 {
3022                let sp_name = format!("_nested_tx_{depth}");
3023                *depth -= 1;
3024                drop(depth);
3025                return self.rollback_to_savepoint(&sp_name);
3026            }
3027        }
3028
3029        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3030            grafeo_common::utils::error::Error::Transaction(
3031                grafeo_common::utils::error::TransactionError::InvalidState(
3032                    "No active transaction".to_string(),
3033                ),
3034            )
3035        })?;
3036
3037        // Reset read-only flag
3038        *self.read_only_tx.lock() = false;
3039
3040        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3041        let touched = self.touched_graphs.lock().clone();
3042        for graph_name in &touched {
3043            let store = self.resolve_store(graph_name);
3044            store.discard_uncommitted_versions(transaction_id);
3045        }
3046
3047        // Discard pending operations in the RDF store
3048        #[cfg(feature = "rdf")]
3049        self.rdf_store.rollback_transaction(transaction_id);
3050
3051        // Clear savepoints and touched graphs
3052        self.savepoints.lock().clear();
3053        self.touched_graphs.lock().clear();
3054
3055        // Mark transaction as aborted in the manager
3056        self.transaction_manager.abort(transaction_id)
3057    }
3058
3059    /// Creates a named savepoint within the current transaction.
3060    ///
3061    /// The savepoint captures the current node/edge ID counters so that
3062    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3063    /// entities created after this point.
3064    ///
3065    /// # Errors
3066    ///
3067    /// Returns an error if no transaction is active.
3068    pub fn savepoint(&self, name: &str) -> Result<()> {
3069        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3070            grafeo_common::utils::error::Error::Transaction(
3071                grafeo_common::utils::error::TransactionError::InvalidState(
3072                    "No active transaction".to_string(),
3073                ),
3074            )
3075        })?;
3076
3077        // Capture state for every graph touched so far.
3078        let touched = self.touched_graphs.lock().clone();
3079        let graph_snapshots: Vec<GraphSavepoint> = touched
3080            .iter()
3081            .map(|graph_name| {
3082                let store = self.resolve_store(graph_name);
3083                GraphSavepoint {
3084                    graph_name: graph_name.clone(),
3085                    next_node_id: store.peek_next_node_id(),
3086                    next_edge_id: store.peek_next_edge_id(),
3087                    undo_log_position: store.property_undo_log_position(tx_id),
3088                }
3089            })
3090            .collect();
3091
3092        self.savepoints.lock().push(SavepointState {
3093            name: name.to_string(),
3094            graph_snapshots,
3095            active_graph: self.current_graph.lock().clone(),
3096        });
3097        Ok(())
3098    }
3099
3100    /// Rolls back to a named savepoint, undoing all writes made after it.
3101    ///
3102    /// The savepoint and any savepoints created after it are removed.
3103    /// Entities with IDs >= the savepoint snapshot are discarded.
3104    ///
3105    /// # Errors
3106    ///
3107    /// Returns an error if no transaction is active or the savepoint does not exist.
3108    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3109        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3110            grafeo_common::utils::error::Error::Transaction(
3111                grafeo_common::utils::error::TransactionError::InvalidState(
3112                    "No active transaction".to_string(),
3113                ),
3114            )
3115        })?;
3116
3117        let mut savepoints = self.savepoints.lock();
3118
3119        // Find the savepoint by name (search from the end for nested savepoints)
3120        let pos = savepoints
3121            .iter()
3122            .rposition(|sp| sp.name == name)
3123            .ok_or_else(|| {
3124                grafeo_common::utils::error::Error::Transaction(
3125                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3126                        "Savepoint '{name}' not found"
3127                    )),
3128                )
3129            })?;
3130
3131        let sp_state = savepoints[pos].clone();
3132
3133        // Remove this savepoint and all later ones
3134        savepoints.truncate(pos);
3135        drop(savepoints);
3136
3137        // Roll back each graph that was captured in the savepoint.
3138        for gs in &sp_state.graph_snapshots {
3139            let store = self.resolve_store(&gs.graph_name);
3140
3141            // Replay property/label undo entries recorded after the savepoint
3142            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3143
3144            // Discard entities created after the savepoint
3145            let current_next_node = store.peek_next_node_id();
3146            let current_next_edge = store.peek_next_edge_id();
3147
3148            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3149                .map(NodeId::new)
3150                .collect();
3151            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3152                .map(EdgeId::new)
3153                .collect();
3154
3155            if !node_ids.is_empty() || !edge_ids.is_empty() {
3156                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3157            }
3158        }
3159
3160        // Also roll back any graphs that were touched AFTER the savepoint
3161        // but not captured in it. These need full discard since the savepoint
3162        // didn't include them.
3163        let touched = self.touched_graphs.lock().clone();
3164        for graph_name in &touched {
3165            let already_captured = sp_state
3166                .graph_snapshots
3167                .iter()
3168                .any(|gs| gs.graph_name == *graph_name);
3169            if !already_captured {
3170                let store = self.resolve_store(graph_name);
3171                store.discard_uncommitted_versions(transaction_id);
3172            }
3173        }
3174
3175        // Restore touched_graphs to only the graphs that were known at savepoint time.
3176        let mut touched = self.touched_graphs.lock();
3177        touched.clear();
3178        for gs in &sp_state.graph_snapshots {
3179            if !touched.contains(&gs.graph_name) {
3180                touched.push(gs.graph_name.clone());
3181            }
3182        }
3183
3184        Ok(())
3185    }
3186
3187    /// Releases (removes) a named savepoint without rolling back.
3188    ///
3189    /// # Errors
3190    ///
3191    /// Returns an error if no transaction is active or the savepoint does not exist.
3192    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3193        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3194            grafeo_common::utils::error::Error::Transaction(
3195                grafeo_common::utils::error::TransactionError::InvalidState(
3196                    "No active transaction".to_string(),
3197                ),
3198            )
3199        })?;
3200
3201        let mut savepoints = self.savepoints.lock();
3202        let pos = savepoints
3203            .iter()
3204            .rposition(|sp| sp.name == name)
3205            .ok_or_else(|| {
3206                grafeo_common::utils::error::Error::Transaction(
3207                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3208                        "Savepoint '{name}' not found"
3209                    )),
3210                )
3211            })?;
3212        savepoints.remove(pos);
3213        Ok(())
3214    }
3215
3216    /// Returns whether a transaction is active.
3217    #[must_use]
3218    pub fn in_transaction(&self) -> bool {
3219        self.current_transaction.lock().is_some()
3220    }
3221
3222    /// Returns the current transaction ID, if any.
3223    #[must_use]
3224    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3225        *self.current_transaction.lock()
3226    }
3227
3228    /// Returns a reference to the transaction manager.
3229    #[must_use]
3230    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3231        &self.transaction_manager
3232    }
3233
3234    /// Returns the store's current node count and the count at transaction start.
3235    #[must_use]
3236    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3237        (
3238            self.transaction_start_node_count.load(Ordering::Relaxed),
3239            self.active_lpg_store().node_count(),
3240        )
3241    }
3242
3243    /// Returns the store's current edge count and the count at transaction start.
3244    #[must_use]
3245    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3246        (
3247            self.transaction_start_edge_count.load(Ordering::Relaxed),
3248            self.active_lpg_store().edge_count(),
3249        )
3250    }
3251
3252    /// Prepares the current transaction for a two-phase commit.
3253    ///
3254    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3255    /// lets you inspect pending changes and attach metadata before finalizing.
3256    /// The mutable borrow prevents concurrent operations while the commit is
3257    /// pending.
3258    ///
3259    /// If the `PreparedCommit` is dropped without calling `commit()` or
3260    /// `abort()`, the transaction is automatically rolled back.
3261    ///
3262    /// # Errors
3263    ///
3264    /// Returns an error if no transaction is active.
3265    ///
3266    /// # Examples
3267    ///
3268    /// ```no_run
3269    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3270    /// use grafeo_engine::GrafeoDB;
3271    ///
3272    /// let db = GrafeoDB::new_in_memory();
3273    /// let mut session = db.session();
3274    ///
3275    /// session.begin_transaction()?;
3276    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3277    ///
3278    /// let mut prepared = session.prepare_commit()?;
3279    /// println!("Nodes written: {}", prepared.info().nodes_written);
3280    /// prepared.set_metadata("audit_user", "admin");
3281    /// prepared.commit()?;
3282    /// # Ok(())
3283    /// # }
3284    /// ```
3285    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3286        crate::transaction::PreparedCommit::new(self)
3287    }
3288
3289    /// Sets auto-commit mode.
3290    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3291        self.auto_commit = auto_commit;
3292    }
3293
3294    /// Returns whether auto-commit is enabled.
3295    #[must_use]
3296    pub fn auto_commit(&self) -> bool {
3297        self.auto_commit
3298    }
3299
3300    /// Returns `true` if auto-commit should wrap this execution.
3301    ///
3302    /// Auto-commit kicks in when: the session is in auto-commit mode,
3303    /// no explicit transaction is active, and the query mutates data.
3304    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3305        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3306    }
3307
3308    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3309    /// returns `true`. On error the transaction is rolled back.
3310    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3311    where
3312        F: FnOnce() -> Result<QueryResult>,
3313    {
3314        if self.needs_auto_commit(has_mutations) {
3315            self.begin_transaction_inner(false, None)?;
3316            match body() {
3317                Ok(result) => {
3318                    self.commit_inner()?;
3319                    Ok(result)
3320                }
3321                Err(e) => {
3322                    let _ = self.rollback_inner();
3323                    Err(e)
3324                }
3325            }
3326        } else {
3327            body()
3328        }
3329    }
3330
3331    /// Quick heuristic: returns `true` when the query text looks like it
3332    /// performs a mutation. Used by `_with_params` paths that go through the
3333    /// `QueryProcessor` (where the logical plan isn't available before
3334    /// execution). False negatives are harmless: the data just won't be
3335    /// auto-committed, which matches the prior behaviour.
3336    fn query_looks_like_mutation(query: &str) -> bool {
3337        let upper = query.to_ascii_uppercase();
3338        upper.contains("INSERT")
3339            || upper.contains("CREATE")
3340            || upper.contains("DELETE")
3341            || upper.contains("MERGE")
3342            || upper.contains("SET")
3343            || upper.contains("REMOVE")
3344            || upper.contains("DROP")
3345            || upper.contains("ALTER")
3346    }
3347
3348    /// Computes the wall-clock deadline for query execution.
3349    #[must_use]
3350    fn query_deadline(&self) -> Option<Instant> {
3351        #[cfg(not(target_arch = "wasm32"))]
3352        {
3353            self.query_timeout.map(|d| Instant::now() + d)
3354        }
3355        #[cfg(target_arch = "wasm32")]
3356        {
3357            let _ = &self.query_timeout;
3358            None
3359        }
3360    }
3361
3362    /// Evaluates a simple integer literal from a session parameter expression.
3363    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3364        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3365        match expr {
3366            Expression::Literal(Literal::Integer(n)) => Some(*n),
3367            _ => None,
3368        }
3369    }
3370
3371    /// Returns the current transaction context for MVCC visibility.
3372    ///
3373    /// Returns `(viewing_epoch, transaction_id)` where:
3374    /// - `viewing_epoch` is the epoch at which to check version visibility
3375    /// - `transaction_id` is the current transaction ID (if in a transaction)
3376    #[must_use]
3377    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3378        // Time-travel override takes precedence (read-only, no tx context)
3379        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3380            return (epoch, None);
3381        }
3382
3383        if let Some(transaction_id) = *self.current_transaction.lock() {
3384            // In a transaction: use the transaction's start epoch
3385            let epoch = self
3386                .transaction_manager
3387                .start_epoch(transaction_id)
3388                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3389            (epoch, Some(transaction_id))
3390        } else {
3391            // No transaction: use current epoch
3392            (self.transaction_manager.current_epoch(), None)
3393        }
3394    }
3395
3396    /// Creates a planner with transaction context and constraint validator.
3397    ///
3398    /// The `store` parameter is the graph store to plan against (use
3399    /// `self.active_store()` for graph-aware execution).
3400    fn create_planner_for_store(
3401        &self,
3402        store: Arc<dyn GraphStoreMut>,
3403        viewing_epoch: EpochId,
3404        transaction_id: Option<TransactionId>,
3405    ) -> crate::query::Planner {
3406        use crate::query::Planner;
3407
3408        let mut planner = Planner::with_context(
3409            Arc::clone(&store),
3410            Arc::clone(&self.transaction_manager),
3411            transaction_id,
3412            viewing_epoch,
3413        )
3414        .with_factorized_execution(self.factorized_execution)
3415        .with_catalog(Arc::clone(&self.catalog));
3416
3417        // Attach the constraint validator for schema enforcement
3418        let validator =
3419            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3420        planner = planner.with_validator(Arc::new(validator));
3421
3422        planner
3423    }
3424
3425    /// Creates a node directly (bypassing query execution).
3426    ///
3427    /// This is a low-level API for testing and direct manipulation.
3428    /// If a transaction is active, the node will be versioned with the transaction ID.
3429    pub fn create_node(&self, labels: &[&str]) -> NodeId {
3430        let (epoch, transaction_id) = self.get_transaction_context();
3431        self.active_lpg_store().create_node_versioned(
3432            labels,
3433            epoch,
3434            transaction_id.unwrap_or(TransactionId::SYSTEM),
3435        )
3436    }
3437
3438    /// Creates a node with properties.
3439    ///
3440    /// If a transaction is active, the node will be versioned with the transaction ID.
3441    pub fn create_node_with_props<'a>(
3442        &self,
3443        labels: &[&str],
3444        properties: impl IntoIterator<Item = (&'a str, Value)>,
3445    ) -> NodeId {
3446        let (epoch, transaction_id) = self.get_transaction_context();
3447        self.active_lpg_store().create_node_with_props_versioned(
3448            labels,
3449            properties,
3450            epoch,
3451            transaction_id.unwrap_or(TransactionId::SYSTEM),
3452        )
3453    }
3454
3455    /// Creates an edge between two nodes.
3456    ///
3457    /// This is a low-level API for testing and direct manipulation.
3458    /// If a transaction is active, the edge will be versioned with the transaction ID.
3459    pub fn create_edge(
3460        &self,
3461        src: NodeId,
3462        dst: NodeId,
3463        edge_type: &str,
3464    ) -> grafeo_common::types::EdgeId {
3465        let (epoch, transaction_id) = self.get_transaction_context();
3466        self.active_lpg_store().create_edge_versioned(
3467            src,
3468            dst,
3469            edge_type,
3470            epoch,
3471            transaction_id.unwrap_or(TransactionId::SYSTEM),
3472        )
3473    }
3474
3475    // =========================================================================
3476    // Direct Lookup APIs (bypass query planning for O(1) point reads)
3477    // =========================================================================
3478
3479    /// Gets a node by ID directly, bypassing query planning.
3480    ///
3481    /// This is the fastest way to retrieve a single node when you know its ID.
3482    /// Skips parsing, binding, optimization, and physical planning entirely.
3483    ///
3484    /// # Performance
3485    ///
3486    /// - Time complexity: O(1) average case
3487    /// - No lock contention (uses DashMap internally)
3488    /// - ~20-30x faster than equivalent MATCH query
3489    ///
3490    /// # Example
3491    ///
3492    /// ```no_run
3493    /// # use grafeo_engine::GrafeoDB;
3494    /// # let db = GrafeoDB::new_in_memory();
3495    /// let session = db.session();
3496    /// let node_id = session.create_node(&["Person"]);
3497    ///
3498    /// // Direct lookup - O(1), no query planning
3499    /// let node = session.get_node(node_id);
3500    /// assert!(node.is_some());
3501    /// ```
3502    #[must_use]
3503    pub fn get_node(&self, id: NodeId) -> Option<Node> {
3504        let (epoch, transaction_id) = self.get_transaction_context();
3505        self.active_lpg_store().get_node_versioned(
3506            id,
3507            epoch,
3508            transaction_id.unwrap_or(TransactionId::SYSTEM),
3509        )
3510    }
3511
3512    /// Gets a single property from a node by ID, bypassing query planning.
3513    ///
3514    /// More efficient than `get_node()` when you only need one property,
3515    /// as it avoids loading the full node with all properties.
3516    ///
3517    /// # Performance
3518    ///
3519    /// - Time complexity: O(1) average case
3520    /// - No query planning overhead
3521    ///
3522    /// # Example
3523    ///
3524    /// ```no_run
3525    /// # use grafeo_engine::GrafeoDB;
3526    /// # use grafeo_common::types::Value;
3527    /// # let db = GrafeoDB::new_in_memory();
3528    /// let session = db.session();
3529    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
3530    ///
3531    /// // Direct property access - O(1)
3532    /// let name = session.get_node_property(id, "name");
3533    /// assert_eq!(name, Some(Value::String("Alix".into())));
3534    /// ```
3535    #[must_use]
3536    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3537        self.get_node(id)
3538            .and_then(|node| node.get_property(key).cloned())
3539    }
3540
3541    /// Gets an edge by ID directly, bypassing query planning.
3542    ///
3543    /// # Performance
3544    ///
3545    /// - Time complexity: O(1) average case
3546    /// - No lock contention
3547    #[must_use]
3548    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3549        let (epoch, transaction_id) = self.get_transaction_context();
3550        self.active_lpg_store().get_edge_versioned(
3551            id,
3552            epoch,
3553            transaction_id.unwrap_or(TransactionId::SYSTEM),
3554        )
3555    }
3556
3557    /// Gets outgoing neighbors of a node directly, bypassing query planning.
3558    ///
3559    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
3560    ///
3561    /// # Performance
3562    ///
3563    /// - Time complexity: O(degree) where degree is the number of outgoing edges
3564    /// - Uses adjacency index for direct access
3565    /// - ~10-20x faster than equivalent MATCH query
3566    ///
3567    /// # Example
3568    ///
3569    /// ```no_run
3570    /// # use grafeo_engine::GrafeoDB;
3571    /// # let db = GrafeoDB::new_in_memory();
3572    /// let session = db.session();
3573    /// let alix = session.create_node(&["Person"]);
3574    /// let gus = session.create_node(&["Person"]);
3575    /// session.create_edge(alix, gus, "KNOWS");
3576    ///
3577    /// // Direct neighbor lookup - O(degree)
3578    /// let neighbors = session.get_neighbors_outgoing(alix);
3579    /// assert_eq!(neighbors.len(), 1);
3580    /// assert_eq!(neighbors[0].0, gus);
3581    /// ```
3582    #[must_use]
3583    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3584        self.active_lpg_store()
3585            .edges_from(node, Direction::Outgoing)
3586            .collect()
3587    }
3588
3589    /// Gets incoming neighbors of a node directly, bypassing query planning.
3590    ///
3591    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
3592    ///
3593    /// # Performance
3594    ///
3595    /// - Time complexity: O(degree) where degree is the number of incoming edges
3596    /// - Uses backward adjacency index for direct access
3597    #[must_use]
3598    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3599        self.active_lpg_store()
3600            .edges_from(node, Direction::Incoming)
3601            .collect()
3602    }
3603
3604    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
3605    ///
3606    /// # Example
3607    ///
3608    /// ```no_run
3609    /// # use grafeo_engine::GrafeoDB;
3610    /// # let db = GrafeoDB::new_in_memory();
3611    /// # let session = db.session();
3612    /// # let alix = session.create_node(&["Person"]);
3613    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3614    /// ```
3615    #[must_use]
3616    pub fn get_neighbors_outgoing_by_type(
3617        &self,
3618        node: NodeId,
3619        edge_type: &str,
3620    ) -> Vec<(NodeId, EdgeId)> {
3621        self.active_lpg_store()
3622            .edges_from(node, Direction::Outgoing)
3623            .filter(|(_, edge_id)| {
3624                self.get_edge(*edge_id)
3625                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
3626            })
3627            .collect()
3628    }
3629
3630    /// Checks if a node exists, bypassing query planning.
3631    ///
3632    /// # Performance
3633    ///
3634    /// - Time complexity: O(1)
3635    /// - Fastest existence check available
3636    #[must_use]
3637    pub fn node_exists(&self, id: NodeId) -> bool {
3638        self.get_node(id).is_some()
3639    }
3640
3641    /// Checks if an edge exists, bypassing query planning.
3642    #[must_use]
3643    pub fn edge_exists(&self, id: EdgeId) -> bool {
3644        self.get_edge(id).is_some()
3645    }
3646
3647    /// Gets the degree (number of edges) of a node.
3648    ///
3649    /// Returns (outgoing_degree, incoming_degree).
3650    #[must_use]
3651    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3652        let active = self.active_lpg_store();
3653        let out = active.out_degree(node);
3654        let in_degree = active.in_degree(node);
3655        (out, in_degree)
3656    }
3657
3658    /// Batch lookup of multiple nodes by ID.
3659    ///
3660    /// More efficient than calling `get_node()` in a loop because it
3661    /// amortizes overhead.
3662    ///
3663    /// # Performance
3664    ///
3665    /// - Time complexity: O(n) where n is the number of IDs
3666    /// - Better cache utilization than individual lookups
3667    #[must_use]
3668    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3669        let (epoch, transaction_id) = self.get_transaction_context();
3670        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3671        let active = self.active_lpg_store();
3672        ids.iter()
3673            .map(|&id| active.get_node_versioned(id, epoch, tx))
3674            .collect()
3675    }
3676
3677    // ── Change Data Capture ─────────────────────────────────────────────
3678
3679    /// Returns the full change history for an entity (node or edge).
3680    #[cfg(feature = "cdc")]
3681    pub fn history(
3682        &self,
3683        entity_id: impl Into<crate::cdc::EntityId>,
3684    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3685        Ok(self.cdc_log.history(entity_id.into()))
3686    }
3687
3688    /// Returns change events for an entity since the given epoch.
3689    #[cfg(feature = "cdc")]
3690    pub fn history_since(
3691        &self,
3692        entity_id: impl Into<crate::cdc::EntityId>,
3693        since_epoch: EpochId,
3694    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3695        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3696    }
3697
3698    /// Returns all change events across all entities in an epoch range.
3699    #[cfg(feature = "cdc")]
3700    pub fn changes_between(
3701        &self,
3702        start_epoch: EpochId,
3703        end_epoch: EpochId,
3704    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3705        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3706    }
3707}
3708
3709impl Drop for Session {
3710    fn drop(&mut self) {
3711        // Auto-rollback any active transaction to prevent leaked MVCC state,
3712        // dangling write locks, and uncommitted versions lingering in the store.
3713        if self.in_transaction() {
3714            let _ = self.rollback_inner();
3715        }
3716    }
3717}
3718
3719#[cfg(test)]
3720mod tests {
3721    use crate::database::GrafeoDB;
3722
3723    #[test]
3724    fn test_session_create_node() {
3725        let db = GrafeoDB::new_in_memory();
3726        let session = db.session();
3727
3728        let id = session.create_node(&["Person"]);
3729        assert!(id.is_valid());
3730        assert_eq!(db.node_count(), 1);
3731    }
3732
3733    #[test]
3734    fn test_session_transaction() {
3735        let db = GrafeoDB::new_in_memory();
3736        let mut session = db.session();
3737
3738        assert!(!session.in_transaction());
3739
3740        session.begin_transaction().unwrap();
3741        assert!(session.in_transaction());
3742
3743        session.commit().unwrap();
3744        assert!(!session.in_transaction());
3745    }
3746
3747    #[test]
3748    fn test_session_transaction_context() {
3749        let db = GrafeoDB::new_in_memory();
3750        let mut session = db.session();
3751
3752        // Without transaction - context should have current epoch and no transaction_id
3753        let (_epoch1, transaction_id1) = session.get_transaction_context();
3754        assert!(transaction_id1.is_none());
3755
3756        // Start a transaction
3757        session.begin_transaction().unwrap();
3758        let (epoch2, transaction_id2) = session.get_transaction_context();
3759        assert!(transaction_id2.is_some());
3760        // Transaction should have a valid epoch
3761        let _ = epoch2; // Use the variable
3762
3763        // Commit and verify
3764        session.commit().unwrap();
3765        let (epoch3, tx_id3) = session.get_transaction_context();
3766        assert!(tx_id3.is_none());
3767        // Epoch should have advanced after commit
3768        assert!(epoch3.as_u64() >= epoch2.as_u64());
3769    }
3770
3771    #[test]
3772    fn test_session_rollback() {
3773        let db = GrafeoDB::new_in_memory();
3774        let mut session = db.session();
3775
3776        session.begin_transaction().unwrap();
3777        session.rollback().unwrap();
3778        assert!(!session.in_transaction());
3779    }
3780
3781    #[test]
3782    fn test_session_rollback_discards_versions() {
3783        use grafeo_common::types::TransactionId;
3784
3785        let db = GrafeoDB::new_in_memory();
3786
3787        // Create a node outside of any transaction (at system level)
3788        let node_before = db.store().create_node(&["Person"]);
3789        assert!(node_before.is_valid());
3790        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3791
3792        // Start a transaction
3793        let mut session = db.session();
3794        session.begin_transaction().unwrap();
3795        let transaction_id = session.current_transaction.lock().unwrap();
3796
3797        // Create a node versioned with the transaction's ID
3798        let epoch = db.store().current_epoch();
3799        let node_in_tx = db
3800            .store()
3801            .create_node_versioned(&["Person"], epoch, transaction_id);
3802        assert!(node_in_tx.is_valid());
3803
3804        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
3805        // non-versioned lookups like node_count(). Verify the node is visible
3806        // only through the owning transaction.
3807        assert_eq!(
3808            db.node_count(),
3809            1,
3810            "PENDING nodes should be invisible to non-versioned node_count()"
3811        );
3812        assert!(
3813            db.store()
3814                .get_node_versioned(node_in_tx, epoch, transaction_id)
3815                .is_some(),
3816            "Transaction node should be visible to its own transaction"
3817        );
3818
3819        // Rollback the transaction
3820        session.rollback().unwrap();
3821        assert!(!session.in_transaction());
3822
3823        // The node created in the transaction should be discarded
3824        // Only the first node should remain visible
3825        let count_after = db.node_count();
3826        assert_eq!(
3827            count_after, 1,
3828            "Rollback should discard uncommitted node, but got {count_after}"
3829        );
3830
3831        // The original node should still be accessible
3832        let current_epoch = db.store().current_epoch();
3833        assert!(
3834            db.store()
3835                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
3836                .is_some(),
3837            "Original node should still exist"
3838        );
3839
3840        // The node created in the transaction should not be accessible
3841        assert!(
3842            db.store()
3843                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
3844                .is_none(),
3845            "Transaction node should be gone"
3846        );
3847    }
3848
3849    #[test]
3850    fn test_session_create_node_in_transaction() {
3851        // Test that session.create_node() is transaction-aware
3852        let db = GrafeoDB::new_in_memory();
3853
3854        // Create a node outside of any transaction
3855        let node_before = db.create_node(&["Person"]);
3856        assert!(node_before.is_valid());
3857        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3858
3859        // Start a transaction and create a node through the session
3860        let mut session = db.session();
3861        session.begin_transaction().unwrap();
3862        let transaction_id = session.current_transaction.lock().unwrap();
3863
3864        // Create a node through session.create_node() - should be versioned with tx
3865        let node_in_tx = session.create_node(&["Person"]);
3866        assert!(node_in_tx.is_valid());
3867
3868        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
3869        // non-versioned lookups. Verify the node is visible only to its own tx.
3870        assert_eq!(
3871            db.node_count(),
3872            1,
3873            "PENDING nodes should be invisible to non-versioned node_count()"
3874        );
3875        let epoch = db.store().current_epoch();
3876        assert!(
3877            db.store()
3878                .get_node_versioned(node_in_tx, epoch, transaction_id)
3879                .is_some(),
3880            "Transaction node should be visible to its own transaction"
3881        );
3882
3883        // Rollback the transaction
3884        session.rollback().unwrap();
3885
3886        // The node created via session.create_node() should be discarded
3887        let count_after = db.node_count();
3888        assert_eq!(
3889            count_after, 1,
3890            "Rollback should discard node created via session.create_node(), but got {count_after}"
3891        );
3892    }
3893
3894    #[test]
3895    fn test_session_create_node_with_props_in_transaction() {
3896        use grafeo_common::types::Value;
3897
3898        // Test that session.create_node_with_props() is transaction-aware
3899        let db = GrafeoDB::new_in_memory();
3900
3901        // Create a node outside of any transaction
3902        db.create_node(&["Person"]);
3903        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3904
3905        // Start a transaction and create a node with properties
3906        let mut session = db.session();
3907        session.begin_transaction().unwrap();
3908        let transaction_id = session.current_transaction.lock().unwrap();
3909
3910        let node_in_tx =
3911            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3912        assert!(node_in_tx.is_valid());
3913
3914        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
3915        // non-versioned lookups. Verify the node is visible only to its own tx.
3916        assert_eq!(
3917            db.node_count(),
3918            1,
3919            "PENDING nodes should be invisible to non-versioned node_count()"
3920        );
3921        let epoch = db.store().current_epoch();
3922        assert!(
3923            db.store()
3924                .get_node_versioned(node_in_tx, epoch, transaction_id)
3925                .is_some(),
3926            "Transaction node should be visible to its own transaction"
3927        );
3928
3929        // Rollback the transaction
3930        session.rollback().unwrap();
3931
3932        // The node should be discarded
3933        let count_after = db.node_count();
3934        assert_eq!(
3935            count_after, 1,
3936            "Rollback should discard node created via session.create_node_with_props()"
3937        );
3938    }
3939
3940    #[cfg(feature = "gql")]
3941    mod gql_tests {
3942        use super::*;
3943
3944        #[test]
3945        fn test_gql_query_execution() {
3946            let db = GrafeoDB::new_in_memory();
3947            let session = db.session();
3948
3949            // Create some test data
3950            session.create_node(&["Person"]);
3951            session.create_node(&["Person"]);
3952            session.create_node(&["Animal"]);
3953
3954            // Execute a GQL query
3955            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3956
3957            // Should return 2 Person nodes
3958            assert_eq!(result.row_count(), 2);
3959            assert_eq!(result.column_count(), 1);
3960            assert_eq!(result.columns[0], "n");
3961        }
3962
3963        #[test]
3964        fn test_gql_empty_result() {
3965            let db = GrafeoDB::new_in_memory();
3966            let session = db.session();
3967
3968            // No data in database
3969            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3970
3971            assert_eq!(result.row_count(), 0);
3972        }
3973
3974        #[test]
3975        fn test_gql_parse_error() {
3976            let db = GrafeoDB::new_in_memory();
3977            let session = db.session();
3978
3979            // Invalid GQL syntax
3980            let result = session.execute("MATCH (n RETURN n");
3981
3982            assert!(result.is_err());
3983        }
3984
3985        #[test]
3986        fn test_gql_relationship_traversal() {
3987            let db = GrafeoDB::new_in_memory();
3988            let session = db.session();
3989
3990            // Create a graph: Alix -> Gus, Alix -> Vincent
3991            let alix = session.create_node(&["Person"]);
3992            let gus = session.create_node(&["Person"]);
3993            let vincent = session.create_node(&["Person"]);
3994
3995            session.create_edge(alix, gus, "KNOWS");
3996            session.create_edge(alix, vincent, "KNOWS");
3997
3998            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
3999            let result = session
4000                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4001                .unwrap();
4002
4003            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4004            assert_eq!(result.row_count(), 2);
4005            assert_eq!(result.column_count(), 2);
4006            assert_eq!(result.columns[0], "a");
4007            assert_eq!(result.columns[1], "b");
4008        }
4009
4010        #[test]
4011        fn test_gql_relationship_with_type_filter() {
4012            let db = GrafeoDB::new_in_memory();
4013            let session = db.session();
4014
4015            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4016            let alix = session.create_node(&["Person"]);
4017            let gus = session.create_node(&["Person"]);
4018            let vincent = session.create_node(&["Person"]);
4019
4020            session.create_edge(alix, gus, "KNOWS");
4021            session.create_edge(alix, vincent, "WORKS_WITH");
4022
4023            // Query only KNOWS relationships
4024            let result = session
4025                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4026                .unwrap();
4027
4028            // Should return only 1 row (Alix->Gus)
4029            assert_eq!(result.row_count(), 1);
4030        }
4031
4032        #[test]
4033        fn test_gql_semantic_error_undefined_variable() {
4034            let db = GrafeoDB::new_in_memory();
4035            let session = db.session();
4036
4037            // Reference undefined variable 'x' in RETURN
4038            let result = session.execute("MATCH (n:Person) RETURN x");
4039
4040            // Should fail with semantic error
4041            assert!(result.is_err());
4042            let Err(err) = result else {
4043                panic!("Expected error")
4044            };
4045            assert!(
4046                err.to_string().contains("Undefined variable"),
4047                "Expected undefined variable error, got: {}",
4048                err
4049            );
4050        }
4051
4052        #[test]
4053        fn test_gql_where_clause_property_filter() {
4054            use grafeo_common::types::Value;
4055
4056            let db = GrafeoDB::new_in_memory();
4057            let session = db.session();
4058
4059            // Create people with ages
4060            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4061            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4062            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4063
4064            // Query with WHERE clause: age > 30
4065            let result = session
4066                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4067                .unwrap();
4068
4069            // Should return 2 people (ages 35 and 45)
4070            assert_eq!(result.row_count(), 2);
4071        }
4072
4073        #[test]
4074        fn test_gql_where_clause_equality() {
4075            use grafeo_common::types::Value;
4076
4077            let db = GrafeoDB::new_in_memory();
4078            let session = db.session();
4079
4080            // Create people with names
4081            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4082            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4083            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4084
4085            // Query with WHERE clause: name = "Alix"
4086            let result = session
4087                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4088                .unwrap();
4089
4090            // Should return 2 people named Alix
4091            assert_eq!(result.row_count(), 2);
4092        }
4093
4094        #[test]
4095        fn test_gql_return_property_access() {
4096            use grafeo_common::types::Value;
4097
4098            let db = GrafeoDB::new_in_memory();
4099            let session = db.session();
4100
4101            // Create people with names and ages
4102            session.create_node_with_props(
4103                &["Person"],
4104                [
4105                    ("name", Value::String("Alix".into())),
4106                    ("age", Value::Int64(30)),
4107                ],
4108            );
4109            session.create_node_with_props(
4110                &["Person"],
4111                [
4112                    ("name", Value::String("Gus".into())),
4113                    ("age", Value::Int64(25)),
4114                ],
4115            );
4116
4117            // Query returning properties
4118            let result = session
4119                .execute("MATCH (n:Person) RETURN n.name, n.age")
4120                .unwrap();
4121
4122            // Should return 2 rows with name and age columns
4123            assert_eq!(result.row_count(), 2);
4124            assert_eq!(result.column_count(), 2);
4125            assert_eq!(result.columns[0], "n.name");
4126            assert_eq!(result.columns[1], "n.age");
4127
4128            // Check that we get actual values
4129            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4130            assert!(names.contains(&&Value::String("Alix".into())));
4131            assert!(names.contains(&&Value::String("Gus".into())));
4132        }
4133
4134        #[test]
4135        fn test_gql_return_mixed_expressions() {
4136            use grafeo_common::types::Value;
4137
4138            let db = GrafeoDB::new_in_memory();
4139            let session = db.session();
4140
4141            // Create a person
4142            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4143
4144            // Query returning both node and property
4145            let result = session
4146                .execute("MATCH (n:Person) RETURN n, n.name")
4147                .unwrap();
4148
4149            assert_eq!(result.row_count(), 1);
4150            assert_eq!(result.column_count(), 2);
4151            assert_eq!(result.columns[0], "n");
4152            assert_eq!(result.columns[1], "n.name");
4153
4154            // Second column should be the name
4155            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4156        }
4157    }
4158
4159    #[cfg(feature = "cypher")]
4160    mod cypher_tests {
4161        use super::*;
4162
4163        #[test]
4164        fn test_cypher_query_execution() {
4165            let db = GrafeoDB::new_in_memory();
4166            let session = db.session();
4167
4168            // Create some test data
4169            session.create_node(&["Person"]);
4170            session.create_node(&["Person"]);
4171            session.create_node(&["Animal"]);
4172
4173            // Execute a Cypher query
4174            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4175
4176            // Should return 2 Person nodes
4177            assert_eq!(result.row_count(), 2);
4178            assert_eq!(result.column_count(), 1);
4179            assert_eq!(result.columns[0], "n");
4180        }
4181
4182        #[test]
4183        fn test_cypher_empty_result() {
4184            let db = GrafeoDB::new_in_memory();
4185            let session = db.session();
4186
4187            // No data in database
4188            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4189
4190            assert_eq!(result.row_count(), 0);
4191        }
4192
4193        #[test]
4194        fn test_cypher_parse_error() {
4195            let db = GrafeoDB::new_in_memory();
4196            let session = db.session();
4197
4198            // Invalid Cypher syntax
4199            let result = session.execute_cypher("MATCH (n RETURN n");
4200
4201            assert!(result.is_err());
4202        }
4203    }
4204
4205    // ==================== Direct Lookup API Tests ====================
4206
4207    mod direct_lookup_tests {
4208        use super::*;
4209        use grafeo_common::types::Value;
4210
4211        #[test]
4212        fn test_get_node() {
4213            let db = GrafeoDB::new_in_memory();
4214            let session = db.session();
4215
4216            let id = session.create_node(&["Person"]);
4217            let node = session.get_node(id);
4218
4219            assert!(node.is_some());
4220            let node = node.unwrap();
4221            assert_eq!(node.id, id);
4222        }
4223
4224        #[test]
4225        fn test_get_node_not_found() {
4226            use grafeo_common::types::NodeId;
4227
4228            let db = GrafeoDB::new_in_memory();
4229            let session = db.session();
4230
4231            // Try to get a non-existent node
4232            let node = session.get_node(NodeId::new(9999));
4233            assert!(node.is_none());
4234        }
4235
4236        #[test]
4237        fn test_get_node_property() {
4238            let db = GrafeoDB::new_in_memory();
4239            let session = db.session();
4240
4241            let id = session
4242                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4243
4244            let name = session.get_node_property(id, "name");
4245            assert_eq!(name, Some(Value::String("Alix".into())));
4246
4247            // Non-existent property
4248            let missing = session.get_node_property(id, "missing");
4249            assert!(missing.is_none());
4250        }
4251
4252        #[test]
4253        fn test_get_edge() {
4254            let db = GrafeoDB::new_in_memory();
4255            let session = db.session();
4256
4257            let alix = session.create_node(&["Person"]);
4258            let gus = session.create_node(&["Person"]);
4259            let edge_id = session.create_edge(alix, gus, "KNOWS");
4260
4261            let edge = session.get_edge(edge_id);
4262            assert!(edge.is_some());
4263            let edge = edge.unwrap();
4264            assert_eq!(edge.id, edge_id);
4265            assert_eq!(edge.src, alix);
4266            assert_eq!(edge.dst, gus);
4267        }
4268
4269        #[test]
4270        fn test_get_edge_not_found() {
4271            use grafeo_common::types::EdgeId;
4272
4273            let db = GrafeoDB::new_in_memory();
4274            let session = db.session();
4275
4276            let edge = session.get_edge(EdgeId::new(9999));
4277            assert!(edge.is_none());
4278        }
4279
4280        #[test]
4281        fn test_get_neighbors_outgoing() {
4282            let db = GrafeoDB::new_in_memory();
4283            let session = db.session();
4284
4285            let alix = session.create_node(&["Person"]);
4286            let gus = session.create_node(&["Person"]);
4287            let harm = session.create_node(&["Person"]);
4288
4289            session.create_edge(alix, gus, "KNOWS");
4290            session.create_edge(alix, harm, "KNOWS");
4291
4292            let neighbors = session.get_neighbors_outgoing(alix);
4293            assert_eq!(neighbors.len(), 2);
4294
4295            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4296            assert!(neighbor_ids.contains(&gus));
4297            assert!(neighbor_ids.contains(&harm));
4298        }
4299
4300        #[test]
4301        fn test_get_neighbors_incoming() {
4302            let db = GrafeoDB::new_in_memory();
4303            let session = db.session();
4304
4305            let alix = session.create_node(&["Person"]);
4306            let gus = session.create_node(&["Person"]);
4307            let harm = session.create_node(&["Person"]);
4308
4309            session.create_edge(gus, alix, "KNOWS");
4310            session.create_edge(harm, alix, "KNOWS");
4311
4312            let neighbors = session.get_neighbors_incoming(alix);
4313            assert_eq!(neighbors.len(), 2);
4314
4315            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4316            assert!(neighbor_ids.contains(&gus));
4317            assert!(neighbor_ids.contains(&harm));
4318        }
4319
4320        #[test]
4321        fn test_get_neighbors_outgoing_by_type() {
4322            let db = GrafeoDB::new_in_memory();
4323            let session = db.session();
4324
4325            let alix = session.create_node(&["Person"]);
4326            let gus = session.create_node(&["Person"]);
4327            let company = session.create_node(&["Company"]);
4328
4329            session.create_edge(alix, gus, "KNOWS");
4330            session.create_edge(alix, company, "WORKS_AT");
4331
4332            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4333            assert_eq!(knows_neighbors.len(), 1);
4334            assert_eq!(knows_neighbors[0].0, gus);
4335
4336            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4337            assert_eq!(works_neighbors.len(), 1);
4338            assert_eq!(works_neighbors[0].0, company);
4339
4340            // No edges of this type
4341            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4342            assert!(no_neighbors.is_empty());
4343        }
4344
4345        #[test]
4346        fn test_node_exists() {
4347            use grafeo_common::types::NodeId;
4348
4349            let db = GrafeoDB::new_in_memory();
4350            let session = db.session();
4351
4352            let id = session.create_node(&["Person"]);
4353
4354            assert!(session.node_exists(id));
4355            assert!(!session.node_exists(NodeId::new(9999)));
4356        }
4357
4358        #[test]
4359        fn test_edge_exists() {
4360            use grafeo_common::types::EdgeId;
4361
4362            let db = GrafeoDB::new_in_memory();
4363            let session = db.session();
4364
4365            let alix = session.create_node(&["Person"]);
4366            let gus = session.create_node(&["Person"]);
4367            let edge_id = session.create_edge(alix, gus, "KNOWS");
4368
4369            assert!(session.edge_exists(edge_id));
4370            assert!(!session.edge_exists(EdgeId::new(9999)));
4371        }
4372
4373        #[test]
4374        fn test_get_degree() {
4375            let db = GrafeoDB::new_in_memory();
4376            let session = db.session();
4377
4378            let alix = session.create_node(&["Person"]);
4379            let gus = session.create_node(&["Person"]);
4380            let harm = session.create_node(&["Person"]);
4381
4382            // Alix knows Gus and Harm (2 outgoing)
4383            session.create_edge(alix, gus, "KNOWS");
4384            session.create_edge(alix, harm, "KNOWS");
4385            // Gus knows Alix (1 incoming for Alix)
4386            session.create_edge(gus, alix, "KNOWS");
4387
4388            let (out_degree, in_degree) = session.get_degree(alix);
4389            assert_eq!(out_degree, 2);
4390            assert_eq!(in_degree, 1);
4391
4392            // Node with no edges
4393            let lonely = session.create_node(&["Person"]);
4394            let (out, in_deg) = session.get_degree(lonely);
4395            assert_eq!(out, 0);
4396            assert_eq!(in_deg, 0);
4397        }
4398
4399        #[test]
4400        fn test_get_nodes_batch() {
4401            let db = GrafeoDB::new_in_memory();
4402            let session = db.session();
4403
4404            let alix = session.create_node(&["Person"]);
4405            let gus = session.create_node(&["Person"]);
4406            let harm = session.create_node(&["Person"]);
4407
4408            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4409            assert_eq!(nodes.len(), 3);
4410            assert!(nodes[0].is_some());
4411            assert!(nodes[1].is_some());
4412            assert!(nodes[2].is_some());
4413
4414            // With non-existent node
4415            use grafeo_common::types::NodeId;
4416            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4417            assert_eq!(nodes_with_missing.len(), 3);
4418            assert!(nodes_with_missing[0].is_some());
4419            assert!(nodes_with_missing[1].is_none()); // Missing node
4420            assert!(nodes_with_missing[2].is_some());
4421        }
4422
4423        #[test]
4424        fn test_auto_commit_setting() {
4425            let db = GrafeoDB::new_in_memory();
4426            let mut session = db.session();
4427
4428            // Default is auto-commit enabled
4429            assert!(session.auto_commit());
4430
4431            session.set_auto_commit(false);
4432            assert!(!session.auto_commit());
4433
4434            session.set_auto_commit(true);
4435            assert!(session.auto_commit());
4436        }
4437
4438        #[test]
4439        fn test_transaction_double_begin_nests() {
4440            let db = GrafeoDB::new_in_memory();
4441            let mut session = db.session();
4442
4443            session.begin_transaction().unwrap();
4444            // Second begin_transaction creates a nested transaction (auto-savepoint)
4445            let result = session.begin_transaction();
4446            assert!(result.is_ok());
4447            // Commit the inner (releases savepoint)
4448            session.commit().unwrap();
4449            // Commit the outer
4450            session.commit().unwrap();
4451        }
4452
4453        #[test]
4454        fn test_commit_without_transaction_error() {
4455            let db = GrafeoDB::new_in_memory();
4456            let mut session = db.session();
4457
4458            let result = session.commit();
4459            assert!(result.is_err());
4460        }
4461
4462        #[test]
4463        fn test_rollback_without_transaction_error() {
4464            let db = GrafeoDB::new_in_memory();
4465            let mut session = db.session();
4466
4467            let result = session.rollback();
4468            assert!(result.is_err());
4469        }
4470
4471        #[test]
4472        fn test_create_edge_in_transaction() {
4473            let db = GrafeoDB::new_in_memory();
4474            let mut session = db.session();
4475
4476            // Create nodes outside transaction
4477            let alix = session.create_node(&["Person"]);
4478            let gus = session.create_node(&["Person"]);
4479
4480            // Create edge in transaction
4481            session.begin_transaction().unwrap();
4482            let edge_id = session.create_edge(alix, gus, "KNOWS");
4483
4484            // Edge should be visible in the transaction
4485            assert!(session.edge_exists(edge_id));
4486
4487            // Commit
4488            session.commit().unwrap();
4489
4490            // Edge should still be visible
4491            assert!(session.edge_exists(edge_id));
4492        }
4493
4494        #[test]
4495        fn test_neighbors_empty_node() {
4496            let db = GrafeoDB::new_in_memory();
4497            let session = db.session();
4498
4499            let lonely = session.create_node(&["Person"]);
4500
4501            assert!(session.get_neighbors_outgoing(lonely).is_empty());
4502            assert!(session.get_neighbors_incoming(lonely).is_empty());
4503            assert!(
4504                session
4505                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4506                    .is_empty()
4507            );
4508        }
4509    }
4510
4511    #[test]
4512    fn test_auto_gc_triggers_on_commit_interval() {
4513        use crate::config::Config;
4514
4515        let config = Config::in_memory().with_gc_interval(2);
4516        let db = GrafeoDB::with_config(config).unwrap();
4517        let mut session = db.session();
4518
4519        // First commit: counter = 1, no GC (not a multiple of 2)
4520        session.begin_transaction().unwrap();
4521        session.create_node(&["A"]);
4522        session.commit().unwrap();
4523
4524        // Second commit: counter = 2, GC should trigger (multiple of 2)
4525        session.begin_transaction().unwrap();
4526        session.create_node(&["B"]);
4527        session.commit().unwrap();
4528
4529        // Verify the database is still functional after GC
4530        assert_eq!(db.node_count(), 2);
4531    }
4532
4533    #[test]
4534    fn test_query_timeout_config_propagates_to_session() {
4535        use crate::config::Config;
4536        use std::time::Duration;
4537
4538        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4539        let db = GrafeoDB::with_config(config).unwrap();
4540        let session = db.session();
4541
4542        // Verify the session has a query deadline (timeout was set)
4543        assert!(session.query_deadline().is_some());
4544    }
4545
4546    #[test]
4547    fn test_no_query_timeout_returns_no_deadline() {
4548        let db = GrafeoDB::new_in_memory();
4549        let session = db.session();
4550
4551        // Default config has no timeout
4552        assert!(session.query_deadline().is_none());
4553    }
4554
4555    #[test]
4556    fn test_graph_model_accessor() {
4557        use crate::config::GraphModel;
4558
4559        let db = GrafeoDB::new_in_memory();
4560        let session = db.session();
4561
4562        assert_eq!(session.graph_model(), GraphModel::Lpg);
4563    }
4564
4565    #[cfg(feature = "gql")]
4566    #[test]
4567    fn test_external_store_session() {
4568        use grafeo_core::graph::GraphStoreMut;
4569        use std::sync::Arc;
4570
4571        let config = crate::config::Config::in_memory();
4572        let store =
4573            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4574        let db = GrafeoDB::with_store(store, config).unwrap();
4575
4576        let mut session = db.session();
4577
4578        // Use an explicit transaction so that INSERT and MATCH share the same
4579        // transaction context. With PENDING epochs, uncommitted versions are
4580        // only visible to the owning transaction.
4581        session.begin_transaction().unwrap();
4582        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4583
4584        // Verify we can query through it within the same transaction
4585        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4586        assert_eq!(result.row_count(), 1);
4587
4588        session.commit().unwrap();
4589    }
4590
4591    // ==================== Session Command Tests ====================
4592
4593    #[cfg(feature = "gql")]
4594    mod session_command_tests {
4595        use super::*;
4596        use grafeo_common::types::Value;
4597
4598        #[test]
4599        fn test_use_graph_sets_current_graph() {
4600            let db = GrafeoDB::new_in_memory();
4601            let session = db.session();
4602
4603            // Create the graph first, then USE it
4604            session.execute("CREATE GRAPH mydb").unwrap();
4605            session.execute("USE GRAPH mydb").unwrap();
4606
4607            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4608        }
4609
4610        #[test]
4611        fn test_use_graph_nonexistent_errors() {
4612            let db = GrafeoDB::new_in_memory();
4613            let session = db.session();
4614
4615            let result = session.execute("USE GRAPH doesnotexist");
4616            assert!(result.is_err());
4617            let err = result.unwrap_err().to_string();
4618            assert!(
4619                err.contains("does not exist"),
4620                "Expected 'does not exist' error, got: {err}"
4621            );
4622        }
4623
4624        #[test]
4625        fn test_use_graph_default_always_valid() {
4626            let db = GrafeoDB::new_in_memory();
4627            let session = db.session();
4628
4629            // "default" is always valid, even without CREATE GRAPH
4630            session.execute("USE GRAPH default").unwrap();
4631            assert_eq!(session.current_graph(), Some("default".to_string()));
4632        }
4633
4634        #[test]
4635        fn test_session_set_graph() {
4636            let db = GrafeoDB::new_in_memory();
4637            let session = db.session();
4638
4639            // SESSION SET GRAPH does not verify existence
4640            session.execute("SESSION SET GRAPH analytics").unwrap();
4641            assert_eq!(session.current_graph(), Some("analytics".to_string()));
4642        }
4643
4644        #[test]
4645        fn test_session_set_time_zone() {
4646            let db = GrafeoDB::new_in_memory();
4647            let session = db.session();
4648
4649            assert_eq!(session.time_zone(), None);
4650
4651            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4652            assert_eq!(session.time_zone(), Some("UTC".to_string()));
4653
4654            session
4655                .execute("SESSION SET TIME ZONE 'America/New_York'")
4656                .unwrap();
4657            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
4658        }
4659
4660        #[test]
4661        fn test_session_set_parameter() {
4662            let db = GrafeoDB::new_in_memory();
4663            let session = db.session();
4664
4665            session
4666                .execute("SESSION SET PARAMETER $timeout = 30")
4667                .unwrap();
4668
4669            // Parameter is stored (value is Null for now, since expression
4670            // evaluation is not yet wired up)
4671            assert!(session.get_parameter("timeout").is_some());
4672        }
4673
4674        #[test]
4675        fn test_session_reset_clears_all_state() {
4676            let db = GrafeoDB::new_in_memory();
4677            let session = db.session();
4678
4679            // Set various session state
4680            session.execute("SESSION SET GRAPH analytics").unwrap();
4681            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4682            session
4683                .execute("SESSION SET PARAMETER $limit = 100")
4684                .unwrap();
4685
4686            // Verify state was set
4687            assert!(session.current_graph().is_some());
4688            assert!(session.time_zone().is_some());
4689            assert!(session.get_parameter("limit").is_some());
4690
4691            // Reset everything
4692            session.execute("SESSION RESET").unwrap();
4693
4694            assert_eq!(session.current_graph(), None);
4695            assert_eq!(session.time_zone(), None);
4696            assert!(session.get_parameter("limit").is_none());
4697        }
4698
4699        #[test]
4700        fn test_session_close_clears_state() {
4701            let db = GrafeoDB::new_in_memory();
4702            let session = db.session();
4703
4704            session.execute("SESSION SET GRAPH analytics").unwrap();
4705            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4706
4707            session.execute("SESSION CLOSE").unwrap();
4708
4709            assert_eq!(session.current_graph(), None);
4710            assert_eq!(session.time_zone(), None);
4711        }
4712
4713        #[test]
4714        fn test_create_graph() {
4715            let db = GrafeoDB::new_in_memory();
4716            let session = db.session();
4717
4718            session.execute("CREATE GRAPH mydb").unwrap();
4719
4720            // Should be able to USE it now
4721            session.execute("USE GRAPH mydb").unwrap();
4722            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4723        }
4724
4725        #[test]
4726        fn test_create_graph_duplicate_errors() {
4727            let db = GrafeoDB::new_in_memory();
4728            let session = db.session();
4729
4730            session.execute("CREATE GRAPH mydb").unwrap();
4731            let result = session.execute("CREATE GRAPH mydb");
4732
4733            assert!(result.is_err());
4734            let err = result.unwrap_err().to_string();
4735            assert!(
4736                err.contains("already exists"),
4737                "Expected 'already exists' error, got: {err}"
4738            );
4739        }
4740
4741        #[test]
4742        fn test_create_graph_if_not_exists() {
4743            let db = GrafeoDB::new_in_memory();
4744            let session = db.session();
4745
4746            session.execute("CREATE GRAPH mydb").unwrap();
4747            // Should succeed silently with IF NOT EXISTS
4748            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
4749        }
4750
4751        #[test]
4752        fn test_drop_graph() {
4753            let db = GrafeoDB::new_in_memory();
4754            let session = db.session();
4755
4756            session.execute("CREATE GRAPH mydb").unwrap();
4757            session.execute("DROP GRAPH mydb").unwrap();
4758
4759            // Should no longer be usable
4760            let result = session.execute("USE GRAPH mydb");
4761            assert!(result.is_err());
4762        }
4763
4764        #[test]
4765        fn test_drop_graph_nonexistent_errors() {
4766            let db = GrafeoDB::new_in_memory();
4767            let session = db.session();
4768
4769            let result = session.execute("DROP GRAPH nosuchgraph");
4770            assert!(result.is_err());
4771            let err = result.unwrap_err().to_string();
4772            assert!(
4773                err.contains("does not exist"),
4774                "Expected 'does not exist' error, got: {err}"
4775            );
4776        }
4777
4778        #[test]
4779        fn test_drop_graph_if_exists() {
4780            let db = GrafeoDB::new_in_memory();
4781            let session = db.session();
4782
4783            // Should succeed silently with IF EXISTS
4784            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
4785        }
4786
4787        #[test]
4788        fn test_start_transaction_via_gql() {
4789            let db = GrafeoDB::new_in_memory();
4790            let session = db.session();
4791
4792            session.execute("START TRANSACTION").unwrap();
4793            assert!(session.in_transaction());
4794            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4795            session.execute("COMMIT").unwrap();
4796            assert!(!session.in_transaction());
4797
4798            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4799            assert_eq!(result.rows.len(), 1);
4800        }
4801
4802        #[test]
4803        fn test_start_transaction_read_only_blocks_insert() {
4804            let db = GrafeoDB::new_in_memory();
4805            let session = db.session();
4806
4807            session.execute("START TRANSACTION READ ONLY").unwrap();
4808            let result = session.execute("INSERT (:Person {name: 'Alix'})");
4809            assert!(result.is_err());
4810            let err = result.unwrap_err().to_string();
4811            assert!(
4812                err.contains("read-only"),
4813                "Expected read-only error, got: {err}"
4814            );
4815            session.execute("ROLLBACK").unwrap();
4816        }
4817
4818        #[test]
4819        fn test_start_transaction_read_only_allows_reads() {
4820            let db = GrafeoDB::new_in_memory();
4821            let mut session = db.session();
4822            session.begin_transaction().unwrap();
4823            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4824            session.commit().unwrap();
4825
4826            session.execute("START TRANSACTION READ ONLY").unwrap();
4827            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4828            assert_eq!(result.rows.len(), 1);
4829            session.execute("COMMIT").unwrap();
4830        }
4831
4832        #[test]
4833        fn test_rollback_via_gql() {
4834            let db = GrafeoDB::new_in_memory();
4835            let session = db.session();
4836
4837            session.execute("START TRANSACTION").unwrap();
4838            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4839            session.execute("ROLLBACK").unwrap();
4840
4841            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4842            assert!(result.rows.is_empty());
4843        }
4844
4845        #[test]
4846        fn test_start_transaction_with_isolation_level() {
4847            let db = GrafeoDB::new_in_memory();
4848            let session = db.session();
4849
4850            session
4851                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
4852                .unwrap();
4853            assert!(session.in_transaction());
4854            session.execute("ROLLBACK").unwrap();
4855        }
4856
4857        #[test]
4858        fn test_session_commands_return_empty_result() {
4859            let db = GrafeoDB::new_in_memory();
4860            let session = db.session();
4861
4862            let result = session.execute("SESSION SET GRAPH test").unwrap();
4863            assert_eq!(result.row_count(), 0);
4864            assert_eq!(result.column_count(), 0);
4865        }
4866
4867        #[test]
4868        fn test_current_graph_default_is_none() {
4869            let db = GrafeoDB::new_in_memory();
4870            let session = db.session();
4871
4872            assert_eq!(session.current_graph(), None);
4873        }
4874
4875        #[test]
4876        fn test_time_zone_default_is_none() {
4877            let db = GrafeoDB::new_in_memory();
4878            let session = db.session();
4879
4880            assert_eq!(session.time_zone(), None);
4881        }
4882
4883        #[test]
4884        fn test_session_state_independent_across_sessions() {
4885            let db = GrafeoDB::new_in_memory();
4886            let session1 = db.session();
4887            let session2 = db.session();
4888
4889            session1.execute("SESSION SET GRAPH first").unwrap();
4890            session2.execute("SESSION SET GRAPH second").unwrap();
4891
4892            assert_eq!(session1.current_graph(), Some("first".to_string()));
4893            assert_eq!(session2.current_graph(), Some("second".to_string()));
4894        }
4895
4896        #[test]
4897        fn test_show_node_types() {
4898            let db = GrafeoDB::new_in_memory();
4899            let session = db.session();
4900
4901            session
4902                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
4903                .unwrap();
4904
4905            let result = session.execute("SHOW NODE TYPES").unwrap();
4906            assert_eq!(
4907                result.columns,
4908                vec!["name", "properties", "constraints", "parents"]
4909            );
4910            assert_eq!(result.rows.len(), 1);
4911            // First column is the type name
4912            assert_eq!(result.rows[0][0], Value::from("Person"));
4913        }
4914
4915        #[test]
4916        fn test_show_edge_types() {
4917            let db = GrafeoDB::new_in_memory();
4918            let session = db.session();
4919
4920            session
4921                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
4922                .unwrap();
4923
4924            let result = session.execute("SHOW EDGE TYPES").unwrap();
4925            assert_eq!(
4926                result.columns,
4927                vec!["name", "properties", "source_types", "target_types"]
4928            );
4929            assert_eq!(result.rows.len(), 1);
4930            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
4931        }
4932
4933        #[test]
4934        fn test_show_graph_types() {
4935            let db = GrafeoDB::new_in_memory();
4936            let session = db.session();
4937
4938            session
4939                .execute("CREATE NODE TYPE Person (name STRING)")
4940                .unwrap();
4941            session
4942                .execute(
4943                    "CREATE GRAPH TYPE social (\
4944                        NODE TYPE Person (name STRING)\
4945                    )",
4946                )
4947                .unwrap();
4948
4949            let result = session.execute("SHOW GRAPH TYPES").unwrap();
4950            assert_eq!(
4951                result.columns,
4952                vec!["name", "open", "node_types", "edge_types"]
4953            );
4954            assert_eq!(result.rows.len(), 1);
4955            assert_eq!(result.rows[0][0], Value::from("social"));
4956        }
4957
4958        #[test]
4959        fn test_show_graph_type_named() {
4960            let db = GrafeoDB::new_in_memory();
4961            let session = db.session();
4962
4963            session
4964                .execute("CREATE NODE TYPE Person (name STRING)")
4965                .unwrap();
4966            session
4967                .execute(
4968                    "CREATE GRAPH TYPE social (\
4969                        NODE TYPE Person (name STRING)\
4970                    )",
4971                )
4972                .unwrap();
4973
4974            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
4975            assert_eq!(result.rows.len(), 1);
4976            assert_eq!(result.rows[0][0], Value::from("social"));
4977        }
4978
4979        #[test]
4980        fn test_show_graph_type_not_found() {
4981            let db = GrafeoDB::new_in_memory();
4982            let session = db.session();
4983
4984            let result = session.execute("SHOW GRAPH TYPE nonexistent");
4985            assert!(result.is_err());
4986        }
4987
4988        #[test]
4989        fn test_show_indexes_via_gql() {
4990            let db = GrafeoDB::new_in_memory();
4991            let session = db.session();
4992
4993            let result = session.execute("SHOW INDEXES").unwrap();
4994            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
4995        }
4996
4997        #[test]
4998        fn test_show_constraints_via_gql() {
4999            let db = GrafeoDB::new_in_memory();
5000            let session = db.session();
5001
5002            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5003            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5004        }
5005
5006        #[test]
5007        fn test_pattern_form_graph_type_roundtrip() {
5008            let db = GrafeoDB::new_in_memory();
5009            let session = db.session();
5010
5011            // Register the types first
5012            session
5013                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5014                .unwrap();
5015            session
5016                .execute("CREATE NODE TYPE City (name STRING)")
5017                .unwrap();
5018            session
5019                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5020                .unwrap();
5021            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5022
5023            // Create graph type using pattern form
5024            session
5025                .execute(
5026                    "CREATE GRAPH TYPE social (\
5027                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5028                        (:Person)-[:LIVES_IN]->(:City)\
5029                    )",
5030                )
5031                .unwrap();
5032
5033            // Verify it was created
5034            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5035            assert_eq!(result.rows.len(), 1);
5036            assert_eq!(result.rows[0][0], Value::from("social"));
5037        }
5038    }
5039}