Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25/// Parses a DDL default-value literal string into a [`Value`].
26///
27/// Handles string literals (single- or double-quoted), integers, floats,
28/// booleans (`true`/`false`), and `NULL`.
29fn parse_default_literal(text: &str) -> Value {
30    if text.eq_ignore_ascii_case("null") {
31        return Value::Null;
32    }
33    if text.eq_ignore_ascii_case("true") {
34        return Value::Bool(true);
35    }
36    if text.eq_ignore_ascii_case("false") {
37        return Value::Bool(false);
38    }
39    // String literal: strip surrounding quotes
40    if (text.starts_with('\'') && text.ends_with('\''))
41        || (text.starts_with('"') && text.ends_with('"'))
42    {
43        return Value::String(text[1..text.len() - 1].into());
44    }
45    // Try integer, then float
46    if let Ok(i) = text.parse::<i64>() {
47        return Value::Int64(i);
48    }
49    if let Ok(f) = text.parse::<f64>() {
50        return Value::Float64(f);
51    }
52    // Fallback: treat as string
53    Value::String(text.into())
54}
55
56/// Runtime configuration for creating a new session.
57///
58/// Groups the shared parameters passed to all session constructors, keeping
59/// call sites readable and avoiding long argument lists.
60pub(crate) struct SessionConfig {
61    pub transaction_manager: Arc<TransactionManager>,
62    pub query_cache: Arc<QueryCache>,
63    pub catalog: Arc<Catalog>,
64    pub adaptive_config: AdaptiveConfig,
65    pub factorized_execution: bool,
66    pub graph_model: GraphModel,
67    pub query_timeout: Option<Duration>,
68    pub commit_counter: Arc<AtomicUsize>,
69    pub gc_interval: usize,
70}
71
72/// Your handle to the database - execute queries and manage transactions.
73///
74/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
75/// tracks its own transaction state, so you can have multiple concurrent
76/// sessions without them interfering.
77pub struct Session {
78    /// The underlying store.
79    store: Arc<LpgStore>,
80    /// Graph store trait object for pluggable storage backends.
81    graph_store: Arc<dyn GraphStoreMut>,
82    /// Schema and metadata catalog shared across sessions.
83    catalog: Arc<Catalog>,
84    /// RDF triple store (if RDF feature is enabled).
85    #[cfg(feature = "rdf")]
86    rdf_store: Arc<RdfStore>,
87    /// Transaction manager.
88    transaction_manager: Arc<TransactionManager>,
89    /// Query cache shared across sessions.
90    query_cache: Arc<QueryCache>,
91    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
92    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
93    /// from within `execute(&self)`.
94    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
95    /// Whether the current transaction is read-only (blocks mutations).
96    read_only_tx: parking_lot::Mutex<bool>,
97    /// Whether the session is in auto-commit mode.
98    auto_commit: bool,
99    /// Adaptive execution configuration.
100    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
101    adaptive_config: AdaptiveConfig,
102    /// Whether to use factorized execution for multi-hop queries.
103    factorized_execution: bool,
104    /// The graph data model this session operates on.
105    graph_model: GraphModel,
106    /// Maximum time a query may run before being cancelled.
107    query_timeout: Option<Duration>,
108    /// Shared commit counter for triggering auto-GC.
109    commit_counter: Arc<AtomicUsize>,
110    /// GC every N commits (0 = disabled).
111    gc_interval: usize,
112    /// Node count at the start of the current transaction (for PreparedCommit stats).
113    transaction_start_node_count: AtomicUsize,
114    /// Edge count at the start of the current transaction (for PreparedCommit stats).
115    transaction_start_edge_count: AtomicUsize,
116    /// WAL for logging schema changes.
117    #[cfg(feature = "wal")]
118    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
119    /// Shared WAL graph context tracker for named graph awareness.
120    #[cfg(feature = "wal")]
121    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
122    /// CDC log for change tracking.
123    #[cfg(feature = "cdc")]
124    cdc_log: Arc<crate::cdc::CdcLog>,
125    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
126    current_graph: parking_lot::Mutex<Option<String>>,
127    /// Session time zone override.
128    time_zone: parking_lot::Mutex<Option<String>>,
129    /// Session-level parameters (SET PARAMETER).
130    session_params:
131        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
132    /// Override epoch for time-travel queries (None = use transaction/current epoch).
133    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
134    /// Savepoints within the current transaction.
135    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
136    /// Nesting depth for nested transactions (0 = outermost).
137    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
138    /// releases it, nested `ROLLBACK` rolls back to it.
139    transaction_nesting_depth: parking_lot::Mutex<u32>,
140    /// Named graphs touched during the current transaction (for cross-graph atomicity).
141    /// `None` represents the default graph. Populated at `BEGIN` time and on each
142    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
143    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
144}
145
146/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
147#[derive(Clone)]
148struct GraphSavepoint {
149    graph_name: Option<String>,
150    next_node_id: u64,
151    next_edge_id: u64,
152    undo_log_position: usize,
153}
154
155/// Savepoint state: name + per-graph snapshots + the graph that was active.
156#[derive(Clone)]
157struct SavepointState {
158    name: String,
159    graph_snapshots: Vec<GraphSavepoint>,
160    /// The graph that was active when the savepoint was created.
161    /// Reserved for future use (e.g., restoring graph context on rollback).
162    #[allow(dead_code)]
163    active_graph: Option<String>,
164}
165
166impl Session {
167    /// Creates a new session with adaptive execution configuration.
168    #[allow(dead_code)]
169    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
170        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
171        Self {
172            store,
173            graph_store,
174            catalog: cfg.catalog,
175            #[cfg(feature = "rdf")]
176            rdf_store: Arc::new(RdfStore::new()),
177            transaction_manager: cfg.transaction_manager,
178            query_cache: cfg.query_cache,
179            current_transaction: parking_lot::Mutex::new(None),
180            read_only_tx: parking_lot::Mutex::new(false),
181            auto_commit: true,
182            adaptive_config: cfg.adaptive_config,
183            factorized_execution: cfg.factorized_execution,
184            graph_model: cfg.graph_model,
185            query_timeout: cfg.query_timeout,
186            commit_counter: cfg.commit_counter,
187            gc_interval: cfg.gc_interval,
188            transaction_start_node_count: AtomicUsize::new(0),
189            transaction_start_edge_count: AtomicUsize::new(0),
190            #[cfg(feature = "wal")]
191            wal: None,
192            #[cfg(feature = "wal")]
193            wal_graph_context: None,
194            #[cfg(feature = "cdc")]
195            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
196            current_graph: parking_lot::Mutex::new(None),
197            time_zone: parking_lot::Mutex::new(None),
198            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
199            viewing_epoch_override: parking_lot::Mutex::new(None),
200            savepoints: parking_lot::Mutex::new(Vec::new()),
201            transaction_nesting_depth: parking_lot::Mutex::new(0),
202            touched_graphs: parking_lot::Mutex::new(Vec::new()),
203        }
204    }
205
206    /// Sets the WAL for this session (shared with the database).
207    ///
208    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
209    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
210    #[cfg(feature = "wal")]
211    pub(crate) fn set_wal(
212        &mut self,
213        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
214        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
215    ) {
216        // Wrap the graph store so query-engine mutations are WAL-logged
217        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
218            Arc::clone(&self.store),
219            Arc::clone(&wal),
220            Arc::clone(&wal_graph_context),
221        ));
222        self.wal = Some(wal);
223        self.wal_graph_context = Some(wal_graph_context);
224    }
225
226    /// Sets the CDC log for this session (shared with the database).
227    #[cfg(feature = "cdc")]
228    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
229        self.cdc_log = cdc_log;
230    }
231
232    /// Creates a new session with RDF store and adaptive configuration.
233    #[cfg(feature = "rdf")]
234    pub(crate) fn with_rdf_store_and_adaptive(
235        store: Arc<LpgStore>,
236        rdf_store: Arc<RdfStore>,
237        cfg: SessionConfig,
238    ) -> Self {
239        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
240        Self {
241            store,
242            graph_store,
243            catalog: cfg.catalog,
244            rdf_store,
245            transaction_manager: cfg.transaction_manager,
246            query_cache: cfg.query_cache,
247            current_transaction: parking_lot::Mutex::new(None),
248            read_only_tx: parking_lot::Mutex::new(false),
249            auto_commit: true,
250            adaptive_config: cfg.adaptive_config,
251            factorized_execution: cfg.factorized_execution,
252            graph_model: cfg.graph_model,
253            query_timeout: cfg.query_timeout,
254            commit_counter: cfg.commit_counter,
255            gc_interval: cfg.gc_interval,
256            transaction_start_node_count: AtomicUsize::new(0),
257            transaction_start_edge_count: AtomicUsize::new(0),
258            #[cfg(feature = "wal")]
259            wal: None,
260            #[cfg(feature = "wal")]
261            wal_graph_context: None,
262            #[cfg(feature = "cdc")]
263            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
264            current_graph: parking_lot::Mutex::new(None),
265            time_zone: parking_lot::Mutex::new(None),
266            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
267            viewing_epoch_override: parking_lot::Mutex::new(None),
268            savepoints: parking_lot::Mutex::new(Vec::new()),
269            transaction_nesting_depth: parking_lot::Mutex::new(0),
270            touched_graphs: parking_lot::Mutex::new(Vec::new()),
271        }
272    }
273
274    /// Creates a session backed by an external graph store.
275    ///
276    /// The external store handles all data operations. Transaction management
277    /// (begin/commit/rollback) is not supported for external stores.
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if the internal arena allocation fails (out of memory).
282    pub(crate) fn with_external_store(
283        store: Arc<dyn GraphStoreMut>,
284        cfg: SessionConfig,
285    ) -> Result<Self> {
286        Ok(Self {
287            store: Arc::new(LpgStore::new()?),
288            graph_store: store,
289            catalog: cfg.catalog,
290            #[cfg(feature = "rdf")]
291            rdf_store: Arc::new(RdfStore::new()),
292            transaction_manager: cfg.transaction_manager,
293            query_cache: cfg.query_cache,
294            current_transaction: parking_lot::Mutex::new(None),
295            read_only_tx: parking_lot::Mutex::new(false),
296            auto_commit: true,
297            adaptive_config: cfg.adaptive_config,
298            factorized_execution: cfg.factorized_execution,
299            graph_model: cfg.graph_model,
300            query_timeout: cfg.query_timeout,
301            commit_counter: cfg.commit_counter,
302            gc_interval: cfg.gc_interval,
303            transaction_start_node_count: AtomicUsize::new(0),
304            transaction_start_edge_count: AtomicUsize::new(0),
305            #[cfg(feature = "wal")]
306            wal: None,
307            #[cfg(feature = "wal")]
308            wal_graph_context: None,
309            #[cfg(feature = "cdc")]
310            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
311            current_graph: parking_lot::Mutex::new(None),
312            time_zone: parking_lot::Mutex::new(None),
313            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
314            viewing_epoch_override: parking_lot::Mutex::new(None),
315            savepoints: parking_lot::Mutex::new(Vec::new()),
316            transaction_nesting_depth: parking_lot::Mutex::new(0),
317            touched_graphs: parking_lot::Mutex::new(Vec::new()),
318        })
319    }
320
321    /// Returns the graph model this session operates on.
322    #[must_use]
323    pub fn graph_model(&self) -> GraphModel {
324        self.graph_model
325    }
326
327    // === Session State Management ===
328
329    /// Sets the current graph for this session (USE GRAPH).
330    pub fn use_graph(&self, name: &str) {
331        *self.current_graph.lock() = Some(name.to_string());
332    }
333
334    /// Returns the current graph name, if any.
335    #[must_use]
336    pub fn current_graph(&self) -> Option<String> {
337        self.current_graph.lock().clone()
338    }
339
340    /// Returns the graph store for the currently active graph.
341    ///
342    /// If `current_graph` is `None` or `"default"`, returns the session's
343    /// default `graph_store` (already WAL-wrapped for the default graph).
344    /// Otherwise looks up the named graph in the root store and wraps it
345    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
346    /// graph context.
347    fn active_store(&self) -> Arc<dyn GraphStoreMut> {
348        let graph_name = self.current_graph.lock().clone();
349        match graph_name {
350            None => Arc::clone(&self.graph_store),
351            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.graph_store),
352            Some(ref name) => match self.store.graph(name) {
353                Some(named_store) => {
354                    #[cfg(feature = "wal")]
355                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
356                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
357                            named_store,
358                            Arc::clone(wal),
359                            name.clone(),
360                            Arc::clone(ctx),
361                        )) as Arc<dyn GraphStoreMut>;
362                    }
363                    named_store as Arc<dyn GraphStoreMut>
364                }
365                None => Arc::clone(&self.graph_store),
366            },
367        }
368    }
369
370    /// Returns the concrete `LpgStore` for the currently active graph.
371    ///
372    /// Used by direct CRUD methods that need the concrete store type
373    /// for versioned operations.
374    fn active_lpg_store(&self) -> Arc<LpgStore> {
375        let graph_name = self.current_graph.lock().clone();
376        match graph_name {
377            None => Arc::clone(&self.store),
378            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
379            Some(ref name) => self
380                .store
381                .graph(name)
382                .unwrap_or_else(|| Arc::clone(&self.store)),
383        }
384    }
385
386    /// Resolves a graph name to a concrete `LpgStore`.
387    /// `None` and `"default"` resolve to the session's root store.
388    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
389        match graph_name {
390            None => Arc::clone(&self.store),
391            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
392            Some(name) => self
393                .store
394                .graph(name)
395                .unwrap_or_else(|| Arc::clone(&self.store)),
396        }
397    }
398
399    /// Records the current graph as "touched" if a transaction is active.
400    fn track_graph_touch(&self) {
401        if self.current_transaction.lock().is_some() {
402            let graph = self.current_graph.lock().clone();
403            // Normalize: treat Some("default") as None
404            let normalized = match graph {
405                Some(ref name) if name.eq_ignore_ascii_case("default") => None,
406                other => other,
407            };
408            let mut touched = self.touched_graphs.lock();
409            if !touched.contains(&normalized) {
410                touched.push(normalized);
411            }
412        }
413    }
414
415    /// Sets the session time zone.
416    pub fn set_time_zone(&self, tz: &str) {
417        *self.time_zone.lock() = Some(tz.to_string());
418    }
419
420    /// Returns the session time zone, if set.
421    #[must_use]
422    pub fn time_zone(&self) -> Option<String> {
423        self.time_zone.lock().clone()
424    }
425
426    /// Sets a session parameter.
427    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
428        self.session_params.lock().insert(key.to_string(), value);
429    }
430
431    /// Gets a session parameter by cloning it.
432    #[must_use]
433    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
434        self.session_params.lock().get(key).cloned()
435    }
436
437    /// Resets all session state to defaults.
438    pub fn reset_session(&self) {
439        *self.current_graph.lock() = None;
440        *self.time_zone.lock() = None;
441        self.session_params.lock().clear();
442        *self.viewing_epoch_override.lock() = None;
443    }
444
445    // --- Time-travel API ---
446
447    /// Sets a viewing epoch override for time-travel queries.
448    ///
449    /// While set, all queries on this session see the database as it existed
450    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
451    /// to return to normal behavior.
452    pub fn set_viewing_epoch(&self, epoch: EpochId) {
453        *self.viewing_epoch_override.lock() = Some(epoch);
454    }
455
456    /// Clears the viewing epoch override, returning to normal behavior.
457    pub fn clear_viewing_epoch(&self) {
458        *self.viewing_epoch_override.lock() = None;
459    }
460
461    /// Returns the current viewing epoch override, if any.
462    #[must_use]
463    pub fn viewing_epoch(&self) -> Option<EpochId> {
464        *self.viewing_epoch_override.lock()
465    }
466
467    /// Returns all versions of a node with their creation/deletion epochs.
468    ///
469    /// Properties and labels reflect the current state (not versioned per-epoch).
470    #[must_use]
471    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
472        self.active_lpg_store().get_node_history(id)
473    }
474
475    /// Returns all versions of an edge with their creation/deletion epochs.
476    ///
477    /// Properties reflect the current state (not versioned per-epoch).
478    #[must_use]
479    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
480        self.active_lpg_store().get_edge_history(id)
481    }
482
483    /// Checks that the session's graph model supports LPG operations.
484    fn require_lpg(&self, language: &str) -> Result<()> {
485        if self.graph_model == GraphModel::Rdf {
486            return Err(grafeo_common::utils::error::Error::Internal(format!(
487                "This is an RDF database. {language} queries require an LPG database."
488            )));
489        }
490        Ok(())
491    }
492
493    /// Executes a session or transaction command, returning an empty result.
494    #[cfg(feature = "gql")]
495    fn execute_session_command(
496        &self,
497        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
498    ) -> Result<QueryResult> {
499        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
500        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
501
502        match cmd {
503            SessionCommand::CreateGraph {
504                name,
505                if_not_exists,
506                typed,
507                like_graph,
508                copy_of,
509                open: _,
510            } => {
511                // Validate source graph exists for LIKE / AS COPY OF
512                if let Some(ref src) = like_graph
513                    && self.store.graph(src).is_none()
514                {
515                    return Err(Error::Query(QueryError::new(
516                        QueryErrorKind::Semantic,
517                        format!("Source graph '{src}' does not exist"),
518                    )));
519                }
520                if let Some(ref src) = copy_of
521                    && self.store.graph(src).is_none()
522                {
523                    return Err(Error::Query(QueryError::new(
524                        QueryErrorKind::Semantic,
525                        format!("Source graph '{src}' does not exist"),
526                    )));
527                }
528
529                let created = self
530                    .store
531                    .create_graph(&name)
532                    .map_err(|e| Error::Internal(e.to_string()))?;
533                if !created && !if_not_exists {
534                    return Err(Error::Query(QueryError::new(
535                        QueryErrorKind::Semantic,
536                        format!("Graph '{name}' already exists"),
537                    )));
538                }
539                if created {
540                    #[cfg(feature = "wal")]
541                    self.log_schema_wal(
542                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
543                            name: name.clone(),
544                        },
545                    );
546                }
547
548                // AS COPY OF: copy data from source graph
549                if let Some(ref src) = copy_of {
550                    self.store
551                        .copy_graph(Some(src), Some(&name))
552                        .map_err(|e| Error::Internal(e.to_string()))?;
553                }
554
555                // Bind to graph type if specified
556                if let Some(type_name) = typed
557                    && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
558                {
559                    return Err(Error::Query(QueryError::new(
560                        QueryErrorKind::Semantic,
561                        e.to_string(),
562                    )));
563                }
564
565                // LIKE: copy graph type binding from source
566                if let Some(ref src) = like_graph
567                    && let Some(src_type) = self.catalog.get_graph_type_binding(src)
568                {
569                    let _ = self.catalog.bind_graph_type(&name, src_type);
570                }
571
572                Ok(QueryResult::empty())
573            }
574            SessionCommand::DropGraph { name, if_exists } => {
575                let dropped = self.store.drop_graph(&name);
576                if !dropped && !if_exists {
577                    return Err(Error::Query(QueryError::new(
578                        QueryErrorKind::Semantic,
579                        format!("Graph '{name}' does not exist"),
580                    )));
581                }
582                if dropped {
583                    #[cfg(feature = "wal")]
584                    self.log_schema_wal(
585                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
586                            name: name.clone(),
587                        },
588                    );
589                    // If this session was using the dropped graph, reset to default
590                    let mut current = self.current_graph.lock();
591                    if current
592                        .as_deref()
593                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
594                    {
595                        *current = None;
596                    }
597                }
598                Ok(QueryResult::empty())
599            }
600            SessionCommand::UseGraph(name) => {
601                // Verify graph exists (default graph is always valid)
602                if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
603                    return Err(Error::Query(QueryError::new(
604                        QueryErrorKind::Semantic,
605                        format!("Graph '{name}' does not exist"),
606                    )));
607                }
608                self.use_graph(&name);
609                // Track the new graph if in a transaction
610                self.track_graph_touch();
611                Ok(QueryResult::empty())
612            }
613            SessionCommand::SessionSetGraph(name) => {
614                // Validate graph exists (same check as USE GRAPH)
615                if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
616                    return Err(Error::Query(QueryError::new(
617                        QueryErrorKind::Semantic,
618                        format!("Graph '{name}' does not exist"),
619                    )));
620                }
621                self.use_graph(&name);
622                // Track the new graph if in a transaction
623                self.track_graph_touch();
624                Ok(QueryResult::empty())
625            }
626            SessionCommand::SessionSetTimeZone(tz) => {
627                self.set_time_zone(&tz);
628                Ok(QueryResult::empty())
629            }
630            SessionCommand::SessionSetParameter(key, expr) => {
631                if key.eq_ignore_ascii_case("viewing_epoch") {
632                    match Self::eval_integer_literal(&expr) {
633                        Some(n) if n >= 0 => {
634                            self.set_viewing_epoch(EpochId::new(n as u64));
635                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
636                        }
637                        _ => Err(Error::Query(QueryError::new(
638                            QueryErrorKind::Semantic,
639                            "viewing_epoch must be a non-negative integer literal",
640                        ))),
641                    }
642                } else {
643                    // For now, store parameter name with Null value.
644                    // Full expression evaluation would require building and executing a plan.
645                    self.set_parameter(&key, Value::Null);
646                    Ok(QueryResult::empty())
647                }
648            }
649            SessionCommand::SessionReset => {
650                self.reset_session();
651                Ok(QueryResult::empty())
652            }
653            SessionCommand::SessionClose => {
654                self.reset_session();
655                Ok(QueryResult::empty())
656            }
657            SessionCommand::StartTransaction {
658                read_only,
659                isolation_level,
660            } => {
661                let engine_level = isolation_level.map(|l| match l {
662                    TransactionIsolationLevel::ReadCommitted => {
663                        crate::transaction::IsolationLevel::ReadCommitted
664                    }
665                    TransactionIsolationLevel::SnapshotIsolation => {
666                        crate::transaction::IsolationLevel::SnapshotIsolation
667                    }
668                    TransactionIsolationLevel::Serializable => {
669                        crate::transaction::IsolationLevel::Serializable
670                    }
671                });
672                self.begin_transaction_inner(read_only, engine_level)?;
673                Ok(QueryResult::status("Transaction started"))
674            }
675            SessionCommand::Commit => {
676                self.commit_inner()?;
677                Ok(QueryResult::status("Transaction committed"))
678            }
679            SessionCommand::Rollback => {
680                self.rollback_inner()?;
681                Ok(QueryResult::status("Transaction rolled back"))
682            }
683            SessionCommand::Savepoint(name) => {
684                self.savepoint(&name)?;
685                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
686            }
687            SessionCommand::RollbackToSavepoint(name) => {
688                self.rollback_to_savepoint(&name)?;
689                Ok(QueryResult::status(format!(
690                    "Rolled back to savepoint '{name}'"
691                )))
692            }
693            SessionCommand::ReleaseSavepoint(name) => {
694                self.release_savepoint(&name)?;
695                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
696            }
697        }
698    }
699
700    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
701    #[cfg(feature = "wal")]
702    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
703        if let Some(ref wal) = self.wal
704            && let Err(e) = wal.log(record)
705        {
706            tracing::warn!("Failed to log schema change to WAL: {}", e);
707        }
708    }
709
710    /// Executes a schema DDL command, returning a status result.
711    #[cfg(feature = "gql")]
712    fn execute_schema_command(
713        &self,
714        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
715    ) -> Result<QueryResult> {
716        use crate::catalog::{
717            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
718        };
719        use grafeo_adapters::query::gql::ast::SchemaStatement;
720        #[cfg(feature = "wal")]
721        use grafeo_adapters::storage::wal::WalRecord;
722        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
723
724        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
725        macro_rules! wal_log {
726            ($self:expr, $record:expr) => {
727                #[cfg(feature = "wal")]
728                $self.log_schema_wal(&$record);
729            };
730        }
731
732        let result = match cmd {
733            SchemaStatement::CreateNodeType(stmt) => {
734                #[cfg(feature = "wal")]
735                let props_for_wal: Vec<(String, String, bool)> = stmt
736                    .properties
737                    .iter()
738                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
739                    .collect();
740                let def = NodeTypeDefinition {
741                    name: stmt.name.clone(),
742                    properties: stmt
743                        .properties
744                        .iter()
745                        .map(|p| TypedProperty {
746                            name: p.name.clone(),
747                            data_type: PropertyDataType::from_type_name(&p.data_type),
748                            nullable: p.nullable,
749                            default_value: p
750                                .default_value
751                                .as_ref()
752                                .map(|s| parse_default_literal(s)),
753                        })
754                        .collect(),
755                    constraints: Vec::new(),
756                    parent_types: stmt.parent_types.clone(),
757                };
758                let result = if stmt.or_replace {
759                    let _ = self.catalog.drop_node_type(&stmt.name);
760                    self.catalog.register_node_type(def)
761                } else {
762                    self.catalog.register_node_type(def)
763                };
764                match result {
765                    Ok(()) => {
766                        wal_log!(
767                            self,
768                            WalRecord::CreateNodeType {
769                                name: stmt.name.clone(),
770                                properties: props_for_wal,
771                                constraints: Vec::new(),
772                            }
773                        );
774                        Ok(QueryResult::status(format!(
775                            "Created node type '{}'",
776                            stmt.name
777                        )))
778                    }
779                    Err(e) if stmt.if_not_exists => {
780                        let _ = e;
781                        Ok(QueryResult::status("No change"))
782                    }
783                    Err(e) => Err(Error::Query(QueryError::new(
784                        QueryErrorKind::Semantic,
785                        e.to_string(),
786                    ))),
787                }
788            }
789            SchemaStatement::CreateEdgeType(stmt) => {
790                #[cfg(feature = "wal")]
791                let props_for_wal: Vec<(String, String, bool)> = stmt
792                    .properties
793                    .iter()
794                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
795                    .collect();
796                let def = EdgeTypeDefinition {
797                    name: stmt.name.clone(),
798                    properties: stmt
799                        .properties
800                        .iter()
801                        .map(|p| TypedProperty {
802                            name: p.name.clone(),
803                            data_type: PropertyDataType::from_type_name(&p.data_type),
804                            nullable: p.nullable,
805                            default_value: p
806                                .default_value
807                                .as_ref()
808                                .map(|s| parse_default_literal(s)),
809                        })
810                        .collect(),
811                    constraints: Vec::new(),
812                    source_node_types: stmt.source_node_types.clone(),
813                    target_node_types: stmt.target_node_types.clone(),
814                };
815                let result = if stmt.or_replace {
816                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
817                    self.catalog.register_edge_type_def(def)
818                } else {
819                    self.catalog.register_edge_type_def(def)
820                };
821                match result {
822                    Ok(()) => {
823                        wal_log!(
824                            self,
825                            WalRecord::CreateEdgeType {
826                                name: stmt.name.clone(),
827                                properties: props_for_wal,
828                                constraints: Vec::new(),
829                            }
830                        );
831                        Ok(QueryResult::status(format!(
832                            "Created edge type '{}'",
833                            stmt.name
834                        )))
835                    }
836                    Err(e) if stmt.if_not_exists => {
837                        let _ = e;
838                        Ok(QueryResult::status("No change"))
839                    }
840                    Err(e) => Err(Error::Query(QueryError::new(
841                        QueryErrorKind::Semantic,
842                        e.to_string(),
843                    ))),
844                }
845            }
846            SchemaStatement::CreateVectorIndex(stmt) => {
847                Self::create_vector_index_on_store(
848                    &self.active_lpg_store(),
849                    &stmt.node_label,
850                    &stmt.property,
851                    stmt.dimensions,
852                    stmt.metric.as_deref(),
853                )?;
854                wal_log!(
855                    self,
856                    WalRecord::CreateIndex {
857                        name: stmt.name.clone(),
858                        label: stmt.node_label.clone(),
859                        property: stmt.property.clone(),
860                        index_type: "vector".to_string(),
861                    }
862                );
863                Ok(QueryResult::status(format!(
864                    "Created vector index '{}'",
865                    stmt.name
866                )))
867            }
868            SchemaStatement::DropNodeType { name, if_exists } => {
869                match self.catalog.drop_node_type(&name) {
870                    Ok(()) => {
871                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
872                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
873                    }
874                    Err(e) if if_exists => {
875                        let _ = e;
876                        Ok(QueryResult::status("No change"))
877                    }
878                    Err(e) => Err(Error::Query(QueryError::new(
879                        QueryErrorKind::Semantic,
880                        e.to_string(),
881                    ))),
882                }
883            }
884            SchemaStatement::DropEdgeType { name, if_exists } => {
885                match self.catalog.drop_edge_type_def(&name) {
886                    Ok(()) => {
887                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
888                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
889                    }
890                    Err(e) if if_exists => {
891                        let _ = e;
892                        Ok(QueryResult::status("No change"))
893                    }
894                    Err(e) => Err(Error::Query(QueryError::new(
895                        QueryErrorKind::Semantic,
896                        e.to_string(),
897                    ))),
898                }
899            }
900            SchemaStatement::CreateIndex(stmt) => {
901                use grafeo_adapters::query::gql::ast::IndexKind;
902                let active = self.active_lpg_store();
903                let index_type_str = match stmt.index_kind {
904                    IndexKind::Property => "property",
905                    IndexKind::BTree => "btree",
906                    IndexKind::Text => "text",
907                    IndexKind::Vector => "vector",
908                };
909                match stmt.index_kind {
910                    IndexKind::Property | IndexKind::BTree => {
911                        for prop in &stmt.properties {
912                            active.create_property_index(prop);
913                        }
914                    }
915                    IndexKind::Text => {
916                        for prop in &stmt.properties {
917                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
918                        }
919                    }
920                    IndexKind::Vector => {
921                        for prop in &stmt.properties {
922                            Self::create_vector_index_on_store(
923                                &active,
924                                &stmt.label,
925                                prop,
926                                stmt.options.dimensions,
927                                stmt.options.metric.as_deref(),
928                            )?;
929                        }
930                    }
931                }
932                #[cfg(feature = "wal")]
933                for prop in &stmt.properties {
934                    wal_log!(
935                        self,
936                        WalRecord::CreateIndex {
937                            name: stmt.name.clone(),
938                            label: stmt.label.clone(),
939                            property: prop.clone(),
940                            index_type: index_type_str.to_string(),
941                        }
942                    );
943                }
944                Ok(QueryResult::status(format!(
945                    "Created {} index '{}'",
946                    index_type_str, stmt.name
947                )))
948            }
949            SchemaStatement::DropIndex { name, if_exists } => {
950                // Try to drop property index by name
951                let dropped = self.active_lpg_store().drop_property_index(&name);
952                if dropped || if_exists {
953                    if dropped {
954                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
955                    }
956                    Ok(QueryResult::status(if dropped {
957                        format!("Dropped index '{name}'")
958                    } else {
959                        "No change".to_string()
960                    }))
961                } else {
962                    Err(Error::Query(QueryError::new(
963                        QueryErrorKind::Semantic,
964                        format!("Index '{name}' does not exist"),
965                    )))
966                }
967            }
968            SchemaStatement::CreateConstraint(stmt) => {
969                use crate::catalog::TypeConstraint;
970                use grafeo_adapters::query::gql::ast::ConstraintKind;
971                let kind_str = match stmt.constraint_kind {
972                    ConstraintKind::Unique => "unique",
973                    ConstraintKind::NodeKey => "node_key",
974                    ConstraintKind::NotNull => "not_null",
975                    ConstraintKind::Exists => "exists",
976                };
977                let constraint_name = stmt
978                    .name
979                    .clone()
980                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
981
982                // Register constraint in catalog type definitions
983                match stmt.constraint_kind {
984                    ConstraintKind::Unique => {
985                        for prop in &stmt.properties {
986                            let label_id = self.catalog.get_or_create_label(&stmt.label);
987                            let prop_id = self.catalog.get_or_create_property_key(prop);
988                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
989                        }
990                        let _ = self.catalog.add_constraint_to_type(
991                            &stmt.label,
992                            TypeConstraint::Unique(stmt.properties.clone()),
993                        );
994                    }
995                    ConstraintKind::NodeKey => {
996                        for prop in &stmt.properties {
997                            let label_id = self.catalog.get_or_create_label(&stmt.label);
998                            let prop_id = self.catalog.get_or_create_property_key(prop);
999                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1000                            let _ = self.catalog.add_required_property(label_id, prop_id);
1001                        }
1002                        let _ = self.catalog.add_constraint_to_type(
1003                            &stmt.label,
1004                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1005                        );
1006                    }
1007                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1008                        for prop in &stmt.properties {
1009                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1010                            let prop_id = self.catalog.get_or_create_property_key(prop);
1011                            let _ = self.catalog.add_required_property(label_id, prop_id);
1012                            let _ = self.catalog.add_constraint_to_type(
1013                                &stmt.label,
1014                                TypeConstraint::NotNull(prop.clone()),
1015                            );
1016                        }
1017                    }
1018                }
1019
1020                wal_log!(
1021                    self,
1022                    WalRecord::CreateConstraint {
1023                        name: constraint_name.clone(),
1024                        label: stmt.label.clone(),
1025                        properties: stmt.properties.clone(),
1026                        kind: kind_str.to_string(),
1027                    }
1028                );
1029                Ok(QueryResult::status(format!(
1030                    "Created {kind_str} constraint '{constraint_name}'"
1031                )))
1032            }
1033            SchemaStatement::DropConstraint { name, if_exists } => {
1034                let _ = if_exists;
1035                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1036                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1037            }
1038            SchemaStatement::CreateGraphType(stmt) => {
1039                use crate::catalog::GraphTypeDefinition;
1040                use grafeo_adapters::query::gql::ast::InlineElementType;
1041
1042                // GG04: LIKE clause copies type from existing graph
1043                let (mut node_types, mut edge_types, open) =
1044                    if let Some(ref like_graph) = stmt.like_graph {
1045                        // Infer types from the graph's bound type, or use its existing types
1046                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1047                            if let Some(existing) = self
1048                                .catalog
1049                                .schema()
1050                                .and_then(|s| s.get_graph_type(&type_name))
1051                            {
1052                                (
1053                                    existing.allowed_node_types.clone(),
1054                                    existing.allowed_edge_types.clone(),
1055                                    existing.open,
1056                                )
1057                            } else {
1058                                (Vec::new(), Vec::new(), true)
1059                            }
1060                        } else {
1061                            // GG22: Infer from graph data (labels used in graph)
1062                            let nt = self.catalog.all_node_type_names();
1063                            let et = self.catalog.all_edge_type_names();
1064                            if nt.is_empty() && et.is_empty() {
1065                                (Vec::new(), Vec::new(), true)
1066                            } else {
1067                                (nt, et, false)
1068                            }
1069                        }
1070                    } else {
1071                        (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1072                    };
1073
1074                // GG03: Register inline element types and add their names
1075                for inline in &stmt.inline_types {
1076                    match inline {
1077                        InlineElementType::Node {
1078                            name,
1079                            properties,
1080                            key_labels,
1081                            ..
1082                        } => {
1083                            let def = NodeTypeDefinition {
1084                                name: name.clone(),
1085                                properties: properties
1086                                    .iter()
1087                                    .map(|p| TypedProperty {
1088                                        name: p.name.clone(),
1089                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1090                                        nullable: p.nullable,
1091                                        default_value: None,
1092                                    })
1093                                    .collect(),
1094                                constraints: Vec::new(),
1095                                parent_types: key_labels.clone(),
1096                            };
1097                            // Register or replace so inline defs override existing
1098                            self.catalog.register_or_replace_node_type(def);
1099                            #[cfg(feature = "wal")]
1100                            {
1101                                let props_for_wal: Vec<(String, String, bool)> = properties
1102                                    .iter()
1103                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1104                                    .collect();
1105                                self.log_schema_wal(&WalRecord::CreateNodeType {
1106                                    name: name.clone(),
1107                                    properties: props_for_wal,
1108                                    constraints: Vec::new(),
1109                                });
1110                            }
1111                            if !node_types.contains(name) {
1112                                node_types.push(name.clone());
1113                            }
1114                        }
1115                        InlineElementType::Edge {
1116                            name,
1117                            properties,
1118                            source_node_types,
1119                            target_node_types,
1120                            ..
1121                        } => {
1122                            let def = EdgeTypeDefinition {
1123                                name: name.clone(),
1124                                properties: properties
1125                                    .iter()
1126                                    .map(|p| TypedProperty {
1127                                        name: p.name.clone(),
1128                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1129                                        nullable: p.nullable,
1130                                        default_value: None,
1131                                    })
1132                                    .collect(),
1133                                constraints: Vec::new(),
1134                                source_node_types: source_node_types.clone(),
1135                                target_node_types: target_node_types.clone(),
1136                            };
1137                            self.catalog.register_or_replace_edge_type_def(def);
1138                            #[cfg(feature = "wal")]
1139                            {
1140                                let props_for_wal: Vec<(String, String, bool)> = properties
1141                                    .iter()
1142                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1143                                    .collect();
1144                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1145                                    name: name.clone(),
1146                                    properties: props_for_wal,
1147                                    constraints: Vec::new(),
1148                                });
1149                            }
1150                            if !edge_types.contains(name) {
1151                                edge_types.push(name.clone());
1152                            }
1153                        }
1154                    }
1155                }
1156
1157                let def = GraphTypeDefinition {
1158                    name: stmt.name.clone(),
1159                    allowed_node_types: node_types.clone(),
1160                    allowed_edge_types: edge_types.clone(),
1161                    open,
1162                };
1163                let result = if stmt.or_replace {
1164                    // Drop existing first, ignore error if not found
1165                    let _ = self.catalog.drop_graph_type(&stmt.name);
1166                    self.catalog.register_graph_type(def)
1167                } else {
1168                    self.catalog.register_graph_type(def)
1169                };
1170                match result {
1171                    Ok(()) => {
1172                        wal_log!(
1173                            self,
1174                            WalRecord::CreateGraphType {
1175                                name: stmt.name.clone(),
1176                                node_types,
1177                                edge_types,
1178                                open,
1179                            }
1180                        );
1181                        Ok(QueryResult::status(format!(
1182                            "Created graph type '{}'",
1183                            stmt.name
1184                        )))
1185                    }
1186                    Err(e) if stmt.if_not_exists => {
1187                        let _ = e;
1188                        Ok(QueryResult::status("No change"))
1189                    }
1190                    Err(e) => Err(Error::Query(QueryError::new(
1191                        QueryErrorKind::Semantic,
1192                        e.to_string(),
1193                    ))),
1194                }
1195            }
1196            SchemaStatement::DropGraphType { name, if_exists } => {
1197                match self.catalog.drop_graph_type(&name) {
1198                    Ok(()) => {
1199                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1200                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1201                    }
1202                    Err(e) if if_exists => {
1203                        let _ = e;
1204                        Ok(QueryResult::status("No change"))
1205                    }
1206                    Err(e) => Err(Error::Query(QueryError::new(
1207                        QueryErrorKind::Semantic,
1208                        e.to_string(),
1209                    ))),
1210                }
1211            }
1212            SchemaStatement::CreateSchema {
1213                name,
1214                if_not_exists,
1215            } => match self.catalog.register_schema_namespace(name.clone()) {
1216                Ok(()) => {
1217                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1218                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1219                }
1220                Err(e) if if_not_exists => {
1221                    let _ = e;
1222                    Ok(QueryResult::status("No change"))
1223                }
1224                Err(e) => Err(Error::Query(QueryError::new(
1225                    QueryErrorKind::Semantic,
1226                    e.to_string(),
1227                ))),
1228            },
1229            SchemaStatement::DropSchema { name, if_exists } => {
1230                match self.catalog.drop_schema_namespace(&name) {
1231                    Ok(()) => {
1232                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1233                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1234                    }
1235                    Err(e) if if_exists => {
1236                        let _ = e;
1237                        Ok(QueryResult::status("No change"))
1238                    }
1239                    Err(e) => Err(Error::Query(QueryError::new(
1240                        QueryErrorKind::Semantic,
1241                        e.to_string(),
1242                    ))),
1243                }
1244            }
1245            SchemaStatement::AlterNodeType(stmt) => {
1246                use grafeo_adapters::query::gql::ast::TypeAlteration;
1247                let mut wal_alts = Vec::new();
1248                for alt in &stmt.alterations {
1249                    match alt {
1250                        TypeAlteration::AddProperty(prop) => {
1251                            let typed = TypedProperty {
1252                                name: prop.name.clone(),
1253                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1254                                nullable: prop.nullable,
1255                                default_value: prop
1256                                    .default_value
1257                                    .as_ref()
1258                                    .map(|s| parse_default_literal(s)),
1259                            };
1260                            self.catalog
1261                                .alter_node_type_add_property(&stmt.name, typed)
1262                                .map_err(|e| {
1263                                    Error::Query(QueryError::new(
1264                                        QueryErrorKind::Semantic,
1265                                        e.to_string(),
1266                                    ))
1267                                })?;
1268                            wal_alts.push((
1269                                "add".to_string(),
1270                                prop.name.clone(),
1271                                prop.data_type.clone(),
1272                                prop.nullable,
1273                            ));
1274                        }
1275                        TypeAlteration::DropProperty(name) => {
1276                            self.catalog
1277                                .alter_node_type_drop_property(&stmt.name, name)
1278                                .map_err(|e| {
1279                                    Error::Query(QueryError::new(
1280                                        QueryErrorKind::Semantic,
1281                                        e.to_string(),
1282                                    ))
1283                                })?;
1284                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1285                        }
1286                    }
1287                }
1288                wal_log!(
1289                    self,
1290                    WalRecord::AlterNodeType {
1291                        name: stmt.name.clone(),
1292                        alterations: wal_alts,
1293                    }
1294                );
1295                Ok(QueryResult::status(format!(
1296                    "Altered node type '{}'",
1297                    stmt.name
1298                )))
1299            }
1300            SchemaStatement::AlterEdgeType(stmt) => {
1301                use grafeo_adapters::query::gql::ast::TypeAlteration;
1302                let mut wal_alts = Vec::new();
1303                for alt in &stmt.alterations {
1304                    match alt {
1305                        TypeAlteration::AddProperty(prop) => {
1306                            let typed = TypedProperty {
1307                                name: prop.name.clone(),
1308                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1309                                nullable: prop.nullable,
1310                                default_value: prop
1311                                    .default_value
1312                                    .as_ref()
1313                                    .map(|s| parse_default_literal(s)),
1314                            };
1315                            self.catalog
1316                                .alter_edge_type_add_property(&stmt.name, typed)
1317                                .map_err(|e| {
1318                                    Error::Query(QueryError::new(
1319                                        QueryErrorKind::Semantic,
1320                                        e.to_string(),
1321                                    ))
1322                                })?;
1323                            wal_alts.push((
1324                                "add".to_string(),
1325                                prop.name.clone(),
1326                                prop.data_type.clone(),
1327                                prop.nullable,
1328                            ));
1329                        }
1330                        TypeAlteration::DropProperty(name) => {
1331                            self.catalog
1332                                .alter_edge_type_drop_property(&stmt.name, name)
1333                                .map_err(|e| {
1334                                    Error::Query(QueryError::new(
1335                                        QueryErrorKind::Semantic,
1336                                        e.to_string(),
1337                                    ))
1338                                })?;
1339                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1340                        }
1341                    }
1342                }
1343                wal_log!(
1344                    self,
1345                    WalRecord::AlterEdgeType {
1346                        name: stmt.name.clone(),
1347                        alterations: wal_alts,
1348                    }
1349                );
1350                Ok(QueryResult::status(format!(
1351                    "Altered edge type '{}'",
1352                    stmt.name
1353                )))
1354            }
1355            SchemaStatement::AlterGraphType(stmt) => {
1356                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1357                let mut wal_alts = Vec::new();
1358                for alt in &stmt.alterations {
1359                    match alt {
1360                        GraphTypeAlteration::AddNodeType(name) => {
1361                            self.catalog
1362                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
1363                                .map_err(|e| {
1364                                    Error::Query(QueryError::new(
1365                                        QueryErrorKind::Semantic,
1366                                        e.to_string(),
1367                                    ))
1368                                })?;
1369                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1370                        }
1371                        GraphTypeAlteration::DropNodeType(name) => {
1372                            self.catalog
1373                                .alter_graph_type_drop_node_type(&stmt.name, name)
1374                                .map_err(|e| {
1375                                    Error::Query(QueryError::new(
1376                                        QueryErrorKind::Semantic,
1377                                        e.to_string(),
1378                                    ))
1379                                })?;
1380                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1381                        }
1382                        GraphTypeAlteration::AddEdgeType(name) => {
1383                            self.catalog
1384                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1385                                .map_err(|e| {
1386                                    Error::Query(QueryError::new(
1387                                        QueryErrorKind::Semantic,
1388                                        e.to_string(),
1389                                    ))
1390                                })?;
1391                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1392                        }
1393                        GraphTypeAlteration::DropEdgeType(name) => {
1394                            self.catalog
1395                                .alter_graph_type_drop_edge_type(&stmt.name, name)
1396                                .map_err(|e| {
1397                                    Error::Query(QueryError::new(
1398                                        QueryErrorKind::Semantic,
1399                                        e.to_string(),
1400                                    ))
1401                                })?;
1402                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1403                        }
1404                    }
1405                }
1406                wal_log!(
1407                    self,
1408                    WalRecord::AlterGraphType {
1409                        name: stmt.name.clone(),
1410                        alterations: wal_alts,
1411                    }
1412                );
1413                Ok(QueryResult::status(format!(
1414                    "Altered graph type '{}'",
1415                    stmt.name
1416                )))
1417            }
1418            SchemaStatement::CreateProcedure(stmt) => {
1419                use crate::catalog::ProcedureDefinition;
1420
1421                let def = ProcedureDefinition {
1422                    name: stmt.name.clone(),
1423                    params: stmt
1424                        .params
1425                        .iter()
1426                        .map(|p| (p.name.clone(), p.param_type.clone()))
1427                        .collect(),
1428                    returns: stmt
1429                        .returns
1430                        .iter()
1431                        .map(|r| (r.name.clone(), r.return_type.clone()))
1432                        .collect(),
1433                    body: stmt.body.clone(),
1434                };
1435
1436                if stmt.or_replace {
1437                    self.catalog.replace_procedure(def).map_err(|e| {
1438                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1439                    })?;
1440                } else {
1441                    match self.catalog.register_procedure(def) {
1442                        Ok(()) => {}
1443                        Err(_) if stmt.if_not_exists => {
1444                            return Ok(QueryResult::empty());
1445                        }
1446                        Err(e) => {
1447                            return Err(Error::Query(QueryError::new(
1448                                QueryErrorKind::Semantic,
1449                                e.to_string(),
1450                            )));
1451                        }
1452                    }
1453                }
1454
1455                wal_log!(
1456                    self,
1457                    WalRecord::CreateProcedure {
1458                        name: stmt.name.clone(),
1459                        params: stmt
1460                            .params
1461                            .iter()
1462                            .map(|p| (p.name.clone(), p.param_type.clone()))
1463                            .collect(),
1464                        returns: stmt
1465                            .returns
1466                            .iter()
1467                            .map(|r| (r.name.clone(), r.return_type.clone()))
1468                            .collect(),
1469                        body: stmt.body,
1470                    }
1471                );
1472                Ok(QueryResult::status(format!(
1473                    "Created procedure '{}'",
1474                    stmt.name
1475                )))
1476            }
1477            SchemaStatement::DropProcedure { name, if_exists } => {
1478                match self.catalog.drop_procedure(&name) {
1479                    Ok(()) => {}
1480                    Err(_) if if_exists => {
1481                        return Ok(QueryResult::empty());
1482                    }
1483                    Err(e) => {
1484                        return Err(Error::Query(QueryError::new(
1485                            QueryErrorKind::Semantic,
1486                            e.to_string(),
1487                        )));
1488                    }
1489                }
1490                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1491                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1492            }
1493            SchemaStatement::ShowIndexes => {
1494                return self.execute_show_indexes();
1495            }
1496            SchemaStatement::ShowConstraints => {
1497                return self.execute_show_constraints();
1498            }
1499            SchemaStatement::ShowNodeTypes => {
1500                return self.execute_show_node_types();
1501            }
1502            SchemaStatement::ShowEdgeTypes => {
1503                return self.execute_show_edge_types();
1504            }
1505            SchemaStatement::ShowGraphTypes => {
1506                return self.execute_show_graph_types();
1507            }
1508            SchemaStatement::ShowGraphType(name) => {
1509                return self.execute_show_graph_type(&name);
1510            }
1511            SchemaStatement::ShowCurrentGraphType => {
1512                return self.execute_show_current_graph_type();
1513            }
1514            SchemaStatement::ShowGraphs => {
1515                return self.execute_show_graphs();
1516            }
1517        };
1518
1519        // Invalidate all cached query plans after any successful DDL change.
1520        // DDL is rare, so clearing the entire cache is cheap and correct.
1521        if result.is_ok() {
1522            self.query_cache.clear();
1523        }
1524
1525        result
1526    }
1527
1528    /// Creates a vector index on the store by scanning existing nodes.
1529    #[cfg(all(feature = "gql", feature = "vector-index"))]
1530    fn create_vector_index_on_store(
1531        store: &LpgStore,
1532        label: &str,
1533        property: &str,
1534        dimensions: Option<usize>,
1535        metric: Option<&str>,
1536    ) -> Result<()> {
1537        use grafeo_common::types::{PropertyKey, Value};
1538        use grafeo_common::utils::error::Error;
1539        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1540
1541        let metric = match metric {
1542            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1543                Error::Internal(format!(
1544                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1545                ))
1546            })?,
1547            None => DistanceMetric::Cosine,
1548        };
1549
1550        let prop_key = PropertyKey::new(property);
1551        let mut found_dims: Option<usize> = dimensions;
1552        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1553
1554        for node in store.nodes_with_label(label) {
1555            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1556                if let Some(expected) = found_dims {
1557                    if v.len() != expected {
1558                        return Err(Error::Internal(format!(
1559                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1560                            v.len(),
1561                            node.id.0
1562                        )));
1563                    }
1564                } else {
1565                    found_dims = Some(v.len());
1566                }
1567                vectors.push((node.id, v.to_vec()));
1568            }
1569        }
1570
1571        let Some(dims) = found_dims else {
1572            return Err(Error::Internal(format!(
1573                "No vector properties found on :{label}({property}) and no dimensions specified"
1574            )));
1575        };
1576
1577        let config = HnswConfig::new(dims, metric);
1578        let index = HnswIndex::with_capacity(config, vectors.len());
1579        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1580        for (node_id, vec) in &vectors {
1581            index.insert(*node_id, vec, &accessor);
1582        }
1583
1584        store.add_vector_index(label, property, Arc::new(index));
1585        Ok(())
1586    }
1587
1588    /// Stub for when vector-index feature is not enabled.
1589    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1590    fn create_vector_index_on_store(
1591        _store: &LpgStore,
1592        _label: &str,
1593        _property: &str,
1594        _dimensions: Option<usize>,
1595        _metric: Option<&str>,
1596    ) -> Result<()> {
1597        Err(grafeo_common::utils::error::Error::Internal(
1598            "Vector index support requires the 'vector-index' feature".to_string(),
1599        ))
1600    }
1601
1602    /// Creates a text index on the store by scanning existing nodes.
1603    #[cfg(all(feature = "gql", feature = "text-index"))]
1604    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1605        use grafeo_common::types::{PropertyKey, Value};
1606        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1607
1608        let mut index = InvertedIndex::new(BM25Config::default());
1609        let prop_key = PropertyKey::new(property);
1610
1611        let nodes = store.nodes_by_label(label);
1612        for node_id in nodes {
1613            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1614                index.insert(node_id, text.as_str());
1615            }
1616        }
1617
1618        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1619        Ok(())
1620    }
1621
1622    /// Stub for when text-index feature is not enabled.
1623    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1624    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1625        Err(grafeo_common::utils::error::Error::Internal(
1626            "Text index support requires the 'text-index' feature".to_string(),
1627        ))
1628    }
1629
1630    /// Returns a table of all indexes from the catalog.
1631    fn execute_show_indexes(&self) -> Result<QueryResult> {
1632        let indexes = self.catalog.all_indexes();
1633        let columns = vec![
1634            "name".to_string(),
1635            "type".to_string(),
1636            "label".to_string(),
1637            "property".to_string(),
1638        ];
1639        let rows: Vec<Vec<Value>> = indexes
1640            .into_iter()
1641            .map(|def| {
1642                let label_name = self
1643                    .catalog
1644                    .get_label_name(def.label)
1645                    .unwrap_or_else(|| "?".into());
1646                let prop_name = self
1647                    .catalog
1648                    .get_property_key_name(def.property_key)
1649                    .unwrap_or_else(|| "?".into());
1650                vec![
1651                    Value::from(format!("idx_{}_{}", label_name, prop_name)),
1652                    Value::from(format!("{:?}", def.index_type)),
1653                    Value::from(&*label_name),
1654                    Value::from(&*prop_name),
1655                ]
1656            })
1657            .collect();
1658        Ok(QueryResult {
1659            columns,
1660            column_types: Vec::new(),
1661            rows,
1662            ..QueryResult::empty()
1663        })
1664    }
1665
1666    /// Returns a table of all constraints (currently metadata-only).
1667    fn execute_show_constraints(&self) -> Result<QueryResult> {
1668        // Constraints are tracked in WAL but not yet in a queryable catalog.
1669        // Return an empty table with the expected schema.
1670        Ok(QueryResult {
1671            columns: vec![
1672                "name".to_string(),
1673                "type".to_string(),
1674                "label".to_string(),
1675                "properties".to_string(),
1676            ],
1677            column_types: Vec::new(),
1678            rows: Vec::new(),
1679            ..QueryResult::empty()
1680        })
1681    }
1682
1683    /// Returns a table of all registered node types.
1684    fn execute_show_node_types(&self) -> Result<QueryResult> {
1685        let columns = vec![
1686            "name".to_string(),
1687            "properties".to_string(),
1688            "constraints".to_string(),
1689            "parents".to_string(),
1690        ];
1691        let type_names = self.catalog.all_node_type_names();
1692        let rows: Vec<Vec<Value>> = type_names
1693            .into_iter()
1694            .filter_map(|name| {
1695                let def = self.catalog.get_node_type(&name)?;
1696                let props: Vec<String> = def
1697                    .properties
1698                    .iter()
1699                    .map(|p| {
1700                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1701                        format!("{} {}{}", p.name, p.data_type, nullable)
1702                    })
1703                    .collect();
1704                let constraints: Vec<String> =
1705                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
1706                let parents = def.parent_types.join(", ");
1707                Some(vec![
1708                    Value::from(name),
1709                    Value::from(props.join(", ")),
1710                    Value::from(constraints.join(", ")),
1711                    Value::from(parents),
1712                ])
1713            })
1714            .collect();
1715        Ok(QueryResult {
1716            columns,
1717            column_types: Vec::new(),
1718            rows,
1719            ..QueryResult::empty()
1720        })
1721    }
1722
1723    /// Returns a table of all registered edge types.
1724    fn execute_show_edge_types(&self) -> Result<QueryResult> {
1725        let columns = vec![
1726            "name".to_string(),
1727            "properties".to_string(),
1728            "source_types".to_string(),
1729            "target_types".to_string(),
1730        ];
1731        let type_names = self.catalog.all_edge_type_names();
1732        let rows: Vec<Vec<Value>> = type_names
1733            .into_iter()
1734            .filter_map(|name| {
1735                let def = self.catalog.get_edge_type_def(&name)?;
1736                let props: Vec<String> = def
1737                    .properties
1738                    .iter()
1739                    .map(|p| {
1740                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1741                        format!("{} {}{}", p.name, p.data_type, nullable)
1742                    })
1743                    .collect();
1744                let src = def.source_node_types.join(", ");
1745                let tgt = def.target_node_types.join(", ");
1746                Some(vec![
1747                    Value::from(name),
1748                    Value::from(props.join(", ")),
1749                    Value::from(src),
1750                    Value::from(tgt),
1751                ])
1752            })
1753            .collect();
1754        Ok(QueryResult {
1755            columns,
1756            column_types: Vec::new(),
1757            rows,
1758            ..QueryResult::empty()
1759        })
1760    }
1761
1762    /// Returns a table of all registered graph types.
1763    fn execute_show_graph_types(&self) -> Result<QueryResult> {
1764        let columns = vec![
1765            "name".to_string(),
1766            "open".to_string(),
1767            "node_types".to_string(),
1768            "edge_types".to_string(),
1769        ];
1770        let type_names = self.catalog.all_graph_type_names();
1771        let rows: Vec<Vec<Value>> = type_names
1772            .into_iter()
1773            .filter_map(|name| {
1774                let def = self.catalog.get_graph_type_def(&name)?;
1775                Some(vec![
1776                    Value::from(name),
1777                    Value::from(def.open),
1778                    Value::from(def.allowed_node_types.join(", ")),
1779                    Value::from(def.allowed_edge_types.join(", ")),
1780                ])
1781            })
1782            .collect();
1783        Ok(QueryResult {
1784            columns,
1785            column_types: Vec::new(),
1786            rows,
1787            ..QueryResult::empty()
1788        })
1789    }
1790
1791    /// Returns the list of named graphs in the database.
1792    fn execute_show_graphs(&self) -> Result<QueryResult> {
1793        let mut names = self.store.graph_names();
1794        names.sort();
1795        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1796        Ok(QueryResult {
1797            columns: vec!["name".to_string()],
1798            column_types: Vec::new(),
1799            rows,
1800            ..QueryResult::empty()
1801        })
1802    }
1803
1804    /// Returns detailed info for a specific graph type.
1805    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1806        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1807
1808        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1809            Error::Query(QueryError::new(
1810                QueryErrorKind::Semantic,
1811                format!("Graph type '{name}' not found"),
1812            ))
1813        })?;
1814
1815        let columns = vec![
1816            "name".to_string(),
1817            "open".to_string(),
1818            "node_types".to_string(),
1819            "edge_types".to_string(),
1820        ];
1821        let rows = vec![vec![
1822            Value::from(def.name),
1823            Value::from(def.open),
1824            Value::from(def.allowed_node_types.join(", ")),
1825            Value::from(def.allowed_edge_types.join(", ")),
1826        ]];
1827        Ok(QueryResult {
1828            columns,
1829            column_types: Vec::new(),
1830            rows,
1831            ..QueryResult::empty()
1832        })
1833    }
1834
1835    /// Returns the graph type bound to the current graph.
1836    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1837        let graph_name = self
1838            .current_graph()
1839            .unwrap_or_else(|| "default".to_string());
1840        let columns = vec![
1841            "graph".to_string(),
1842            "graph_type".to_string(),
1843            "open".to_string(),
1844            "node_types".to_string(),
1845            "edge_types".to_string(),
1846        ];
1847
1848        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
1849            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
1850        {
1851            let rows = vec![vec![
1852                Value::from(graph_name),
1853                Value::from(type_name),
1854                Value::from(def.open),
1855                Value::from(def.allowed_node_types.join(", ")),
1856                Value::from(def.allowed_edge_types.join(", ")),
1857            ]];
1858            return Ok(QueryResult {
1859                columns,
1860                column_types: Vec::new(),
1861                rows,
1862                ..QueryResult::empty()
1863            });
1864        }
1865
1866        // No graph type binding found
1867        Ok(QueryResult {
1868            columns,
1869            column_types: Vec::new(),
1870            rows: vec![vec![
1871                Value::from(graph_name),
1872                Value::Null,
1873                Value::Null,
1874                Value::Null,
1875                Value::Null,
1876            ]],
1877            ..QueryResult::empty()
1878        })
1879    }
1880
1881    /// Executes a GQL query.
1882    ///
1883    /// # Errors
1884    ///
1885    /// Returns an error if the query fails to parse or execute.
1886    ///
1887    /// # Examples
1888    ///
1889    /// ```no_run
1890    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1891    /// use grafeo_engine::GrafeoDB;
1892    ///
1893    /// let db = GrafeoDB::new_in_memory();
1894    /// let session = db.session();
1895    ///
1896    /// // Create a node
1897    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
1898    ///
1899    /// // Query nodes
1900    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
1901    /// for row in &result.rows {
1902    ///     println!("{:?}", row);
1903    /// }
1904    /// # Ok(())
1905    /// # }
1906    /// ```
1907    #[cfg(feature = "gql")]
1908    pub fn execute(&self, query: &str) -> Result<QueryResult> {
1909        self.require_lpg("GQL")?;
1910
1911        use crate::query::{
1912            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1913            processor::QueryLanguage, translators::gql,
1914        };
1915
1916        #[cfg(not(target_arch = "wasm32"))]
1917        let start_time = std::time::Instant::now();
1918
1919        // Parse and translate, checking for session/schema commands first
1920        let translation = gql::translate_full(query)?;
1921        let logical_plan = match translation {
1922            gql::GqlTranslationResult::SessionCommand(cmd) => {
1923                return self.execute_session_command(cmd);
1924            }
1925            gql::GqlTranslationResult::SchemaCommand(cmd) => {
1926                // All DDL is a write operation
1927                if *self.read_only_tx.lock() {
1928                    return Err(grafeo_common::utils::error::Error::Transaction(
1929                        grafeo_common::utils::error::TransactionError::ReadOnly,
1930                    ));
1931                }
1932                return self.execute_schema_command(cmd);
1933            }
1934            gql::GqlTranslationResult::Plan(plan) => {
1935                // Block mutations in read-only transactions
1936                if *self.read_only_tx.lock() && plan.root.has_mutations() {
1937                    return Err(grafeo_common::utils::error::Error::Transaction(
1938                        grafeo_common::utils::error::TransactionError::ReadOnly,
1939                    ));
1940                }
1941                plan
1942            }
1943        };
1944
1945        // Create cache key for this query
1946        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
1947
1948        // Try to get cached optimized plan, or use the plan we just translated
1949        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1950            cached_plan
1951        } else {
1952            // Semantic validation
1953            let mut binder = Binder::new();
1954            let _binding_context = binder.bind(&logical_plan)?;
1955
1956            // Optimize the plan
1957            let active = self.active_store();
1958            let optimizer = Optimizer::from_graph_store(&*active);
1959            let plan = optimizer.optimize(logical_plan)?;
1960
1961            // Cache the optimized plan for future use
1962            self.query_cache.put_optimized(cache_key, plan.clone());
1963
1964            plan
1965        };
1966
1967        // Resolve the active store for query execution
1968        let active = self.active_store();
1969
1970        // EXPLAIN: annotate pushdown hints and return the plan tree
1971        if optimized_plan.explain {
1972            use crate::query::processor::{annotate_pushdown_hints, explain_result};
1973            let mut plan = optimized_plan;
1974            annotate_pushdown_hints(&mut plan.root, active.as_ref());
1975            return Ok(explain_result(&plan));
1976        }
1977
1978        // PROFILE: execute with per-operator instrumentation
1979        if optimized_plan.profile {
1980            let has_mutations = optimized_plan.root.has_mutations();
1981            return self.with_auto_commit(has_mutations, || {
1982                let (viewing_epoch, transaction_id) = self.get_transaction_context();
1983                let planner = self.create_planner_for_store(
1984                    Arc::clone(&active),
1985                    viewing_epoch,
1986                    transaction_id,
1987                );
1988                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
1989
1990                let executor = Executor::with_columns(physical_plan.columns.clone())
1991                    .with_deadline(self.query_deadline());
1992                let _result = executor.execute(physical_plan.operator.as_mut())?;
1993
1994                let total_time_ms;
1995                #[cfg(not(target_arch = "wasm32"))]
1996                {
1997                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1998                }
1999                #[cfg(target_arch = "wasm32")]
2000                {
2001                    total_time_ms = 0.0;
2002                }
2003
2004                let profile_tree = crate::query::profile::build_profile_tree(
2005                    &optimized_plan.root,
2006                    &mut entries.into_iter(),
2007                );
2008                Ok(crate::query::profile::profile_result(
2009                    &profile_tree,
2010                    total_time_ms,
2011                ))
2012            });
2013        }
2014
2015        let has_mutations = optimized_plan.root.has_mutations();
2016
2017        self.with_auto_commit(has_mutations, || {
2018            // Get transaction context for MVCC visibility
2019            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2020
2021            // Convert to physical plan with transaction context
2022            // (Physical planning cannot be cached as it depends on transaction state)
2023            let planner =
2024                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2025            let mut physical_plan = planner.plan(&optimized_plan)?;
2026
2027            // Execute the plan
2028            let executor = Executor::with_columns(physical_plan.columns.clone())
2029                .with_deadline(self.query_deadline());
2030            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2031
2032            // Add execution metrics
2033            let rows_scanned = result.rows.len() as u64;
2034            #[cfg(not(target_arch = "wasm32"))]
2035            {
2036                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2037                result.execution_time_ms = Some(elapsed_ms);
2038            }
2039            result.rows_scanned = Some(rows_scanned);
2040
2041            Ok(result)
2042        })
2043    }
2044
2045    /// Executes a GQL query with visibility at the specified epoch.
2046    ///
2047    /// This enables time-travel queries: the query sees the database
2048    /// as it existed at the given epoch.
2049    ///
2050    /// # Errors
2051    ///
2052    /// Returns an error if parsing or execution fails.
2053    #[cfg(feature = "gql")]
2054    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2055        let previous = self.viewing_epoch_override.lock().replace(epoch);
2056        let result = self.execute(query);
2057        *self.viewing_epoch_override.lock() = previous;
2058        result
2059    }
2060
2061    /// Executes a GQL query with parameters.
2062    ///
2063    /// # Errors
2064    ///
2065    /// Returns an error if the query fails to parse or execute.
2066    #[cfg(feature = "gql")]
2067    pub fn execute_with_params(
2068        &self,
2069        query: &str,
2070        params: std::collections::HashMap<String, Value>,
2071    ) -> Result<QueryResult> {
2072        self.require_lpg("GQL")?;
2073
2074        use crate::query::processor::{QueryLanguage, QueryProcessor};
2075
2076        let has_mutations = Self::query_looks_like_mutation(query);
2077        let active = self.active_store();
2078
2079        self.with_auto_commit(has_mutations, || {
2080            // Get transaction context for MVCC visibility
2081            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2082
2083            // Create processor with transaction context
2084            let processor = QueryProcessor::for_graph_store_with_transaction(
2085                Arc::clone(&active),
2086                Arc::clone(&self.transaction_manager),
2087            )?;
2088
2089            // Apply transaction context if in a transaction
2090            let processor = if let Some(transaction_id) = transaction_id {
2091                processor.with_transaction_context(viewing_epoch, transaction_id)
2092            } else {
2093                processor
2094            };
2095
2096            processor.process(query, QueryLanguage::Gql, Some(&params))
2097        })
2098    }
2099
2100    /// Executes a GQL query with parameters.
2101    ///
2102    /// # Errors
2103    ///
2104    /// Returns an error if no query language is enabled.
2105    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2106    pub fn execute_with_params(
2107        &self,
2108        _query: &str,
2109        _params: std::collections::HashMap<String, Value>,
2110    ) -> Result<QueryResult> {
2111        Err(grafeo_common::utils::error::Error::Internal(
2112            "No query language enabled".to_string(),
2113        ))
2114    }
2115
2116    /// Executes a GQL query.
2117    ///
2118    /// # Errors
2119    ///
2120    /// Returns an error if no query language is enabled.
2121    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2122    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2123        Err(grafeo_common::utils::error::Error::Internal(
2124            "No query language enabled".to_string(),
2125        ))
2126    }
2127
2128    /// Executes a Cypher query.
2129    ///
2130    /// # Errors
2131    ///
2132    /// Returns an error if the query fails to parse or execute.
2133    #[cfg(feature = "cypher")]
2134    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2135        use crate::query::{
2136            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2137            processor::QueryLanguage, translators::cypher,
2138        };
2139        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2140
2141        // Handle schema DDL and SHOW commands before the normal query path
2142        let translation = cypher::translate_full(query)?;
2143        match translation {
2144            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2145                if *self.read_only_tx.lock() {
2146                    return Err(GrafeoError::Query(QueryError::new(
2147                        QueryErrorKind::Semantic,
2148                        "Cannot execute schema DDL in a read-only transaction",
2149                    )));
2150                }
2151                return self.execute_schema_command(cmd);
2152            }
2153            cypher::CypherTranslationResult::ShowIndexes => {
2154                return self.execute_show_indexes();
2155            }
2156            cypher::CypherTranslationResult::ShowConstraints => {
2157                return self.execute_show_constraints();
2158            }
2159            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2160                return self.execute_show_current_graph_type();
2161            }
2162            cypher::CypherTranslationResult::Plan(_) => {
2163                // Fall through to normal execution below
2164            }
2165        }
2166
2167        #[cfg(not(target_arch = "wasm32"))]
2168        let start_time = std::time::Instant::now();
2169
2170        // Create cache key for this query
2171        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2172
2173        // Try to get cached optimized plan
2174        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2175            cached_plan
2176        } else {
2177            // Parse and translate the query to a logical plan
2178            let logical_plan = cypher::translate(query)?;
2179
2180            // Semantic validation
2181            let mut binder = Binder::new();
2182            let _binding_context = binder.bind(&logical_plan)?;
2183
2184            // Optimize the plan
2185            let active = self.active_store();
2186            let optimizer = Optimizer::from_graph_store(&*active);
2187            let plan = optimizer.optimize(logical_plan)?;
2188
2189            // Cache the optimized plan
2190            self.query_cache.put_optimized(cache_key, plan.clone());
2191
2192            plan
2193        };
2194
2195        // Resolve the active store for query execution
2196        let active = self.active_store();
2197
2198        // EXPLAIN
2199        if optimized_plan.explain {
2200            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2201            let mut plan = optimized_plan;
2202            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2203            return Ok(explain_result(&plan));
2204        }
2205
2206        // PROFILE
2207        if optimized_plan.profile {
2208            let has_mutations = optimized_plan.root.has_mutations();
2209            return self.with_auto_commit(has_mutations, || {
2210                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2211                let planner = self.create_planner_for_store(
2212                    Arc::clone(&active),
2213                    viewing_epoch,
2214                    transaction_id,
2215                );
2216                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2217
2218                let executor = Executor::with_columns(physical_plan.columns.clone())
2219                    .with_deadline(self.query_deadline());
2220                let _result = executor.execute(physical_plan.operator.as_mut())?;
2221
2222                let total_time_ms;
2223                #[cfg(not(target_arch = "wasm32"))]
2224                {
2225                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2226                }
2227                #[cfg(target_arch = "wasm32")]
2228                {
2229                    total_time_ms = 0.0;
2230                }
2231
2232                let profile_tree = crate::query::profile::build_profile_tree(
2233                    &optimized_plan.root,
2234                    &mut entries.into_iter(),
2235                );
2236                Ok(crate::query::profile::profile_result(
2237                    &profile_tree,
2238                    total_time_ms,
2239                ))
2240            });
2241        }
2242
2243        let has_mutations = optimized_plan.root.has_mutations();
2244
2245        self.with_auto_commit(has_mutations, || {
2246            // Get transaction context for MVCC visibility
2247            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2248
2249            // Convert to physical plan with transaction context
2250            let planner =
2251                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2252            let mut physical_plan = planner.plan(&optimized_plan)?;
2253
2254            // Execute the plan
2255            let executor = Executor::with_columns(physical_plan.columns.clone())
2256                .with_deadline(self.query_deadline());
2257            executor.execute(physical_plan.operator.as_mut())
2258        })
2259    }
2260
2261    /// Executes a Gremlin query.
2262    ///
2263    /// # Errors
2264    ///
2265    /// Returns an error if the query fails to parse or execute.
2266    ///
2267    /// # Examples
2268    ///
2269    /// ```no_run
2270    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2271    /// use grafeo_engine::GrafeoDB;
2272    ///
2273    /// let db = GrafeoDB::new_in_memory();
2274    /// let session = db.session();
2275    ///
2276    /// // Create some nodes first
2277    /// session.create_node(&["Person"]);
2278    ///
2279    /// // Query using Gremlin
2280    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2281    /// # Ok(())
2282    /// # }
2283    /// ```
2284    #[cfg(feature = "gremlin")]
2285    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2286        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2287
2288        // Parse and translate the query to a logical plan
2289        let logical_plan = gremlin::translate(query)?;
2290
2291        // Semantic validation
2292        let mut binder = Binder::new();
2293        let _binding_context = binder.bind(&logical_plan)?;
2294
2295        // Optimize the plan
2296        let active = self.active_store();
2297        let optimizer = Optimizer::from_graph_store(&*active);
2298        let optimized_plan = optimizer.optimize(logical_plan)?;
2299
2300        let has_mutations = optimized_plan.root.has_mutations();
2301
2302        self.with_auto_commit(has_mutations, || {
2303            // Get transaction context for MVCC visibility
2304            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2305
2306            // Convert to physical plan with transaction context
2307            let planner =
2308                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2309            let mut physical_plan = planner.plan(&optimized_plan)?;
2310
2311            // Execute the plan
2312            let executor = Executor::with_columns(physical_plan.columns.clone())
2313                .with_deadline(self.query_deadline());
2314            executor.execute(physical_plan.operator.as_mut())
2315        })
2316    }
2317
2318    /// Executes a Gremlin query with parameters.
2319    ///
2320    /// # Errors
2321    ///
2322    /// Returns an error if the query fails to parse or execute.
2323    #[cfg(feature = "gremlin")]
2324    pub fn execute_gremlin_with_params(
2325        &self,
2326        query: &str,
2327        params: std::collections::HashMap<String, Value>,
2328    ) -> Result<QueryResult> {
2329        use crate::query::processor::{QueryLanguage, QueryProcessor};
2330
2331        let has_mutations = Self::query_looks_like_mutation(query);
2332        let active = self.active_store();
2333
2334        self.with_auto_commit(has_mutations, || {
2335            // Get transaction context for MVCC visibility
2336            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2337
2338            // Create processor with transaction context
2339            let processor = QueryProcessor::for_graph_store_with_transaction(
2340                Arc::clone(&active),
2341                Arc::clone(&self.transaction_manager),
2342            )?;
2343
2344            // Apply transaction context if in a transaction
2345            let processor = if let Some(transaction_id) = transaction_id {
2346                processor.with_transaction_context(viewing_epoch, transaction_id)
2347            } else {
2348                processor
2349            };
2350
2351            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2352        })
2353    }
2354
2355    /// Executes a GraphQL query against the LPG store.
2356    ///
2357    /// # Errors
2358    ///
2359    /// Returns an error if the query fails to parse or execute.
2360    ///
2361    /// # Examples
2362    ///
2363    /// ```no_run
2364    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2365    /// use grafeo_engine::GrafeoDB;
2366    ///
2367    /// let db = GrafeoDB::new_in_memory();
2368    /// let session = db.session();
2369    ///
2370    /// // Create some nodes first
2371    /// session.create_node(&["User"]);
2372    ///
2373    /// // Query using GraphQL
2374    /// let result = session.execute_graphql("query { user { id name } }")?;
2375    /// # Ok(())
2376    /// # }
2377    /// ```
2378    #[cfg(feature = "graphql")]
2379    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2380        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2381
2382        // Parse and translate the query to a logical plan
2383        let logical_plan = graphql::translate(query)?;
2384
2385        // Semantic validation
2386        let mut binder = Binder::new();
2387        let _binding_context = binder.bind(&logical_plan)?;
2388
2389        // Optimize the plan
2390        let active = self.active_store();
2391        let optimizer = Optimizer::from_graph_store(&*active);
2392        let optimized_plan = optimizer.optimize(logical_plan)?;
2393
2394        let has_mutations = optimized_plan.root.has_mutations();
2395
2396        self.with_auto_commit(has_mutations, || {
2397            // Get transaction context for MVCC visibility
2398            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2399
2400            // Convert to physical plan with transaction context
2401            let planner =
2402                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2403            let mut physical_plan = planner.plan(&optimized_plan)?;
2404
2405            // Execute the plan
2406            let executor = Executor::with_columns(physical_plan.columns.clone())
2407                .with_deadline(self.query_deadline());
2408            executor.execute(physical_plan.operator.as_mut())
2409        })
2410    }
2411
2412    /// Executes a GraphQL query with parameters.
2413    ///
2414    /// # Errors
2415    ///
2416    /// Returns an error if the query fails to parse or execute.
2417    #[cfg(feature = "graphql")]
2418    pub fn execute_graphql_with_params(
2419        &self,
2420        query: &str,
2421        params: std::collections::HashMap<String, Value>,
2422    ) -> Result<QueryResult> {
2423        use crate::query::processor::{QueryLanguage, QueryProcessor};
2424
2425        let has_mutations = Self::query_looks_like_mutation(query);
2426        let active = self.active_store();
2427
2428        self.with_auto_commit(has_mutations, || {
2429            // Get transaction context for MVCC visibility
2430            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2431
2432            // Create processor with transaction context
2433            let processor = QueryProcessor::for_graph_store_with_transaction(
2434                Arc::clone(&active),
2435                Arc::clone(&self.transaction_manager),
2436            )?;
2437
2438            // Apply transaction context if in a transaction
2439            let processor = if let Some(transaction_id) = transaction_id {
2440                processor.with_transaction_context(viewing_epoch, transaction_id)
2441            } else {
2442                processor
2443            };
2444
2445            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2446        })
2447    }
2448
2449    /// Executes a GraphQL query against the RDF store.
2450    ///
2451    /// # Errors
2452    ///
2453    /// Returns an error if the query fails to parse or execute.
2454    #[cfg(all(feature = "graphql", feature = "rdf"))]
2455    pub fn execute_graphql_rdf(&self, query: &str) -> Result<QueryResult> {
2456        use crate::query::{
2457            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::graphql_rdf,
2458        };
2459
2460        let logical_plan = graphql_rdf::translate(query, "http://example.org/")?;
2461
2462        let active = self.active_store();
2463        let optimizer = Optimizer::from_graph_store(&*active);
2464        let optimized_plan = optimizer.optimize(logical_plan)?;
2465
2466        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2467            .with_transaction_id(*self.current_transaction.lock());
2468        #[cfg(feature = "wal")]
2469        let planner = planner.with_wal(self.wal.clone());
2470        let mut physical_plan = planner.plan(&optimized_plan)?;
2471
2472        let executor = Executor::with_columns(physical_plan.columns.clone())
2473            .with_deadline(self.query_deadline());
2474        executor.execute(physical_plan.operator.as_mut())
2475    }
2476
2477    /// Executes a GraphQL query against the RDF store with parameters.
2478    ///
2479    /// # Errors
2480    ///
2481    /// Returns an error if the query fails to parse or execute.
2482    #[cfg(all(feature = "graphql", feature = "rdf"))]
2483    pub fn execute_graphql_rdf_with_params(
2484        &self,
2485        query: &str,
2486        params: std::collections::HashMap<String, Value>,
2487    ) -> Result<QueryResult> {
2488        use crate::query::processor::{QueryLanguage, QueryProcessor};
2489
2490        let has_mutations = Self::query_looks_like_mutation(query);
2491        let active = self.active_store();
2492
2493        self.with_auto_commit(has_mutations, || {
2494            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2495
2496            let processor = QueryProcessor::for_graph_store_with_transaction(
2497                Arc::clone(&active),
2498                Arc::clone(&self.transaction_manager),
2499            )?;
2500
2501            let processor = if let Some(transaction_id) = transaction_id {
2502                processor.with_transaction_context(viewing_epoch, transaction_id)
2503            } else {
2504                processor
2505            };
2506
2507            processor.process(query, QueryLanguage::GraphQLRdf, Some(&params))
2508        })
2509    }
2510
2511    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2512    ///
2513    /// # Errors
2514    ///
2515    /// Returns an error if the query fails to parse or execute.
2516    ///
2517    /// # Examples
2518    ///
2519    /// ```no_run
2520    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2521    /// use grafeo_engine::GrafeoDB;
2522    ///
2523    /// let db = GrafeoDB::new_in_memory();
2524    /// let session = db.session();
2525    ///
2526    /// let result = session.execute_sql(
2527    ///     "SELECT * FROM GRAPH_TABLE (
2528    ///         MATCH (n:Person)
2529    ///         COLUMNS (n.name AS name)
2530    ///     )"
2531    /// )?;
2532    /// # Ok(())
2533    /// # }
2534    /// ```
2535    #[cfg(feature = "sql-pgq")]
2536    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2537        use crate::query::{
2538            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2539            processor::QueryLanguage, translators::sql_pgq,
2540        };
2541
2542        // Parse and translate (always needed to check for DDL)
2543        let logical_plan = sql_pgq::translate(query)?;
2544
2545        // Handle DDL statements directly (they don't go through the query pipeline)
2546        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2547            return Ok(QueryResult {
2548                columns: vec!["status".into()],
2549                column_types: vec![grafeo_common::types::LogicalType::String],
2550                rows: vec![vec![Value::from(format!(
2551                    "Property graph '{}' created ({} node tables, {} edge tables)",
2552                    cpg.name,
2553                    cpg.node_tables.len(),
2554                    cpg.edge_tables.len()
2555                ))]],
2556                execution_time_ms: None,
2557                rows_scanned: None,
2558                status_message: None,
2559                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2560            });
2561        }
2562
2563        // Create cache key for query plans
2564        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2565
2566        // Try to get cached optimized plan
2567        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2568            cached_plan
2569        } else {
2570            // Semantic validation
2571            let mut binder = Binder::new();
2572            let _binding_context = binder.bind(&logical_plan)?;
2573
2574            // Optimize the plan
2575            let active = self.active_store();
2576            let optimizer = Optimizer::from_graph_store(&*active);
2577            let plan = optimizer.optimize(logical_plan)?;
2578
2579            // Cache the optimized plan
2580            self.query_cache.put_optimized(cache_key, plan.clone());
2581
2582            plan
2583        };
2584
2585        let active = self.active_store();
2586        let has_mutations = optimized_plan.root.has_mutations();
2587
2588        self.with_auto_commit(has_mutations, || {
2589            // Get transaction context for MVCC visibility
2590            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2591
2592            // Convert to physical plan with transaction context
2593            let planner =
2594                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2595            let mut physical_plan = planner.plan(&optimized_plan)?;
2596
2597            // Execute the plan
2598            let executor = Executor::with_columns(physical_plan.columns.clone())
2599                .with_deadline(self.query_deadline());
2600            executor.execute(physical_plan.operator.as_mut())
2601        })
2602    }
2603
2604    /// Executes a SQL/PGQ query with parameters.
2605    ///
2606    /// # Errors
2607    ///
2608    /// Returns an error if the query fails to parse or execute.
2609    #[cfg(feature = "sql-pgq")]
2610    pub fn execute_sql_with_params(
2611        &self,
2612        query: &str,
2613        params: std::collections::HashMap<String, Value>,
2614    ) -> Result<QueryResult> {
2615        use crate::query::processor::{QueryLanguage, QueryProcessor};
2616
2617        let has_mutations = Self::query_looks_like_mutation(query);
2618        let active = self.active_store();
2619
2620        self.with_auto_commit(has_mutations, || {
2621            // Get transaction context for MVCC visibility
2622            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2623
2624            // Create processor with transaction context
2625            let processor = QueryProcessor::for_graph_store_with_transaction(
2626                Arc::clone(&active),
2627                Arc::clone(&self.transaction_manager),
2628            )?;
2629
2630            // Apply transaction context if in a transaction
2631            let processor = if let Some(transaction_id) = transaction_id {
2632                processor.with_transaction_context(viewing_epoch, transaction_id)
2633            } else {
2634                processor
2635            };
2636
2637            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
2638        })
2639    }
2640
2641    /// Executes a SPARQL query.
2642    ///
2643    /// # Errors
2644    ///
2645    /// Returns an error if the query fails to parse or execute.
2646    #[cfg(all(feature = "sparql", feature = "rdf"))]
2647    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
2648        use crate::query::{
2649            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
2650        };
2651
2652        // Parse and translate the SPARQL query to a logical plan
2653        let logical_plan = sparql::translate(query)?;
2654
2655        // Optimize the plan
2656        let active = self.active_store();
2657        let optimizer = Optimizer::from_graph_store(&*active);
2658        let optimized_plan = optimizer.optimize(logical_plan)?;
2659
2660        // Convert to physical plan using RDF planner
2661        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2662            .with_transaction_id(*self.current_transaction.lock());
2663        #[cfg(feature = "wal")]
2664        let planner = planner.with_wal(self.wal.clone());
2665        let mut physical_plan = planner.plan(&optimized_plan)?;
2666
2667        // Execute the plan
2668        let executor = Executor::with_columns(physical_plan.columns.clone())
2669            .with_deadline(self.query_deadline());
2670        executor.execute(physical_plan.operator.as_mut())
2671    }
2672
2673    /// Executes a SPARQL query with parameters.
2674    ///
2675    /// # Errors
2676    ///
2677    /// Returns an error if the query fails to parse or execute.
2678    #[cfg(all(feature = "sparql", feature = "rdf"))]
2679    pub fn execute_sparql_with_params(
2680        &self,
2681        query: &str,
2682        params: std::collections::HashMap<String, Value>,
2683    ) -> Result<QueryResult> {
2684        use crate::query::{
2685            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, processor::substitute_params,
2686            translators::sparql,
2687        };
2688
2689        let mut logical_plan = sparql::translate(query)?;
2690
2691        substitute_params(&mut logical_plan, &params)?;
2692
2693        let active = self.active_store();
2694        let optimizer = Optimizer::from_graph_store(&*active);
2695        let optimized_plan = optimizer.optimize(logical_plan)?;
2696
2697        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2698            .with_transaction_id(*self.current_transaction.lock());
2699        #[cfg(feature = "wal")]
2700        let planner = planner.with_wal(self.wal.clone());
2701        let mut physical_plan = planner.plan(&optimized_plan)?;
2702
2703        let executor = Executor::with_columns(physical_plan.columns.clone())
2704            .with_deadline(self.query_deadline());
2705        executor.execute(physical_plan.operator.as_mut())
2706    }
2707
2708    /// Executes a query in the specified language by name.
2709    ///
2710    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
2711    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
2712    ///
2713    /// # Errors
2714    ///
2715    /// Returns an error if the language is unknown/disabled or the query fails.
2716    pub fn execute_language(
2717        &self,
2718        query: &str,
2719        language: &str,
2720        params: Option<std::collections::HashMap<String, Value>>,
2721    ) -> Result<QueryResult> {
2722        match language {
2723            "gql" => {
2724                if let Some(p) = params {
2725                    self.execute_with_params(query, p)
2726                } else {
2727                    self.execute(query)
2728                }
2729            }
2730            #[cfg(feature = "cypher")]
2731            "cypher" => {
2732                if let Some(p) = params {
2733                    use crate::query::processor::{QueryLanguage, QueryProcessor};
2734                    let has_mutations = Self::query_looks_like_mutation(query);
2735                    let active = self.active_store();
2736                    self.with_auto_commit(has_mutations, || {
2737                        let processor = QueryProcessor::for_graph_store_with_transaction(
2738                            Arc::clone(&active),
2739                            Arc::clone(&self.transaction_manager),
2740                        )?;
2741                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2742                        let processor = if let Some(transaction_id) = transaction_id {
2743                            processor.with_transaction_context(viewing_epoch, transaction_id)
2744                        } else {
2745                            processor
2746                        };
2747                        processor.process(query, QueryLanguage::Cypher, Some(&p))
2748                    })
2749                } else {
2750                    self.execute_cypher(query)
2751                }
2752            }
2753            #[cfg(feature = "gremlin")]
2754            "gremlin" => {
2755                if let Some(p) = params {
2756                    self.execute_gremlin_with_params(query, p)
2757                } else {
2758                    self.execute_gremlin(query)
2759                }
2760            }
2761            #[cfg(feature = "graphql")]
2762            "graphql" => {
2763                if let Some(p) = params {
2764                    self.execute_graphql_with_params(query, p)
2765                } else {
2766                    self.execute_graphql(query)
2767                }
2768            }
2769            #[cfg(all(feature = "graphql", feature = "rdf"))]
2770            "graphql-rdf" => {
2771                if let Some(p) = params {
2772                    self.execute_graphql_rdf_with_params(query, p)
2773                } else {
2774                    self.execute_graphql_rdf(query)
2775                }
2776            }
2777            #[cfg(feature = "sql-pgq")]
2778            "sql" | "sql-pgq" => {
2779                if let Some(p) = params {
2780                    self.execute_sql_with_params(query, p)
2781                } else {
2782                    self.execute_sql(query)
2783                }
2784            }
2785            #[cfg(all(feature = "sparql", feature = "rdf"))]
2786            "sparql" => {
2787                if let Some(p) = params {
2788                    self.execute_sparql_with_params(query, p)
2789                } else {
2790                    self.execute_sparql(query)
2791                }
2792            }
2793            other => Err(grafeo_common::utils::error::Error::Query(
2794                grafeo_common::utils::error::QueryError::new(
2795                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2796                    format!("Unknown query language: '{other}'"),
2797                ),
2798            )),
2799        }
2800    }
2801
2802    /// Begins a new transaction.
2803    ///
2804    /// # Errors
2805    ///
2806    /// Returns an error if a transaction is already active.
2807    ///
2808    /// # Examples
2809    ///
2810    /// ```no_run
2811    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2812    /// use grafeo_engine::GrafeoDB;
2813    ///
2814    /// let db = GrafeoDB::new_in_memory();
2815    /// let mut session = db.session();
2816    ///
2817    /// session.begin_transaction()?;
2818    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2819    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
2820    /// session.commit()?; // Both inserts committed atomically
2821    /// # Ok(())
2822    /// # }
2823    /// ```
2824    /// Clears all cached query plans.
2825    ///
2826    /// The plan cache is shared across all sessions on the same database,
2827    /// so clearing from one session affects all sessions.
2828    pub fn clear_plan_cache(&self) {
2829        self.query_cache.clear();
2830    }
2831
2832    /// Begins a new transaction on this session.
2833    ///
2834    /// Uses the default isolation level (`SnapshotIsolation`).
2835    ///
2836    /// # Errors
2837    ///
2838    /// Returns an error if a transaction is already active.
2839    pub fn begin_transaction(&mut self) -> Result<()> {
2840        self.begin_transaction_inner(false, None)
2841    }
2842
2843    /// Begins a transaction with a specific isolation level.
2844    ///
2845    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
2846    ///
2847    /// # Errors
2848    ///
2849    /// Returns an error if a transaction is already active.
2850    pub fn begin_transaction_with_isolation(
2851        &mut self,
2852        isolation_level: crate::transaction::IsolationLevel,
2853    ) -> Result<()> {
2854        self.begin_transaction_inner(false, Some(isolation_level))
2855    }
2856
2857    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
2858    fn begin_transaction_inner(
2859        &self,
2860        read_only: bool,
2861        isolation_level: Option<crate::transaction::IsolationLevel>,
2862    ) -> Result<()> {
2863        let mut current = self.current_transaction.lock();
2864        if current.is_some() {
2865            // Nested transaction: create an auto-savepoint instead of a new tx.
2866            drop(current);
2867            let mut depth = self.transaction_nesting_depth.lock();
2868            *depth += 1;
2869            let sp_name = format!("_nested_tx_{}", *depth);
2870            self.savepoint(&sp_name)?;
2871            return Ok(());
2872        }
2873
2874        let active = self.active_lpg_store();
2875        self.transaction_start_node_count
2876            .store(active.node_count(), Ordering::Relaxed);
2877        self.transaction_start_edge_count
2878            .store(active.edge_count(), Ordering::Relaxed);
2879        let transaction_id = if let Some(level) = isolation_level {
2880            self.transaction_manager.begin_with_isolation(level)
2881        } else {
2882            self.transaction_manager.begin()
2883        };
2884        *current = Some(transaction_id);
2885        *self.read_only_tx.lock() = read_only;
2886
2887        // Record the initial graph as "touched" for cross-graph atomicity.
2888        let graph = self.current_graph.lock().clone();
2889        let normalized = match graph {
2890            Some(ref name) if name.eq_ignore_ascii_case("default") => None,
2891            other => other,
2892        };
2893        let mut touched = self.touched_graphs.lock();
2894        touched.clear();
2895        touched.push(normalized);
2896
2897        Ok(())
2898    }
2899
2900    /// Commits the current transaction.
2901    ///
2902    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
2903    ///
2904    /// # Errors
2905    ///
2906    /// Returns an error if no transaction is active.
2907    pub fn commit(&mut self) -> Result<()> {
2908        self.commit_inner()
2909    }
2910
2911    /// Core commit logic, usable from both `&mut self` and `&self` paths.
2912    fn commit_inner(&self) -> Result<()> {
2913        // Nested transaction: release the auto-savepoint (changes are preserved).
2914        {
2915            let mut depth = self.transaction_nesting_depth.lock();
2916            if *depth > 0 {
2917                let sp_name = format!("_nested_tx_{depth}");
2918                *depth -= 1;
2919                drop(depth);
2920                return self.release_savepoint(&sp_name);
2921            }
2922        }
2923
2924        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2925            grafeo_common::utils::error::Error::Transaction(
2926                grafeo_common::utils::error::TransactionError::InvalidState(
2927                    "No active transaction".to_string(),
2928                ),
2929            )
2930        })?;
2931
2932        // Validate the transaction first (conflict detection) before committing data.
2933        // If this fails, we rollback the data changes instead of making them permanent.
2934        let touched = self.touched_graphs.lock().clone();
2935        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
2936            Ok(epoch) => epoch,
2937            Err(e) => {
2938                // Conflict detected: rollback the data changes
2939                for graph_name in &touched {
2940                    let store = self.resolve_store(graph_name);
2941                    store.rollback_transaction_properties(transaction_id);
2942                }
2943                #[cfg(feature = "rdf")]
2944                self.rdf_store.rollback_transaction(transaction_id);
2945                *self.read_only_tx.lock() = false;
2946                self.savepoints.lock().clear();
2947                self.touched_graphs.lock().clear();
2948                return Err(e);
2949            }
2950        };
2951
2952        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
2953        for graph_name in &touched {
2954            let store = self.resolve_store(graph_name);
2955            store.finalize_version_epochs(transaction_id, commit_epoch);
2956        }
2957
2958        // Commit succeeded: discard undo logs (make changes permanent)
2959        #[cfg(feature = "rdf")]
2960        self.rdf_store.commit_transaction(transaction_id);
2961
2962        for graph_name in &touched {
2963            let store = self.resolve_store(graph_name);
2964            store.commit_transaction_properties(transaction_id);
2965        }
2966
2967        // Sync epoch for all touched graphs so that convenience lookups
2968        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
2969        let current_epoch = self.transaction_manager.current_epoch();
2970        for graph_name in &touched {
2971            let store = self.resolve_store(graph_name);
2972            store.sync_epoch(current_epoch);
2973        }
2974
2975        // Reset read-only flag, clear savepoints and touched graphs
2976        *self.read_only_tx.lock() = false;
2977        self.savepoints.lock().clear();
2978        self.touched_graphs.lock().clear();
2979
2980        // Auto-GC: periodically prune old MVCC versions
2981        if self.gc_interval > 0 {
2982            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
2983            if count.is_multiple_of(self.gc_interval) {
2984                let min_epoch = self.transaction_manager.min_active_epoch();
2985                for graph_name in &touched {
2986                    let store = self.resolve_store(graph_name);
2987                    store.gc_versions(min_epoch);
2988                }
2989                self.transaction_manager.gc();
2990            }
2991        }
2992
2993        Ok(())
2994    }
2995
2996    /// Aborts the current transaction.
2997    ///
2998    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
2999    ///
3000    /// # Errors
3001    ///
3002    /// Returns an error if no transaction is active.
3003    ///
3004    /// # Examples
3005    ///
3006    /// ```no_run
3007    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3008    /// use grafeo_engine::GrafeoDB;
3009    ///
3010    /// let db = GrafeoDB::new_in_memory();
3011    /// let mut session = db.session();
3012    ///
3013    /// session.begin_transaction()?;
3014    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3015    /// session.rollback()?; // Insert is discarded
3016    /// # Ok(())
3017    /// # }
3018    /// ```
3019    pub fn rollback(&mut self) -> Result<()> {
3020        self.rollback_inner()
3021    }
3022
3023    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3024    fn rollback_inner(&self) -> Result<()> {
3025        // Nested transaction: rollback to the auto-savepoint.
3026        {
3027            let mut depth = self.transaction_nesting_depth.lock();
3028            if *depth > 0 {
3029                let sp_name = format!("_nested_tx_{depth}");
3030                *depth -= 1;
3031                drop(depth);
3032                return self.rollback_to_savepoint(&sp_name);
3033            }
3034        }
3035
3036        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3037            grafeo_common::utils::error::Error::Transaction(
3038                grafeo_common::utils::error::TransactionError::InvalidState(
3039                    "No active transaction".to_string(),
3040                ),
3041            )
3042        })?;
3043
3044        // Reset read-only flag
3045        *self.read_only_tx.lock() = false;
3046
3047        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3048        let touched = self.touched_graphs.lock().clone();
3049        for graph_name in &touched {
3050            let store = self.resolve_store(graph_name);
3051            store.discard_uncommitted_versions(transaction_id);
3052        }
3053
3054        // Discard pending operations in the RDF store
3055        #[cfg(feature = "rdf")]
3056        self.rdf_store.rollback_transaction(transaction_id);
3057
3058        // Clear savepoints and touched graphs
3059        self.savepoints.lock().clear();
3060        self.touched_graphs.lock().clear();
3061
3062        // Mark transaction as aborted in the manager
3063        self.transaction_manager.abort(transaction_id)
3064    }
3065
3066    /// Creates a named savepoint within the current transaction.
3067    ///
3068    /// The savepoint captures the current node/edge ID counters so that
3069    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3070    /// entities created after this point.
3071    ///
3072    /// # Errors
3073    ///
3074    /// Returns an error if no transaction is active.
3075    pub fn savepoint(&self, name: &str) -> Result<()> {
3076        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3077            grafeo_common::utils::error::Error::Transaction(
3078                grafeo_common::utils::error::TransactionError::InvalidState(
3079                    "No active transaction".to_string(),
3080                ),
3081            )
3082        })?;
3083
3084        // Capture state for every graph touched so far.
3085        let touched = self.touched_graphs.lock().clone();
3086        let graph_snapshots: Vec<GraphSavepoint> = touched
3087            .iter()
3088            .map(|graph_name| {
3089                let store = self.resolve_store(graph_name);
3090                GraphSavepoint {
3091                    graph_name: graph_name.clone(),
3092                    next_node_id: store.peek_next_node_id(),
3093                    next_edge_id: store.peek_next_edge_id(),
3094                    undo_log_position: store.property_undo_log_position(tx_id),
3095                }
3096            })
3097            .collect();
3098
3099        self.savepoints.lock().push(SavepointState {
3100            name: name.to_string(),
3101            graph_snapshots,
3102            active_graph: self.current_graph.lock().clone(),
3103        });
3104        Ok(())
3105    }
3106
3107    /// Rolls back to a named savepoint, undoing all writes made after it.
3108    ///
3109    /// The savepoint and any savepoints created after it are removed.
3110    /// Entities with IDs >= the savepoint snapshot are discarded.
3111    ///
3112    /// # Errors
3113    ///
3114    /// Returns an error if no transaction is active or the savepoint does not exist.
3115    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3116        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3117            grafeo_common::utils::error::Error::Transaction(
3118                grafeo_common::utils::error::TransactionError::InvalidState(
3119                    "No active transaction".to_string(),
3120                ),
3121            )
3122        })?;
3123
3124        let mut savepoints = self.savepoints.lock();
3125
3126        // Find the savepoint by name (search from the end for nested savepoints)
3127        let pos = savepoints
3128            .iter()
3129            .rposition(|sp| sp.name == name)
3130            .ok_or_else(|| {
3131                grafeo_common::utils::error::Error::Transaction(
3132                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3133                        "Savepoint '{name}' not found"
3134                    )),
3135                )
3136            })?;
3137
3138        let sp_state = savepoints[pos].clone();
3139
3140        // Remove this savepoint and all later ones
3141        savepoints.truncate(pos);
3142        drop(savepoints);
3143
3144        // Roll back each graph that was captured in the savepoint.
3145        for gs in &sp_state.graph_snapshots {
3146            let store = self.resolve_store(&gs.graph_name);
3147
3148            // Replay property/label undo entries recorded after the savepoint
3149            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3150
3151            // Discard entities created after the savepoint
3152            let current_next_node = store.peek_next_node_id();
3153            let current_next_edge = store.peek_next_edge_id();
3154
3155            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3156                .map(NodeId::new)
3157                .collect();
3158            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3159                .map(EdgeId::new)
3160                .collect();
3161
3162            if !node_ids.is_empty() || !edge_ids.is_empty() {
3163                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3164            }
3165        }
3166
3167        // Also roll back any graphs that were touched AFTER the savepoint
3168        // but not captured in it. These need full discard since the savepoint
3169        // didn't include them.
3170        let touched = self.touched_graphs.lock().clone();
3171        for graph_name in &touched {
3172            let already_captured = sp_state
3173                .graph_snapshots
3174                .iter()
3175                .any(|gs| gs.graph_name == *graph_name);
3176            if !already_captured {
3177                let store = self.resolve_store(graph_name);
3178                store.discard_uncommitted_versions(transaction_id);
3179            }
3180        }
3181
3182        // Restore touched_graphs to only the graphs that were known at savepoint time.
3183        let mut touched = self.touched_graphs.lock();
3184        touched.clear();
3185        for gs in &sp_state.graph_snapshots {
3186            if !touched.contains(&gs.graph_name) {
3187                touched.push(gs.graph_name.clone());
3188            }
3189        }
3190
3191        Ok(())
3192    }
3193
3194    /// Releases (removes) a named savepoint without rolling back.
3195    ///
3196    /// # Errors
3197    ///
3198    /// Returns an error if no transaction is active or the savepoint does not exist.
3199    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3200        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3201            grafeo_common::utils::error::Error::Transaction(
3202                grafeo_common::utils::error::TransactionError::InvalidState(
3203                    "No active transaction".to_string(),
3204                ),
3205            )
3206        })?;
3207
3208        let mut savepoints = self.savepoints.lock();
3209        let pos = savepoints
3210            .iter()
3211            .rposition(|sp| sp.name == name)
3212            .ok_or_else(|| {
3213                grafeo_common::utils::error::Error::Transaction(
3214                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3215                        "Savepoint '{name}' not found"
3216                    )),
3217                )
3218            })?;
3219        savepoints.remove(pos);
3220        Ok(())
3221    }
3222
3223    /// Returns whether a transaction is active.
3224    #[must_use]
3225    pub fn in_transaction(&self) -> bool {
3226        self.current_transaction.lock().is_some()
3227    }
3228
3229    /// Returns the current transaction ID, if any.
3230    #[must_use]
3231    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3232        *self.current_transaction.lock()
3233    }
3234
3235    /// Returns a reference to the transaction manager.
3236    #[must_use]
3237    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3238        &self.transaction_manager
3239    }
3240
3241    /// Returns the store's current node count and the count at transaction start.
3242    #[must_use]
3243    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3244        (
3245            self.transaction_start_node_count.load(Ordering::Relaxed),
3246            self.active_lpg_store().node_count(),
3247        )
3248    }
3249
3250    /// Returns the store's current edge count and the count at transaction start.
3251    #[must_use]
3252    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3253        (
3254            self.transaction_start_edge_count.load(Ordering::Relaxed),
3255            self.active_lpg_store().edge_count(),
3256        )
3257    }
3258
3259    /// Prepares the current transaction for a two-phase commit.
3260    ///
3261    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3262    /// lets you inspect pending changes and attach metadata before finalizing.
3263    /// The mutable borrow prevents concurrent operations while the commit is
3264    /// pending.
3265    ///
3266    /// If the `PreparedCommit` is dropped without calling `commit()` or
3267    /// `abort()`, the transaction is automatically rolled back.
3268    ///
3269    /// # Errors
3270    ///
3271    /// Returns an error if no transaction is active.
3272    ///
3273    /// # Examples
3274    ///
3275    /// ```no_run
3276    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3277    /// use grafeo_engine::GrafeoDB;
3278    ///
3279    /// let db = GrafeoDB::new_in_memory();
3280    /// let mut session = db.session();
3281    ///
3282    /// session.begin_transaction()?;
3283    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3284    ///
3285    /// let mut prepared = session.prepare_commit()?;
3286    /// println!("Nodes written: {}", prepared.info().nodes_written);
3287    /// prepared.set_metadata("audit_user", "admin");
3288    /// prepared.commit()?;
3289    /// # Ok(())
3290    /// # }
3291    /// ```
3292    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3293        crate::transaction::PreparedCommit::new(self)
3294    }
3295
3296    /// Sets auto-commit mode.
3297    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3298        self.auto_commit = auto_commit;
3299    }
3300
3301    /// Returns whether auto-commit is enabled.
3302    #[must_use]
3303    pub fn auto_commit(&self) -> bool {
3304        self.auto_commit
3305    }
3306
3307    /// Returns `true` if auto-commit should wrap this execution.
3308    ///
3309    /// Auto-commit kicks in when: the session is in auto-commit mode,
3310    /// no explicit transaction is active, and the query mutates data.
3311    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3312        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3313    }
3314
3315    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3316    /// returns `true`. On error the transaction is rolled back.
3317    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3318    where
3319        F: FnOnce() -> Result<QueryResult>,
3320    {
3321        if self.needs_auto_commit(has_mutations) {
3322            self.begin_transaction_inner(false, None)?;
3323            match body() {
3324                Ok(result) => {
3325                    self.commit_inner()?;
3326                    Ok(result)
3327                }
3328                Err(e) => {
3329                    let _ = self.rollback_inner();
3330                    Err(e)
3331                }
3332            }
3333        } else {
3334            body()
3335        }
3336    }
3337
3338    /// Quick heuristic: returns `true` when the query text looks like it
3339    /// performs a mutation. Used by `_with_params` paths that go through the
3340    /// `QueryProcessor` (where the logical plan isn't available before
3341    /// execution). False negatives are harmless: the data just won't be
3342    /// auto-committed, which matches the prior behaviour.
3343    fn query_looks_like_mutation(query: &str) -> bool {
3344        let upper = query.to_ascii_uppercase();
3345        upper.contains("INSERT")
3346            || upper.contains("CREATE")
3347            || upper.contains("DELETE")
3348            || upper.contains("MERGE")
3349            || upper.contains("SET")
3350            || upper.contains("REMOVE")
3351            || upper.contains("DROP")
3352            || upper.contains("ALTER")
3353    }
3354
3355    /// Computes the wall-clock deadline for query execution.
3356    #[must_use]
3357    fn query_deadline(&self) -> Option<Instant> {
3358        #[cfg(not(target_arch = "wasm32"))]
3359        {
3360            self.query_timeout.map(|d| Instant::now() + d)
3361        }
3362        #[cfg(target_arch = "wasm32")]
3363        {
3364            let _ = &self.query_timeout;
3365            None
3366        }
3367    }
3368
3369    /// Evaluates a simple integer literal from a session parameter expression.
3370    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3371        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3372        match expr {
3373            Expression::Literal(Literal::Integer(n)) => Some(*n),
3374            _ => None,
3375        }
3376    }
3377
3378    /// Returns the current transaction context for MVCC visibility.
3379    ///
3380    /// Returns `(viewing_epoch, transaction_id)` where:
3381    /// - `viewing_epoch` is the epoch at which to check version visibility
3382    /// - `transaction_id` is the current transaction ID (if in a transaction)
3383    #[must_use]
3384    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3385        // Time-travel override takes precedence (read-only, no tx context)
3386        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3387            return (epoch, None);
3388        }
3389
3390        if let Some(transaction_id) = *self.current_transaction.lock() {
3391            // In a transaction: use the transaction's start epoch
3392            let epoch = self
3393                .transaction_manager
3394                .start_epoch(transaction_id)
3395                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3396            (epoch, Some(transaction_id))
3397        } else {
3398            // No transaction: use current epoch
3399            (self.transaction_manager.current_epoch(), None)
3400        }
3401    }
3402
3403    /// Creates a planner with transaction context and constraint validator.
3404    ///
3405    /// The `store` parameter is the graph store to plan against (use
3406    /// `self.active_store()` for graph-aware execution).
3407    fn create_planner_for_store(
3408        &self,
3409        store: Arc<dyn GraphStoreMut>,
3410        viewing_epoch: EpochId,
3411        transaction_id: Option<TransactionId>,
3412    ) -> crate::query::Planner {
3413        use crate::query::Planner;
3414
3415        let mut planner = Planner::with_context(
3416            Arc::clone(&store),
3417            Arc::clone(&self.transaction_manager),
3418            transaction_id,
3419            viewing_epoch,
3420        )
3421        .with_factorized_execution(self.factorized_execution)
3422        .with_catalog(Arc::clone(&self.catalog));
3423
3424        // Attach the constraint validator for schema enforcement
3425        let validator =
3426            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3427        planner = planner.with_validator(Arc::new(validator));
3428
3429        planner
3430    }
3431
3432    /// Creates a node directly (bypassing query execution).
3433    ///
3434    /// This is a low-level API for testing and direct manipulation.
3435    /// If a transaction is active, the node will be versioned with the transaction ID.
3436    pub fn create_node(&self, labels: &[&str]) -> NodeId {
3437        let (epoch, transaction_id) = self.get_transaction_context();
3438        self.active_lpg_store().create_node_versioned(
3439            labels,
3440            epoch,
3441            transaction_id.unwrap_or(TransactionId::SYSTEM),
3442        )
3443    }
3444
3445    /// Creates a node with properties.
3446    ///
3447    /// If a transaction is active, the node will be versioned with the transaction ID.
3448    pub fn create_node_with_props<'a>(
3449        &self,
3450        labels: &[&str],
3451        properties: impl IntoIterator<Item = (&'a str, Value)>,
3452    ) -> NodeId {
3453        let (epoch, transaction_id) = self.get_transaction_context();
3454        self.active_lpg_store().create_node_with_props_versioned(
3455            labels,
3456            properties,
3457            epoch,
3458            transaction_id.unwrap_or(TransactionId::SYSTEM),
3459        )
3460    }
3461
3462    /// Creates an edge between two nodes.
3463    ///
3464    /// This is a low-level API for testing and direct manipulation.
3465    /// If a transaction is active, the edge will be versioned with the transaction ID.
3466    pub fn create_edge(
3467        &self,
3468        src: NodeId,
3469        dst: NodeId,
3470        edge_type: &str,
3471    ) -> grafeo_common::types::EdgeId {
3472        let (epoch, transaction_id) = self.get_transaction_context();
3473        self.active_lpg_store().create_edge_versioned(
3474            src,
3475            dst,
3476            edge_type,
3477            epoch,
3478            transaction_id.unwrap_or(TransactionId::SYSTEM),
3479        )
3480    }
3481
3482    // =========================================================================
3483    // Direct Lookup APIs (bypass query planning for O(1) point reads)
3484    // =========================================================================
3485
3486    /// Gets a node by ID directly, bypassing query planning.
3487    ///
3488    /// This is the fastest way to retrieve a single node when you know its ID.
3489    /// Skips parsing, binding, optimization, and physical planning entirely.
3490    ///
3491    /// # Performance
3492    ///
3493    /// - Time complexity: O(1) average case
3494    /// - No lock contention (uses DashMap internally)
3495    /// - ~20-30x faster than equivalent MATCH query
3496    ///
3497    /// # Example
3498    ///
3499    /// ```no_run
3500    /// # use grafeo_engine::GrafeoDB;
3501    /// # let db = GrafeoDB::new_in_memory();
3502    /// let session = db.session();
3503    /// let node_id = session.create_node(&["Person"]);
3504    ///
3505    /// // Direct lookup - O(1), no query planning
3506    /// let node = session.get_node(node_id);
3507    /// assert!(node.is_some());
3508    /// ```
3509    #[must_use]
3510    pub fn get_node(&self, id: NodeId) -> Option<Node> {
3511        let (epoch, transaction_id) = self.get_transaction_context();
3512        self.active_lpg_store().get_node_versioned(
3513            id,
3514            epoch,
3515            transaction_id.unwrap_or(TransactionId::SYSTEM),
3516        )
3517    }
3518
3519    /// Gets a single property from a node by ID, bypassing query planning.
3520    ///
3521    /// More efficient than `get_node()` when you only need one property,
3522    /// as it avoids loading the full node with all properties.
3523    ///
3524    /// # Performance
3525    ///
3526    /// - Time complexity: O(1) average case
3527    /// - No query planning overhead
3528    ///
3529    /// # Example
3530    ///
3531    /// ```no_run
3532    /// # use grafeo_engine::GrafeoDB;
3533    /// # use grafeo_common::types::Value;
3534    /// # let db = GrafeoDB::new_in_memory();
3535    /// let session = db.session();
3536    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
3537    ///
3538    /// // Direct property access - O(1)
3539    /// let name = session.get_node_property(id, "name");
3540    /// assert_eq!(name, Some(Value::String("Alix".into())));
3541    /// ```
3542    #[must_use]
3543    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3544        self.get_node(id)
3545            .and_then(|node| node.get_property(key).cloned())
3546    }
3547
3548    /// Gets an edge by ID directly, bypassing query planning.
3549    ///
3550    /// # Performance
3551    ///
3552    /// - Time complexity: O(1) average case
3553    /// - No lock contention
3554    #[must_use]
3555    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3556        let (epoch, transaction_id) = self.get_transaction_context();
3557        self.active_lpg_store().get_edge_versioned(
3558            id,
3559            epoch,
3560            transaction_id.unwrap_or(TransactionId::SYSTEM),
3561        )
3562    }
3563
3564    /// Gets outgoing neighbors of a node directly, bypassing query planning.
3565    ///
3566    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
3567    ///
3568    /// # Performance
3569    ///
3570    /// - Time complexity: O(degree) where degree is the number of outgoing edges
3571    /// - Uses adjacency index for direct access
3572    /// - ~10-20x faster than equivalent MATCH query
3573    ///
3574    /// # Example
3575    ///
3576    /// ```no_run
3577    /// # use grafeo_engine::GrafeoDB;
3578    /// # let db = GrafeoDB::new_in_memory();
3579    /// let session = db.session();
3580    /// let alix = session.create_node(&["Person"]);
3581    /// let gus = session.create_node(&["Person"]);
3582    /// session.create_edge(alix, gus, "KNOWS");
3583    ///
3584    /// // Direct neighbor lookup - O(degree)
3585    /// let neighbors = session.get_neighbors_outgoing(alix);
3586    /// assert_eq!(neighbors.len(), 1);
3587    /// assert_eq!(neighbors[0].0, gus);
3588    /// ```
3589    #[must_use]
3590    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3591        self.active_lpg_store()
3592            .edges_from(node, Direction::Outgoing)
3593            .collect()
3594    }
3595
3596    /// Gets incoming neighbors of a node directly, bypassing query planning.
3597    ///
3598    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
3599    ///
3600    /// # Performance
3601    ///
3602    /// - Time complexity: O(degree) where degree is the number of incoming edges
3603    /// - Uses backward adjacency index for direct access
3604    #[must_use]
3605    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3606        self.active_lpg_store()
3607            .edges_from(node, Direction::Incoming)
3608            .collect()
3609    }
3610
3611    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
3612    ///
3613    /// # Example
3614    ///
3615    /// ```no_run
3616    /// # use grafeo_engine::GrafeoDB;
3617    /// # let db = GrafeoDB::new_in_memory();
3618    /// # let session = db.session();
3619    /// # let alix = session.create_node(&["Person"]);
3620    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3621    /// ```
3622    #[must_use]
3623    pub fn get_neighbors_outgoing_by_type(
3624        &self,
3625        node: NodeId,
3626        edge_type: &str,
3627    ) -> Vec<(NodeId, EdgeId)> {
3628        self.active_lpg_store()
3629            .edges_from(node, Direction::Outgoing)
3630            .filter(|(_, edge_id)| {
3631                self.get_edge(*edge_id)
3632                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
3633            })
3634            .collect()
3635    }
3636
3637    /// Checks if a node exists, bypassing query planning.
3638    ///
3639    /// # Performance
3640    ///
3641    /// - Time complexity: O(1)
3642    /// - Fastest existence check available
3643    #[must_use]
3644    pub fn node_exists(&self, id: NodeId) -> bool {
3645        self.get_node(id).is_some()
3646    }
3647
3648    /// Checks if an edge exists, bypassing query planning.
3649    #[must_use]
3650    pub fn edge_exists(&self, id: EdgeId) -> bool {
3651        self.get_edge(id).is_some()
3652    }
3653
3654    /// Gets the degree (number of edges) of a node.
3655    ///
3656    /// Returns (outgoing_degree, incoming_degree).
3657    #[must_use]
3658    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3659        let active = self.active_lpg_store();
3660        let out = active.out_degree(node);
3661        let in_degree = active.in_degree(node);
3662        (out, in_degree)
3663    }
3664
3665    /// Batch lookup of multiple nodes by ID.
3666    ///
3667    /// More efficient than calling `get_node()` in a loop because it
3668    /// amortizes overhead.
3669    ///
3670    /// # Performance
3671    ///
3672    /// - Time complexity: O(n) where n is the number of IDs
3673    /// - Better cache utilization than individual lookups
3674    #[must_use]
3675    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3676        let (epoch, transaction_id) = self.get_transaction_context();
3677        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3678        let active = self.active_lpg_store();
3679        ids.iter()
3680            .map(|&id| active.get_node_versioned(id, epoch, tx))
3681            .collect()
3682    }
3683
3684    // ── Change Data Capture ─────────────────────────────────────────────
3685
3686    /// Returns the full change history for an entity (node or edge).
3687    #[cfg(feature = "cdc")]
3688    pub fn history(
3689        &self,
3690        entity_id: impl Into<crate::cdc::EntityId>,
3691    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3692        Ok(self.cdc_log.history(entity_id.into()))
3693    }
3694
3695    /// Returns change events for an entity since the given epoch.
3696    #[cfg(feature = "cdc")]
3697    pub fn history_since(
3698        &self,
3699        entity_id: impl Into<crate::cdc::EntityId>,
3700        since_epoch: EpochId,
3701    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3702        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3703    }
3704
3705    /// Returns all change events across all entities in an epoch range.
3706    #[cfg(feature = "cdc")]
3707    pub fn changes_between(
3708        &self,
3709        start_epoch: EpochId,
3710        end_epoch: EpochId,
3711    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3712        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3713    }
3714}
3715
3716impl Drop for Session {
3717    fn drop(&mut self) {
3718        // Auto-rollback any active transaction to prevent leaked MVCC state,
3719        // dangling write locks, and uncommitted versions lingering in the store.
3720        if self.in_transaction() {
3721            let _ = self.rollback_inner();
3722        }
3723    }
3724}
3725
3726#[cfg(test)]
3727mod tests {
3728    use crate::database::GrafeoDB;
3729
3730    #[test]
3731    fn test_session_create_node() {
3732        let db = GrafeoDB::new_in_memory();
3733        let session = db.session();
3734
3735        let id = session.create_node(&["Person"]);
3736        assert!(id.is_valid());
3737        assert_eq!(db.node_count(), 1);
3738    }
3739
3740    #[test]
3741    fn test_session_transaction() {
3742        let db = GrafeoDB::new_in_memory();
3743        let mut session = db.session();
3744
3745        assert!(!session.in_transaction());
3746
3747        session.begin_transaction().unwrap();
3748        assert!(session.in_transaction());
3749
3750        session.commit().unwrap();
3751        assert!(!session.in_transaction());
3752    }
3753
3754    #[test]
3755    fn test_session_transaction_context() {
3756        let db = GrafeoDB::new_in_memory();
3757        let mut session = db.session();
3758
3759        // Without transaction - context should have current epoch and no transaction_id
3760        let (_epoch1, transaction_id1) = session.get_transaction_context();
3761        assert!(transaction_id1.is_none());
3762
3763        // Start a transaction
3764        session.begin_transaction().unwrap();
3765        let (epoch2, transaction_id2) = session.get_transaction_context();
3766        assert!(transaction_id2.is_some());
3767        // Transaction should have a valid epoch
3768        let _ = epoch2; // Use the variable
3769
3770        // Commit and verify
3771        session.commit().unwrap();
3772        let (epoch3, tx_id3) = session.get_transaction_context();
3773        assert!(tx_id3.is_none());
3774        // Epoch should have advanced after commit
3775        assert!(epoch3.as_u64() >= epoch2.as_u64());
3776    }
3777
3778    #[test]
3779    fn test_session_rollback() {
3780        let db = GrafeoDB::new_in_memory();
3781        let mut session = db.session();
3782
3783        session.begin_transaction().unwrap();
3784        session.rollback().unwrap();
3785        assert!(!session.in_transaction());
3786    }
3787
3788    #[test]
3789    fn test_session_rollback_discards_versions() {
3790        use grafeo_common::types::TransactionId;
3791
3792        let db = GrafeoDB::new_in_memory();
3793
3794        // Create a node outside of any transaction (at system level)
3795        let node_before = db.store().create_node(&["Person"]);
3796        assert!(node_before.is_valid());
3797        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3798
3799        // Start a transaction
3800        let mut session = db.session();
3801        session.begin_transaction().unwrap();
3802        let transaction_id = session.current_transaction.lock().unwrap();
3803
3804        // Create a node versioned with the transaction's ID
3805        let epoch = db.store().current_epoch();
3806        let node_in_tx = db
3807            .store()
3808            .create_node_versioned(&["Person"], epoch, transaction_id);
3809        assert!(node_in_tx.is_valid());
3810
3811        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
3812        // non-versioned lookups like node_count(). Verify the node is visible
3813        // only through the owning transaction.
3814        assert_eq!(
3815            db.node_count(),
3816            1,
3817            "PENDING nodes should be invisible to non-versioned node_count()"
3818        );
3819        assert!(
3820            db.store()
3821                .get_node_versioned(node_in_tx, epoch, transaction_id)
3822                .is_some(),
3823            "Transaction node should be visible to its own transaction"
3824        );
3825
3826        // Rollback the transaction
3827        session.rollback().unwrap();
3828        assert!(!session.in_transaction());
3829
3830        // The node created in the transaction should be discarded
3831        // Only the first node should remain visible
3832        let count_after = db.node_count();
3833        assert_eq!(
3834            count_after, 1,
3835            "Rollback should discard uncommitted node, but got {count_after}"
3836        );
3837
3838        // The original node should still be accessible
3839        let current_epoch = db.store().current_epoch();
3840        assert!(
3841            db.store()
3842                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
3843                .is_some(),
3844            "Original node should still exist"
3845        );
3846
3847        // The node created in the transaction should not be accessible
3848        assert!(
3849            db.store()
3850                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
3851                .is_none(),
3852            "Transaction node should be gone"
3853        );
3854    }
3855
3856    #[test]
3857    fn test_session_create_node_in_transaction() {
3858        // Test that session.create_node() is transaction-aware
3859        let db = GrafeoDB::new_in_memory();
3860
3861        // Create a node outside of any transaction
3862        let node_before = db.create_node(&["Person"]);
3863        assert!(node_before.is_valid());
3864        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3865
3866        // Start a transaction and create a node through the session
3867        let mut session = db.session();
3868        session.begin_transaction().unwrap();
3869        let transaction_id = session.current_transaction.lock().unwrap();
3870
3871        // Create a node through session.create_node() - should be versioned with tx
3872        let node_in_tx = session.create_node(&["Person"]);
3873        assert!(node_in_tx.is_valid());
3874
3875        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
3876        // non-versioned lookups. Verify the node is visible only to its own tx.
3877        assert_eq!(
3878            db.node_count(),
3879            1,
3880            "PENDING nodes should be invisible to non-versioned node_count()"
3881        );
3882        let epoch = db.store().current_epoch();
3883        assert!(
3884            db.store()
3885                .get_node_versioned(node_in_tx, epoch, transaction_id)
3886                .is_some(),
3887            "Transaction node should be visible to its own transaction"
3888        );
3889
3890        // Rollback the transaction
3891        session.rollback().unwrap();
3892
3893        // The node created via session.create_node() should be discarded
3894        let count_after = db.node_count();
3895        assert_eq!(
3896            count_after, 1,
3897            "Rollback should discard node created via session.create_node(), but got {count_after}"
3898        );
3899    }
3900
3901    #[test]
3902    fn test_session_create_node_with_props_in_transaction() {
3903        use grafeo_common::types::Value;
3904
3905        // Test that session.create_node_with_props() is transaction-aware
3906        let db = GrafeoDB::new_in_memory();
3907
3908        // Create a node outside of any transaction
3909        db.create_node(&["Person"]);
3910        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3911
3912        // Start a transaction and create a node with properties
3913        let mut session = db.session();
3914        session.begin_transaction().unwrap();
3915        let transaction_id = session.current_transaction.lock().unwrap();
3916
3917        let node_in_tx =
3918            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3919        assert!(node_in_tx.is_valid());
3920
3921        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
3922        // non-versioned lookups. Verify the node is visible only to its own tx.
3923        assert_eq!(
3924            db.node_count(),
3925            1,
3926            "PENDING nodes should be invisible to non-versioned node_count()"
3927        );
3928        let epoch = db.store().current_epoch();
3929        assert!(
3930            db.store()
3931                .get_node_versioned(node_in_tx, epoch, transaction_id)
3932                .is_some(),
3933            "Transaction node should be visible to its own transaction"
3934        );
3935
3936        // Rollback the transaction
3937        session.rollback().unwrap();
3938
3939        // The node should be discarded
3940        let count_after = db.node_count();
3941        assert_eq!(
3942            count_after, 1,
3943            "Rollback should discard node created via session.create_node_with_props()"
3944        );
3945    }
3946
3947    #[cfg(feature = "gql")]
3948    mod gql_tests {
3949        use super::*;
3950
3951        #[test]
3952        fn test_gql_query_execution() {
3953            let db = GrafeoDB::new_in_memory();
3954            let session = db.session();
3955
3956            // Create some test data
3957            session.create_node(&["Person"]);
3958            session.create_node(&["Person"]);
3959            session.create_node(&["Animal"]);
3960
3961            // Execute a GQL query
3962            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3963
3964            // Should return 2 Person nodes
3965            assert_eq!(result.row_count(), 2);
3966            assert_eq!(result.column_count(), 1);
3967            assert_eq!(result.columns[0], "n");
3968        }
3969
3970        #[test]
3971        fn test_gql_empty_result() {
3972            let db = GrafeoDB::new_in_memory();
3973            let session = db.session();
3974
3975            // No data in database
3976            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3977
3978            assert_eq!(result.row_count(), 0);
3979        }
3980
3981        #[test]
3982        fn test_gql_parse_error() {
3983            let db = GrafeoDB::new_in_memory();
3984            let session = db.session();
3985
3986            // Invalid GQL syntax
3987            let result = session.execute("MATCH (n RETURN n");
3988
3989            assert!(result.is_err());
3990        }
3991
3992        #[test]
3993        fn test_gql_relationship_traversal() {
3994            let db = GrafeoDB::new_in_memory();
3995            let session = db.session();
3996
3997            // Create a graph: Alix -> Gus, Alix -> Vincent
3998            let alix = session.create_node(&["Person"]);
3999            let gus = session.create_node(&["Person"]);
4000            let vincent = session.create_node(&["Person"]);
4001
4002            session.create_edge(alix, gus, "KNOWS");
4003            session.create_edge(alix, vincent, "KNOWS");
4004
4005            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4006            let result = session
4007                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4008                .unwrap();
4009
4010            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4011            assert_eq!(result.row_count(), 2);
4012            assert_eq!(result.column_count(), 2);
4013            assert_eq!(result.columns[0], "a");
4014            assert_eq!(result.columns[1], "b");
4015        }
4016
4017        #[test]
4018        fn test_gql_relationship_with_type_filter() {
4019            let db = GrafeoDB::new_in_memory();
4020            let session = db.session();
4021
4022            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4023            let alix = session.create_node(&["Person"]);
4024            let gus = session.create_node(&["Person"]);
4025            let vincent = session.create_node(&["Person"]);
4026
4027            session.create_edge(alix, gus, "KNOWS");
4028            session.create_edge(alix, vincent, "WORKS_WITH");
4029
4030            // Query only KNOWS relationships
4031            let result = session
4032                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4033                .unwrap();
4034
4035            // Should return only 1 row (Alix->Gus)
4036            assert_eq!(result.row_count(), 1);
4037        }
4038
4039        #[test]
4040        fn test_gql_semantic_error_undefined_variable() {
4041            let db = GrafeoDB::new_in_memory();
4042            let session = db.session();
4043
4044            // Reference undefined variable 'x' in RETURN
4045            let result = session.execute("MATCH (n:Person) RETURN x");
4046
4047            // Should fail with semantic error
4048            assert!(result.is_err());
4049            let Err(err) = result else {
4050                panic!("Expected error")
4051            };
4052            assert!(
4053                err.to_string().contains("Undefined variable"),
4054                "Expected undefined variable error, got: {}",
4055                err
4056            );
4057        }
4058
4059        #[test]
4060        fn test_gql_where_clause_property_filter() {
4061            use grafeo_common::types::Value;
4062
4063            let db = GrafeoDB::new_in_memory();
4064            let session = db.session();
4065
4066            // Create people with ages
4067            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4068            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4069            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4070
4071            // Query with WHERE clause: age > 30
4072            let result = session
4073                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4074                .unwrap();
4075
4076            // Should return 2 people (ages 35 and 45)
4077            assert_eq!(result.row_count(), 2);
4078        }
4079
4080        #[test]
4081        fn test_gql_where_clause_equality() {
4082            use grafeo_common::types::Value;
4083
4084            let db = GrafeoDB::new_in_memory();
4085            let session = db.session();
4086
4087            // Create people with names
4088            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4089            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4090            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4091
4092            // Query with WHERE clause: name = "Alix"
4093            let result = session
4094                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4095                .unwrap();
4096
4097            // Should return 2 people named Alix
4098            assert_eq!(result.row_count(), 2);
4099        }
4100
4101        #[test]
4102        fn test_gql_return_property_access() {
4103            use grafeo_common::types::Value;
4104
4105            let db = GrafeoDB::new_in_memory();
4106            let session = db.session();
4107
4108            // Create people with names and ages
4109            session.create_node_with_props(
4110                &["Person"],
4111                [
4112                    ("name", Value::String("Alix".into())),
4113                    ("age", Value::Int64(30)),
4114                ],
4115            );
4116            session.create_node_with_props(
4117                &["Person"],
4118                [
4119                    ("name", Value::String("Gus".into())),
4120                    ("age", Value::Int64(25)),
4121                ],
4122            );
4123
4124            // Query returning properties
4125            let result = session
4126                .execute("MATCH (n:Person) RETURN n.name, n.age")
4127                .unwrap();
4128
4129            // Should return 2 rows with name and age columns
4130            assert_eq!(result.row_count(), 2);
4131            assert_eq!(result.column_count(), 2);
4132            assert_eq!(result.columns[0], "n.name");
4133            assert_eq!(result.columns[1], "n.age");
4134
4135            // Check that we get actual values
4136            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4137            assert!(names.contains(&&Value::String("Alix".into())));
4138            assert!(names.contains(&&Value::String("Gus".into())));
4139        }
4140
4141        #[test]
4142        fn test_gql_return_mixed_expressions() {
4143            use grafeo_common::types::Value;
4144
4145            let db = GrafeoDB::new_in_memory();
4146            let session = db.session();
4147
4148            // Create a person
4149            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4150
4151            // Query returning both node and property
4152            let result = session
4153                .execute("MATCH (n:Person) RETURN n, n.name")
4154                .unwrap();
4155
4156            assert_eq!(result.row_count(), 1);
4157            assert_eq!(result.column_count(), 2);
4158            assert_eq!(result.columns[0], "n");
4159            assert_eq!(result.columns[1], "n.name");
4160
4161            // Second column should be the name
4162            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4163        }
4164    }
4165
4166    #[cfg(feature = "cypher")]
4167    mod cypher_tests {
4168        use super::*;
4169
4170        #[test]
4171        fn test_cypher_query_execution() {
4172            let db = GrafeoDB::new_in_memory();
4173            let session = db.session();
4174
4175            // Create some test data
4176            session.create_node(&["Person"]);
4177            session.create_node(&["Person"]);
4178            session.create_node(&["Animal"]);
4179
4180            // Execute a Cypher query
4181            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4182
4183            // Should return 2 Person nodes
4184            assert_eq!(result.row_count(), 2);
4185            assert_eq!(result.column_count(), 1);
4186            assert_eq!(result.columns[0], "n");
4187        }
4188
4189        #[test]
4190        fn test_cypher_empty_result() {
4191            let db = GrafeoDB::new_in_memory();
4192            let session = db.session();
4193
4194            // No data in database
4195            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4196
4197            assert_eq!(result.row_count(), 0);
4198        }
4199
4200        #[test]
4201        fn test_cypher_parse_error() {
4202            let db = GrafeoDB::new_in_memory();
4203            let session = db.session();
4204
4205            // Invalid Cypher syntax
4206            let result = session.execute_cypher("MATCH (n RETURN n");
4207
4208            assert!(result.is_err());
4209        }
4210    }
4211
4212    // ==================== Direct Lookup API Tests ====================
4213
4214    mod direct_lookup_tests {
4215        use super::*;
4216        use grafeo_common::types::Value;
4217
4218        #[test]
4219        fn test_get_node() {
4220            let db = GrafeoDB::new_in_memory();
4221            let session = db.session();
4222
4223            let id = session.create_node(&["Person"]);
4224            let node = session.get_node(id);
4225
4226            assert!(node.is_some());
4227            let node = node.unwrap();
4228            assert_eq!(node.id, id);
4229        }
4230
4231        #[test]
4232        fn test_get_node_not_found() {
4233            use grafeo_common::types::NodeId;
4234
4235            let db = GrafeoDB::new_in_memory();
4236            let session = db.session();
4237
4238            // Try to get a non-existent node
4239            let node = session.get_node(NodeId::new(9999));
4240            assert!(node.is_none());
4241        }
4242
4243        #[test]
4244        fn test_get_node_property() {
4245            let db = GrafeoDB::new_in_memory();
4246            let session = db.session();
4247
4248            let id = session
4249                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4250
4251            let name = session.get_node_property(id, "name");
4252            assert_eq!(name, Some(Value::String("Alix".into())));
4253
4254            // Non-existent property
4255            let missing = session.get_node_property(id, "missing");
4256            assert!(missing.is_none());
4257        }
4258
4259        #[test]
4260        fn test_get_edge() {
4261            let db = GrafeoDB::new_in_memory();
4262            let session = db.session();
4263
4264            let alix = session.create_node(&["Person"]);
4265            let gus = session.create_node(&["Person"]);
4266            let edge_id = session.create_edge(alix, gus, "KNOWS");
4267
4268            let edge = session.get_edge(edge_id);
4269            assert!(edge.is_some());
4270            let edge = edge.unwrap();
4271            assert_eq!(edge.id, edge_id);
4272            assert_eq!(edge.src, alix);
4273            assert_eq!(edge.dst, gus);
4274        }
4275
4276        #[test]
4277        fn test_get_edge_not_found() {
4278            use grafeo_common::types::EdgeId;
4279
4280            let db = GrafeoDB::new_in_memory();
4281            let session = db.session();
4282
4283            let edge = session.get_edge(EdgeId::new(9999));
4284            assert!(edge.is_none());
4285        }
4286
4287        #[test]
4288        fn test_get_neighbors_outgoing() {
4289            let db = GrafeoDB::new_in_memory();
4290            let session = db.session();
4291
4292            let alix = session.create_node(&["Person"]);
4293            let gus = session.create_node(&["Person"]);
4294            let harm = session.create_node(&["Person"]);
4295
4296            session.create_edge(alix, gus, "KNOWS");
4297            session.create_edge(alix, harm, "KNOWS");
4298
4299            let neighbors = session.get_neighbors_outgoing(alix);
4300            assert_eq!(neighbors.len(), 2);
4301
4302            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4303            assert!(neighbor_ids.contains(&gus));
4304            assert!(neighbor_ids.contains(&harm));
4305        }
4306
4307        #[test]
4308        fn test_get_neighbors_incoming() {
4309            let db = GrafeoDB::new_in_memory();
4310            let session = db.session();
4311
4312            let alix = session.create_node(&["Person"]);
4313            let gus = session.create_node(&["Person"]);
4314            let harm = session.create_node(&["Person"]);
4315
4316            session.create_edge(gus, alix, "KNOWS");
4317            session.create_edge(harm, alix, "KNOWS");
4318
4319            let neighbors = session.get_neighbors_incoming(alix);
4320            assert_eq!(neighbors.len(), 2);
4321
4322            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4323            assert!(neighbor_ids.contains(&gus));
4324            assert!(neighbor_ids.contains(&harm));
4325        }
4326
4327        #[test]
4328        fn test_get_neighbors_outgoing_by_type() {
4329            let db = GrafeoDB::new_in_memory();
4330            let session = db.session();
4331
4332            let alix = session.create_node(&["Person"]);
4333            let gus = session.create_node(&["Person"]);
4334            let company = session.create_node(&["Company"]);
4335
4336            session.create_edge(alix, gus, "KNOWS");
4337            session.create_edge(alix, company, "WORKS_AT");
4338
4339            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4340            assert_eq!(knows_neighbors.len(), 1);
4341            assert_eq!(knows_neighbors[0].0, gus);
4342
4343            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4344            assert_eq!(works_neighbors.len(), 1);
4345            assert_eq!(works_neighbors[0].0, company);
4346
4347            // No edges of this type
4348            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4349            assert!(no_neighbors.is_empty());
4350        }
4351
4352        #[test]
4353        fn test_node_exists() {
4354            use grafeo_common::types::NodeId;
4355
4356            let db = GrafeoDB::new_in_memory();
4357            let session = db.session();
4358
4359            let id = session.create_node(&["Person"]);
4360
4361            assert!(session.node_exists(id));
4362            assert!(!session.node_exists(NodeId::new(9999)));
4363        }
4364
4365        #[test]
4366        fn test_edge_exists() {
4367            use grafeo_common::types::EdgeId;
4368
4369            let db = GrafeoDB::new_in_memory();
4370            let session = db.session();
4371
4372            let alix = session.create_node(&["Person"]);
4373            let gus = session.create_node(&["Person"]);
4374            let edge_id = session.create_edge(alix, gus, "KNOWS");
4375
4376            assert!(session.edge_exists(edge_id));
4377            assert!(!session.edge_exists(EdgeId::new(9999)));
4378        }
4379
4380        #[test]
4381        fn test_get_degree() {
4382            let db = GrafeoDB::new_in_memory();
4383            let session = db.session();
4384
4385            let alix = session.create_node(&["Person"]);
4386            let gus = session.create_node(&["Person"]);
4387            let harm = session.create_node(&["Person"]);
4388
4389            // Alix knows Gus and Harm (2 outgoing)
4390            session.create_edge(alix, gus, "KNOWS");
4391            session.create_edge(alix, harm, "KNOWS");
4392            // Gus knows Alix (1 incoming for Alix)
4393            session.create_edge(gus, alix, "KNOWS");
4394
4395            let (out_degree, in_degree) = session.get_degree(alix);
4396            assert_eq!(out_degree, 2);
4397            assert_eq!(in_degree, 1);
4398
4399            // Node with no edges
4400            let lonely = session.create_node(&["Person"]);
4401            let (out, in_deg) = session.get_degree(lonely);
4402            assert_eq!(out, 0);
4403            assert_eq!(in_deg, 0);
4404        }
4405
4406        #[test]
4407        fn test_get_nodes_batch() {
4408            let db = GrafeoDB::new_in_memory();
4409            let session = db.session();
4410
4411            let alix = session.create_node(&["Person"]);
4412            let gus = session.create_node(&["Person"]);
4413            let harm = session.create_node(&["Person"]);
4414
4415            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4416            assert_eq!(nodes.len(), 3);
4417            assert!(nodes[0].is_some());
4418            assert!(nodes[1].is_some());
4419            assert!(nodes[2].is_some());
4420
4421            // With non-existent node
4422            use grafeo_common::types::NodeId;
4423            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4424            assert_eq!(nodes_with_missing.len(), 3);
4425            assert!(nodes_with_missing[0].is_some());
4426            assert!(nodes_with_missing[1].is_none()); // Missing node
4427            assert!(nodes_with_missing[2].is_some());
4428        }
4429
4430        #[test]
4431        fn test_auto_commit_setting() {
4432            let db = GrafeoDB::new_in_memory();
4433            let mut session = db.session();
4434
4435            // Default is auto-commit enabled
4436            assert!(session.auto_commit());
4437
4438            session.set_auto_commit(false);
4439            assert!(!session.auto_commit());
4440
4441            session.set_auto_commit(true);
4442            assert!(session.auto_commit());
4443        }
4444
4445        #[test]
4446        fn test_transaction_double_begin_nests() {
4447            let db = GrafeoDB::new_in_memory();
4448            let mut session = db.session();
4449
4450            session.begin_transaction().unwrap();
4451            // Second begin_transaction creates a nested transaction (auto-savepoint)
4452            let result = session.begin_transaction();
4453            assert!(result.is_ok());
4454            // Commit the inner (releases savepoint)
4455            session.commit().unwrap();
4456            // Commit the outer
4457            session.commit().unwrap();
4458        }
4459
4460        #[test]
4461        fn test_commit_without_transaction_error() {
4462            let db = GrafeoDB::new_in_memory();
4463            let mut session = db.session();
4464
4465            let result = session.commit();
4466            assert!(result.is_err());
4467        }
4468
4469        #[test]
4470        fn test_rollback_without_transaction_error() {
4471            let db = GrafeoDB::new_in_memory();
4472            let mut session = db.session();
4473
4474            let result = session.rollback();
4475            assert!(result.is_err());
4476        }
4477
4478        #[test]
4479        fn test_create_edge_in_transaction() {
4480            let db = GrafeoDB::new_in_memory();
4481            let mut session = db.session();
4482
4483            // Create nodes outside transaction
4484            let alix = session.create_node(&["Person"]);
4485            let gus = session.create_node(&["Person"]);
4486
4487            // Create edge in transaction
4488            session.begin_transaction().unwrap();
4489            let edge_id = session.create_edge(alix, gus, "KNOWS");
4490
4491            // Edge should be visible in the transaction
4492            assert!(session.edge_exists(edge_id));
4493
4494            // Commit
4495            session.commit().unwrap();
4496
4497            // Edge should still be visible
4498            assert!(session.edge_exists(edge_id));
4499        }
4500
4501        #[test]
4502        fn test_neighbors_empty_node() {
4503            let db = GrafeoDB::new_in_memory();
4504            let session = db.session();
4505
4506            let lonely = session.create_node(&["Person"]);
4507
4508            assert!(session.get_neighbors_outgoing(lonely).is_empty());
4509            assert!(session.get_neighbors_incoming(lonely).is_empty());
4510            assert!(
4511                session
4512                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4513                    .is_empty()
4514            );
4515        }
4516    }
4517
4518    #[test]
4519    fn test_auto_gc_triggers_on_commit_interval() {
4520        use crate::config::Config;
4521
4522        let config = Config::in_memory().with_gc_interval(2);
4523        let db = GrafeoDB::with_config(config).unwrap();
4524        let mut session = db.session();
4525
4526        // First commit: counter = 1, no GC (not a multiple of 2)
4527        session.begin_transaction().unwrap();
4528        session.create_node(&["A"]);
4529        session.commit().unwrap();
4530
4531        // Second commit: counter = 2, GC should trigger (multiple of 2)
4532        session.begin_transaction().unwrap();
4533        session.create_node(&["B"]);
4534        session.commit().unwrap();
4535
4536        // Verify the database is still functional after GC
4537        assert_eq!(db.node_count(), 2);
4538    }
4539
4540    #[test]
4541    fn test_query_timeout_config_propagates_to_session() {
4542        use crate::config::Config;
4543        use std::time::Duration;
4544
4545        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4546        let db = GrafeoDB::with_config(config).unwrap();
4547        let session = db.session();
4548
4549        // Verify the session has a query deadline (timeout was set)
4550        assert!(session.query_deadline().is_some());
4551    }
4552
4553    #[test]
4554    fn test_no_query_timeout_returns_no_deadline() {
4555        let db = GrafeoDB::new_in_memory();
4556        let session = db.session();
4557
4558        // Default config has no timeout
4559        assert!(session.query_deadline().is_none());
4560    }
4561
4562    #[test]
4563    fn test_graph_model_accessor() {
4564        use crate::config::GraphModel;
4565
4566        let db = GrafeoDB::new_in_memory();
4567        let session = db.session();
4568
4569        assert_eq!(session.graph_model(), GraphModel::Lpg);
4570    }
4571
4572    #[cfg(feature = "gql")]
4573    #[test]
4574    fn test_external_store_session() {
4575        use grafeo_core::graph::GraphStoreMut;
4576        use std::sync::Arc;
4577
4578        let config = crate::config::Config::in_memory();
4579        let store =
4580            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4581        let db = GrafeoDB::with_store(store, config).unwrap();
4582
4583        let mut session = db.session();
4584
4585        // Use an explicit transaction so that INSERT and MATCH share the same
4586        // transaction context. With PENDING epochs, uncommitted versions are
4587        // only visible to the owning transaction.
4588        session.begin_transaction().unwrap();
4589        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4590
4591        // Verify we can query through it within the same transaction
4592        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4593        assert_eq!(result.row_count(), 1);
4594
4595        session.commit().unwrap();
4596    }
4597
4598    // ==================== Session Command Tests ====================
4599
4600    #[cfg(feature = "gql")]
4601    mod session_command_tests {
4602        use super::*;
4603        use grafeo_common::types::Value;
4604
4605        #[test]
4606        fn test_use_graph_sets_current_graph() {
4607            let db = GrafeoDB::new_in_memory();
4608            let session = db.session();
4609
4610            // Create the graph first, then USE it
4611            session.execute("CREATE GRAPH mydb").unwrap();
4612            session.execute("USE GRAPH mydb").unwrap();
4613
4614            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4615        }
4616
4617        #[test]
4618        fn test_use_graph_nonexistent_errors() {
4619            let db = GrafeoDB::new_in_memory();
4620            let session = db.session();
4621
4622            let result = session.execute("USE GRAPH doesnotexist");
4623            assert!(result.is_err());
4624            let err = result.unwrap_err().to_string();
4625            assert!(
4626                err.contains("does not exist"),
4627                "Expected 'does not exist' error, got: {err}"
4628            );
4629        }
4630
4631        #[test]
4632        fn test_use_graph_default_always_valid() {
4633            let db = GrafeoDB::new_in_memory();
4634            let session = db.session();
4635
4636            // "default" is always valid, even without CREATE GRAPH
4637            session.execute("USE GRAPH default").unwrap();
4638            assert_eq!(session.current_graph(), Some("default".to_string()));
4639        }
4640
4641        #[test]
4642        fn test_session_set_graph() {
4643            let db = GrafeoDB::new_in_memory();
4644            let session = db.session();
4645
4646            session.execute("CREATE GRAPH analytics").unwrap();
4647            session.execute("SESSION SET GRAPH analytics").unwrap();
4648            assert_eq!(session.current_graph(), Some("analytics".to_string()));
4649        }
4650
4651        #[test]
4652        fn test_session_set_graph_nonexistent_errors() {
4653            let db = GrafeoDB::new_in_memory();
4654            let session = db.session();
4655
4656            let result = session.execute("SESSION SET GRAPH nosuchgraph");
4657            assert!(result.is_err());
4658        }
4659
4660        #[test]
4661        fn test_session_set_time_zone() {
4662            let db = GrafeoDB::new_in_memory();
4663            let session = db.session();
4664
4665            assert_eq!(session.time_zone(), None);
4666
4667            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4668            assert_eq!(session.time_zone(), Some("UTC".to_string()));
4669
4670            session
4671                .execute("SESSION SET TIME ZONE 'America/New_York'")
4672                .unwrap();
4673            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
4674        }
4675
4676        #[test]
4677        fn test_session_set_parameter() {
4678            let db = GrafeoDB::new_in_memory();
4679            let session = db.session();
4680
4681            session
4682                .execute("SESSION SET PARAMETER $timeout = 30")
4683                .unwrap();
4684
4685            // Parameter is stored (value is Null for now, since expression
4686            // evaluation is not yet wired up)
4687            assert!(session.get_parameter("timeout").is_some());
4688        }
4689
4690        #[test]
4691        fn test_session_reset_clears_all_state() {
4692            let db = GrafeoDB::new_in_memory();
4693            let session = db.session();
4694
4695            // Set various session state
4696            session.execute("CREATE GRAPH analytics").unwrap();
4697            session.execute("SESSION SET GRAPH analytics").unwrap();
4698            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4699            session
4700                .execute("SESSION SET PARAMETER $limit = 100")
4701                .unwrap();
4702
4703            // Verify state was set
4704            assert!(session.current_graph().is_some());
4705            assert!(session.time_zone().is_some());
4706            assert!(session.get_parameter("limit").is_some());
4707
4708            // Reset everything
4709            session.execute("SESSION RESET").unwrap();
4710
4711            assert_eq!(session.current_graph(), None);
4712            assert_eq!(session.time_zone(), None);
4713            assert!(session.get_parameter("limit").is_none());
4714        }
4715
4716        #[test]
4717        fn test_session_close_clears_state() {
4718            let db = GrafeoDB::new_in_memory();
4719            let session = db.session();
4720
4721            session.execute("CREATE GRAPH analytics").unwrap();
4722            session.execute("SESSION SET GRAPH analytics").unwrap();
4723            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4724
4725            session.execute("SESSION CLOSE").unwrap();
4726
4727            assert_eq!(session.current_graph(), None);
4728            assert_eq!(session.time_zone(), None);
4729        }
4730
4731        #[test]
4732        fn test_create_graph() {
4733            let db = GrafeoDB::new_in_memory();
4734            let session = db.session();
4735
4736            session.execute("CREATE GRAPH mydb").unwrap();
4737
4738            // Should be able to USE it now
4739            session.execute("USE GRAPH mydb").unwrap();
4740            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4741        }
4742
4743        #[test]
4744        fn test_create_graph_duplicate_errors() {
4745            let db = GrafeoDB::new_in_memory();
4746            let session = db.session();
4747
4748            session.execute("CREATE GRAPH mydb").unwrap();
4749            let result = session.execute("CREATE GRAPH mydb");
4750
4751            assert!(result.is_err());
4752            let err = result.unwrap_err().to_string();
4753            assert!(
4754                err.contains("already exists"),
4755                "Expected 'already exists' error, got: {err}"
4756            );
4757        }
4758
4759        #[test]
4760        fn test_create_graph_if_not_exists() {
4761            let db = GrafeoDB::new_in_memory();
4762            let session = db.session();
4763
4764            session.execute("CREATE GRAPH mydb").unwrap();
4765            // Should succeed silently with IF NOT EXISTS
4766            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
4767        }
4768
4769        #[test]
4770        fn test_drop_graph() {
4771            let db = GrafeoDB::new_in_memory();
4772            let session = db.session();
4773
4774            session.execute("CREATE GRAPH mydb").unwrap();
4775            session.execute("DROP GRAPH mydb").unwrap();
4776
4777            // Should no longer be usable
4778            let result = session.execute("USE GRAPH mydb");
4779            assert!(result.is_err());
4780        }
4781
4782        #[test]
4783        fn test_drop_graph_nonexistent_errors() {
4784            let db = GrafeoDB::new_in_memory();
4785            let session = db.session();
4786
4787            let result = session.execute("DROP GRAPH nosuchgraph");
4788            assert!(result.is_err());
4789            let err = result.unwrap_err().to_string();
4790            assert!(
4791                err.contains("does not exist"),
4792                "Expected 'does not exist' error, got: {err}"
4793            );
4794        }
4795
4796        #[test]
4797        fn test_drop_graph_if_exists() {
4798            let db = GrafeoDB::new_in_memory();
4799            let session = db.session();
4800
4801            // Should succeed silently with IF EXISTS
4802            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
4803        }
4804
4805        #[test]
4806        fn test_start_transaction_via_gql() {
4807            let db = GrafeoDB::new_in_memory();
4808            let session = db.session();
4809
4810            session.execute("START TRANSACTION").unwrap();
4811            assert!(session.in_transaction());
4812            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4813            session.execute("COMMIT").unwrap();
4814            assert!(!session.in_transaction());
4815
4816            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4817            assert_eq!(result.rows.len(), 1);
4818        }
4819
4820        #[test]
4821        fn test_start_transaction_read_only_blocks_insert() {
4822            let db = GrafeoDB::new_in_memory();
4823            let session = db.session();
4824
4825            session.execute("START TRANSACTION READ ONLY").unwrap();
4826            let result = session.execute("INSERT (:Person {name: 'Alix'})");
4827            assert!(result.is_err());
4828            let err = result.unwrap_err().to_string();
4829            assert!(
4830                err.contains("read-only"),
4831                "Expected read-only error, got: {err}"
4832            );
4833            session.execute("ROLLBACK").unwrap();
4834        }
4835
4836        #[test]
4837        fn test_start_transaction_read_only_allows_reads() {
4838            let db = GrafeoDB::new_in_memory();
4839            let mut session = db.session();
4840            session.begin_transaction().unwrap();
4841            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4842            session.commit().unwrap();
4843
4844            session.execute("START TRANSACTION READ ONLY").unwrap();
4845            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4846            assert_eq!(result.rows.len(), 1);
4847            session.execute("COMMIT").unwrap();
4848        }
4849
4850        #[test]
4851        fn test_rollback_via_gql() {
4852            let db = GrafeoDB::new_in_memory();
4853            let session = db.session();
4854
4855            session.execute("START TRANSACTION").unwrap();
4856            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4857            session.execute("ROLLBACK").unwrap();
4858
4859            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4860            assert!(result.rows.is_empty());
4861        }
4862
4863        #[test]
4864        fn test_start_transaction_with_isolation_level() {
4865            let db = GrafeoDB::new_in_memory();
4866            let session = db.session();
4867
4868            session
4869                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
4870                .unwrap();
4871            assert!(session.in_transaction());
4872            session.execute("ROLLBACK").unwrap();
4873        }
4874
4875        #[test]
4876        fn test_session_commands_return_empty_result() {
4877            let db = GrafeoDB::new_in_memory();
4878            let session = db.session();
4879
4880            session.execute("CREATE GRAPH test").unwrap();
4881            let result = session.execute("SESSION SET GRAPH test").unwrap();
4882            assert_eq!(result.row_count(), 0);
4883            assert_eq!(result.column_count(), 0);
4884        }
4885
4886        #[test]
4887        fn test_current_graph_default_is_none() {
4888            let db = GrafeoDB::new_in_memory();
4889            let session = db.session();
4890
4891            assert_eq!(session.current_graph(), None);
4892        }
4893
4894        #[test]
4895        fn test_time_zone_default_is_none() {
4896            let db = GrafeoDB::new_in_memory();
4897            let session = db.session();
4898
4899            assert_eq!(session.time_zone(), None);
4900        }
4901
4902        #[test]
4903        fn test_session_state_independent_across_sessions() {
4904            let db = GrafeoDB::new_in_memory();
4905            let session1 = db.session();
4906            let session2 = db.session();
4907
4908            session1.execute("CREATE GRAPH first").unwrap();
4909            session1.execute("CREATE GRAPH second").unwrap();
4910            session1.execute("SESSION SET GRAPH first").unwrap();
4911            session2.execute("SESSION SET GRAPH second").unwrap();
4912
4913            assert_eq!(session1.current_graph(), Some("first".to_string()));
4914            assert_eq!(session2.current_graph(), Some("second".to_string()));
4915        }
4916
4917        #[test]
4918        fn test_show_node_types() {
4919            let db = GrafeoDB::new_in_memory();
4920            let session = db.session();
4921
4922            session
4923                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
4924                .unwrap();
4925
4926            let result = session.execute("SHOW NODE TYPES").unwrap();
4927            assert_eq!(
4928                result.columns,
4929                vec!["name", "properties", "constraints", "parents"]
4930            );
4931            assert_eq!(result.rows.len(), 1);
4932            // First column is the type name
4933            assert_eq!(result.rows[0][0], Value::from("Person"));
4934        }
4935
4936        #[test]
4937        fn test_show_edge_types() {
4938            let db = GrafeoDB::new_in_memory();
4939            let session = db.session();
4940
4941            session
4942                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
4943                .unwrap();
4944
4945            let result = session.execute("SHOW EDGE TYPES").unwrap();
4946            assert_eq!(
4947                result.columns,
4948                vec!["name", "properties", "source_types", "target_types"]
4949            );
4950            assert_eq!(result.rows.len(), 1);
4951            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
4952        }
4953
4954        #[test]
4955        fn test_show_graph_types() {
4956            let db = GrafeoDB::new_in_memory();
4957            let session = db.session();
4958
4959            session
4960                .execute("CREATE NODE TYPE Person (name STRING)")
4961                .unwrap();
4962            session
4963                .execute(
4964                    "CREATE GRAPH TYPE social (\
4965                        NODE TYPE Person (name STRING)\
4966                    )",
4967                )
4968                .unwrap();
4969
4970            let result = session.execute("SHOW GRAPH TYPES").unwrap();
4971            assert_eq!(
4972                result.columns,
4973                vec!["name", "open", "node_types", "edge_types"]
4974            );
4975            assert_eq!(result.rows.len(), 1);
4976            assert_eq!(result.rows[0][0], Value::from("social"));
4977        }
4978
4979        #[test]
4980        fn test_show_graph_type_named() {
4981            let db = GrafeoDB::new_in_memory();
4982            let session = db.session();
4983
4984            session
4985                .execute("CREATE NODE TYPE Person (name STRING)")
4986                .unwrap();
4987            session
4988                .execute(
4989                    "CREATE GRAPH TYPE social (\
4990                        NODE TYPE Person (name STRING)\
4991                    )",
4992                )
4993                .unwrap();
4994
4995            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
4996            assert_eq!(result.rows.len(), 1);
4997            assert_eq!(result.rows[0][0], Value::from("social"));
4998        }
4999
5000        #[test]
5001        fn test_show_graph_type_not_found() {
5002            let db = GrafeoDB::new_in_memory();
5003            let session = db.session();
5004
5005            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5006            assert!(result.is_err());
5007        }
5008
5009        #[test]
5010        fn test_show_indexes_via_gql() {
5011            let db = GrafeoDB::new_in_memory();
5012            let session = db.session();
5013
5014            let result = session.execute("SHOW INDEXES").unwrap();
5015            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5016        }
5017
5018        #[test]
5019        fn test_show_constraints_via_gql() {
5020            let db = GrafeoDB::new_in_memory();
5021            let session = db.session();
5022
5023            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5024            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5025        }
5026
5027        #[test]
5028        fn test_pattern_form_graph_type_roundtrip() {
5029            let db = GrafeoDB::new_in_memory();
5030            let session = db.session();
5031
5032            // Register the types first
5033            session
5034                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5035                .unwrap();
5036            session
5037                .execute("CREATE NODE TYPE City (name STRING)")
5038                .unwrap();
5039            session
5040                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5041                .unwrap();
5042            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5043
5044            // Create graph type using pattern form
5045            session
5046                .execute(
5047                    "CREATE GRAPH TYPE social (\
5048                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5049                        (:Person)-[:LIVES_IN]->(:City)\
5050                    )",
5051                )
5052                .unwrap();
5053
5054            // Verify it was created
5055            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5056            assert_eq!(result.rows.len(), 1);
5057            assert_eq!(result.rows[0][0], Value::from("social"));
5058        }
5059    }
5060}