Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25/// Your handle to the database - execute queries and manage transactions.
26///
27/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
28/// tracks its own transaction state, so you can have multiple concurrent
29/// sessions without them interfering.
30pub struct Session {
31    /// The underlying store.
32    store: Arc<LpgStore>,
33    /// Graph store trait object for pluggable storage backends.
34    graph_store: Arc<dyn GraphStoreMut>,
35    /// Schema and metadata catalog shared across sessions.
36    catalog: Arc<Catalog>,
37    /// RDF triple store (if RDF feature is enabled).
38    #[cfg(feature = "rdf")]
39    rdf_store: Arc<RdfStore>,
40    /// Transaction manager.
41    transaction_manager: Arc<TransactionManager>,
42    /// Query cache shared across sessions.
43    query_cache: Arc<QueryCache>,
44    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
45    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
46    /// from within `execute(&self)`.
47    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
48    /// Whether the current transaction is read-only (blocks mutations).
49    read_only_tx: parking_lot::Mutex<bool>,
50    /// Whether the session is in auto-commit mode.
51    auto_commit: bool,
52    /// Adaptive execution configuration.
53    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
54    adaptive_config: AdaptiveConfig,
55    /// Whether to use factorized execution for multi-hop queries.
56    factorized_execution: bool,
57    /// The graph data model this session operates on.
58    graph_model: GraphModel,
59    /// Maximum time a query may run before being cancelled.
60    query_timeout: Option<Duration>,
61    /// Shared commit counter for triggering auto-GC.
62    commit_counter: Arc<AtomicUsize>,
63    /// GC every N commits (0 = disabled).
64    gc_interval: usize,
65    /// Node count at the start of the current transaction (for PreparedCommit stats).
66    transaction_start_node_count: AtomicUsize,
67    /// Edge count at the start of the current transaction (for PreparedCommit stats).
68    transaction_start_edge_count: AtomicUsize,
69    /// WAL for logging schema changes.
70    #[cfg(feature = "wal")]
71    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
72    /// CDC log for change tracking.
73    #[cfg(feature = "cdc")]
74    cdc_log: Arc<crate::cdc::CdcLog>,
75    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
76    current_graph: parking_lot::Mutex<Option<String>>,
77    /// Session time zone override.
78    time_zone: parking_lot::Mutex<Option<String>>,
79    /// Session-level parameters (SET PARAMETER).
80    session_params:
81        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
82    /// Override epoch for time-travel queries (None = use transaction/current epoch).
83    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
84    /// Savepoints within the current transaction: name -> (next_node_id, next_edge_id, undo_log_position) snapshot.
85    savepoints: parking_lot::Mutex<Vec<(String, u64, u64, usize)>>,
86    /// Nesting depth for nested transactions (0 = outermost).
87    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
88    /// releases it, nested `ROLLBACK` rolls back to it.
89    transaction_nesting_depth: parking_lot::Mutex<u32>,
90}
91
92impl Session {
93    /// Creates a new session with adaptive execution configuration.
94    #[allow(dead_code, clippy::too_many_arguments)]
95    pub(crate) fn with_adaptive(
96        store: Arc<LpgStore>,
97        transaction_manager: Arc<TransactionManager>,
98        query_cache: Arc<QueryCache>,
99        catalog: Arc<Catalog>,
100        adaptive_config: AdaptiveConfig,
101        factorized_execution: bool,
102        graph_model: GraphModel,
103        query_timeout: Option<Duration>,
104        commit_counter: Arc<AtomicUsize>,
105        gc_interval: usize,
106    ) -> Self {
107        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
108        Self {
109            store,
110            graph_store,
111            catalog,
112            #[cfg(feature = "rdf")]
113            rdf_store: Arc::new(RdfStore::new()),
114            transaction_manager,
115            query_cache,
116            current_transaction: parking_lot::Mutex::new(None),
117            read_only_tx: parking_lot::Mutex::new(false),
118            auto_commit: true,
119            adaptive_config,
120            factorized_execution,
121            graph_model,
122            query_timeout,
123            commit_counter,
124            gc_interval,
125            transaction_start_node_count: AtomicUsize::new(0),
126            transaction_start_edge_count: AtomicUsize::new(0),
127            #[cfg(feature = "wal")]
128            wal: None,
129            #[cfg(feature = "cdc")]
130            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
131            current_graph: parking_lot::Mutex::new(None),
132            time_zone: parking_lot::Mutex::new(None),
133            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
134            viewing_epoch_override: parking_lot::Mutex::new(None),
135            savepoints: parking_lot::Mutex::new(Vec::new()),
136            transaction_nesting_depth: parking_lot::Mutex::new(0),
137        }
138    }
139
140    /// Sets the WAL for this session (shared with the database).
141    ///
142    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
143    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
144    #[cfg(feature = "wal")]
145    pub(crate) fn set_wal(&mut self, wal: Arc<grafeo_adapters::storage::wal::LpgWal>) {
146        // Wrap the graph store so query-engine mutations are WAL-logged
147        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
148            Arc::clone(&self.store),
149            Arc::clone(&wal),
150        ));
151        self.wal = Some(wal);
152    }
153
154    /// Sets the CDC log for this session (shared with the database).
155    #[cfg(feature = "cdc")]
156    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
157        self.cdc_log = cdc_log;
158    }
159
160    /// Creates a new session with RDF store and adaptive configuration.
161    #[cfg(feature = "rdf")]
162    #[allow(clippy::too_many_arguments)]
163    pub(crate) fn with_rdf_store_and_adaptive(
164        store: Arc<LpgStore>,
165        rdf_store: Arc<RdfStore>,
166        transaction_manager: Arc<TransactionManager>,
167        query_cache: Arc<QueryCache>,
168        catalog: Arc<Catalog>,
169        adaptive_config: AdaptiveConfig,
170        factorized_execution: bool,
171        graph_model: GraphModel,
172        query_timeout: Option<Duration>,
173        commit_counter: Arc<AtomicUsize>,
174        gc_interval: usize,
175    ) -> Self {
176        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
177        Self {
178            store,
179            graph_store,
180            catalog,
181            rdf_store,
182            transaction_manager,
183            query_cache,
184            current_transaction: parking_lot::Mutex::new(None),
185            read_only_tx: parking_lot::Mutex::new(false),
186            auto_commit: true,
187            adaptive_config,
188            factorized_execution,
189            graph_model,
190            query_timeout,
191            commit_counter,
192            gc_interval,
193            transaction_start_node_count: AtomicUsize::new(0),
194            transaction_start_edge_count: AtomicUsize::new(0),
195            #[cfg(feature = "wal")]
196            wal: None,
197            #[cfg(feature = "cdc")]
198            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
199            current_graph: parking_lot::Mutex::new(None),
200            time_zone: parking_lot::Mutex::new(None),
201            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
202            viewing_epoch_override: parking_lot::Mutex::new(None),
203            savepoints: parking_lot::Mutex::new(Vec::new()),
204            transaction_nesting_depth: parking_lot::Mutex::new(0),
205        }
206    }
207
208    /// Creates a session backed by an external graph store.
209    ///
210    /// The external store handles all data operations. Transaction management
211    /// (begin/commit/rollback) is not supported for external stores.
212    #[allow(clippy::too_many_arguments)]
213    pub(crate) fn with_external_store(
214        store: Arc<dyn GraphStoreMut>,
215        transaction_manager: Arc<TransactionManager>,
216        query_cache: Arc<QueryCache>,
217        catalog: Arc<Catalog>,
218        adaptive_config: AdaptiveConfig,
219        factorized_execution: bool,
220        graph_model: GraphModel,
221        query_timeout: Option<Duration>,
222        commit_counter: Arc<AtomicUsize>,
223        gc_interval: usize,
224    ) -> Self {
225        Self {
226            store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), // dummy for LpgStore-specific ops
227            graph_store: store,
228            catalog,
229            #[cfg(feature = "rdf")]
230            rdf_store: Arc::new(RdfStore::new()),
231            transaction_manager,
232            query_cache,
233            current_transaction: parking_lot::Mutex::new(None),
234            read_only_tx: parking_lot::Mutex::new(false),
235            auto_commit: true,
236            adaptive_config,
237            factorized_execution,
238            graph_model,
239            query_timeout,
240            commit_counter,
241            gc_interval,
242            transaction_start_node_count: AtomicUsize::new(0),
243            transaction_start_edge_count: AtomicUsize::new(0),
244            #[cfg(feature = "wal")]
245            wal: None,
246            #[cfg(feature = "cdc")]
247            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
248            current_graph: parking_lot::Mutex::new(None),
249            time_zone: parking_lot::Mutex::new(None),
250            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
251            viewing_epoch_override: parking_lot::Mutex::new(None),
252            savepoints: parking_lot::Mutex::new(Vec::new()),
253            transaction_nesting_depth: parking_lot::Mutex::new(0),
254        }
255    }
256
257    /// Returns the graph model this session operates on.
258    #[must_use]
259    pub fn graph_model(&self) -> GraphModel {
260        self.graph_model
261    }
262
263    // === Session State Management ===
264
265    /// Sets the current graph for this session (USE GRAPH).
266    pub fn use_graph(&self, name: &str) {
267        *self.current_graph.lock() = Some(name.to_string());
268    }
269
270    /// Returns the current graph name, if any.
271    #[must_use]
272    pub fn current_graph(&self) -> Option<String> {
273        self.current_graph.lock().clone()
274    }
275
276    /// Sets the session time zone.
277    pub fn set_time_zone(&self, tz: &str) {
278        *self.time_zone.lock() = Some(tz.to_string());
279    }
280
281    /// Returns the session time zone, if set.
282    #[must_use]
283    pub fn time_zone(&self) -> Option<String> {
284        self.time_zone.lock().clone()
285    }
286
287    /// Sets a session parameter.
288    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
289        self.session_params.lock().insert(key.to_string(), value);
290    }
291
292    /// Gets a session parameter by cloning it.
293    #[must_use]
294    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
295        self.session_params.lock().get(key).cloned()
296    }
297
298    /// Resets all session state to defaults.
299    pub fn reset_session(&self) {
300        *self.current_graph.lock() = None;
301        *self.time_zone.lock() = None;
302        self.session_params.lock().clear();
303        *self.viewing_epoch_override.lock() = None;
304    }
305
306    // --- Time-travel API ---
307
308    /// Sets a viewing epoch override for time-travel queries.
309    ///
310    /// While set, all queries on this session see the database as it existed
311    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
312    /// to return to normal behavior.
313    pub fn set_viewing_epoch(&self, epoch: EpochId) {
314        *self.viewing_epoch_override.lock() = Some(epoch);
315    }
316
317    /// Clears the viewing epoch override, returning to normal behavior.
318    pub fn clear_viewing_epoch(&self) {
319        *self.viewing_epoch_override.lock() = None;
320    }
321
322    /// Returns the current viewing epoch override, if any.
323    #[must_use]
324    pub fn viewing_epoch(&self) -> Option<EpochId> {
325        *self.viewing_epoch_override.lock()
326    }
327
328    /// Returns all versions of a node with their creation/deletion epochs.
329    ///
330    /// Properties and labels reflect the current state (not versioned per-epoch).
331    #[must_use]
332    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
333        self.store.get_node_history(id)
334    }
335
336    /// Returns all versions of an edge with their creation/deletion epochs.
337    ///
338    /// Properties reflect the current state (not versioned per-epoch).
339    #[must_use]
340    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
341        self.store.get_edge_history(id)
342    }
343
344    /// Checks that the session's graph model supports LPG operations.
345    fn require_lpg(&self, language: &str) -> Result<()> {
346        if self.graph_model == GraphModel::Rdf {
347            return Err(grafeo_common::utils::error::Error::Internal(format!(
348                "This is an RDF database. {language} queries require an LPG database."
349            )));
350        }
351        Ok(())
352    }
353
354    /// Executes a session or transaction command, returning an empty result.
355    #[cfg(feature = "gql")]
356    fn execute_session_command(
357        &self,
358        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
359    ) -> Result<QueryResult> {
360        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
361        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
362
363        match cmd {
364            SessionCommand::CreateGraph {
365                name,
366                if_not_exists,
367                typed,
368                like_graph,
369                copy_of,
370                open: _,
371            } => {
372                // Validate source graph exists for LIKE / AS COPY OF
373                if let Some(ref src) = like_graph
374                    && self.store.graph(src).is_none()
375                {
376                    return Err(Error::Query(QueryError::new(
377                        QueryErrorKind::Semantic,
378                        format!("Source graph '{src}' does not exist"),
379                    )));
380                }
381                if let Some(ref src) = copy_of
382                    && self.store.graph(src).is_none()
383                {
384                    return Err(Error::Query(QueryError::new(
385                        QueryErrorKind::Semantic,
386                        format!("Source graph '{src}' does not exist"),
387                    )));
388                }
389
390                let created = self
391                    .store
392                    .create_graph(&name)
393                    .map_err(|e| Error::Internal(e.to_string()))?;
394                if !created && !if_not_exists {
395                    return Err(Error::Query(QueryError::new(
396                        QueryErrorKind::Semantic,
397                        format!("Graph '{name}' already exists"),
398                    )));
399                }
400
401                // AS COPY OF: copy data from source graph
402                if let Some(ref src) = copy_of {
403                    self.store
404                        .copy_graph(Some(src), Some(&name))
405                        .map_err(|e| Error::Internal(e.to_string()))?;
406                }
407
408                // Bind to graph type if specified
409                if let Some(type_name) = typed
410                    && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
411                {
412                    return Err(Error::Query(QueryError::new(
413                        QueryErrorKind::Semantic,
414                        e.to_string(),
415                    )));
416                }
417
418                // LIKE: copy graph type binding from source
419                if let Some(ref src) = like_graph
420                    && let Some(src_type) = self.catalog.get_graph_type_binding(src)
421                {
422                    let _ = self.catalog.bind_graph_type(&name, src_type);
423                }
424
425                Ok(QueryResult::empty())
426            }
427            SessionCommand::DropGraph { name, if_exists } => {
428                let dropped = self.store.drop_graph(&name);
429                if !dropped && !if_exists {
430                    return Err(Error::Query(QueryError::new(
431                        QueryErrorKind::Semantic,
432                        format!("Graph '{name}' does not exist"),
433                    )));
434                }
435                Ok(QueryResult::empty())
436            }
437            SessionCommand::UseGraph(name) => {
438                // Verify graph exists (default graph is always valid)
439                if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
440                    return Err(Error::Query(QueryError::new(
441                        QueryErrorKind::Semantic,
442                        format!("Graph '{name}' does not exist"),
443                    )));
444                }
445                self.use_graph(&name);
446                Ok(QueryResult::empty())
447            }
448            SessionCommand::SessionSetGraph(name) => {
449                self.use_graph(&name);
450                Ok(QueryResult::empty())
451            }
452            SessionCommand::SessionSetTimeZone(tz) => {
453                self.set_time_zone(&tz);
454                Ok(QueryResult::empty())
455            }
456            SessionCommand::SessionSetParameter(key, expr) => {
457                if key.eq_ignore_ascii_case("viewing_epoch") {
458                    match Self::eval_integer_literal(&expr) {
459                        Some(n) if n >= 0 => {
460                            self.set_viewing_epoch(EpochId::new(n as u64));
461                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
462                        }
463                        _ => Err(Error::Query(QueryError::new(
464                            QueryErrorKind::Semantic,
465                            "viewing_epoch must be a non-negative integer literal",
466                        ))),
467                    }
468                } else {
469                    // For now, store parameter name with Null value.
470                    // Full expression evaluation would require building and executing a plan.
471                    self.set_parameter(&key, Value::Null);
472                    Ok(QueryResult::empty())
473                }
474            }
475            SessionCommand::SessionReset => {
476                self.reset_session();
477                Ok(QueryResult::empty())
478            }
479            SessionCommand::SessionClose => {
480                self.reset_session();
481                Ok(QueryResult::empty())
482            }
483            SessionCommand::StartTransaction {
484                read_only,
485                isolation_level,
486            } => {
487                let engine_level = isolation_level.map(|l| match l {
488                    TransactionIsolationLevel::ReadCommitted => {
489                        crate::transaction::IsolationLevel::ReadCommitted
490                    }
491                    TransactionIsolationLevel::SnapshotIsolation => {
492                        crate::transaction::IsolationLevel::SnapshotIsolation
493                    }
494                    TransactionIsolationLevel::Serializable => {
495                        crate::transaction::IsolationLevel::Serializable
496                    }
497                });
498                self.begin_transaction_inner(read_only, engine_level)?;
499                Ok(QueryResult::status("Transaction started"))
500            }
501            SessionCommand::Commit => {
502                self.commit_inner()?;
503                Ok(QueryResult::status("Transaction committed"))
504            }
505            SessionCommand::Rollback => {
506                self.rollback_inner()?;
507                Ok(QueryResult::status("Transaction rolled back"))
508            }
509            SessionCommand::Savepoint(name) => {
510                self.savepoint(&name)?;
511                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
512            }
513            SessionCommand::RollbackToSavepoint(name) => {
514                self.rollback_to_savepoint(&name)?;
515                Ok(QueryResult::status(format!(
516                    "Rolled back to savepoint '{name}'"
517                )))
518            }
519            SessionCommand::ReleaseSavepoint(name) => {
520                self.release_savepoint(&name)?;
521                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
522            }
523        }
524    }
525
526    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
527    #[cfg(feature = "wal")]
528    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
529        if let Some(ref wal) = self.wal
530            && let Err(e) = wal.log(record)
531        {
532            tracing::warn!("Failed to log schema change to WAL: {}", e);
533        }
534    }
535
536    /// Executes a schema DDL command, returning a status result.
537    #[cfg(feature = "gql")]
538    fn execute_schema_command(
539        &self,
540        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
541    ) -> Result<QueryResult> {
542        use crate::catalog::{
543            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
544        };
545        use grafeo_adapters::query::gql::ast::SchemaStatement;
546        #[cfg(feature = "wal")]
547        use grafeo_adapters::storage::wal::WalRecord;
548        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
549
550        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
551        macro_rules! wal_log {
552            ($self:expr, $record:expr) => {
553                #[cfg(feature = "wal")]
554                $self.log_schema_wal(&$record);
555            };
556        }
557
558        let result = match cmd {
559            SchemaStatement::CreateNodeType(stmt) => {
560                #[cfg(feature = "wal")]
561                let props_for_wal: Vec<(String, String, bool)> = stmt
562                    .properties
563                    .iter()
564                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
565                    .collect();
566                let def = NodeTypeDefinition {
567                    name: stmt.name.clone(),
568                    properties: stmt
569                        .properties
570                        .iter()
571                        .map(|p| TypedProperty {
572                            name: p.name.clone(),
573                            data_type: PropertyDataType::from_type_name(&p.data_type),
574                            nullable: p.nullable,
575                            default_value: None,
576                        })
577                        .collect(),
578                    constraints: Vec::new(),
579                };
580                let result = if stmt.or_replace {
581                    let _ = self.catalog.drop_node_type(&stmt.name);
582                    self.catalog.register_node_type(def)
583                } else {
584                    self.catalog.register_node_type(def)
585                };
586                match result {
587                    Ok(()) => {
588                        wal_log!(
589                            self,
590                            WalRecord::CreateNodeType {
591                                name: stmt.name.clone(),
592                                properties: props_for_wal,
593                                constraints: Vec::new(),
594                            }
595                        );
596                        Ok(QueryResult::status(format!(
597                            "Created node type '{}'",
598                            stmt.name
599                        )))
600                    }
601                    Err(e) if stmt.if_not_exists => {
602                        let _ = e;
603                        Ok(QueryResult::status("No change"))
604                    }
605                    Err(e) => Err(Error::Query(QueryError::new(
606                        QueryErrorKind::Semantic,
607                        e.to_string(),
608                    ))),
609                }
610            }
611            SchemaStatement::CreateEdgeType(stmt) => {
612                #[cfg(feature = "wal")]
613                let props_for_wal: Vec<(String, String, bool)> = stmt
614                    .properties
615                    .iter()
616                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
617                    .collect();
618                let def = EdgeTypeDefinition {
619                    name: stmt.name.clone(),
620                    properties: stmt
621                        .properties
622                        .iter()
623                        .map(|p| TypedProperty {
624                            name: p.name.clone(),
625                            data_type: PropertyDataType::from_type_name(&p.data_type),
626                            nullable: p.nullable,
627                            default_value: None,
628                        })
629                        .collect(),
630                    constraints: Vec::new(),
631                };
632                let result = if stmt.or_replace {
633                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
634                    self.catalog.register_edge_type_def(def)
635                } else {
636                    self.catalog.register_edge_type_def(def)
637                };
638                match result {
639                    Ok(()) => {
640                        wal_log!(
641                            self,
642                            WalRecord::CreateEdgeType {
643                                name: stmt.name.clone(),
644                                properties: props_for_wal,
645                                constraints: Vec::new(),
646                            }
647                        );
648                        Ok(QueryResult::status(format!(
649                            "Created edge type '{}'",
650                            stmt.name
651                        )))
652                    }
653                    Err(e) if stmt.if_not_exists => {
654                        let _ = e;
655                        Ok(QueryResult::status("No change"))
656                    }
657                    Err(e) => Err(Error::Query(QueryError::new(
658                        QueryErrorKind::Semantic,
659                        e.to_string(),
660                    ))),
661                }
662            }
663            SchemaStatement::CreateVectorIndex(stmt) => {
664                Self::create_vector_index_on_store(
665                    &self.store,
666                    &stmt.node_label,
667                    &stmt.property,
668                    stmt.dimensions,
669                    stmt.metric.as_deref(),
670                )?;
671                wal_log!(
672                    self,
673                    WalRecord::CreateIndex {
674                        name: stmt.name.clone(),
675                        label: stmt.node_label.clone(),
676                        property: stmt.property.clone(),
677                        index_type: "vector".to_string(),
678                    }
679                );
680                Ok(QueryResult::status(format!(
681                    "Created vector index '{}'",
682                    stmt.name
683                )))
684            }
685            SchemaStatement::DropNodeType { name, if_exists } => {
686                match self.catalog.drop_node_type(&name) {
687                    Ok(()) => {
688                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
689                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
690                    }
691                    Err(e) if if_exists => {
692                        let _ = e;
693                        Ok(QueryResult::status("No change"))
694                    }
695                    Err(e) => Err(Error::Query(QueryError::new(
696                        QueryErrorKind::Semantic,
697                        e.to_string(),
698                    ))),
699                }
700            }
701            SchemaStatement::DropEdgeType { name, if_exists } => {
702                match self.catalog.drop_edge_type_def(&name) {
703                    Ok(()) => {
704                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
705                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
706                    }
707                    Err(e) if if_exists => {
708                        let _ = e;
709                        Ok(QueryResult::status("No change"))
710                    }
711                    Err(e) => Err(Error::Query(QueryError::new(
712                        QueryErrorKind::Semantic,
713                        e.to_string(),
714                    ))),
715                }
716            }
717            SchemaStatement::CreateIndex(stmt) => {
718                use grafeo_adapters::query::gql::ast::IndexKind;
719                let index_type_str = match stmt.index_kind {
720                    IndexKind::Property => "property",
721                    IndexKind::BTree => "btree",
722                    IndexKind::Text => "text",
723                    IndexKind::Vector => "vector",
724                };
725                match stmt.index_kind {
726                    IndexKind::Property | IndexKind::BTree => {
727                        for prop in &stmt.properties {
728                            self.store.create_property_index(prop);
729                        }
730                    }
731                    IndexKind::Text => {
732                        for prop in &stmt.properties {
733                            Self::create_text_index_on_store(&self.store, &stmt.label, prop)?;
734                        }
735                    }
736                    IndexKind::Vector => {
737                        for prop in &stmt.properties {
738                            Self::create_vector_index_on_store(
739                                &self.store,
740                                &stmt.label,
741                                prop,
742                                stmt.options.dimensions,
743                                stmt.options.metric.as_deref(),
744                            )?;
745                        }
746                    }
747                }
748                #[cfg(feature = "wal")]
749                for prop in &stmt.properties {
750                    wal_log!(
751                        self,
752                        WalRecord::CreateIndex {
753                            name: stmt.name.clone(),
754                            label: stmt.label.clone(),
755                            property: prop.clone(),
756                            index_type: index_type_str.to_string(),
757                        }
758                    );
759                }
760                Ok(QueryResult::status(format!(
761                    "Created {} index '{}'",
762                    index_type_str, stmt.name
763                )))
764            }
765            SchemaStatement::DropIndex { name, if_exists } => {
766                // Try to drop property index by name
767                let dropped = self.store.drop_property_index(&name);
768                if dropped || if_exists {
769                    if dropped {
770                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
771                    }
772                    Ok(QueryResult::status(if dropped {
773                        format!("Dropped index '{name}'")
774                    } else {
775                        "No change".to_string()
776                    }))
777                } else {
778                    Err(Error::Query(QueryError::new(
779                        QueryErrorKind::Semantic,
780                        format!("Index '{name}' does not exist"),
781                    )))
782                }
783            }
784            SchemaStatement::CreateConstraint(stmt) => {
785                use grafeo_adapters::query::gql::ast::ConstraintKind;
786                let kind_str = match stmt.constraint_kind {
787                    ConstraintKind::Unique => "unique",
788                    ConstraintKind::NodeKey => "node_key",
789                    ConstraintKind::NotNull => "not_null",
790                    ConstraintKind::Exists => "exists",
791                };
792                let constraint_name = stmt
793                    .name
794                    .clone()
795                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
796                wal_log!(
797                    self,
798                    WalRecord::CreateConstraint {
799                        name: constraint_name.clone(),
800                        label: stmt.label.clone(),
801                        properties: stmt.properties.clone(),
802                        kind: kind_str.to_string(),
803                    }
804                );
805                Ok(QueryResult::status(format!(
806                    "Created {kind_str} constraint '{constraint_name}'"
807                )))
808            }
809            SchemaStatement::DropConstraint { name, if_exists } => {
810                let _ = if_exists;
811                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
812                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
813            }
814            SchemaStatement::CreateGraphType(stmt) => {
815                use crate::catalog::GraphTypeDefinition;
816                use grafeo_adapters::query::gql::ast::InlineElementType;
817
818                // GG04: LIKE clause copies type from existing graph
819                let (mut node_types, mut edge_types, open) =
820                    if let Some(ref like_graph) = stmt.like_graph {
821                        // Infer types from the graph's bound type, or use its existing types
822                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
823                            if let Some(existing) = self
824                                .catalog
825                                .schema()
826                                .and_then(|s| s.get_graph_type(&type_name))
827                            {
828                                (
829                                    existing.allowed_node_types.clone(),
830                                    existing.allowed_edge_types.clone(),
831                                    existing.open,
832                                )
833                            } else {
834                                (Vec::new(), Vec::new(), true)
835                            }
836                        } else {
837                            // GG22: Infer from graph data (labels used in graph)
838                            let nt = self.catalog.all_node_type_names();
839                            let et = self.catalog.all_edge_type_names();
840                            if nt.is_empty() && et.is_empty() {
841                                (Vec::new(), Vec::new(), true)
842                            } else {
843                                (nt, et, false)
844                            }
845                        }
846                    } else {
847                        (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
848                    };
849
850                // GG03: Register inline element types and add their names
851                for inline in &stmt.inline_types {
852                    match inline {
853                        InlineElementType::Node {
854                            name, properties, ..
855                        } => {
856                            let def = NodeTypeDefinition {
857                                name: name.clone(),
858                                properties: properties
859                                    .iter()
860                                    .map(|p| TypedProperty {
861                                        name: p.name.clone(),
862                                        data_type: PropertyDataType::from_type_name(&p.data_type),
863                                        nullable: p.nullable,
864                                        default_value: None,
865                                    })
866                                    .collect(),
867                                constraints: Vec::new(),
868                            };
869                            // Register or replace so inline defs override existing
870                            self.catalog.register_or_replace_node_type(def);
871                            #[cfg(feature = "wal")]
872                            {
873                                let props_for_wal: Vec<(String, String, bool)> = properties
874                                    .iter()
875                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
876                                    .collect();
877                                self.log_schema_wal(&WalRecord::CreateNodeType {
878                                    name: name.clone(),
879                                    properties: props_for_wal,
880                                    constraints: Vec::new(),
881                                });
882                            }
883                            if !node_types.contains(name) {
884                                node_types.push(name.clone());
885                            }
886                        }
887                        InlineElementType::Edge {
888                            name, properties, ..
889                        } => {
890                            let def = EdgeTypeDefinition {
891                                name: name.clone(),
892                                properties: properties
893                                    .iter()
894                                    .map(|p| TypedProperty {
895                                        name: p.name.clone(),
896                                        data_type: PropertyDataType::from_type_name(&p.data_type),
897                                        nullable: p.nullable,
898                                        default_value: None,
899                                    })
900                                    .collect(),
901                                constraints: Vec::new(),
902                            };
903                            self.catalog.register_or_replace_edge_type_def(def);
904                            #[cfg(feature = "wal")]
905                            {
906                                let props_for_wal: Vec<(String, String, bool)> = properties
907                                    .iter()
908                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
909                                    .collect();
910                                self.log_schema_wal(&WalRecord::CreateEdgeType {
911                                    name: name.clone(),
912                                    properties: props_for_wal,
913                                    constraints: Vec::new(),
914                                });
915                            }
916                            if !edge_types.contains(name) {
917                                edge_types.push(name.clone());
918                            }
919                        }
920                    }
921                }
922
923                let def = GraphTypeDefinition {
924                    name: stmt.name.clone(),
925                    allowed_node_types: node_types.clone(),
926                    allowed_edge_types: edge_types.clone(),
927                    open,
928                };
929                let result = if stmt.or_replace {
930                    // Drop existing first, ignore error if not found
931                    let _ = self.catalog.drop_graph_type(&stmt.name);
932                    self.catalog.register_graph_type(def)
933                } else {
934                    self.catalog.register_graph_type(def)
935                };
936                match result {
937                    Ok(()) => {
938                        wal_log!(
939                            self,
940                            WalRecord::CreateGraphType {
941                                name: stmt.name.clone(),
942                                node_types,
943                                edge_types,
944                                open,
945                            }
946                        );
947                        Ok(QueryResult::status(format!(
948                            "Created graph type '{}'",
949                            stmt.name
950                        )))
951                    }
952                    Err(e) if stmt.if_not_exists => {
953                        let _ = e;
954                        Ok(QueryResult::status("No change"))
955                    }
956                    Err(e) => Err(Error::Query(QueryError::new(
957                        QueryErrorKind::Semantic,
958                        e.to_string(),
959                    ))),
960                }
961            }
962            SchemaStatement::DropGraphType { name, if_exists } => {
963                match self.catalog.drop_graph_type(&name) {
964                    Ok(()) => {
965                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
966                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
967                    }
968                    Err(e) if if_exists => {
969                        let _ = e;
970                        Ok(QueryResult::status("No change"))
971                    }
972                    Err(e) => Err(Error::Query(QueryError::new(
973                        QueryErrorKind::Semantic,
974                        e.to_string(),
975                    ))),
976                }
977            }
978            SchemaStatement::CreateSchema {
979                name,
980                if_not_exists,
981            } => match self.catalog.register_schema_namespace(name.clone()) {
982                Ok(()) => {
983                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
984                    Ok(QueryResult::status(format!("Created schema '{name}'")))
985                }
986                Err(e) if if_not_exists => {
987                    let _ = e;
988                    Ok(QueryResult::status("No change"))
989                }
990                Err(e) => Err(Error::Query(QueryError::new(
991                    QueryErrorKind::Semantic,
992                    e.to_string(),
993                ))),
994            },
995            SchemaStatement::DropSchema { name, if_exists } => {
996                match self.catalog.drop_schema_namespace(&name) {
997                    Ok(()) => {
998                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
999                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1000                    }
1001                    Err(e) if if_exists => {
1002                        let _ = e;
1003                        Ok(QueryResult::status("No change"))
1004                    }
1005                    Err(e) => Err(Error::Query(QueryError::new(
1006                        QueryErrorKind::Semantic,
1007                        e.to_string(),
1008                    ))),
1009                }
1010            }
1011            SchemaStatement::AlterNodeType(stmt) => {
1012                use grafeo_adapters::query::gql::ast::TypeAlteration;
1013                let mut wal_alts = Vec::new();
1014                for alt in &stmt.alterations {
1015                    match alt {
1016                        TypeAlteration::AddProperty(prop) => {
1017                            let typed = TypedProperty {
1018                                name: prop.name.clone(),
1019                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1020                                nullable: prop.nullable,
1021                                default_value: None,
1022                            };
1023                            self.catalog
1024                                .alter_node_type_add_property(&stmt.name, typed)
1025                                .map_err(|e| {
1026                                    Error::Query(QueryError::new(
1027                                        QueryErrorKind::Semantic,
1028                                        e.to_string(),
1029                                    ))
1030                                })?;
1031                            wal_alts.push((
1032                                "add".to_string(),
1033                                prop.name.clone(),
1034                                prop.data_type.clone(),
1035                                prop.nullable,
1036                            ));
1037                        }
1038                        TypeAlteration::DropProperty(name) => {
1039                            self.catalog
1040                                .alter_node_type_drop_property(&stmt.name, name)
1041                                .map_err(|e| {
1042                                    Error::Query(QueryError::new(
1043                                        QueryErrorKind::Semantic,
1044                                        e.to_string(),
1045                                    ))
1046                                })?;
1047                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1048                        }
1049                    }
1050                }
1051                wal_log!(
1052                    self,
1053                    WalRecord::AlterNodeType {
1054                        name: stmt.name.clone(),
1055                        alterations: wal_alts,
1056                    }
1057                );
1058                Ok(QueryResult::status(format!(
1059                    "Altered node type '{}'",
1060                    stmt.name
1061                )))
1062            }
1063            SchemaStatement::AlterEdgeType(stmt) => {
1064                use grafeo_adapters::query::gql::ast::TypeAlteration;
1065                let mut wal_alts = Vec::new();
1066                for alt in &stmt.alterations {
1067                    match alt {
1068                        TypeAlteration::AddProperty(prop) => {
1069                            let typed = TypedProperty {
1070                                name: prop.name.clone(),
1071                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1072                                nullable: prop.nullable,
1073                                default_value: None,
1074                            };
1075                            self.catalog
1076                                .alter_edge_type_add_property(&stmt.name, typed)
1077                                .map_err(|e| {
1078                                    Error::Query(QueryError::new(
1079                                        QueryErrorKind::Semantic,
1080                                        e.to_string(),
1081                                    ))
1082                                })?;
1083                            wal_alts.push((
1084                                "add".to_string(),
1085                                prop.name.clone(),
1086                                prop.data_type.clone(),
1087                                prop.nullable,
1088                            ));
1089                        }
1090                        TypeAlteration::DropProperty(name) => {
1091                            self.catalog
1092                                .alter_edge_type_drop_property(&stmt.name, name)
1093                                .map_err(|e| {
1094                                    Error::Query(QueryError::new(
1095                                        QueryErrorKind::Semantic,
1096                                        e.to_string(),
1097                                    ))
1098                                })?;
1099                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1100                        }
1101                    }
1102                }
1103                wal_log!(
1104                    self,
1105                    WalRecord::AlterEdgeType {
1106                        name: stmt.name.clone(),
1107                        alterations: wal_alts,
1108                    }
1109                );
1110                Ok(QueryResult::status(format!(
1111                    "Altered edge type '{}'",
1112                    stmt.name
1113                )))
1114            }
1115            SchemaStatement::AlterGraphType(stmt) => {
1116                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1117                let mut wal_alts = Vec::new();
1118                for alt in &stmt.alterations {
1119                    match alt {
1120                        GraphTypeAlteration::AddNodeType(name) => {
1121                            self.catalog
1122                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
1123                                .map_err(|e| {
1124                                    Error::Query(QueryError::new(
1125                                        QueryErrorKind::Semantic,
1126                                        e.to_string(),
1127                                    ))
1128                                })?;
1129                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1130                        }
1131                        GraphTypeAlteration::DropNodeType(name) => {
1132                            self.catalog
1133                                .alter_graph_type_drop_node_type(&stmt.name, name)
1134                                .map_err(|e| {
1135                                    Error::Query(QueryError::new(
1136                                        QueryErrorKind::Semantic,
1137                                        e.to_string(),
1138                                    ))
1139                                })?;
1140                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1141                        }
1142                        GraphTypeAlteration::AddEdgeType(name) => {
1143                            self.catalog
1144                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1145                                .map_err(|e| {
1146                                    Error::Query(QueryError::new(
1147                                        QueryErrorKind::Semantic,
1148                                        e.to_string(),
1149                                    ))
1150                                })?;
1151                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1152                        }
1153                        GraphTypeAlteration::DropEdgeType(name) => {
1154                            self.catalog
1155                                .alter_graph_type_drop_edge_type(&stmt.name, name)
1156                                .map_err(|e| {
1157                                    Error::Query(QueryError::new(
1158                                        QueryErrorKind::Semantic,
1159                                        e.to_string(),
1160                                    ))
1161                                })?;
1162                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1163                        }
1164                    }
1165                }
1166                wal_log!(
1167                    self,
1168                    WalRecord::AlterGraphType {
1169                        name: stmt.name.clone(),
1170                        alterations: wal_alts,
1171                    }
1172                );
1173                Ok(QueryResult::status(format!(
1174                    "Altered graph type '{}'",
1175                    stmt.name
1176                )))
1177            }
1178            SchemaStatement::CreateProcedure(stmt) => {
1179                use crate::catalog::ProcedureDefinition;
1180
1181                let def = ProcedureDefinition {
1182                    name: stmt.name.clone(),
1183                    params: stmt
1184                        .params
1185                        .iter()
1186                        .map(|p| (p.name.clone(), p.param_type.clone()))
1187                        .collect(),
1188                    returns: stmt
1189                        .returns
1190                        .iter()
1191                        .map(|r| (r.name.clone(), r.return_type.clone()))
1192                        .collect(),
1193                    body: stmt.body.clone(),
1194                };
1195
1196                if stmt.or_replace {
1197                    self.catalog.replace_procedure(def).map_err(|e| {
1198                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1199                    })?;
1200                } else {
1201                    match self.catalog.register_procedure(def) {
1202                        Ok(()) => {}
1203                        Err(_) if stmt.if_not_exists => {
1204                            return Ok(QueryResult::empty());
1205                        }
1206                        Err(e) => {
1207                            return Err(Error::Query(QueryError::new(
1208                                QueryErrorKind::Semantic,
1209                                e.to_string(),
1210                            )));
1211                        }
1212                    }
1213                }
1214
1215                wal_log!(
1216                    self,
1217                    WalRecord::CreateProcedure {
1218                        name: stmt.name.clone(),
1219                        params: stmt
1220                            .params
1221                            .iter()
1222                            .map(|p| (p.name.clone(), p.param_type.clone()))
1223                            .collect(),
1224                        returns: stmt
1225                            .returns
1226                            .iter()
1227                            .map(|r| (r.name.clone(), r.return_type.clone()))
1228                            .collect(),
1229                        body: stmt.body,
1230                    }
1231                );
1232                Ok(QueryResult::status(format!(
1233                    "Created procedure '{}'",
1234                    stmt.name
1235                )))
1236            }
1237            SchemaStatement::DropProcedure { name, if_exists } => {
1238                match self.catalog.drop_procedure(&name) {
1239                    Ok(()) => {}
1240                    Err(_) if if_exists => {
1241                        return Ok(QueryResult::empty());
1242                    }
1243                    Err(e) => {
1244                        return Err(Error::Query(QueryError::new(
1245                            QueryErrorKind::Semantic,
1246                            e.to_string(),
1247                        )));
1248                    }
1249                }
1250                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1251                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1252            }
1253        };
1254
1255        // Invalidate all cached query plans after any successful DDL change.
1256        // DDL is rare, so clearing the entire cache is cheap and correct.
1257        if result.is_ok() {
1258            self.query_cache.clear();
1259        }
1260
1261        result
1262    }
1263
1264    /// Creates a vector index on the store by scanning existing nodes.
1265    #[cfg(all(feature = "gql", feature = "vector-index"))]
1266    fn create_vector_index_on_store(
1267        store: &LpgStore,
1268        label: &str,
1269        property: &str,
1270        dimensions: Option<usize>,
1271        metric: Option<&str>,
1272    ) -> Result<()> {
1273        use grafeo_common::types::{PropertyKey, Value};
1274        use grafeo_common::utils::error::Error;
1275        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1276
1277        let metric = match metric {
1278            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1279                Error::Internal(format!(
1280                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1281                ))
1282            })?,
1283            None => DistanceMetric::Cosine,
1284        };
1285
1286        let prop_key = PropertyKey::new(property);
1287        let mut found_dims: Option<usize> = dimensions;
1288        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1289
1290        for node in store.nodes_with_label(label) {
1291            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1292                if let Some(expected) = found_dims {
1293                    if v.len() != expected {
1294                        return Err(Error::Internal(format!(
1295                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1296                            v.len(),
1297                            node.id.0
1298                        )));
1299                    }
1300                } else {
1301                    found_dims = Some(v.len());
1302                }
1303                vectors.push((node.id, v.to_vec()));
1304            }
1305        }
1306
1307        let Some(dims) = found_dims else {
1308            return Err(Error::Internal(format!(
1309                "No vector properties found on :{label}({property}) and no dimensions specified"
1310            )));
1311        };
1312
1313        let config = HnswConfig::new(dims, metric);
1314        let index = HnswIndex::with_capacity(config, vectors.len());
1315        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1316        for (node_id, vec) in &vectors {
1317            index.insert(*node_id, vec, &accessor);
1318        }
1319
1320        store.add_vector_index(label, property, Arc::new(index));
1321        Ok(())
1322    }
1323
1324    /// Stub for when vector-index feature is not enabled.
1325    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1326    fn create_vector_index_on_store(
1327        _store: &LpgStore,
1328        _label: &str,
1329        _property: &str,
1330        _dimensions: Option<usize>,
1331        _metric: Option<&str>,
1332    ) -> Result<()> {
1333        Err(grafeo_common::utils::error::Error::Internal(
1334            "Vector index support requires the 'vector-index' feature".to_string(),
1335        ))
1336    }
1337
1338    /// Creates a text index on the store by scanning existing nodes.
1339    #[cfg(all(feature = "gql", feature = "text-index"))]
1340    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1341        use grafeo_common::types::{PropertyKey, Value};
1342        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1343
1344        let mut index = InvertedIndex::new(BM25Config::default());
1345        let prop_key = PropertyKey::new(property);
1346
1347        let nodes = store.nodes_by_label(label);
1348        for node_id in nodes {
1349            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1350                index.insert(node_id, text.as_str());
1351            }
1352        }
1353
1354        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1355        Ok(())
1356    }
1357
1358    /// Stub for when text-index feature is not enabled.
1359    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1360    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1361        Err(grafeo_common::utils::error::Error::Internal(
1362            "Text index support requires the 'text-index' feature".to_string(),
1363        ))
1364    }
1365
1366    /// Returns a table of all indexes from the catalog.
1367    fn execute_show_indexes(&self) -> Result<QueryResult> {
1368        let indexes = self.catalog.all_indexes();
1369        let columns = vec![
1370            "name".to_string(),
1371            "type".to_string(),
1372            "label".to_string(),
1373            "property".to_string(),
1374        ];
1375        let rows: Vec<Vec<Value>> = indexes
1376            .into_iter()
1377            .map(|def| {
1378                let label_name = self
1379                    .catalog
1380                    .get_label_name(def.label)
1381                    .unwrap_or_else(|| "?".into());
1382                let prop_name = self
1383                    .catalog
1384                    .get_property_key_name(def.property_key)
1385                    .unwrap_or_else(|| "?".into());
1386                vec![
1387                    Value::from(format!("idx_{}_{}", label_name, prop_name)),
1388                    Value::from(format!("{:?}", def.index_type)),
1389                    Value::from(&*label_name),
1390                    Value::from(&*prop_name),
1391                ]
1392            })
1393            .collect();
1394        Ok(QueryResult {
1395            columns,
1396            column_types: Vec::new(),
1397            rows,
1398            ..QueryResult::empty()
1399        })
1400    }
1401
1402    /// Returns a table of all constraints (currently metadata-only).
1403    fn execute_show_constraints(&self) -> Result<QueryResult> {
1404        // Constraints are tracked in WAL but not yet in a queryable catalog.
1405        // Return an empty table with the expected schema.
1406        Ok(QueryResult {
1407            columns: vec![
1408                "name".to_string(),
1409                "type".to_string(),
1410                "label".to_string(),
1411                "properties".to_string(),
1412            ],
1413            column_types: Vec::new(),
1414            rows: Vec::new(),
1415            ..QueryResult::empty()
1416        })
1417    }
1418
1419    /// Executes a GQL query.
1420    ///
1421    /// # Errors
1422    ///
1423    /// Returns an error if the query fails to parse or execute.
1424    ///
1425    /// # Examples
1426    ///
1427    /// ```no_run
1428    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1429    /// use grafeo_engine::GrafeoDB;
1430    ///
1431    /// let db = GrafeoDB::new_in_memory();
1432    /// let session = db.session();
1433    ///
1434    /// // Create a node
1435    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
1436    ///
1437    /// // Query nodes
1438    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
1439    /// for row in &result.rows {
1440    ///     println!("{:?}", row);
1441    /// }
1442    /// # Ok(())
1443    /// # }
1444    /// ```
1445    #[cfg(feature = "gql")]
1446    pub fn execute(&self, query: &str) -> Result<QueryResult> {
1447        self.require_lpg("GQL")?;
1448
1449        use crate::query::{
1450            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1451            processor::QueryLanguage, translators::gql,
1452        };
1453
1454        #[cfg(not(target_arch = "wasm32"))]
1455        let start_time = std::time::Instant::now();
1456
1457        // Parse and translate, checking for session/schema commands first
1458        let translation = gql::translate_full(query)?;
1459        let logical_plan = match translation {
1460            gql::GqlTranslationResult::SessionCommand(cmd) => {
1461                return self.execute_session_command(cmd);
1462            }
1463            gql::GqlTranslationResult::SchemaCommand(cmd) => {
1464                // All DDL is a write operation
1465                if *self.read_only_tx.lock() {
1466                    return Err(grafeo_common::utils::error::Error::Transaction(
1467                        grafeo_common::utils::error::TransactionError::ReadOnly,
1468                    ));
1469                }
1470                return self.execute_schema_command(cmd);
1471            }
1472            gql::GqlTranslationResult::Plan(plan) => {
1473                // Block mutations in read-only transactions
1474                if *self.read_only_tx.lock() && plan.root.has_mutations() {
1475                    return Err(grafeo_common::utils::error::Error::Transaction(
1476                        grafeo_common::utils::error::TransactionError::ReadOnly,
1477                    ));
1478                }
1479                plan
1480            }
1481        };
1482
1483        // Create cache key for this query
1484        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
1485
1486        // Try to get cached optimized plan, or use the plan we just translated
1487        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1488            cached_plan
1489        } else {
1490            // Semantic validation
1491            let mut binder = Binder::new();
1492            let _binding_context = binder.bind(&logical_plan)?;
1493
1494            // Optimize the plan
1495            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1496            let plan = optimizer.optimize(logical_plan)?;
1497
1498            // Cache the optimized plan for future use
1499            self.query_cache.put_optimized(cache_key, plan.clone());
1500
1501            plan
1502        };
1503
1504        // EXPLAIN: annotate pushdown hints and return the plan tree
1505        if optimized_plan.explain {
1506            use crate::query::processor::{annotate_pushdown_hints, explain_result};
1507            let mut plan = optimized_plan;
1508            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
1509            return Ok(explain_result(&plan));
1510        }
1511
1512        // PROFILE: execute with per-operator instrumentation
1513        if optimized_plan.profile {
1514            let has_mutations = optimized_plan.root.has_mutations();
1515            return self.with_auto_commit(has_mutations, || {
1516                let (viewing_epoch, transaction_id) = self.get_transaction_context();
1517                let planner = self.create_planner(viewing_epoch, transaction_id);
1518                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
1519
1520                let executor = Executor::with_columns(physical_plan.columns.clone())
1521                    .with_deadline(self.query_deadline());
1522                let _result = executor.execute(physical_plan.operator.as_mut())?;
1523
1524                let total_time_ms;
1525                #[cfg(not(target_arch = "wasm32"))]
1526                {
1527                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1528                }
1529                #[cfg(target_arch = "wasm32")]
1530                {
1531                    total_time_ms = 0.0;
1532                }
1533
1534                let profile_tree = crate::query::profile::build_profile_tree(
1535                    &optimized_plan.root,
1536                    &mut entries.into_iter(),
1537                );
1538                Ok(crate::query::profile::profile_result(
1539                    &profile_tree,
1540                    total_time_ms,
1541                ))
1542            });
1543        }
1544
1545        let has_mutations = optimized_plan.root.has_mutations();
1546
1547        self.with_auto_commit(has_mutations, || {
1548            // Get transaction context for MVCC visibility
1549            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1550
1551            // Convert to physical plan with transaction context
1552            // (Physical planning cannot be cached as it depends on transaction state)
1553            let planner = self.create_planner(viewing_epoch, transaction_id);
1554            let mut physical_plan = planner.plan(&optimized_plan)?;
1555
1556            // Execute the plan
1557            let executor = Executor::with_columns(physical_plan.columns.clone())
1558                .with_deadline(self.query_deadline());
1559            let mut result = executor.execute(physical_plan.operator.as_mut())?;
1560
1561            // Add execution metrics
1562            let rows_scanned = result.rows.len() as u64;
1563            #[cfg(not(target_arch = "wasm32"))]
1564            {
1565                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1566                result.execution_time_ms = Some(elapsed_ms);
1567            }
1568            result.rows_scanned = Some(rows_scanned);
1569
1570            Ok(result)
1571        })
1572    }
1573
1574    /// Executes a GQL query with visibility at the specified epoch.
1575    ///
1576    /// This enables time-travel queries: the query sees the database
1577    /// as it existed at the given epoch.
1578    ///
1579    /// # Errors
1580    ///
1581    /// Returns an error if parsing or execution fails.
1582    #[cfg(feature = "gql")]
1583    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
1584        let previous = self.viewing_epoch_override.lock().replace(epoch);
1585        let result = self.execute(query);
1586        *self.viewing_epoch_override.lock() = previous;
1587        result
1588    }
1589
1590    /// Executes a GQL query with parameters.
1591    ///
1592    /// # Errors
1593    ///
1594    /// Returns an error if the query fails to parse or execute.
1595    #[cfg(feature = "gql")]
1596    pub fn execute_with_params(
1597        &self,
1598        query: &str,
1599        params: std::collections::HashMap<String, Value>,
1600    ) -> Result<QueryResult> {
1601        self.require_lpg("GQL")?;
1602
1603        use crate::query::processor::{QueryLanguage, QueryProcessor};
1604
1605        let has_mutations = Self::query_looks_like_mutation(query);
1606
1607        self.with_auto_commit(has_mutations, || {
1608            // Get transaction context for MVCC visibility
1609            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1610
1611            // Create processor with transaction context
1612            let processor = QueryProcessor::for_graph_store_with_transaction(
1613                Arc::clone(&self.graph_store),
1614                Arc::clone(&self.transaction_manager),
1615            );
1616
1617            // Apply transaction context if in a transaction
1618            let processor = if let Some(transaction_id) = transaction_id {
1619                processor.with_transaction_context(viewing_epoch, transaction_id)
1620            } else {
1621                processor
1622            };
1623
1624            processor.process(query, QueryLanguage::Gql, Some(&params))
1625        })
1626    }
1627
1628    /// Executes a GQL query with parameters.
1629    ///
1630    /// # Errors
1631    ///
1632    /// Returns an error if no query language is enabled.
1633    #[cfg(not(any(feature = "gql", feature = "cypher")))]
1634    pub fn execute_with_params(
1635        &self,
1636        _query: &str,
1637        _params: std::collections::HashMap<String, Value>,
1638    ) -> Result<QueryResult> {
1639        Err(grafeo_common::utils::error::Error::Internal(
1640            "No query language enabled".to_string(),
1641        ))
1642    }
1643
1644    /// Executes a GQL query.
1645    ///
1646    /// # Errors
1647    ///
1648    /// Returns an error if no query language is enabled.
1649    #[cfg(not(any(feature = "gql", feature = "cypher")))]
1650    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
1651        Err(grafeo_common::utils::error::Error::Internal(
1652            "No query language enabled".to_string(),
1653        ))
1654    }
1655
1656    /// Executes a Cypher query.
1657    ///
1658    /// # Errors
1659    ///
1660    /// Returns an error if the query fails to parse or execute.
1661    #[cfg(feature = "cypher")]
1662    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
1663        use crate::query::{
1664            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1665            processor::QueryLanguage, translators::cypher,
1666        };
1667        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
1668
1669        // Handle schema DDL and SHOW commands before the normal query path
1670        let translation = cypher::translate_full(query)?;
1671        match translation {
1672            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
1673                if *self.read_only_tx.lock() {
1674                    return Err(GrafeoError::Query(QueryError::new(
1675                        QueryErrorKind::Semantic,
1676                        "Cannot execute schema DDL in a read-only transaction",
1677                    )));
1678                }
1679                return self.execute_schema_command(cmd);
1680            }
1681            cypher::CypherTranslationResult::ShowIndexes => {
1682                return self.execute_show_indexes();
1683            }
1684            cypher::CypherTranslationResult::ShowConstraints => {
1685                return self.execute_show_constraints();
1686            }
1687            cypher::CypherTranslationResult::Plan(_) => {
1688                // Fall through to normal execution below
1689            }
1690        }
1691
1692        #[cfg(not(target_arch = "wasm32"))]
1693        let start_time = std::time::Instant::now();
1694
1695        // Create cache key for this query
1696        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
1697
1698        // Try to get cached optimized plan
1699        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1700            cached_plan
1701        } else {
1702            // Parse and translate the query to a logical plan
1703            let logical_plan = cypher::translate(query)?;
1704
1705            // Semantic validation
1706            let mut binder = Binder::new();
1707            let _binding_context = binder.bind(&logical_plan)?;
1708
1709            // Optimize the plan
1710            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1711            let plan = optimizer.optimize(logical_plan)?;
1712
1713            // Cache the optimized plan
1714            self.query_cache.put_optimized(cache_key, plan.clone());
1715
1716            plan
1717        };
1718
1719        // EXPLAIN
1720        if optimized_plan.explain {
1721            use crate::query::processor::{annotate_pushdown_hints, explain_result};
1722            let mut plan = optimized_plan;
1723            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
1724            return Ok(explain_result(&plan));
1725        }
1726
1727        // PROFILE
1728        if optimized_plan.profile {
1729            let has_mutations = optimized_plan.root.has_mutations();
1730            return self.with_auto_commit(has_mutations, || {
1731                let (viewing_epoch, transaction_id) = self.get_transaction_context();
1732                let planner = self.create_planner(viewing_epoch, transaction_id);
1733                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
1734
1735                let executor = Executor::with_columns(physical_plan.columns.clone())
1736                    .with_deadline(self.query_deadline());
1737                let _result = executor.execute(physical_plan.operator.as_mut())?;
1738
1739                let total_time_ms;
1740                #[cfg(not(target_arch = "wasm32"))]
1741                {
1742                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1743                }
1744                #[cfg(target_arch = "wasm32")]
1745                {
1746                    total_time_ms = 0.0;
1747                }
1748
1749                let profile_tree = crate::query::profile::build_profile_tree(
1750                    &optimized_plan.root,
1751                    &mut entries.into_iter(),
1752                );
1753                Ok(crate::query::profile::profile_result(
1754                    &profile_tree,
1755                    total_time_ms,
1756                ))
1757            });
1758        }
1759
1760        let has_mutations = optimized_plan.root.has_mutations();
1761
1762        self.with_auto_commit(has_mutations, || {
1763            // Get transaction context for MVCC visibility
1764            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1765
1766            // Convert to physical plan with transaction context
1767            let planner = self.create_planner(viewing_epoch, transaction_id);
1768            let mut physical_plan = planner.plan(&optimized_plan)?;
1769
1770            // Execute the plan
1771            let executor = Executor::with_columns(physical_plan.columns.clone())
1772                .with_deadline(self.query_deadline());
1773            executor.execute(physical_plan.operator.as_mut())
1774        })
1775    }
1776
1777    /// Executes a Gremlin query.
1778    ///
1779    /// # Errors
1780    ///
1781    /// Returns an error if the query fails to parse or execute.
1782    ///
1783    /// # Examples
1784    ///
1785    /// ```no_run
1786    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1787    /// use grafeo_engine::GrafeoDB;
1788    ///
1789    /// let db = GrafeoDB::new_in_memory();
1790    /// let session = db.session();
1791    ///
1792    /// // Create some nodes first
1793    /// session.create_node(&["Person"]);
1794    ///
1795    /// // Query using Gremlin
1796    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
1797    /// # Ok(())
1798    /// # }
1799    /// ```
1800    #[cfg(feature = "gremlin")]
1801    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
1802        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
1803
1804        // Parse and translate the query to a logical plan
1805        let logical_plan = gremlin::translate(query)?;
1806
1807        // Semantic validation
1808        let mut binder = Binder::new();
1809        let _binding_context = binder.bind(&logical_plan)?;
1810
1811        // Optimize the plan
1812        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1813        let optimized_plan = optimizer.optimize(logical_plan)?;
1814
1815        let has_mutations = optimized_plan.root.has_mutations();
1816
1817        self.with_auto_commit(has_mutations, || {
1818            // Get transaction context for MVCC visibility
1819            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1820
1821            // Convert to physical plan with transaction context
1822            let planner = self.create_planner(viewing_epoch, transaction_id);
1823            let mut physical_plan = planner.plan(&optimized_plan)?;
1824
1825            // Execute the plan
1826            let executor = Executor::with_columns(physical_plan.columns.clone())
1827                .with_deadline(self.query_deadline());
1828            executor.execute(physical_plan.operator.as_mut())
1829        })
1830    }
1831
1832    /// Executes a Gremlin query with parameters.
1833    ///
1834    /// # Errors
1835    ///
1836    /// Returns an error if the query fails to parse or execute.
1837    #[cfg(feature = "gremlin")]
1838    pub fn execute_gremlin_with_params(
1839        &self,
1840        query: &str,
1841        params: std::collections::HashMap<String, Value>,
1842    ) -> Result<QueryResult> {
1843        use crate::query::processor::{QueryLanguage, QueryProcessor};
1844
1845        let has_mutations = Self::query_looks_like_mutation(query);
1846
1847        self.with_auto_commit(has_mutations, || {
1848            // Get transaction context for MVCC visibility
1849            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1850
1851            // Create processor with transaction context
1852            let processor = QueryProcessor::for_graph_store_with_transaction(
1853                Arc::clone(&self.graph_store),
1854                Arc::clone(&self.transaction_manager),
1855            );
1856
1857            // Apply transaction context if in a transaction
1858            let processor = if let Some(transaction_id) = transaction_id {
1859                processor.with_transaction_context(viewing_epoch, transaction_id)
1860            } else {
1861                processor
1862            };
1863
1864            processor.process(query, QueryLanguage::Gremlin, Some(&params))
1865        })
1866    }
1867
1868    /// Executes a GraphQL query against the LPG store.
1869    ///
1870    /// # Errors
1871    ///
1872    /// Returns an error if the query fails to parse or execute.
1873    ///
1874    /// # Examples
1875    ///
1876    /// ```no_run
1877    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1878    /// use grafeo_engine::GrafeoDB;
1879    ///
1880    /// let db = GrafeoDB::new_in_memory();
1881    /// let session = db.session();
1882    ///
1883    /// // Create some nodes first
1884    /// session.create_node(&["User"]);
1885    ///
1886    /// // Query using GraphQL
1887    /// let result = session.execute_graphql("query { user { id name } }")?;
1888    /// # Ok(())
1889    /// # }
1890    /// ```
1891    #[cfg(feature = "graphql")]
1892    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
1893        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
1894
1895        // Parse and translate the query to a logical plan
1896        let logical_plan = graphql::translate(query)?;
1897
1898        // Semantic validation
1899        let mut binder = Binder::new();
1900        let _binding_context = binder.bind(&logical_plan)?;
1901
1902        // Optimize the plan
1903        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1904        let optimized_plan = optimizer.optimize(logical_plan)?;
1905
1906        let has_mutations = optimized_plan.root.has_mutations();
1907
1908        self.with_auto_commit(has_mutations, || {
1909            // Get transaction context for MVCC visibility
1910            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1911
1912            // Convert to physical plan with transaction context
1913            let planner = self.create_planner(viewing_epoch, transaction_id);
1914            let mut physical_plan = planner.plan(&optimized_plan)?;
1915
1916            // Execute the plan
1917            let executor = Executor::with_columns(physical_plan.columns.clone())
1918                .with_deadline(self.query_deadline());
1919            executor.execute(physical_plan.operator.as_mut())
1920        })
1921    }
1922
1923    /// Executes a GraphQL query with parameters.
1924    ///
1925    /// # Errors
1926    ///
1927    /// Returns an error if the query fails to parse or execute.
1928    #[cfg(feature = "graphql")]
1929    pub fn execute_graphql_with_params(
1930        &self,
1931        query: &str,
1932        params: std::collections::HashMap<String, Value>,
1933    ) -> Result<QueryResult> {
1934        use crate::query::processor::{QueryLanguage, QueryProcessor};
1935
1936        let has_mutations = Self::query_looks_like_mutation(query);
1937
1938        self.with_auto_commit(has_mutations, || {
1939            // Get transaction context for MVCC visibility
1940            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1941
1942            // Create processor with transaction context
1943            let processor = QueryProcessor::for_graph_store_with_transaction(
1944                Arc::clone(&self.graph_store),
1945                Arc::clone(&self.transaction_manager),
1946            );
1947
1948            // Apply transaction context if in a transaction
1949            let processor = if let Some(transaction_id) = transaction_id {
1950                processor.with_transaction_context(viewing_epoch, transaction_id)
1951            } else {
1952                processor
1953            };
1954
1955            processor.process(query, QueryLanguage::GraphQL, Some(&params))
1956        })
1957    }
1958
1959    /// Executes a GraphQL query against the RDF store.
1960    ///
1961    /// # Errors
1962    ///
1963    /// Returns an error if the query fails to parse or execute.
1964    #[cfg(all(feature = "graphql", feature = "rdf"))]
1965    pub fn execute_graphql_rdf(&self, query: &str) -> Result<QueryResult> {
1966        use crate::query::{
1967            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::graphql_rdf,
1968        };
1969
1970        let logical_plan = graphql_rdf::translate(query, "http://example.org/")?;
1971
1972        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1973        let optimized_plan = optimizer.optimize(logical_plan)?;
1974
1975        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
1976            .with_transaction_id(*self.current_transaction.lock());
1977        let mut physical_plan = planner.plan(&optimized_plan)?;
1978
1979        let executor = Executor::with_columns(physical_plan.columns.clone())
1980            .with_deadline(self.query_deadline());
1981        executor.execute(physical_plan.operator.as_mut())
1982    }
1983
1984    /// Executes a GraphQL query against the RDF store with parameters.
1985    ///
1986    /// # Errors
1987    ///
1988    /// Returns an error if the query fails to parse or execute.
1989    #[cfg(all(feature = "graphql", feature = "rdf"))]
1990    pub fn execute_graphql_rdf_with_params(
1991        &self,
1992        query: &str,
1993        params: std::collections::HashMap<String, Value>,
1994    ) -> Result<QueryResult> {
1995        use crate::query::processor::{QueryLanguage, QueryProcessor};
1996
1997        let has_mutations = Self::query_looks_like_mutation(query);
1998
1999        self.with_auto_commit(has_mutations, || {
2000            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2001
2002            let processor = QueryProcessor::for_graph_store_with_transaction(
2003                Arc::clone(&self.graph_store),
2004                Arc::clone(&self.transaction_manager),
2005            );
2006
2007            let processor = if let Some(transaction_id) = transaction_id {
2008                processor.with_transaction_context(viewing_epoch, transaction_id)
2009            } else {
2010                processor
2011            };
2012
2013            processor.process(query, QueryLanguage::GraphQLRdf, Some(&params))
2014        })
2015    }
2016
2017    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2018    ///
2019    /// # Errors
2020    ///
2021    /// Returns an error if the query fails to parse or execute.
2022    ///
2023    /// # Examples
2024    ///
2025    /// ```no_run
2026    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2027    /// use grafeo_engine::GrafeoDB;
2028    ///
2029    /// let db = GrafeoDB::new_in_memory();
2030    /// let session = db.session();
2031    ///
2032    /// let result = session.execute_sql(
2033    ///     "SELECT * FROM GRAPH_TABLE (
2034    ///         MATCH (n:Person)
2035    ///         COLUMNS (n.name AS name)
2036    ///     )"
2037    /// )?;
2038    /// # Ok(())
2039    /// # }
2040    /// ```
2041    #[cfg(feature = "sql-pgq")]
2042    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2043        use crate::query::{
2044            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2045            processor::QueryLanguage, translators::sql_pgq,
2046        };
2047
2048        // Parse and translate (always needed to check for DDL)
2049        let logical_plan = sql_pgq::translate(query)?;
2050
2051        // Handle DDL statements directly (they don't go through the query pipeline)
2052        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2053            return Ok(QueryResult {
2054                columns: vec!["status".into()],
2055                column_types: vec![grafeo_common::types::LogicalType::String],
2056                rows: vec![vec![Value::from(format!(
2057                    "Property graph '{}' created ({} node tables, {} edge tables)",
2058                    cpg.name,
2059                    cpg.node_tables.len(),
2060                    cpg.edge_tables.len()
2061                ))]],
2062                execution_time_ms: None,
2063                rows_scanned: None,
2064                status_message: None,
2065                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2066            });
2067        }
2068
2069        // Create cache key for query plans
2070        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
2071
2072        // Try to get cached optimized plan
2073        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2074            cached_plan
2075        } else {
2076            // Semantic validation
2077            let mut binder = Binder::new();
2078            let _binding_context = binder.bind(&logical_plan)?;
2079
2080            // Optimize the plan
2081            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
2082            let plan = optimizer.optimize(logical_plan)?;
2083
2084            // Cache the optimized plan
2085            self.query_cache.put_optimized(cache_key, plan.clone());
2086
2087            plan
2088        };
2089
2090        let has_mutations = optimized_plan.root.has_mutations();
2091
2092        self.with_auto_commit(has_mutations, || {
2093            // Get transaction context for MVCC visibility
2094            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2095
2096            // Convert to physical plan with transaction context
2097            let planner = self.create_planner(viewing_epoch, transaction_id);
2098            let mut physical_plan = planner.plan(&optimized_plan)?;
2099
2100            // Execute the plan
2101            let executor = Executor::with_columns(physical_plan.columns.clone())
2102                .with_deadline(self.query_deadline());
2103            executor.execute(physical_plan.operator.as_mut())
2104        })
2105    }
2106
2107    /// Executes a SQL/PGQ query with parameters.
2108    ///
2109    /// # Errors
2110    ///
2111    /// Returns an error if the query fails to parse or execute.
2112    #[cfg(feature = "sql-pgq")]
2113    pub fn execute_sql_with_params(
2114        &self,
2115        query: &str,
2116        params: std::collections::HashMap<String, Value>,
2117    ) -> Result<QueryResult> {
2118        use crate::query::processor::{QueryLanguage, QueryProcessor};
2119
2120        let has_mutations = Self::query_looks_like_mutation(query);
2121
2122        self.with_auto_commit(has_mutations, || {
2123            // Get transaction context for MVCC visibility
2124            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2125
2126            // Create processor with transaction context
2127            let processor = QueryProcessor::for_graph_store_with_transaction(
2128                Arc::clone(&self.graph_store),
2129                Arc::clone(&self.transaction_manager),
2130            );
2131
2132            // Apply transaction context if in a transaction
2133            let processor = if let Some(transaction_id) = transaction_id {
2134                processor.with_transaction_context(viewing_epoch, transaction_id)
2135            } else {
2136                processor
2137            };
2138
2139            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
2140        })
2141    }
2142
2143    /// Executes a SPARQL query.
2144    ///
2145    /// # Errors
2146    ///
2147    /// Returns an error if the query fails to parse or execute.
2148    #[cfg(all(feature = "sparql", feature = "rdf"))]
2149    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
2150        use crate::query::{
2151            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
2152        };
2153
2154        // Parse and translate the SPARQL query to a logical plan
2155        let logical_plan = sparql::translate(query)?;
2156
2157        // Optimize the plan
2158        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
2159        let optimized_plan = optimizer.optimize(logical_plan)?;
2160
2161        // Convert to physical plan using RDF planner
2162        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2163            .with_transaction_id(*self.current_transaction.lock());
2164        let mut physical_plan = planner.plan(&optimized_plan)?;
2165
2166        // Execute the plan
2167        let executor = Executor::with_columns(physical_plan.columns.clone())
2168            .with_deadline(self.query_deadline());
2169        executor.execute(physical_plan.operator.as_mut())
2170    }
2171
2172    /// Executes a SPARQL query with parameters.
2173    ///
2174    /// # Errors
2175    ///
2176    /// Returns an error if the query fails to parse or execute.
2177    #[cfg(all(feature = "sparql", feature = "rdf"))]
2178    pub fn execute_sparql_with_params(
2179        &self,
2180        query: &str,
2181        params: std::collections::HashMap<String, Value>,
2182    ) -> Result<QueryResult> {
2183        use crate::query::{
2184            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, processor::substitute_params,
2185            translators::sparql,
2186        };
2187
2188        let mut logical_plan = sparql::translate(query)?;
2189
2190        substitute_params(&mut logical_plan, &params)?;
2191
2192        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
2193        let optimized_plan = optimizer.optimize(logical_plan)?;
2194
2195        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2196            .with_transaction_id(*self.current_transaction.lock());
2197        let mut physical_plan = planner.plan(&optimized_plan)?;
2198
2199        let executor = Executor::with_columns(physical_plan.columns.clone())
2200            .with_deadline(self.query_deadline());
2201        executor.execute(physical_plan.operator.as_mut())
2202    }
2203
2204    /// Executes a query in the specified language by name.
2205    ///
2206    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
2207    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
2208    ///
2209    /// # Errors
2210    ///
2211    /// Returns an error if the language is unknown/disabled or the query fails.
2212    pub fn execute_language(
2213        &self,
2214        query: &str,
2215        language: &str,
2216        params: Option<std::collections::HashMap<String, Value>>,
2217    ) -> Result<QueryResult> {
2218        match language {
2219            "gql" => {
2220                if let Some(p) = params {
2221                    self.execute_with_params(query, p)
2222                } else {
2223                    self.execute(query)
2224                }
2225            }
2226            #[cfg(feature = "cypher")]
2227            "cypher" => {
2228                if let Some(p) = params {
2229                    use crate::query::processor::{QueryLanguage, QueryProcessor};
2230                    let has_mutations = Self::query_looks_like_mutation(query);
2231                    self.with_auto_commit(has_mutations, || {
2232                        let processor = QueryProcessor::for_graph_store_with_transaction(
2233                            Arc::clone(&self.graph_store),
2234                            Arc::clone(&self.transaction_manager),
2235                        );
2236                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2237                        let processor = if let Some(transaction_id) = transaction_id {
2238                            processor.with_transaction_context(viewing_epoch, transaction_id)
2239                        } else {
2240                            processor
2241                        };
2242                        processor.process(query, QueryLanguage::Cypher, Some(&p))
2243                    })
2244                } else {
2245                    self.execute_cypher(query)
2246                }
2247            }
2248            #[cfg(feature = "gremlin")]
2249            "gremlin" => {
2250                if let Some(p) = params {
2251                    self.execute_gremlin_with_params(query, p)
2252                } else {
2253                    self.execute_gremlin(query)
2254                }
2255            }
2256            #[cfg(feature = "graphql")]
2257            "graphql" => {
2258                if let Some(p) = params {
2259                    self.execute_graphql_with_params(query, p)
2260                } else {
2261                    self.execute_graphql(query)
2262                }
2263            }
2264            #[cfg(all(feature = "graphql", feature = "rdf"))]
2265            "graphql-rdf" => {
2266                if let Some(p) = params {
2267                    self.execute_graphql_rdf_with_params(query, p)
2268                } else {
2269                    self.execute_graphql_rdf(query)
2270                }
2271            }
2272            #[cfg(feature = "sql-pgq")]
2273            "sql" | "sql-pgq" => {
2274                if let Some(p) = params {
2275                    self.execute_sql_with_params(query, p)
2276                } else {
2277                    self.execute_sql(query)
2278                }
2279            }
2280            #[cfg(all(feature = "sparql", feature = "rdf"))]
2281            "sparql" => {
2282                if let Some(p) = params {
2283                    self.execute_sparql_with_params(query, p)
2284                } else {
2285                    self.execute_sparql(query)
2286                }
2287            }
2288            other => Err(grafeo_common::utils::error::Error::Query(
2289                grafeo_common::utils::error::QueryError::new(
2290                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2291                    format!("Unknown query language: '{other}'"),
2292                ),
2293            )),
2294        }
2295    }
2296
2297    /// Begins a new transaction.
2298    ///
2299    /// # Errors
2300    ///
2301    /// Returns an error if a transaction is already active.
2302    ///
2303    /// # Examples
2304    ///
2305    /// ```no_run
2306    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2307    /// use grafeo_engine::GrafeoDB;
2308    ///
2309    /// let db = GrafeoDB::new_in_memory();
2310    /// let mut session = db.session();
2311    ///
2312    /// session.begin_transaction()?;
2313    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2314    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
2315    /// session.commit()?; // Both inserts committed atomically
2316    /// # Ok(())
2317    /// # }
2318    /// ```
2319    /// Clears all cached query plans.
2320    ///
2321    /// The plan cache is shared across all sessions on the same database,
2322    /// so clearing from one session affects all sessions.
2323    pub fn clear_plan_cache(&self) {
2324        self.query_cache.clear();
2325    }
2326
2327    /// Begins a new transaction on this session.
2328    ///
2329    /// Uses the default isolation level (`SnapshotIsolation`).
2330    ///
2331    /// # Errors
2332    ///
2333    /// Returns an error if a transaction is already active.
2334    pub fn begin_transaction(&mut self) -> Result<()> {
2335        self.begin_transaction_inner(false, None)
2336    }
2337
2338    /// Begins a transaction with a specific isolation level.
2339    ///
2340    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
2341    ///
2342    /// # Errors
2343    ///
2344    /// Returns an error if a transaction is already active.
2345    pub fn begin_transaction_with_isolation(
2346        &mut self,
2347        isolation_level: crate::transaction::IsolationLevel,
2348    ) -> Result<()> {
2349        self.begin_transaction_inner(false, Some(isolation_level))
2350    }
2351
2352    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
2353    fn begin_transaction_inner(
2354        &self,
2355        read_only: bool,
2356        isolation_level: Option<crate::transaction::IsolationLevel>,
2357    ) -> Result<()> {
2358        let mut current = self.current_transaction.lock();
2359        if current.is_some() {
2360            // Nested transaction: create an auto-savepoint instead of a new tx.
2361            drop(current);
2362            let mut depth = self.transaction_nesting_depth.lock();
2363            *depth += 1;
2364            let sp_name = format!("_nested_tx_{}", *depth);
2365            self.savepoint(&sp_name)?;
2366            return Ok(());
2367        }
2368
2369        self.transaction_start_node_count
2370            .store(self.store.node_count(), Ordering::Relaxed);
2371        self.transaction_start_edge_count
2372            .store(self.store.edge_count(), Ordering::Relaxed);
2373        let transaction_id = if let Some(level) = isolation_level {
2374            self.transaction_manager.begin_with_isolation(level)
2375        } else {
2376            self.transaction_manager.begin()
2377        };
2378        *current = Some(transaction_id);
2379        *self.read_only_tx.lock() = read_only;
2380        Ok(())
2381    }
2382
2383    /// Commits the current transaction.
2384    ///
2385    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
2386    ///
2387    /// # Errors
2388    ///
2389    /// Returns an error if no transaction is active.
2390    pub fn commit(&mut self) -> Result<()> {
2391        self.commit_inner()
2392    }
2393
2394    /// Core commit logic, usable from both `&mut self` and `&self` paths.
2395    fn commit_inner(&self) -> Result<()> {
2396        // Nested transaction: release the auto-savepoint (changes are preserved).
2397        {
2398            let mut depth = self.transaction_nesting_depth.lock();
2399            if *depth > 0 {
2400                let sp_name = format!("_nested_tx_{depth}");
2401                *depth -= 1;
2402                drop(depth);
2403                return self.release_savepoint(&sp_name);
2404            }
2405        }
2406
2407        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2408            grafeo_common::utils::error::Error::Transaction(
2409                grafeo_common::utils::error::TransactionError::InvalidState(
2410                    "No active transaction".to_string(),
2411                ),
2412            )
2413        })?;
2414
2415        // Commit RDF store pending operations
2416        #[cfg(feature = "rdf")]
2417        self.rdf_store.commit_transaction(transaction_id);
2418
2419        // Discard property undo log: changes are committed, no rollback possible
2420        self.store.commit_transaction_properties(transaction_id);
2421
2422        self.transaction_manager.commit(transaction_id)?;
2423
2424        // Sync the LpgStore epoch with the TxManager so that
2425        // convenience lookups (edge_type, get_edge, get_node) that use
2426        // store.current_epoch() can see versions created at the latest epoch.
2427        self.store
2428            .sync_epoch(self.transaction_manager.current_epoch());
2429
2430        // Reset read-only flag and clear savepoints
2431        *self.read_only_tx.lock() = false;
2432        self.savepoints.lock().clear();
2433
2434        // Auto-GC: periodically prune old MVCC versions
2435        if self.gc_interval > 0 {
2436            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
2437            if count.is_multiple_of(self.gc_interval) {
2438                let min_epoch = self.transaction_manager.min_active_epoch();
2439                self.store.gc_versions(min_epoch);
2440                self.transaction_manager.gc();
2441            }
2442        }
2443
2444        Ok(())
2445    }
2446
2447    /// Aborts the current transaction.
2448    ///
2449    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
2450    ///
2451    /// # Errors
2452    ///
2453    /// Returns an error if no transaction is active.
2454    ///
2455    /// # Examples
2456    ///
2457    /// ```no_run
2458    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2459    /// use grafeo_engine::GrafeoDB;
2460    ///
2461    /// let db = GrafeoDB::new_in_memory();
2462    /// let mut session = db.session();
2463    ///
2464    /// session.begin_transaction()?;
2465    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2466    /// session.rollback()?; // Insert is discarded
2467    /// # Ok(())
2468    /// # }
2469    /// ```
2470    pub fn rollback(&mut self) -> Result<()> {
2471        self.rollback_inner()
2472    }
2473
2474    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
2475    fn rollback_inner(&self) -> Result<()> {
2476        // Nested transaction: rollback to the auto-savepoint.
2477        {
2478            let mut depth = self.transaction_nesting_depth.lock();
2479            if *depth > 0 {
2480                let sp_name = format!("_nested_tx_{depth}");
2481                *depth -= 1;
2482                drop(depth);
2483                return self.rollback_to_savepoint(&sp_name);
2484            }
2485        }
2486
2487        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2488            grafeo_common::utils::error::Error::Transaction(
2489                grafeo_common::utils::error::TransactionError::InvalidState(
2490                    "No active transaction".to_string(),
2491                ),
2492            )
2493        })?;
2494
2495        // Reset read-only flag
2496        *self.read_only_tx.lock() = false;
2497
2498        // Discard uncommitted versions in the LPG store
2499        self.store.discard_uncommitted_versions(transaction_id);
2500
2501        // Discard pending operations in the RDF store
2502        #[cfg(feature = "rdf")]
2503        self.rdf_store.rollback_transaction(transaction_id);
2504
2505        // Clear savepoints
2506        self.savepoints.lock().clear();
2507
2508        // Mark transaction as aborted in the manager
2509        self.transaction_manager.abort(transaction_id)
2510    }
2511
2512    /// Creates a named savepoint within the current transaction.
2513    ///
2514    /// The savepoint captures the current node/edge ID counters so that
2515    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
2516    /// entities created after this point.
2517    ///
2518    /// # Errors
2519    ///
2520    /// Returns an error if no transaction is active.
2521    pub fn savepoint(&self, name: &str) -> Result<()> {
2522        let tx_id = self.current_transaction.lock().ok_or_else(|| {
2523            grafeo_common::utils::error::Error::Transaction(
2524                grafeo_common::utils::error::TransactionError::InvalidState(
2525                    "No active transaction".to_string(),
2526                ),
2527            )
2528        })?;
2529
2530        let next_node = self.store.peek_next_node_id();
2531        let next_edge = self.store.peek_next_edge_id();
2532        let undo_position = self.store.property_undo_log_position(tx_id);
2533        self.savepoints
2534            .lock()
2535            .push((name.to_string(), next_node, next_edge, undo_position));
2536        Ok(())
2537    }
2538
2539    /// Rolls back to a named savepoint, undoing all writes made after it.
2540    ///
2541    /// The savepoint and any savepoints created after it are removed.
2542    /// Entities with IDs >= the savepoint snapshot are discarded.
2543    ///
2544    /// # Errors
2545    ///
2546    /// Returns an error if no transaction is active or the savepoint does not exist.
2547    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
2548        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
2549            grafeo_common::utils::error::Error::Transaction(
2550                grafeo_common::utils::error::TransactionError::InvalidState(
2551                    "No active transaction".to_string(),
2552                ),
2553            )
2554        })?;
2555
2556        let mut savepoints = self.savepoints.lock();
2557
2558        // Find the savepoint by name (search from the end for nested savepoints)
2559        let pos = savepoints
2560            .iter()
2561            .rposition(|(n, _, _, _)| n == name)
2562            .ok_or_else(|| {
2563                grafeo_common::utils::error::Error::Transaction(
2564                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
2565                        "Savepoint '{name}' not found"
2566                    )),
2567                )
2568            })?;
2569
2570        let (_, sp_next_node, sp_next_edge, sp_undo_position) = savepoints[pos].clone();
2571
2572        // Remove this savepoint and all later ones
2573        savepoints.truncate(pos);
2574        drop(savepoints);
2575
2576        // Replay property/label undo entries recorded after the savepoint
2577        self.store
2578            .rollback_transaction_properties_to(transaction_id, sp_undo_position);
2579
2580        // Discard all nodes with ID >= sp_next_node and edges with ID >= sp_next_edge
2581        let current_next_node = self.store.peek_next_node_id();
2582        let current_next_edge = self.store.peek_next_edge_id();
2583
2584        let node_ids: Vec<NodeId> = (sp_next_node..current_next_node).map(NodeId::new).collect();
2585        let edge_ids: Vec<EdgeId> = (sp_next_edge..current_next_edge).map(EdgeId::new).collect();
2586
2587        if !node_ids.is_empty() || !edge_ids.is_empty() {
2588            self.store
2589                .discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
2590        }
2591
2592        Ok(())
2593    }
2594
2595    /// Releases (removes) a named savepoint without rolling back.
2596    ///
2597    /// # Errors
2598    ///
2599    /// Returns an error if no transaction is active or the savepoint does not exist.
2600    pub fn release_savepoint(&self, name: &str) -> Result<()> {
2601        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
2602            grafeo_common::utils::error::Error::Transaction(
2603                grafeo_common::utils::error::TransactionError::InvalidState(
2604                    "No active transaction".to_string(),
2605                ),
2606            )
2607        })?;
2608
2609        let mut savepoints = self.savepoints.lock();
2610        let pos = savepoints
2611            .iter()
2612            .rposition(|(n, _, _, _)| n == name)
2613            .ok_or_else(|| {
2614                grafeo_common::utils::error::Error::Transaction(
2615                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
2616                        "Savepoint '{name}' not found"
2617                    )),
2618                )
2619            })?;
2620        savepoints.remove(pos);
2621        Ok(())
2622    }
2623
2624    /// Returns whether a transaction is active.
2625    #[must_use]
2626    pub fn in_transaction(&self) -> bool {
2627        self.current_transaction.lock().is_some()
2628    }
2629
2630    /// Returns the current transaction ID, if any.
2631    #[must_use]
2632    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
2633        *self.current_transaction.lock()
2634    }
2635
2636    /// Returns a reference to the transaction manager.
2637    #[must_use]
2638    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
2639        &self.transaction_manager
2640    }
2641
2642    /// Returns the store's current node count and the count at transaction start.
2643    #[must_use]
2644    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
2645        (
2646            self.transaction_start_node_count.load(Ordering::Relaxed),
2647            self.store.node_count(),
2648        )
2649    }
2650
2651    /// Returns the store's current edge count and the count at transaction start.
2652    #[must_use]
2653    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
2654        (
2655            self.transaction_start_edge_count.load(Ordering::Relaxed),
2656            self.store.edge_count(),
2657        )
2658    }
2659
2660    /// Prepares the current transaction for a two-phase commit.
2661    ///
2662    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
2663    /// lets you inspect pending changes and attach metadata before finalizing.
2664    /// The mutable borrow prevents concurrent operations while the commit is
2665    /// pending.
2666    ///
2667    /// If the `PreparedCommit` is dropped without calling `commit()` or
2668    /// `abort()`, the transaction is automatically rolled back.
2669    ///
2670    /// # Errors
2671    ///
2672    /// Returns an error if no transaction is active.
2673    ///
2674    /// # Examples
2675    ///
2676    /// ```no_run
2677    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2678    /// use grafeo_engine::GrafeoDB;
2679    ///
2680    /// let db = GrafeoDB::new_in_memory();
2681    /// let mut session = db.session();
2682    ///
2683    /// session.begin_transaction()?;
2684    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2685    ///
2686    /// let mut prepared = session.prepare_commit()?;
2687    /// println!("Nodes written: {}", prepared.info().nodes_written);
2688    /// prepared.set_metadata("audit_user", "admin");
2689    /// prepared.commit()?;
2690    /// # Ok(())
2691    /// # }
2692    /// ```
2693    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
2694        crate::transaction::PreparedCommit::new(self)
2695    }
2696
2697    /// Sets auto-commit mode.
2698    pub fn set_auto_commit(&mut self, auto_commit: bool) {
2699        self.auto_commit = auto_commit;
2700    }
2701
2702    /// Returns whether auto-commit is enabled.
2703    #[must_use]
2704    pub fn auto_commit(&self) -> bool {
2705        self.auto_commit
2706    }
2707
2708    /// Returns `true` if auto-commit should wrap this execution.
2709    ///
2710    /// Auto-commit kicks in when: the session is in auto-commit mode,
2711    /// no explicit transaction is active, and the query mutates data.
2712    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
2713        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
2714    }
2715
2716    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
2717    /// returns `true`. On error the transaction is rolled back.
2718    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
2719    where
2720        F: FnOnce() -> Result<QueryResult>,
2721    {
2722        if self.needs_auto_commit(has_mutations) {
2723            self.begin_transaction_inner(false, None)?;
2724            match body() {
2725                Ok(result) => {
2726                    self.commit_inner()?;
2727                    Ok(result)
2728                }
2729                Err(e) => {
2730                    let _ = self.rollback_inner();
2731                    Err(e)
2732                }
2733            }
2734        } else {
2735            body()
2736        }
2737    }
2738
2739    /// Quick heuristic: returns `true` when the query text looks like it
2740    /// performs a mutation. Used by `_with_params` paths that go through the
2741    /// `QueryProcessor` (where the logical plan isn't available before
2742    /// execution). False negatives are harmless: the data just won't be
2743    /// auto-committed, which matches the prior behaviour.
2744    fn query_looks_like_mutation(query: &str) -> bool {
2745        let upper = query.to_ascii_uppercase();
2746        upper.contains("INSERT")
2747            || upper.contains("CREATE")
2748            || upper.contains("DELETE")
2749            || upper.contains("MERGE")
2750            || upper.contains("SET")
2751            || upper.contains("REMOVE")
2752            || upper.contains("DROP")
2753            || upper.contains("ALTER")
2754    }
2755
2756    /// Computes the wall-clock deadline for query execution.
2757    #[must_use]
2758    fn query_deadline(&self) -> Option<Instant> {
2759        #[cfg(not(target_arch = "wasm32"))]
2760        {
2761            self.query_timeout.map(|d| Instant::now() + d)
2762        }
2763        #[cfg(target_arch = "wasm32")]
2764        {
2765            let _ = &self.query_timeout;
2766            None
2767        }
2768    }
2769
2770    /// Evaluates a simple integer literal from a session parameter expression.
2771    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
2772        use grafeo_adapters::query::gql::ast::{Expression, Literal};
2773        match expr {
2774            Expression::Literal(Literal::Integer(n)) => Some(*n),
2775            _ => None,
2776        }
2777    }
2778
2779    /// Returns the current transaction context for MVCC visibility.
2780    ///
2781    /// Returns `(viewing_epoch, transaction_id)` where:
2782    /// - `viewing_epoch` is the epoch at which to check version visibility
2783    /// - `transaction_id` is the current transaction ID (if in a transaction)
2784    #[must_use]
2785    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
2786        // Time-travel override takes precedence (read-only, no tx context)
2787        if let Some(epoch) = *self.viewing_epoch_override.lock() {
2788            return (epoch, None);
2789        }
2790
2791        if let Some(transaction_id) = *self.current_transaction.lock() {
2792            // In a transaction: use the transaction's start epoch
2793            let epoch = self
2794                .transaction_manager
2795                .start_epoch(transaction_id)
2796                .unwrap_or_else(|| self.transaction_manager.current_epoch());
2797            (epoch, Some(transaction_id))
2798        } else {
2799            // No transaction: use current epoch
2800            (self.transaction_manager.current_epoch(), None)
2801        }
2802    }
2803
2804    /// Creates a planner with transaction context and constraint validator.
2805    fn create_planner(
2806        &self,
2807        viewing_epoch: EpochId,
2808        transaction_id: Option<TransactionId>,
2809    ) -> crate::query::Planner {
2810        use crate::query::Planner;
2811
2812        let mut planner = Planner::with_context(
2813            Arc::clone(&self.graph_store),
2814            Arc::clone(&self.transaction_manager),
2815            transaction_id,
2816            viewing_epoch,
2817        )
2818        .with_factorized_execution(self.factorized_execution)
2819        .with_catalog(Arc::clone(&self.catalog));
2820
2821        // Attach the constraint validator for schema enforcement
2822        let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog));
2823        planner = planner.with_validator(Arc::new(validator));
2824
2825        planner
2826    }
2827
2828    /// Creates a node directly (bypassing query execution).
2829    ///
2830    /// This is a low-level API for testing and direct manipulation.
2831    /// If a transaction is active, the node will be versioned with the transaction ID.
2832    pub fn create_node(&self, labels: &[&str]) -> NodeId {
2833        let (epoch, transaction_id) = self.get_transaction_context();
2834        self.store.create_node_versioned(
2835            labels,
2836            epoch,
2837            transaction_id.unwrap_or(TransactionId::SYSTEM),
2838        )
2839    }
2840
2841    /// Creates a node with properties.
2842    ///
2843    /// If a transaction is active, the node will be versioned with the transaction ID.
2844    pub fn create_node_with_props<'a>(
2845        &self,
2846        labels: &[&str],
2847        properties: impl IntoIterator<Item = (&'a str, Value)>,
2848    ) -> NodeId {
2849        let (epoch, transaction_id) = self.get_transaction_context();
2850        self.store.create_node_with_props_versioned(
2851            labels,
2852            properties,
2853            epoch,
2854            transaction_id.unwrap_or(TransactionId::SYSTEM),
2855        )
2856    }
2857
2858    /// Creates an edge between two nodes.
2859    ///
2860    /// This is a low-level API for testing and direct manipulation.
2861    /// If a transaction is active, the edge will be versioned with the transaction ID.
2862    pub fn create_edge(
2863        &self,
2864        src: NodeId,
2865        dst: NodeId,
2866        edge_type: &str,
2867    ) -> grafeo_common::types::EdgeId {
2868        let (epoch, transaction_id) = self.get_transaction_context();
2869        self.store.create_edge_versioned(
2870            src,
2871            dst,
2872            edge_type,
2873            epoch,
2874            transaction_id.unwrap_or(TransactionId::SYSTEM),
2875        )
2876    }
2877
2878    // =========================================================================
2879    // Direct Lookup APIs (bypass query planning for O(1) point reads)
2880    // =========================================================================
2881
2882    /// Gets a node by ID directly, bypassing query planning.
2883    ///
2884    /// This is the fastest way to retrieve a single node when you know its ID.
2885    /// Skips parsing, binding, optimization, and physical planning entirely.
2886    ///
2887    /// # Performance
2888    ///
2889    /// - Time complexity: O(1) average case
2890    /// - No lock contention (uses DashMap internally)
2891    /// - ~20-30x faster than equivalent MATCH query
2892    ///
2893    /// # Example
2894    ///
2895    /// ```no_run
2896    /// # use grafeo_engine::GrafeoDB;
2897    /// # let db = GrafeoDB::new_in_memory();
2898    /// let session = db.session();
2899    /// let node_id = session.create_node(&["Person"]);
2900    ///
2901    /// // Direct lookup - O(1), no query planning
2902    /// let node = session.get_node(node_id);
2903    /// assert!(node.is_some());
2904    /// ```
2905    #[must_use]
2906    pub fn get_node(&self, id: NodeId) -> Option<Node> {
2907        let (epoch, transaction_id) = self.get_transaction_context();
2908        self.store
2909            .get_node_versioned(id, epoch, transaction_id.unwrap_or(TransactionId::SYSTEM))
2910    }
2911
2912    /// Gets a single property from a node by ID, bypassing query planning.
2913    ///
2914    /// More efficient than `get_node()` when you only need one property,
2915    /// as it avoids loading the full node with all properties.
2916    ///
2917    /// # Performance
2918    ///
2919    /// - Time complexity: O(1) average case
2920    /// - No query planning overhead
2921    ///
2922    /// # Example
2923    ///
2924    /// ```no_run
2925    /// # use grafeo_engine::GrafeoDB;
2926    /// # use grafeo_common::types::Value;
2927    /// # let db = GrafeoDB::new_in_memory();
2928    /// let session = db.session();
2929    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
2930    ///
2931    /// // Direct property access - O(1)
2932    /// let name = session.get_node_property(id, "name");
2933    /// assert_eq!(name, Some(Value::String("Alix".into())));
2934    /// ```
2935    #[must_use]
2936    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
2937        self.get_node(id)
2938            .and_then(|node| node.get_property(key).cloned())
2939    }
2940
2941    /// Gets an edge by ID directly, bypassing query planning.
2942    ///
2943    /// # Performance
2944    ///
2945    /// - Time complexity: O(1) average case
2946    /// - No lock contention
2947    #[must_use]
2948    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
2949        let (epoch, transaction_id) = self.get_transaction_context();
2950        self.store
2951            .get_edge_versioned(id, epoch, transaction_id.unwrap_or(TransactionId::SYSTEM))
2952    }
2953
2954    /// Gets outgoing neighbors of a node directly, bypassing query planning.
2955    ///
2956    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
2957    ///
2958    /// # Performance
2959    ///
2960    /// - Time complexity: O(degree) where degree is the number of outgoing edges
2961    /// - Uses adjacency index for direct access
2962    /// - ~10-20x faster than equivalent MATCH query
2963    ///
2964    /// # Example
2965    ///
2966    /// ```no_run
2967    /// # use grafeo_engine::GrafeoDB;
2968    /// # let db = GrafeoDB::new_in_memory();
2969    /// let session = db.session();
2970    /// let alix = session.create_node(&["Person"]);
2971    /// let gus = session.create_node(&["Person"]);
2972    /// session.create_edge(alix, gus, "KNOWS");
2973    ///
2974    /// // Direct neighbor lookup - O(degree)
2975    /// let neighbors = session.get_neighbors_outgoing(alix);
2976    /// assert_eq!(neighbors.len(), 1);
2977    /// assert_eq!(neighbors[0].0, gus);
2978    /// ```
2979    #[must_use]
2980    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2981        self.store.edges_from(node, Direction::Outgoing).collect()
2982    }
2983
2984    /// Gets incoming neighbors of a node directly, bypassing query planning.
2985    ///
2986    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
2987    ///
2988    /// # Performance
2989    ///
2990    /// - Time complexity: O(degree) where degree is the number of incoming edges
2991    /// - Uses backward adjacency index for direct access
2992    #[must_use]
2993    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2994        self.store.edges_from(node, Direction::Incoming).collect()
2995    }
2996
2997    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
2998    ///
2999    /// # Example
3000    ///
3001    /// ```no_run
3002    /// # use grafeo_engine::GrafeoDB;
3003    /// # let db = GrafeoDB::new_in_memory();
3004    /// # let session = db.session();
3005    /// # let alix = session.create_node(&["Person"]);
3006    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3007    /// ```
3008    #[must_use]
3009    pub fn get_neighbors_outgoing_by_type(
3010        &self,
3011        node: NodeId,
3012        edge_type: &str,
3013    ) -> Vec<(NodeId, EdgeId)> {
3014        self.store
3015            .edges_from(node, Direction::Outgoing)
3016            .filter(|(_, edge_id)| {
3017                self.get_edge(*edge_id)
3018                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
3019            })
3020            .collect()
3021    }
3022
3023    /// Checks if a node exists, bypassing query planning.
3024    ///
3025    /// # Performance
3026    ///
3027    /// - Time complexity: O(1)
3028    /// - Fastest existence check available
3029    #[must_use]
3030    pub fn node_exists(&self, id: NodeId) -> bool {
3031        self.get_node(id).is_some()
3032    }
3033
3034    /// Checks if an edge exists, bypassing query planning.
3035    #[must_use]
3036    pub fn edge_exists(&self, id: EdgeId) -> bool {
3037        self.get_edge(id).is_some()
3038    }
3039
3040    /// Gets the degree (number of edges) of a node.
3041    ///
3042    /// Returns (outgoing_degree, incoming_degree).
3043    #[must_use]
3044    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3045        let out = self.store.out_degree(node);
3046        let in_degree = self.store.in_degree(node);
3047        (out, in_degree)
3048    }
3049
3050    /// Batch lookup of multiple nodes by ID.
3051    ///
3052    /// More efficient than calling `get_node()` in a loop because it
3053    /// amortizes overhead.
3054    ///
3055    /// # Performance
3056    ///
3057    /// - Time complexity: O(n) where n is the number of IDs
3058    /// - Better cache utilization than individual lookups
3059    #[must_use]
3060    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3061        let (epoch, transaction_id) = self.get_transaction_context();
3062        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3063        ids.iter()
3064            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
3065            .collect()
3066    }
3067
3068    // ── Change Data Capture ─────────────────────────────────────────────
3069
3070    /// Returns the full change history for an entity (node or edge).
3071    #[cfg(feature = "cdc")]
3072    pub fn history(
3073        &self,
3074        entity_id: impl Into<crate::cdc::EntityId>,
3075    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3076        Ok(self.cdc_log.history(entity_id.into()))
3077    }
3078
3079    /// Returns change events for an entity since the given epoch.
3080    #[cfg(feature = "cdc")]
3081    pub fn history_since(
3082        &self,
3083        entity_id: impl Into<crate::cdc::EntityId>,
3084        since_epoch: EpochId,
3085    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3086        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3087    }
3088
3089    /// Returns all change events across all entities in an epoch range.
3090    #[cfg(feature = "cdc")]
3091    pub fn changes_between(
3092        &self,
3093        start_epoch: EpochId,
3094        end_epoch: EpochId,
3095    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3096        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3097    }
3098}
3099
3100#[cfg(test)]
3101mod tests {
3102    use crate::database::GrafeoDB;
3103
3104    #[test]
3105    fn test_session_create_node() {
3106        let db = GrafeoDB::new_in_memory();
3107        let session = db.session();
3108
3109        let id = session.create_node(&["Person"]);
3110        assert!(id.is_valid());
3111        assert_eq!(db.node_count(), 1);
3112    }
3113
3114    #[test]
3115    fn test_session_transaction() {
3116        let db = GrafeoDB::new_in_memory();
3117        let mut session = db.session();
3118
3119        assert!(!session.in_transaction());
3120
3121        session.begin_transaction().unwrap();
3122        assert!(session.in_transaction());
3123
3124        session.commit().unwrap();
3125        assert!(!session.in_transaction());
3126    }
3127
3128    #[test]
3129    fn test_session_transaction_context() {
3130        let db = GrafeoDB::new_in_memory();
3131        let mut session = db.session();
3132
3133        // Without transaction - context should have current epoch and no transaction_id
3134        let (_epoch1, transaction_id1) = session.get_transaction_context();
3135        assert!(transaction_id1.is_none());
3136
3137        // Start a transaction
3138        session.begin_transaction().unwrap();
3139        let (epoch2, transaction_id2) = session.get_transaction_context();
3140        assert!(transaction_id2.is_some());
3141        // Transaction should have a valid epoch
3142        let _ = epoch2; // Use the variable
3143
3144        // Commit and verify
3145        session.commit().unwrap();
3146        let (epoch3, tx_id3) = session.get_transaction_context();
3147        assert!(tx_id3.is_none());
3148        // Epoch should have advanced after commit
3149        assert!(epoch3.as_u64() >= epoch2.as_u64());
3150    }
3151
3152    #[test]
3153    fn test_session_rollback() {
3154        let db = GrafeoDB::new_in_memory();
3155        let mut session = db.session();
3156
3157        session.begin_transaction().unwrap();
3158        session.rollback().unwrap();
3159        assert!(!session.in_transaction());
3160    }
3161
3162    #[test]
3163    fn test_session_rollback_discards_versions() {
3164        use grafeo_common::types::TransactionId;
3165
3166        let db = GrafeoDB::new_in_memory();
3167
3168        // Create a node outside of any transaction (at system level)
3169        let node_before = db.store().create_node(&["Person"]);
3170        assert!(node_before.is_valid());
3171        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3172
3173        // Start a transaction
3174        let mut session = db.session();
3175        session.begin_transaction().unwrap();
3176        let transaction_id = session.current_transaction.lock().unwrap();
3177
3178        // Create a node versioned with the transaction's ID
3179        let epoch = db.store().current_epoch();
3180        let node_in_tx = db
3181            .store()
3182            .create_node_versioned(&["Person"], epoch, transaction_id);
3183        assert!(node_in_tx.is_valid());
3184
3185        // Should see 2 nodes at this point
3186        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
3187
3188        // Rollback the transaction
3189        session.rollback().unwrap();
3190        assert!(!session.in_transaction());
3191
3192        // The node created in the transaction should be discarded
3193        // Only the first node should remain visible
3194        let count_after = db.node_count();
3195        assert_eq!(
3196            count_after, 1,
3197            "Rollback should discard uncommitted node, but got {count_after}"
3198        );
3199
3200        // The original node should still be accessible
3201        let current_epoch = db.store().current_epoch();
3202        assert!(
3203            db.store()
3204                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
3205                .is_some(),
3206            "Original node should still exist"
3207        );
3208
3209        // The node created in the transaction should not be accessible
3210        assert!(
3211            db.store()
3212                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
3213                .is_none(),
3214            "Transaction node should be gone"
3215        );
3216    }
3217
3218    #[test]
3219    fn test_session_create_node_in_transaction() {
3220        // Test that session.create_node() is transaction-aware
3221        let db = GrafeoDB::new_in_memory();
3222
3223        // Create a node outside of any transaction
3224        let node_before = db.create_node(&["Person"]);
3225        assert!(node_before.is_valid());
3226        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3227
3228        // Start a transaction and create a node through the session
3229        let mut session = db.session();
3230        session.begin_transaction().unwrap();
3231
3232        // Create a node through session.create_node() - should be versioned with tx
3233        let node_in_tx = session.create_node(&["Person"]);
3234        assert!(node_in_tx.is_valid());
3235
3236        // Should see 2 nodes at this point
3237        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
3238
3239        // Rollback the transaction
3240        session.rollback().unwrap();
3241
3242        // The node created via session.create_node() should be discarded
3243        let count_after = db.node_count();
3244        assert_eq!(
3245            count_after, 1,
3246            "Rollback should discard node created via session.create_node(), but got {count_after}"
3247        );
3248    }
3249
3250    #[test]
3251    fn test_session_create_node_with_props_in_transaction() {
3252        use grafeo_common::types::Value;
3253
3254        // Test that session.create_node_with_props() is transaction-aware
3255        let db = GrafeoDB::new_in_memory();
3256
3257        // Create a node outside of any transaction
3258        db.create_node(&["Person"]);
3259        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
3260
3261        // Start a transaction and create a node with properties
3262        let mut session = db.session();
3263        session.begin_transaction().unwrap();
3264
3265        let node_in_tx =
3266            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3267        assert!(node_in_tx.is_valid());
3268
3269        // Should see 2 nodes
3270        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
3271
3272        // Rollback the transaction
3273        session.rollback().unwrap();
3274
3275        // The node should be discarded
3276        let count_after = db.node_count();
3277        assert_eq!(
3278            count_after, 1,
3279            "Rollback should discard node created via session.create_node_with_props()"
3280        );
3281    }
3282
3283    #[cfg(feature = "gql")]
3284    mod gql_tests {
3285        use super::*;
3286
3287        #[test]
3288        fn test_gql_query_execution() {
3289            let db = GrafeoDB::new_in_memory();
3290            let session = db.session();
3291
3292            // Create some test data
3293            session.create_node(&["Person"]);
3294            session.create_node(&["Person"]);
3295            session.create_node(&["Animal"]);
3296
3297            // Execute a GQL query
3298            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3299
3300            // Should return 2 Person nodes
3301            assert_eq!(result.row_count(), 2);
3302            assert_eq!(result.column_count(), 1);
3303            assert_eq!(result.columns[0], "n");
3304        }
3305
3306        #[test]
3307        fn test_gql_empty_result() {
3308            let db = GrafeoDB::new_in_memory();
3309            let session = db.session();
3310
3311            // No data in database
3312            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3313
3314            assert_eq!(result.row_count(), 0);
3315        }
3316
3317        #[test]
3318        fn test_gql_parse_error() {
3319            let db = GrafeoDB::new_in_memory();
3320            let session = db.session();
3321
3322            // Invalid GQL syntax
3323            let result = session.execute("MATCH (n RETURN n");
3324
3325            assert!(result.is_err());
3326        }
3327
3328        #[test]
3329        fn test_gql_relationship_traversal() {
3330            let db = GrafeoDB::new_in_memory();
3331            let session = db.session();
3332
3333            // Create a graph: Alix -> Gus, Alix -> Vincent
3334            let alix = session.create_node(&["Person"]);
3335            let gus = session.create_node(&["Person"]);
3336            let vincent = session.create_node(&["Person"]);
3337
3338            session.create_edge(alix, gus, "KNOWS");
3339            session.create_edge(alix, vincent, "KNOWS");
3340
3341            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
3342            let result = session
3343                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
3344                .unwrap();
3345
3346            // Should return 2 rows (Alix->Gus, Alix->Vincent)
3347            assert_eq!(result.row_count(), 2);
3348            assert_eq!(result.column_count(), 2);
3349            assert_eq!(result.columns[0], "a");
3350            assert_eq!(result.columns[1], "b");
3351        }
3352
3353        #[test]
3354        fn test_gql_relationship_with_type_filter() {
3355            let db = GrafeoDB::new_in_memory();
3356            let session = db.session();
3357
3358            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
3359            let alix = session.create_node(&["Person"]);
3360            let gus = session.create_node(&["Person"]);
3361            let vincent = session.create_node(&["Person"]);
3362
3363            session.create_edge(alix, gus, "KNOWS");
3364            session.create_edge(alix, vincent, "WORKS_WITH");
3365
3366            // Query only KNOWS relationships
3367            let result = session
3368                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
3369                .unwrap();
3370
3371            // Should return only 1 row (Alix->Gus)
3372            assert_eq!(result.row_count(), 1);
3373        }
3374
3375        #[test]
3376        fn test_gql_semantic_error_undefined_variable() {
3377            let db = GrafeoDB::new_in_memory();
3378            let session = db.session();
3379
3380            // Reference undefined variable 'x' in RETURN
3381            let result = session.execute("MATCH (n:Person) RETURN x");
3382
3383            // Should fail with semantic error
3384            assert!(result.is_err());
3385            let Err(err) = result else {
3386                panic!("Expected error")
3387            };
3388            assert!(
3389                err.to_string().contains("Undefined variable"),
3390                "Expected undefined variable error, got: {}",
3391                err
3392            );
3393        }
3394
3395        #[test]
3396        fn test_gql_where_clause_property_filter() {
3397            use grafeo_common::types::Value;
3398
3399            let db = GrafeoDB::new_in_memory();
3400            let session = db.session();
3401
3402            // Create people with ages
3403            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
3404            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
3405            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
3406
3407            // Query with WHERE clause: age > 30
3408            let result = session
3409                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
3410                .unwrap();
3411
3412            // Should return 2 people (ages 35 and 45)
3413            assert_eq!(result.row_count(), 2);
3414        }
3415
3416        #[test]
3417        fn test_gql_where_clause_equality() {
3418            use grafeo_common::types::Value;
3419
3420            let db = GrafeoDB::new_in_memory();
3421            let session = db.session();
3422
3423            // Create people with names
3424            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3425            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
3426            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3427
3428            // Query with WHERE clause: name = "Alix"
3429            let result = session
3430                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
3431                .unwrap();
3432
3433            // Should return 2 people named Alix
3434            assert_eq!(result.row_count(), 2);
3435        }
3436
3437        #[test]
3438        fn test_gql_return_property_access() {
3439            use grafeo_common::types::Value;
3440
3441            let db = GrafeoDB::new_in_memory();
3442            let session = db.session();
3443
3444            // Create people with names and ages
3445            session.create_node_with_props(
3446                &["Person"],
3447                [
3448                    ("name", Value::String("Alix".into())),
3449                    ("age", Value::Int64(30)),
3450                ],
3451            );
3452            session.create_node_with_props(
3453                &["Person"],
3454                [
3455                    ("name", Value::String("Gus".into())),
3456                    ("age", Value::Int64(25)),
3457                ],
3458            );
3459
3460            // Query returning properties
3461            let result = session
3462                .execute("MATCH (n:Person) RETURN n.name, n.age")
3463                .unwrap();
3464
3465            // Should return 2 rows with name and age columns
3466            assert_eq!(result.row_count(), 2);
3467            assert_eq!(result.column_count(), 2);
3468            assert_eq!(result.columns[0], "n.name");
3469            assert_eq!(result.columns[1], "n.age");
3470
3471            // Check that we get actual values
3472            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
3473            assert!(names.contains(&&Value::String("Alix".into())));
3474            assert!(names.contains(&&Value::String("Gus".into())));
3475        }
3476
3477        #[test]
3478        fn test_gql_return_mixed_expressions() {
3479            use grafeo_common::types::Value;
3480
3481            let db = GrafeoDB::new_in_memory();
3482            let session = db.session();
3483
3484            // Create a person
3485            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3486
3487            // Query returning both node and property
3488            let result = session
3489                .execute("MATCH (n:Person) RETURN n, n.name")
3490                .unwrap();
3491
3492            assert_eq!(result.row_count(), 1);
3493            assert_eq!(result.column_count(), 2);
3494            assert_eq!(result.columns[0], "n");
3495            assert_eq!(result.columns[1], "n.name");
3496
3497            // Second column should be the name
3498            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
3499        }
3500    }
3501
3502    #[cfg(feature = "cypher")]
3503    mod cypher_tests {
3504        use super::*;
3505
3506        #[test]
3507        fn test_cypher_query_execution() {
3508            let db = GrafeoDB::new_in_memory();
3509            let session = db.session();
3510
3511            // Create some test data
3512            session.create_node(&["Person"]);
3513            session.create_node(&["Person"]);
3514            session.create_node(&["Animal"]);
3515
3516            // Execute a Cypher query
3517            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3518
3519            // Should return 2 Person nodes
3520            assert_eq!(result.row_count(), 2);
3521            assert_eq!(result.column_count(), 1);
3522            assert_eq!(result.columns[0], "n");
3523        }
3524
3525        #[test]
3526        fn test_cypher_empty_result() {
3527            let db = GrafeoDB::new_in_memory();
3528            let session = db.session();
3529
3530            // No data in database
3531            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3532
3533            assert_eq!(result.row_count(), 0);
3534        }
3535
3536        #[test]
3537        fn test_cypher_parse_error() {
3538            let db = GrafeoDB::new_in_memory();
3539            let session = db.session();
3540
3541            // Invalid Cypher syntax
3542            let result = session.execute_cypher("MATCH (n RETURN n");
3543
3544            assert!(result.is_err());
3545        }
3546    }
3547
3548    // ==================== Direct Lookup API Tests ====================
3549
3550    mod direct_lookup_tests {
3551        use super::*;
3552        use grafeo_common::types::Value;
3553
3554        #[test]
3555        fn test_get_node() {
3556            let db = GrafeoDB::new_in_memory();
3557            let session = db.session();
3558
3559            let id = session.create_node(&["Person"]);
3560            let node = session.get_node(id);
3561
3562            assert!(node.is_some());
3563            let node = node.unwrap();
3564            assert_eq!(node.id, id);
3565        }
3566
3567        #[test]
3568        fn test_get_node_not_found() {
3569            use grafeo_common::types::NodeId;
3570
3571            let db = GrafeoDB::new_in_memory();
3572            let session = db.session();
3573
3574            // Try to get a non-existent node
3575            let node = session.get_node(NodeId::new(9999));
3576            assert!(node.is_none());
3577        }
3578
3579        #[test]
3580        fn test_get_node_property() {
3581            let db = GrafeoDB::new_in_memory();
3582            let session = db.session();
3583
3584            let id = session
3585                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3586
3587            let name = session.get_node_property(id, "name");
3588            assert_eq!(name, Some(Value::String("Alix".into())));
3589
3590            // Non-existent property
3591            let missing = session.get_node_property(id, "missing");
3592            assert!(missing.is_none());
3593        }
3594
3595        #[test]
3596        fn test_get_edge() {
3597            let db = GrafeoDB::new_in_memory();
3598            let session = db.session();
3599
3600            let alix = session.create_node(&["Person"]);
3601            let gus = session.create_node(&["Person"]);
3602            let edge_id = session.create_edge(alix, gus, "KNOWS");
3603
3604            let edge = session.get_edge(edge_id);
3605            assert!(edge.is_some());
3606            let edge = edge.unwrap();
3607            assert_eq!(edge.id, edge_id);
3608            assert_eq!(edge.src, alix);
3609            assert_eq!(edge.dst, gus);
3610        }
3611
3612        #[test]
3613        fn test_get_edge_not_found() {
3614            use grafeo_common::types::EdgeId;
3615
3616            let db = GrafeoDB::new_in_memory();
3617            let session = db.session();
3618
3619            let edge = session.get_edge(EdgeId::new(9999));
3620            assert!(edge.is_none());
3621        }
3622
3623        #[test]
3624        fn test_get_neighbors_outgoing() {
3625            let db = GrafeoDB::new_in_memory();
3626            let session = db.session();
3627
3628            let alix = session.create_node(&["Person"]);
3629            let gus = session.create_node(&["Person"]);
3630            let harm = session.create_node(&["Person"]);
3631
3632            session.create_edge(alix, gus, "KNOWS");
3633            session.create_edge(alix, harm, "KNOWS");
3634
3635            let neighbors = session.get_neighbors_outgoing(alix);
3636            assert_eq!(neighbors.len(), 2);
3637
3638            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3639            assert!(neighbor_ids.contains(&gus));
3640            assert!(neighbor_ids.contains(&harm));
3641        }
3642
3643        #[test]
3644        fn test_get_neighbors_incoming() {
3645            let db = GrafeoDB::new_in_memory();
3646            let session = db.session();
3647
3648            let alix = session.create_node(&["Person"]);
3649            let gus = session.create_node(&["Person"]);
3650            let harm = session.create_node(&["Person"]);
3651
3652            session.create_edge(gus, alix, "KNOWS");
3653            session.create_edge(harm, alix, "KNOWS");
3654
3655            let neighbors = session.get_neighbors_incoming(alix);
3656            assert_eq!(neighbors.len(), 2);
3657
3658            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3659            assert!(neighbor_ids.contains(&gus));
3660            assert!(neighbor_ids.contains(&harm));
3661        }
3662
3663        #[test]
3664        fn test_get_neighbors_outgoing_by_type() {
3665            let db = GrafeoDB::new_in_memory();
3666            let session = db.session();
3667
3668            let alix = session.create_node(&["Person"]);
3669            let gus = session.create_node(&["Person"]);
3670            let company = session.create_node(&["Company"]);
3671
3672            session.create_edge(alix, gus, "KNOWS");
3673            session.create_edge(alix, company, "WORKS_AT");
3674
3675            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3676            assert_eq!(knows_neighbors.len(), 1);
3677            assert_eq!(knows_neighbors[0].0, gus);
3678
3679            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
3680            assert_eq!(works_neighbors.len(), 1);
3681            assert_eq!(works_neighbors[0].0, company);
3682
3683            // No edges of this type
3684            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
3685            assert!(no_neighbors.is_empty());
3686        }
3687
3688        #[test]
3689        fn test_node_exists() {
3690            use grafeo_common::types::NodeId;
3691
3692            let db = GrafeoDB::new_in_memory();
3693            let session = db.session();
3694
3695            let id = session.create_node(&["Person"]);
3696
3697            assert!(session.node_exists(id));
3698            assert!(!session.node_exists(NodeId::new(9999)));
3699        }
3700
3701        #[test]
3702        fn test_edge_exists() {
3703            use grafeo_common::types::EdgeId;
3704
3705            let db = GrafeoDB::new_in_memory();
3706            let session = db.session();
3707
3708            let alix = session.create_node(&["Person"]);
3709            let gus = session.create_node(&["Person"]);
3710            let edge_id = session.create_edge(alix, gus, "KNOWS");
3711
3712            assert!(session.edge_exists(edge_id));
3713            assert!(!session.edge_exists(EdgeId::new(9999)));
3714        }
3715
3716        #[test]
3717        fn test_get_degree() {
3718            let db = GrafeoDB::new_in_memory();
3719            let session = db.session();
3720
3721            let alix = session.create_node(&["Person"]);
3722            let gus = session.create_node(&["Person"]);
3723            let harm = session.create_node(&["Person"]);
3724
3725            // Alix knows Gus and Harm (2 outgoing)
3726            session.create_edge(alix, gus, "KNOWS");
3727            session.create_edge(alix, harm, "KNOWS");
3728            // Gus knows Alix (1 incoming for Alix)
3729            session.create_edge(gus, alix, "KNOWS");
3730
3731            let (out_degree, in_degree) = session.get_degree(alix);
3732            assert_eq!(out_degree, 2);
3733            assert_eq!(in_degree, 1);
3734
3735            // Node with no edges
3736            let lonely = session.create_node(&["Person"]);
3737            let (out, in_deg) = session.get_degree(lonely);
3738            assert_eq!(out, 0);
3739            assert_eq!(in_deg, 0);
3740        }
3741
3742        #[test]
3743        fn test_get_nodes_batch() {
3744            let db = GrafeoDB::new_in_memory();
3745            let session = db.session();
3746
3747            let alix = session.create_node(&["Person"]);
3748            let gus = session.create_node(&["Person"]);
3749            let harm = session.create_node(&["Person"]);
3750
3751            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
3752            assert_eq!(nodes.len(), 3);
3753            assert!(nodes[0].is_some());
3754            assert!(nodes[1].is_some());
3755            assert!(nodes[2].is_some());
3756
3757            // With non-existent node
3758            use grafeo_common::types::NodeId;
3759            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
3760            assert_eq!(nodes_with_missing.len(), 3);
3761            assert!(nodes_with_missing[0].is_some());
3762            assert!(nodes_with_missing[1].is_none()); // Missing node
3763            assert!(nodes_with_missing[2].is_some());
3764        }
3765
3766        #[test]
3767        fn test_auto_commit_setting() {
3768            let db = GrafeoDB::new_in_memory();
3769            let mut session = db.session();
3770
3771            // Default is auto-commit enabled
3772            assert!(session.auto_commit());
3773
3774            session.set_auto_commit(false);
3775            assert!(!session.auto_commit());
3776
3777            session.set_auto_commit(true);
3778            assert!(session.auto_commit());
3779        }
3780
3781        #[test]
3782        fn test_transaction_double_begin_nests() {
3783            let db = GrafeoDB::new_in_memory();
3784            let mut session = db.session();
3785
3786            session.begin_transaction().unwrap();
3787            // Second begin_transaction creates a nested transaction (auto-savepoint)
3788            let result = session.begin_transaction();
3789            assert!(result.is_ok());
3790            // Commit the inner (releases savepoint)
3791            session.commit().unwrap();
3792            // Commit the outer
3793            session.commit().unwrap();
3794        }
3795
3796        #[test]
3797        fn test_commit_without_transaction_error() {
3798            let db = GrafeoDB::new_in_memory();
3799            let mut session = db.session();
3800
3801            let result = session.commit();
3802            assert!(result.is_err());
3803        }
3804
3805        #[test]
3806        fn test_rollback_without_transaction_error() {
3807            let db = GrafeoDB::new_in_memory();
3808            let mut session = db.session();
3809
3810            let result = session.rollback();
3811            assert!(result.is_err());
3812        }
3813
3814        #[test]
3815        fn test_create_edge_in_transaction() {
3816            let db = GrafeoDB::new_in_memory();
3817            let mut session = db.session();
3818
3819            // Create nodes outside transaction
3820            let alix = session.create_node(&["Person"]);
3821            let gus = session.create_node(&["Person"]);
3822
3823            // Create edge in transaction
3824            session.begin_transaction().unwrap();
3825            let edge_id = session.create_edge(alix, gus, "KNOWS");
3826
3827            // Edge should be visible in the transaction
3828            assert!(session.edge_exists(edge_id));
3829
3830            // Commit
3831            session.commit().unwrap();
3832
3833            // Edge should still be visible
3834            assert!(session.edge_exists(edge_id));
3835        }
3836
3837        #[test]
3838        fn test_neighbors_empty_node() {
3839            let db = GrafeoDB::new_in_memory();
3840            let session = db.session();
3841
3842            let lonely = session.create_node(&["Person"]);
3843
3844            assert!(session.get_neighbors_outgoing(lonely).is_empty());
3845            assert!(session.get_neighbors_incoming(lonely).is_empty());
3846            assert!(
3847                session
3848                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
3849                    .is_empty()
3850            );
3851        }
3852    }
3853
3854    #[test]
3855    fn test_auto_gc_triggers_on_commit_interval() {
3856        use crate::config::Config;
3857
3858        let config = Config::in_memory().with_gc_interval(2);
3859        let db = GrafeoDB::with_config(config).unwrap();
3860        let mut session = db.session();
3861
3862        // First commit: counter = 1, no GC (not a multiple of 2)
3863        session.begin_transaction().unwrap();
3864        session.create_node(&["A"]);
3865        session.commit().unwrap();
3866
3867        // Second commit: counter = 2, GC should trigger (multiple of 2)
3868        session.begin_transaction().unwrap();
3869        session.create_node(&["B"]);
3870        session.commit().unwrap();
3871
3872        // Verify the database is still functional after GC
3873        assert_eq!(db.node_count(), 2);
3874    }
3875
3876    #[test]
3877    fn test_query_timeout_config_propagates_to_session() {
3878        use crate::config::Config;
3879        use std::time::Duration;
3880
3881        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
3882        let db = GrafeoDB::with_config(config).unwrap();
3883        let session = db.session();
3884
3885        // Verify the session has a query deadline (timeout was set)
3886        assert!(session.query_deadline().is_some());
3887    }
3888
3889    #[test]
3890    fn test_no_query_timeout_returns_no_deadline() {
3891        let db = GrafeoDB::new_in_memory();
3892        let session = db.session();
3893
3894        // Default config has no timeout
3895        assert!(session.query_deadline().is_none());
3896    }
3897
3898    #[test]
3899    fn test_graph_model_accessor() {
3900        use crate::config::GraphModel;
3901
3902        let db = GrafeoDB::new_in_memory();
3903        let session = db.session();
3904
3905        assert_eq!(session.graph_model(), GraphModel::Lpg);
3906    }
3907
3908    #[cfg(feature = "gql")]
3909    #[test]
3910    fn test_external_store_session() {
3911        use grafeo_core::graph::GraphStoreMut;
3912        use std::sync::Arc;
3913
3914        let config = crate::config::Config::in_memory();
3915        let store =
3916            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
3917        let db = GrafeoDB::with_store(store, config).unwrap();
3918
3919        let session = db.session();
3920
3921        // Create data through a query (goes through the external graph_store)
3922        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
3923
3924        // Verify we can query through it
3925        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
3926        assert_eq!(result.row_count(), 1);
3927    }
3928
3929    // ==================== Session Command Tests ====================
3930
3931    #[cfg(feature = "gql")]
3932    mod session_command_tests {
3933        use super::*;
3934
3935        #[test]
3936        fn test_use_graph_sets_current_graph() {
3937            let db = GrafeoDB::new_in_memory();
3938            let session = db.session();
3939
3940            // Create the graph first, then USE it
3941            session.execute("CREATE GRAPH mydb").unwrap();
3942            session.execute("USE GRAPH mydb").unwrap();
3943
3944            assert_eq!(session.current_graph(), Some("mydb".to_string()));
3945        }
3946
3947        #[test]
3948        fn test_use_graph_nonexistent_errors() {
3949            let db = GrafeoDB::new_in_memory();
3950            let session = db.session();
3951
3952            let result = session.execute("USE GRAPH doesnotexist");
3953            assert!(result.is_err());
3954            let err = result.unwrap_err().to_string();
3955            assert!(
3956                err.contains("does not exist"),
3957                "Expected 'does not exist' error, got: {err}"
3958            );
3959        }
3960
3961        #[test]
3962        fn test_use_graph_default_always_valid() {
3963            let db = GrafeoDB::new_in_memory();
3964            let session = db.session();
3965
3966            // "default" is always valid, even without CREATE GRAPH
3967            session.execute("USE GRAPH default").unwrap();
3968            assert_eq!(session.current_graph(), Some("default".to_string()));
3969        }
3970
3971        #[test]
3972        fn test_session_set_graph() {
3973            let db = GrafeoDB::new_in_memory();
3974            let session = db.session();
3975
3976            // SESSION SET GRAPH does not verify existence
3977            session.execute("SESSION SET GRAPH analytics").unwrap();
3978            assert_eq!(session.current_graph(), Some("analytics".to_string()));
3979        }
3980
3981        #[test]
3982        fn test_session_set_time_zone() {
3983            let db = GrafeoDB::new_in_memory();
3984            let session = db.session();
3985
3986            assert_eq!(session.time_zone(), None);
3987
3988            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3989            assert_eq!(session.time_zone(), Some("UTC".to_string()));
3990
3991            session
3992                .execute("SESSION SET TIME ZONE 'America/New_York'")
3993                .unwrap();
3994            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
3995        }
3996
3997        #[test]
3998        fn test_session_set_parameter() {
3999            let db = GrafeoDB::new_in_memory();
4000            let session = db.session();
4001
4002            session
4003                .execute("SESSION SET PARAMETER $timeout = 30")
4004                .unwrap();
4005
4006            // Parameter is stored (value is Null for now, since expression
4007            // evaluation is not yet wired up)
4008            assert!(session.get_parameter("timeout").is_some());
4009        }
4010
4011        #[test]
4012        fn test_session_reset_clears_all_state() {
4013            let db = GrafeoDB::new_in_memory();
4014            let session = db.session();
4015
4016            // Set various session state
4017            session.execute("SESSION SET GRAPH analytics").unwrap();
4018            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4019            session
4020                .execute("SESSION SET PARAMETER $limit = 100")
4021                .unwrap();
4022
4023            // Verify state was set
4024            assert!(session.current_graph().is_some());
4025            assert!(session.time_zone().is_some());
4026            assert!(session.get_parameter("limit").is_some());
4027
4028            // Reset everything
4029            session.execute("SESSION RESET").unwrap();
4030
4031            assert_eq!(session.current_graph(), None);
4032            assert_eq!(session.time_zone(), None);
4033            assert!(session.get_parameter("limit").is_none());
4034        }
4035
4036        #[test]
4037        fn test_session_close_clears_state() {
4038            let db = GrafeoDB::new_in_memory();
4039            let session = db.session();
4040
4041            session.execute("SESSION SET GRAPH analytics").unwrap();
4042            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4043
4044            session.execute("SESSION CLOSE").unwrap();
4045
4046            assert_eq!(session.current_graph(), None);
4047            assert_eq!(session.time_zone(), None);
4048        }
4049
4050        #[test]
4051        fn test_create_graph() {
4052            let db = GrafeoDB::new_in_memory();
4053            let session = db.session();
4054
4055            session.execute("CREATE GRAPH mydb").unwrap();
4056
4057            // Should be able to USE it now
4058            session.execute("USE GRAPH mydb").unwrap();
4059            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4060        }
4061
4062        #[test]
4063        fn test_create_graph_duplicate_errors() {
4064            let db = GrafeoDB::new_in_memory();
4065            let session = db.session();
4066
4067            session.execute("CREATE GRAPH mydb").unwrap();
4068            let result = session.execute("CREATE GRAPH mydb");
4069
4070            assert!(result.is_err());
4071            let err = result.unwrap_err().to_string();
4072            assert!(
4073                err.contains("already exists"),
4074                "Expected 'already exists' error, got: {err}"
4075            );
4076        }
4077
4078        #[test]
4079        fn test_create_graph_if_not_exists() {
4080            let db = GrafeoDB::new_in_memory();
4081            let session = db.session();
4082
4083            session.execute("CREATE GRAPH mydb").unwrap();
4084            // Should succeed silently with IF NOT EXISTS
4085            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
4086        }
4087
4088        #[test]
4089        fn test_drop_graph() {
4090            let db = GrafeoDB::new_in_memory();
4091            let session = db.session();
4092
4093            session.execute("CREATE GRAPH mydb").unwrap();
4094            session.execute("DROP GRAPH mydb").unwrap();
4095
4096            // Should no longer be usable
4097            let result = session.execute("USE GRAPH mydb");
4098            assert!(result.is_err());
4099        }
4100
4101        #[test]
4102        fn test_drop_graph_nonexistent_errors() {
4103            let db = GrafeoDB::new_in_memory();
4104            let session = db.session();
4105
4106            let result = session.execute("DROP GRAPH nosuchgraph");
4107            assert!(result.is_err());
4108            let err = result.unwrap_err().to_string();
4109            assert!(
4110                err.contains("does not exist"),
4111                "Expected 'does not exist' error, got: {err}"
4112            );
4113        }
4114
4115        #[test]
4116        fn test_drop_graph_if_exists() {
4117            let db = GrafeoDB::new_in_memory();
4118            let session = db.session();
4119
4120            // Should succeed silently with IF EXISTS
4121            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
4122        }
4123
4124        #[test]
4125        fn test_start_transaction_via_gql() {
4126            let db = GrafeoDB::new_in_memory();
4127            let session = db.session();
4128
4129            session.execute("START TRANSACTION").unwrap();
4130            assert!(session.in_transaction());
4131            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4132            session.execute("COMMIT").unwrap();
4133            assert!(!session.in_transaction());
4134
4135            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4136            assert_eq!(result.rows.len(), 1);
4137        }
4138
4139        #[test]
4140        fn test_start_transaction_read_only_blocks_insert() {
4141            let db = GrafeoDB::new_in_memory();
4142            let session = db.session();
4143
4144            session.execute("START TRANSACTION READ ONLY").unwrap();
4145            let result = session.execute("INSERT (:Person {name: 'Alix'})");
4146            assert!(result.is_err());
4147            let err = result.unwrap_err().to_string();
4148            assert!(
4149                err.contains("read-only"),
4150                "Expected read-only error, got: {err}"
4151            );
4152            session.execute("ROLLBACK").unwrap();
4153        }
4154
4155        #[test]
4156        fn test_start_transaction_read_only_allows_reads() {
4157            let db = GrafeoDB::new_in_memory();
4158            let mut session = db.session();
4159            session.begin_transaction().unwrap();
4160            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4161            session.commit().unwrap();
4162
4163            session.execute("START TRANSACTION READ ONLY").unwrap();
4164            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4165            assert_eq!(result.rows.len(), 1);
4166            session.execute("COMMIT").unwrap();
4167        }
4168
4169        #[test]
4170        fn test_rollback_via_gql() {
4171            let db = GrafeoDB::new_in_memory();
4172            let session = db.session();
4173
4174            session.execute("START TRANSACTION").unwrap();
4175            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
4176            session.execute("ROLLBACK").unwrap();
4177
4178            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
4179            assert!(result.rows.is_empty());
4180        }
4181
4182        #[test]
4183        fn test_start_transaction_with_isolation_level() {
4184            let db = GrafeoDB::new_in_memory();
4185            let session = db.session();
4186
4187            session
4188                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
4189                .unwrap();
4190            assert!(session.in_transaction());
4191            session.execute("ROLLBACK").unwrap();
4192        }
4193
4194        #[test]
4195        fn test_session_commands_return_empty_result() {
4196            let db = GrafeoDB::new_in_memory();
4197            let session = db.session();
4198
4199            let result = session.execute("SESSION SET GRAPH test").unwrap();
4200            assert_eq!(result.row_count(), 0);
4201            assert_eq!(result.column_count(), 0);
4202        }
4203
4204        #[test]
4205        fn test_current_graph_default_is_none() {
4206            let db = GrafeoDB::new_in_memory();
4207            let session = db.session();
4208
4209            assert_eq!(session.current_graph(), None);
4210        }
4211
4212        #[test]
4213        fn test_time_zone_default_is_none() {
4214            let db = GrafeoDB::new_in_memory();
4215            let session = db.session();
4216
4217            assert_eq!(session.time_zone(), None);
4218        }
4219
4220        #[test]
4221        fn test_session_state_independent_across_sessions() {
4222            let db = GrafeoDB::new_in_memory();
4223            let session1 = db.session();
4224            let session2 = db.session();
4225
4226            session1.execute("SESSION SET GRAPH first").unwrap();
4227            session2.execute("SESSION SET GRAPH second").unwrap();
4228
4229            assert_eq!(session1.current_graph(), Some("first".to_string()));
4230            assert_eq!(session2.current_graph(), Some("second".to_string()));
4231        }
4232    }
4233}