Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25/// Your handle to the database - execute queries and manage transactions.
26///
27/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
28/// tracks its own transaction state, so you can have multiple concurrent
29/// sessions without them interfering.
30pub struct Session {
31    /// The underlying store.
32    store: Arc<LpgStore>,
33    /// Graph store trait object for pluggable storage backends.
34    graph_store: Arc<dyn GraphStoreMut>,
35    /// Schema and metadata catalog shared across sessions.
36    catalog: Arc<Catalog>,
37    /// RDF triple store (if RDF feature is enabled).
38    #[cfg(feature = "rdf")]
39    rdf_store: Arc<RdfStore>,
40    /// Transaction manager.
41    tx_manager: Arc<TransactionManager>,
42    /// Query cache shared across sessions.
43    query_cache: Arc<QueryCache>,
44    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
45    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
46    /// from within `execute(&self)`.
47    current_tx: parking_lot::Mutex<Option<TxId>>,
48    /// Whether the current transaction is read-only (blocks mutations).
49    read_only_tx: parking_lot::Mutex<bool>,
50    /// Whether the session is in auto-commit mode.
51    auto_commit: bool,
52    /// Adaptive execution configuration.
53    #[allow(dead_code)]
54    adaptive_config: AdaptiveConfig,
55    /// Whether to use factorized execution for multi-hop queries.
56    factorized_execution: bool,
57    /// The graph data model this session operates on.
58    graph_model: GraphModel,
59    /// Maximum time a query may run before being cancelled.
60    query_timeout: Option<Duration>,
61    /// Shared commit counter for triggering auto-GC.
62    commit_counter: Arc<AtomicUsize>,
63    /// GC every N commits (0 = disabled).
64    gc_interval: usize,
65    /// Node count at the start of the current transaction (for PreparedCommit stats).
66    tx_start_node_count: AtomicUsize,
67    /// Edge count at the start of the current transaction (for PreparedCommit stats).
68    tx_start_edge_count: AtomicUsize,
69    /// WAL for logging schema changes.
70    #[cfg(feature = "wal")]
71    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
72    /// CDC log for change tracking.
73    #[cfg(feature = "cdc")]
74    cdc_log: Arc<crate::cdc::CdcLog>,
75    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
76    current_graph: parking_lot::Mutex<Option<String>>,
77    /// Session time zone override.
78    time_zone: parking_lot::Mutex<Option<String>>,
79    /// Session-level parameters (SET PARAMETER).
80    session_params:
81        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
82    /// Override epoch for time-travel queries (None = use transaction/current epoch).
83    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
84}
85
86impl Session {
87    /// Creates a new session with adaptive execution configuration.
88    #[allow(dead_code, clippy::too_many_arguments)]
89    pub(crate) fn with_adaptive(
90        store: Arc<LpgStore>,
91        tx_manager: Arc<TransactionManager>,
92        query_cache: Arc<QueryCache>,
93        catalog: Arc<Catalog>,
94        adaptive_config: AdaptiveConfig,
95        factorized_execution: bool,
96        graph_model: GraphModel,
97        query_timeout: Option<Duration>,
98        commit_counter: Arc<AtomicUsize>,
99        gc_interval: usize,
100    ) -> Self {
101        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
102        Self {
103            store,
104            graph_store,
105            catalog,
106            #[cfg(feature = "rdf")]
107            rdf_store: Arc::new(RdfStore::new()),
108            tx_manager,
109            query_cache,
110            current_tx: parking_lot::Mutex::new(None),
111            read_only_tx: parking_lot::Mutex::new(false),
112            auto_commit: true,
113            adaptive_config,
114            factorized_execution,
115            graph_model,
116            query_timeout,
117            commit_counter,
118            gc_interval,
119            tx_start_node_count: AtomicUsize::new(0),
120            tx_start_edge_count: AtomicUsize::new(0),
121            #[cfg(feature = "wal")]
122            wal: None,
123            #[cfg(feature = "cdc")]
124            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
125            current_graph: parking_lot::Mutex::new(None),
126            time_zone: parking_lot::Mutex::new(None),
127            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
128            viewing_epoch_override: parking_lot::Mutex::new(None),
129        }
130    }
131
132    /// Sets the WAL for this session (shared with the database).
133    #[cfg(feature = "wal")]
134    pub(crate) fn set_wal(&mut self, wal: Arc<grafeo_adapters::storage::wal::LpgWal>) {
135        self.wal = Some(wal);
136    }
137
138    /// Sets the CDC log for this session (shared with the database).
139    #[cfg(feature = "cdc")]
140    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
141        self.cdc_log = cdc_log;
142    }
143
144    /// Creates a new session with RDF store and adaptive configuration.
145    #[cfg(feature = "rdf")]
146    #[allow(clippy::too_many_arguments)]
147    pub(crate) fn with_rdf_store_and_adaptive(
148        store: Arc<LpgStore>,
149        rdf_store: Arc<RdfStore>,
150        tx_manager: Arc<TransactionManager>,
151        query_cache: Arc<QueryCache>,
152        catalog: Arc<Catalog>,
153        adaptive_config: AdaptiveConfig,
154        factorized_execution: bool,
155        graph_model: GraphModel,
156        query_timeout: Option<Duration>,
157        commit_counter: Arc<AtomicUsize>,
158        gc_interval: usize,
159    ) -> Self {
160        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
161        Self {
162            store,
163            graph_store,
164            catalog,
165            rdf_store,
166            tx_manager,
167            query_cache,
168            current_tx: parking_lot::Mutex::new(None),
169            read_only_tx: parking_lot::Mutex::new(false),
170            auto_commit: true,
171            adaptive_config,
172            factorized_execution,
173            graph_model,
174            query_timeout,
175            commit_counter,
176            gc_interval,
177            tx_start_node_count: AtomicUsize::new(0),
178            tx_start_edge_count: AtomicUsize::new(0),
179            #[cfg(feature = "wal")]
180            wal: None,
181            #[cfg(feature = "cdc")]
182            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
183            current_graph: parking_lot::Mutex::new(None),
184            time_zone: parking_lot::Mutex::new(None),
185            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
186            viewing_epoch_override: parking_lot::Mutex::new(None),
187        }
188    }
189
190    /// Creates a session backed by an external graph store.
191    ///
192    /// The external store handles all data operations. Transaction management
193    /// (begin/commit/rollback) is not supported for external stores.
194    #[allow(clippy::too_many_arguments)]
195    pub(crate) fn with_external_store(
196        store: Arc<dyn GraphStoreMut>,
197        tx_manager: Arc<TransactionManager>,
198        query_cache: Arc<QueryCache>,
199        catalog: Arc<Catalog>,
200        adaptive_config: AdaptiveConfig,
201        factorized_execution: bool,
202        graph_model: GraphModel,
203        query_timeout: Option<Duration>,
204        commit_counter: Arc<AtomicUsize>,
205        gc_interval: usize,
206    ) -> Self {
207        Self {
208            store: Arc::new(LpgStore::new()), // dummy for LpgStore-specific ops
209            graph_store: store,
210            catalog,
211            #[cfg(feature = "rdf")]
212            rdf_store: Arc::new(RdfStore::new()),
213            tx_manager,
214            query_cache,
215            current_tx: parking_lot::Mutex::new(None),
216            read_only_tx: parking_lot::Mutex::new(false),
217            auto_commit: true,
218            adaptive_config,
219            factorized_execution,
220            graph_model,
221            query_timeout,
222            commit_counter,
223            gc_interval,
224            tx_start_node_count: AtomicUsize::new(0),
225            tx_start_edge_count: AtomicUsize::new(0),
226            #[cfg(feature = "wal")]
227            wal: None,
228            #[cfg(feature = "cdc")]
229            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
230            current_graph: parking_lot::Mutex::new(None),
231            time_zone: parking_lot::Mutex::new(None),
232            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
233            viewing_epoch_override: parking_lot::Mutex::new(None),
234        }
235    }
236
237    /// Returns the graph model this session operates on.
238    #[must_use]
239    pub fn graph_model(&self) -> GraphModel {
240        self.graph_model
241    }
242
243    // === Session State Management ===
244
245    /// Sets the current graph for this session (USE GRAPH).
246    pub fn use_graph(&self, name: &str) {
247        *self.current_graph.lock() = Some(name.to_string());
248    }
249
250    /// Returns the current graph name, if any.
251    #[must_use]
252    pub fn current_graph(&self) -> Option<String> {
253        self.current_graph.lock().clone()
254    }
255
256    /// Sets the session time zone.
257    pub fn set_time_zone(&self, tz: &str) {
258        *self.time_zone.lock() = Some(tz.to_string());
259    }
260
261    /// Returns the session time zone, if set.
262    #[must_use]
263    pub fn time_zone(&self) -> Option<String> {
264        self.time_zone.lock().clone()
265    }
266
267    /// Sets a session parameter.
268    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
269        self.session_params.lock().insert(key.to_string(), value);
270    }
271
272    /// Gets a session parameter by cloning it.
273    #[must_use]
274    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
275        self.session_params.lock().get(key).cloned()
276    }
277
278    /// Resets all session state to defaults.
279    pub fn reset_session(&self) {
280        *self.current_graph.lock() = None;
281        *self.time_zone.lock() = None;
282        self.session_params.lock().clear();
283        *self.viewing_epoch_override.lock() = None;
284    }
285
286    // --- Time-travel API ---
287
288    /// Sets a viewing epoch override for time-travel queries.
289    ///
290    /// While set, all queries on this session see the database as it existed
291    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
292    /// to return to normal behavior.
293    pub fn set_viewing_epoch(&self, epoch: EpochId) {
294        *self.viewing_epoch_override.lock() = Some(epoch);
295    }
296
297    /// Clears the viewing epoch override, returning to normal behavior.
298    pub fn clear_viewing_epoch(&self) {
299        *self.viewing_epoch_override.lock() = None;
300    }
301
302    /// Returns the current viewing epoch override, if any.
303    #[must_use]
304    pub fn viewing_epoch(&self) -> Option<EpochId> {
305        *self.viewing_epoch_override.lock()
306    }
307
308    /// Returns all versions of a node with their creation/deletion epochs.
309    ///
310    /// Properties and labels reflect the current state (not versioned per-epoch).
311    #[must_use]
312    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
313        self.store.get_node_history(id)
314    }
315
316    /// Returns all versions of an edge with their creation/deletion epochs.
317    ///
318    /// Properties reflect the current state (not versioned per-epoch).
319    #[must_use]
320    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
321        self.store.get_edge_history(id)
322    }
323
324    /// Checks that the session's graph model supports LPG operations.
325    fn require_lpg(&self, language: &str) -> Result<()> {
326        if self.graph_model == GraphModel::Rdf {
327            return Err(grafeo_common::utils::error::Error::Internal(format!(
328                "This is an RDF database. {language} queries require an LPG database."
329            )));
330        }
331        Ok(())
332    }
333
334    /// Executes a session or transaction command, returning an empty result.
335    #[cfg(feature = "gql")]
336    fn execute_session_command(
337        &self,
338        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
339    ) -> Result<QueryResult> {
340        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
341        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
342
343        match cmd {
344            SessionCommand::CreateGraph {
345                name,
346                if_not_exists,
347                typed,
348            } => {
349                let created = self.store.create_graph(&name);
350                if !created && !if_not_exists {
351                    return Err(Error::Query(QueryError::new(
352                        QueryErrorKind::Semantic,
353                        format!("Graph '{name}' already exists"),
354                    )));
355                }
356                // Bind to graph type if specified
357                if let Some(type_name) = typed
358                    && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
359                {
360                    return Err(Error::Query(QueryError::new(
361                        QueryErrorKind::Semantic,
362                        e.to_string(),
363                    )));
364                }
365                Ok(QueryResult::empty())
366            }
367            SessionCommand::DropGraph { name, if_exists } => {
368                let dropped = self.store.drop_graph(&name);
369                if !dropped && !if_exists {
370                    return Err(Error::Query(QueryError::new(
371                        QueryErrorKind::Semantic,
372                        format!("Graph '{name}' does not exist"),
373                    )));
374                }
375                Ok(QueryResult::empty())
376            }
377            SessionCommand::UseGraph(name) => {
378                // Verify graph exists (default graph is always valid)
379                if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
380                    return Err(Error::Query(QueryError::new(
381                        QueryErrorKind::Semantic,
382                        format!("Graph '{name}' does not exist"),
383                    )));
384                }
385                self.use_graph(&name);
386                Ok(QueryResult::empty())
387            }
388            SessionCommand::SessionSetGraph(name) => {
389                self.use_graph(&name);
390                Ok(QueryResult::empty())
391            }
392            SessionCommand::SessionSetTimeZone(tz) => {
393                self.set_time_zone(&tz);
394                Ok(QueryResult::empty())
395            }
396            SessionCommand::SessionSetParameter(key, expr) => {
397                if key.eq_ignore_ascii_case("viewing_epoch") {
398                    match Self::eval_integer_literal(&expr) {
399                        Some(n) if n >= 0 => {
400                            self.set_viewing_epoch(EpochId::new(n as u64));
401                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
402                        }
403                        _ => Err(Error::Query(QueryError::new(
404                            QueryErrorKind::Semantic,
405                            "viewing_epoch must be a non-negative integer literal",
406                        ))),
407                    }
408                } else {
409                    // For now, store parameter name with Null value.
410                    // Full expression evaluation would require building and executing a plan.
411                    self.set_parameter(&key, Value::Null);
412                    Ok(QueryResult::empty())
413                }
414            }
415            SessionCommand::SessionReset => {
416                self.reset_session();
417                Ok(QueryResult::empty())
418            }
419            SessionCommand::SessionClose => {
420                self.reset_session();
421                Ok(QueryResult::empty())
422            }
423            SessionCommand::StartTransaction {
424                read_only,
425                isolation_level,
426            } => {
427                let engine_level = isolation_level.map(|l| match l {
428                    TransactionIsolationLevel::ReadCommitted => {
429                        crate::transaction::IsolationLevel::ReadCommitted
430                    }
431                    TransactionIsolationLevel::SnapshotIsolation => {
432                        crate::transaction::IsolationLevel::SnapshotIsolation
433                    }
434                    TransactionIsolationLevel::Serializable => {
435                        crate::transaction::IsolationLevel::Serializable
436                    }
437                });
438                self.begin_tx_inner(read_only, engine_level)?;
439                Ok(QueryResult::status("Transaction started"))
440            }
441            SessionCommand::Commit => {
442                self.commit_inner()?;
443                Ok(QueryResult::status("Transaction committed"))
444            }
445            SessionCommand::Rollback => {
446                self.rollback_inner()?;
447                Ok(QueryResult::status("Transaction rolled back"))
448            }
449        }
450    }
451
452    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
453    #[cfg(feature = "wal")]
454    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
455        if let Some(ref wal) = self.wal
456            && let Err(e) = wal.log(record)
457        {
458            tracing::warn!("Failed to log schema change to WAL: {}", e);
459        }
460    }
461
462    /// Executes a schema DDL command, returning a status result.
463    #[cfg(feature = "gql")]
464    fn execute_schema_command(
465        &self,
466        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
467    ) -> Result<QueryResult> {
468        use crate::catalog::{
469            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
470        };
471        use grafeo_adapters::query::gql::ast::SchemaStatement;
472        #[cfg(feature = "wal")]
473        use grafeo_adapters::storage::wal::WalRecord;
474        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
475
476        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
477        macro_rules! wal_log {
478            ($self:expr, $record:expr) => {
479                #[cfg(feature = "wal")]
480                $self.log_schema_wal(&$record);
481            };
482        }
483
484        match cmd {
485            SchemaStatement::CreateNodeType(stmt) => {
486                #[cfg(feature = "wal")]
487                let props_for_wal: Vec<(String, String, bool)> = stmt
488                    .properties
489                    .iter()
490                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
491                    .collect();
492                let def = NodeTypeDefinition {
493                    name: stmt.name.clone(),
494                    properties: stmt
495                        .properties
496                        .iter()
497                        .map(|p| TypedProperty {
498                            name: p.name.clone(),
499                            data_type: PropertyDataType::from_type_name(&p.data_type),
500                            nullable: p.nullable,
501                            default_value: None,
502                        })
503                        .collect(),
504                    constraints: Vec::new(),
505                };
506                let result = if stmt.or_replace {
507                    let _ = self.catalog.drop_node_type(&stmt.name);
508                    self.catalog.register_node_type(def)
509                } else {
510                    self.catalog.register_node_type(def)
511                };
512                match result {
513                    Ok(()) => {
514                        wal_log!(
515                            self,
516                            WalRecord::CreateNodeType {
517                                name: stmt.name.clone(),
518                                properties: props_for_wal,
519                                constraints: Vec::new(),
520                            }
521                        );
522                        Ok(QueryResult::status(format!(
523                            "Created node type '{}'",
524                            stmt.name
525                        )))
526                    }
527                    Err(e) if stmt.if_not_exists => {
528                        let _ = e;
529                        Ok(QueryResult::status("No change"))
530                    }
531                    Err(e) => Err(Error::Query(QueryError::new(
532                        QueryErrorKind::Semantic,
533                        e.to_string(),
534                    ))),
535                }
536            }
537            SchemaStatement::CreateEdgeType(stmt) => {
538                #[cfg(feature = "wal")]
539                let props_for_wal: Vec<(String, String, bool)> = stmt
540                    .properties
541                    .iter()
542                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
543                    .collect();
544                let def = EdgeTypeDefinition {
545                    name: stmt.name.clone(),
546                    properties: stmt
547                        .properties
548                        .iter()
549                        .map(|p| TypedProperty {
550                            name: p.name.clone(),
551                            data_type: PropertyDataType::from_type_name(&p.data_type),
552                            nullable: p.nullable,
553                            default_value: None,
554                        })
555                        .collect(),
556                    constraints: Vec::new(),
557                };
558                let result = if stmt.or_replace {
559                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
560                    self.catalog.register_edge_type_def(def)
561                } else {
562                    self.catalog.register_edge_type_def(def)
563                };
564                match result {
565                    Ok(()) => {
566                        wal_log!(
567                            self,
568                            WalRecord::CreateEdgeType {
569                                name: stmt.name.clone(),
570                                properties: props_for_wal,
571                                constraints: Vec::new(),
572                            }
573                        );
574                        Ok(QueryResult::status(format!(
575                            "Created edge type '{}'",
576                            stmt.name
577                        )))
578                    }
579                    Err(e) if stmt.if_not_exists => {
580                        let _ = e;
581                        Ok(QueryResult::status("No change"))
582                    }
583                    Err(e) => Err(Error::Query(QueryError::new(
584                        QueryErrorKind::Semantic,
585                        e.to_string(),
586                    ))),
587                }
588            }
589            SchemaStatement::CreateVectorIndex(stmt) => {
590                Self::create_vector_index_on_store(
591                    &self.store,
592                    &stmt.node_label,
593                    &stmt.property,
594                    stmt.dimensions,
595                    stmt.metric.as_deref(),
596                )?;
597                wal_log!(
598                    self,
599                    WalRecord::CreateIndex {
600                        name: stmt.name.clone(),
601                        label: stmt.node_label.clone(),
602                        property: stmt.property.clone(),
603                        index_type: "vector".to_string(),
604                    }
605                );
606                Ok(QueryResult::status(format!(
607                    "Created vector index '{}'",
608                    stmt.name
609                )))
610            }
611            SchemaStatement::DropNodeType { name, if_exists } => {
612                match self.catalog.drop_node_type(&name) {
613                    Ok(()) => {
614                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
615                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
616                    }
617                    Err(e) if if_exists => {
618                        let _ = e;
619                        Ok(QueryResult::status("No change"))
620                    }
621                    Err(e) => Err(Error::Query(QueryError::new(
622                        QueryErrorKind::Semantic,
623                        e.to_string(),
624                    ))),
625                }
626            }
627            SchemaStatement::DropEdgeType { name, if_exists } => {
628                match self.catalog.drop_edge_type_def(&name) {
629                    Ok(()) => {
630                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
631                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
632                    }
633                    Err(e) if if_exists => {
634                        let _ = e;
635                        Ok(QueryResult::status("No change"))
636                    }
637                    Err(e) => Err(Error::Query(QueryError::new(
638                        QueryErrorKind::Semantic,
639                        e.to_string(),
640                    ))),
641                }
642            }
643            SchemaStatement::CreateIndex(stmt) => {
644                use grafeo_adapters::query::gql::ast::IndexKind;
645                let index_type_str = match stmt.index_kind {
646                    IndexKind::Property => "property",
647                    IndexKind::BTree => "btree",
648                    IndexKind::Text => "text",
649                    IndexKind::Vector => "vector",
650                };
651                match stmt.index_kind {
652                    IndexKind::Property | IndexKind::BTree => {
653                        for prop in &stmt.properties {
654                            self.store.create_property_index(prop);
655                        }
656                    }
657                    IndexKind::Text => {
658                        for prop in &stmt.properties {
659                            Self::create_text_index_on_store(&self.store, &stmt.label, prop)?;
660                        }
661                    }
662                    IndexKind::Vector => {
663                        for prop in &stmt.properties {
664                            Self::create_vector_index_on_store(
665                                &self.store,
666                                &stmt.label,
667                                prop,
668                                stmt.options.dimensions,
669                                stmt.options.metric.as_deref(),
670                            )?;
671                        }
672                    }
673                }
674                #[cfg(feature = "wal")]
675                for prop in &stmt.properties {
676                    wal_log!(
677                        self,
678                        WalRecord::CreateIndex {
679                            name: stmt.name.clone(),
680                            label: stmt.label.clone(),
681                            property: prop.clone(),
682                            index_type: index_type_str.to_string(),
683                        }
684                    );
685                }
686                Ok(QueryResult::status(format!(
687                    "Created {} index '{}'",
688                    index_type_str, stmt.name
689                )))
690            }
691            SchemaStatement::DropIndex { name, if_exists } => {
692                // Try to drop property index by name
693                let dropped = self.store.drop_property_index(&name);
694                if dropped || if_exists {
695                    if dropped {
696                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
697                    }
698                    Ok(QueryResult::status(if dropped {
699                        format!("Dropped index '{name}'")
700                    } else {
701                        "No change".to_string()
702                    }))
703                } else {
704                    Err(Error::Query(QueryError::new(
705                        QueryErrorKind::Semantic,
706                        format!("Index '{name}' does not exist"),
707                    )))
708                }
709            }
710            SchemaStatement::CreateConstraint(stmt) => {
711                use grafeo_adapters::query::gql::ast::ConstraintKind;
712                let kind_str = match stmt.constraint_kind {
713                    ConstraintKind::Unique => "unique",
714                    ConstraintKind::NodeKey => "node_key",
715                    ConstraintKind::NotNull => "not_null",
716                    ConstraintKind::Exists => "exists",
717                };
718                let constraint_name = stmt
719                    .name
720                    .clone()
721                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
722                wal_log!(
723                    self,
724                    WalRecord::CreateConstraint {
725                        name: constraint_name.clone(),
726                        label: stmt.label.clone(),
727                        properties: stmt.properties.clone(),
728                        kind: kind_str.to_string(),
729                    }
730                );
731                Ok(QueryResult::status(format!(
732                    "Created {kind_str} constraint '{constraint_name}'"
733                )))
734            }
735            SchemaStatement::DropConstraint { name, if_exists } => {
736                let _ = if_exists;
737                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
738                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
739            }
740            SchemaStatement::CreateGraphType(stmt) => {
741                use crate::catalog::GraphTypeDefinition;
742                let def = GraphTypeDefinition {
743                    name: stmt.name.clone(),
744                    allowed_node_types: stmt.node_types.clone(),
745                    allowed_edge_types: stmt.edge_types.clone(),
746                    open: stmt.open,
747                };
748                let result = if stmt.or_replace {
749                    // Drop existing first, ignore error if not found
750                    let _ = self.catalog.drop_graph_type(&stmt.name);
751                    self.catalog.register_graph_type(def)
752                } else {
753                    self.catalog.register_graph_type(def)
754                };
755                match result {
756                    Ok(()) => {
757                        wal_log!(
758                            self,
759                            WalRecord::CreateGraphType {
760                                name: stmt.name.clone(),
761                                node_types: stmt.node_types,
762                                edge_types: stmt.edge_types,
763                                open: stmt.open,
764                            }
765                        );
766                        Ok(QueryResult::status(format!(
767                            "Created graph type '{}'",
768                            stmt.name
769                        )))
770                    }
771                    Err(e) if stmt.if_not_exists => {
772                        let _ = e;
773                        Ok(QueryResult::status("No change"))
774                    }
775                    Err(e) => Err(Error::Query(QueryError::new(
776                        QueryErrorKind::Semantic,
777                        e.to_string(),
778                    ))),
779                }
780            }
781            SchemaStatement::DropGraphType { name, if_exists } => {
782                match self.catalog.drop_graph_type(&name) {
783                    Ok(()) => {
784                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
785                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
786                    }
787                    Err(e) if if_exists => {
788                        let _ = e;
789                        Ok(QueryResult::status("No change"))
790                    }
791                    Err(e) => Err(Error::Query(QueryError::new(
792                        QueryErrorKind::Semantic,
793                        e.to_string(),
794                    ))),
795                }
796            }
797            SchemaStatement::CreateSchema {
798                name,
799                if_not_exists,
800            } => match self.catalog.register_schema_namespace(name.clone()) {
801                Ok(()) => {
802                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
803                    Ok(QueryResult::status(format!("Created schema '{name}'")))
804                }
805                Err(e) if if_not_exists => {
806                    let _ = e;
807                    Ok(QueryResult::status("No change"))
808                }
809                Err(e) => Err(Error::Query(QueryError::new(
810                    QueryErrorKind::Semantic,
811                    e.to_string(),
812                ))),
813            },
814            SchemaStatement::DropSchema { name, if_exists } => {
815                match self.catalog.drop_schema_namespace(&name) {
816                    Ok(()) => {
817                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
818                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
819                    }
820                    Err(e) if if_exists => {
821                        let _ = e;
822                        Ok(QueryResult::status("No change"))
823                    }
824                    Err(e) => Err(Error::Query(QueryError::new(
825                        QueryErrorKind::Semantic,
826                        e.to_string(),
827                    ))),
828                }
829            }
830            SchemaStatement::AlterNodeType(stmt) => {
831                use grafeo_adapters::query::gql::ast::TypeAlteration;
832                let mut wal_alts = Vec::new();
833                for alt in &stmt.alterations {
834                    match alt {
835                        TypeAlteration::AddProperty(prop) => {
836                            let typed = TypedProperty {
837                                name: prop.name.clone(),
838                                data_type: PropertyDataType::from_type_name(&prop.data_type),
839                                nullable: prop.nullable,
840                                default_value: None,
841                            };
842                            self.catalog
843                                .alter_node_type_add_property(&stmt.name, typed)
844                                .map_err(|e| {
845                                    Error::Query(QueryError::new(
846                                        QueryErrorKind::Semantic,
847                                        e.to_string(),
848                                    ))
849                                })?;
850                            wal_alts.push((
851                                "add".to_string(),
852                                prop.name.clone(),
853                                prop.data_type.clone(),
854                                prop.nullable,
855                            ));
856                        }
857                        TypeAlteration::DropProperty(name) => {
858                            self.catalog
859                                .alter_node_type_drop_property(&stmt.name, name)
860                                .map_err(|e| {
861                                    Error::Query(QueryError::new(
862                                        QueryErrorKind::Semantic,
863                                        e.to_string(),
864                                    ))
865                                })?;
866                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
867                        }
868                    }
869                }
870                wal_log!(
871                    self,
872                    WalRecord::AlterNodeType {
873                        name: stmt.name.clone(),
874                        alterations: wal_alts,
875                    }
876                );
877                Ok(QueryResult::status(format!(
878                    "Altered node type '{}'",
879                    stmt.name
880                )))
881            }
882            SchemaStatement::AlterEdgeType(stmt) => {
883                use grafeo_adapters::query::gql::ast::TypeAlteration;
884                let mut wal_alts = Vec::new();
885                for alt in &stmt.alterations {
886                    match alt {
887                        TypeAlteration::AddProperty(prop) => {
888                            let typed = TypedProperty {
889                                name: prop.name.clone(),
890                                data_type: PropertyDataType::from_type_name(&prop.data_type),
891                                nullable: prop.nullable,
892                                default_value: None,
893                            };
894                            self.catalog
895                                .alter_edge_type_add_property(&stmt.name, typed)
896                                .map_err(|e| {
897                                    Error::Query(QueryError::new(
898                                        QueryErrorKind::Semantic,
899                                        e.to_string(),
900                                    ))
901                                })?;
902                            wal_alts.push((
903                                "add".to_string(),
904                                prop.name.clone(),
905                                prop.data_type.clone(),
906                                prop.nullable,
907                            ));
908                        }
909                        TypeAlteration::DropProperty(name) => {
910                            self.catalog
911                                .alter_edge_type_drop_property(&stmt.name, name)
912                                .map_err(|e| {
913                                    Error::Query(QueryError::new(
914                                        QueryErrorKind::Semantic,
915                                        e.to_string(),
916                                    ))
917                                })?;
918                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
919                        }
920                    }
921                }
922                wal_log!(
923                    self,
924                    WalRecord::AlterEdgeType {
925                        name: stmt.name.clone(),
926                        alterations: wal_alts,
927                    }
928                );
929                Ok(QueryResult::status(format!(
930                    "Altered edge type '{}'",
931                    stmt.name
932                )))
933            }
934            SchemaStatement::AlterGraphType(stmt) => {
935                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
936                let mut wal_alts = Vec::new();
937                for alt in &stmt.alterations {
938                    match alt {
939                        GraphTypeAlteration::AddNodeType(name) => {
940                            self.catalog
941                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
942                                .map_err(|e| {
943                                    Error::Query(QueryError::new(
944                                        QueryErrorKind::Semantic,
945                                        e.to_string(),
946                                    ))
947                                })?;
948                            wal_alts.push(("add_node_type".to_string(), name.clone()));
949                        }
950                        GraphTypeAlteration::DropNodeType(name) => {
951                            self.catalog
952                                .alter_graph_type_drop_node_type(&stmt.name, name)
953                                .map_err(|e| {
954                                    Error::Query(QueryError::new(
955                                        QueryErrorKind::Semantic,
956                                        e.to_string(),
957                                    ))
958                                })?;
959                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
960                        }
961                        GraphTypeAlteration::AddEdgeType(name) => {
962                            self.catalog
963                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
964                                .map_err(|e| {
965                                    Error::Query(QueryError::new(
966                                        QueryErrorKind::Semantic,
967                                        e.to_string(),
968                                    ))
969                                })?;
970                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
971                        }
972                        GraphTypeAlteration::DropEdgeType(name) => {
973                            self.catalog
974                                .alter_graph_type_drop_edge_type(&stmt.name, name)
975                                .map_err(|e| {
976                                    Error::Query(QueryError::new(
977                                        QueryErrorKind::Semantic,
978                                        e.to_string(),
979                                    ))
980                                })?;
981                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
982                        }
983                    }
984                }
985                wal_log!(
986                    self,
987                    WalRecord::AlterGraphType {
988                        name: stmt.name.clone(),
989                        alterations: wal_alts,
990                    }
991                );
992                Ok(QueryResult::status(format!(
993                    "Altered graph type '{}'",
994                    stmt.name
995                )))
996            }
997            SchemaStatement::CreateProcedure(stmt) => {
998                use crate::catalog::ProcedureDefinition;
999
1000                let def = ProcedureDefinition {
1001                    name: stmt.name.clone(),
1002                    params: stmt
1003                        .params
1004                        .iter()
1005                        .map(|p| (p.name.clone(), p.param_type.clone()))
1006                        .collect(),
1007                    returns: stmt
1008                        .returns
1009                        .iter()
1010                        .map(|r| (r.name.clone(), r.return_type.clone()))
1011                        .collect(),
1012                    body: stmt.body.clone(),
1013                };
1014
1015                if stmt.or_replace {
1016                    self.catalog.replace_procedure(def).map_err(|e| {
1017                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1018                    })?;
1019                } else {
1020                    match self.catalog.register_procedure(def) {
1021                        Ok(()) => {}
1022                        Err(_) if stmt.if_not_exists => {
1023                            return Ok(QueryResult::empty());
1024                        }
1025                        Err(e) => {
1026                            return Err(Error::Query(QueryError::new(
1027                                QueryErrorKind::Semantic,
1028                                e.to_string(),
1029                            )));
1030                        }
1031                    }
1032                }
1033
1034                wal_log!(
1035                    self,
1036                    WalRecord::CreateProcedure {
1037                        name: stmt.name.clone(),
1038                        params: stmt
1039                            .params
1040                            .iter()
1041                            .map(|p| (p.name.clone(), p.param_type.clone()))
1042                            .collect(),
1043                        returns: stmt
1044                            .returns
1045                            .iter()
1046                            .map(|r| (r.name.clone(), r.return_type.clone()))
1047                            .collect(),
1048                        body: stmt.body,
1049                    }
1050                );
1051                Ok(QueryResult::status(format!(
1052                    "Created procedure '{}'",
1053                    stmt.name
1054                )))
1055            }
1056            SchemaStatement::DropProcedure { name, if_exists } => {
1057                match self.catalog.drop_procedure(&name) {
1058                    Ok(()) => {}
1059                    Err(_) if if_exists => {
1060                        return Ok(QueryResult::empty());
1061                    }
1062                    Err(e) => {
1063                        return Err(Error::Query(QueryError::new(
1064                            QueryErrorKind::Semantic,
1065                            e.to_string(),
1066                        )));
1067                    }
1068                }
1069                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1070                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1071            }
1072        }
1073    }
1074
1075    /// Creates a vector index on the store by scanning existing nodes.
1076    #[cfg(all(feature = "gql", feature = "vector-index"))]
1077    fn create_vector_index_on_store(
1078        store: &LpgStore,
1079        label: &str,
1080        property: &str,
1081        dimensions: Option<usize>,
1082        metric: Option<&str>,
1083    ) -> Result<()> {
1084        use grafeo_common::types::{PropertyKey, Value};
1085        use grafeo_common::utils::error::Error;
1086        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1087
1088        let metric = match metric {
1089            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1090                Error::Internal(format!(
1091                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1092                ))
1093            })?,
1094            None => DistanceMetric::Cosine,
1095        };
1096
1097        let prop_key = PropertyKey::new(property);
1098        let mut found_dims: Option<usize> = dimensions;
1099        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1100
1101        for node in store.nodes_with_label(label) {
1102            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1103                if let Some(expected) = found_dims {
1104                    if v.len() != expected {
1105                        return Err(Error::Internal(format!(
1106                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1107                            v.len(),
1108                            node.id.0
1109                        )));
1110                    }
1111                } else {
1112                    found_dims = Some(v.len());
1113                }
1114                vectors.push((node.id, v.to_vec()));
1115            }
1116        }
1117
1118        let Some(dims) = found_dims else {
1119            return Err(Error::Internal(format!(
1120                "No vector properties found on :{label}({property}) and no dimensions specified"
1121            )));
1122        };
1123
1124        let config = HnswConfig::new(dims, metric);
1125        let index = HnswIndex::with_capacity(config, vectors.len());
1126        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1127        for (node_id, vec) in &vectors {
1128            index.insert(*node_id, vec, &accessor);
1129        }
1130
1131        store.add_vector_index(label, property, Arc::new(index));
1132        Ok(())
1133    }
1134
1135    /// Stub for when vector-index feature is not enabled.
1136    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1137    fn create_vector_index_on_store(
1138        _store: &LpgStore,
1139        _label: &str,
1140        _property: &str,
1141        _dimensions: Option<usize>,
1142        _metric: Option<&str>,
1143    ) -> Result<()> {
1144        Err(grafeo_common::utils::error::Error::Internal(
1145            "Vector index support requires the 'vector-index' feature".to_string(),
1146        ))
1147    }
1148
1149    /// Creates a text index on the store by scanning existing nodes.
1150    #[cfg(all(feature = "gql", feature = "text-index"))]
1151    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1152        use grafeo_common::types::{PropertyKey, Value};
1153        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1154
1155        let mut index = InvertedIndex::new(BM25Config::default());
1156        let prop_key = PropertyKey::new(property);
1157
1158        let nodes = store.nodes_by_label(label);
1159        for node_id in nodes {
1160            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1161                index.insert(node_id, text.as_str());
1162            }
1163        }
1164
1165        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1166        Ok(())
1167    }
1168
1169    /// Stub for when text-index feature is not enabled.
1170    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1171    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1172        Err(grafeo_common::utils::error::Error::Internal(
1173            "Text index support requires the 'text-index' feature".to_string(),
1174        ))
1175    }
1176
1177    /// Executes a GQL query.
1178    ///
1179    /// # Errors
1180    ///
1181    /// Returns an error if the query fails to parse or execute.
1182    ///
1183    /// # Examples
1184    ///
1185    /// ```no_run
1186    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1187    /// use grafeo_engine::GrafeoDB;
1188    ///
1189    /// let db = GrafeoDB::new_in_memory();
1190    /// let session = db.session();
1191    ///
1192    /// // Create a node
1193    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
1194    ///
1195    /// // Query nodes
1196    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
1197    /// for row in &result.rows {
1198    ///     println!("{:?}", row);
1199    /// }
1200    /// # Ok(())
1201    /// # }
1202    /// ```
1203    #[cfg(feature = "gql")]
1204    pub fn execute(&self, query: &str) -> Result<QueryResult> {
1205        self.require_lpg("GQL")?;
1206
1207        use crate::query::{
1208            Executor, binder::Binder, cache::CacheKey, gql_translator, optimizer::Optimizer,
1209            processor::QueryLanguage,
1210        };
1211
1212        let start_time = std::time::Instant::now();
1213
1214        // Parse and translate, checking for session/schema commands first
1215        let translation = gql_translator::translate_full(query)?;
1216        let logical_plan = match translation {
1217            gql_translator::GqlTranslationResult::SessionCommand(cmd) => {
1218                return self.execute_session_command(cmd);
1219            }
1220            gql_translator::GqlTranslationResult::SchemaCommand(cmd) => {
1221                // All DDL is a write operation
1222                if *self.read_only_tx.lock() {
1223                    return Err(grafeo_common::utils::error::Error::Transaction(
1224                        grafeo_common::utils::error::TransactionError::ReadOnly,
1225                    ));
1226                }
1227                return self.execute_schema_command(cmd);
1228            }
1229            gql_translator::GqlTranslationResult::Plan(plan) => {
1230                // Block mutations in read-only transactions
1231                if *self.read_only_tx.lock() && plan.root.has_mutations() {
1232                    return Err(grafeo_common::utils::error::Error::Transaction(
1233                        grafeo_common::utils::error::TransactionError::ReadOnly,
1234                    ));
1235                }
1236                plan
1237            }
1238        };
1239
1240        // Create cache key for this query
1241        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
1242
1243        // Try to get cached optimized plan, or use the plan we just translated
1244        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1245            cached_plan
1246        } else {
1247            // Semantic validation
1248            let mut binder = Binder::new();
1249            let _binding_context = binder.bind(&logical_plan)?;
1250
1251            // Optimize the plan
1252            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1253            let plan = optimizer.optimize(logical_plan)?;
1254
1255            // Cache the optimized plan for future use
1256            self.query_cache.put_optimized(cache_key, plan.clone());
1257
1258            plan
1259        };
1260
1261        // Get transaction context for MVCC visibility
1262        let (viewing_epoch, tx_id) = self.get_transaction_context();
1263
1264        // Convert to physical plan with transaction context
1265        // (Physical planning cannot be cached as it depends on transaction state)
1266        let planner = self.create_planner(viewing_epoch, tx_id);
1267        let mut physical_plan = planner.plan(&optimized_plan)?;
1268
1269        // Execute the plan
1270        let executor = Executor::with_columns(physical_plan.columns.clone())
1271            .with_deadline(self.query_deadline());
1272        let mut result = executor.execute(physical_plan.operator.as_mut())?;
1273
1274        // Add execution metrics
1275        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1276        let rows_scanned = result.rows.len() as u64;
1277        result.execution_time_ms = Some(elapsed_ms);
1278        result.rows_scanned = Some(rows_scanned);
1279
1280        Ok(result)
1281    }
1282
1283    /// Executes a GQL query with visibility at the specified epoch.
1284    ///
1285    /// This enables time-travel queries: the query sees the database
1286    /// as it existed at the given epoch.
1287    ///
1288    /// # Errors
1289    ///
1290    /// Returns an error if parsing or execution fails.
1291    #[cfg(feature = "gql")]
1292    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
1293        let previous = self.viewing_epoch_override.lock().replace(epoch);
1294        let result = self.execute(query);
1295        *self.viewing_epoch_override.lock() = previous;
1296        result
1297    }
1298
1299    /// Executes a GQL query with parameters.
1300    ///
1301    /// # Errors
1302    ///
1303    /// Returns an error if the query fails to parse or execute.
1304    #[cfg(feature = "gql")]
1305    pub fn execute_with_params(
1306        &self,
1307        query: &str,
1308        params: std::collections::HashMap<String, Value>,
1309    ) -> Result<QueryResult> {
1310        self.require_lpg("GQL")?;
1311
1312        use crate::query::processor::{QueryLanguage, QueryProcessor};
1313
1314        // Get transaction context for MVCC visibility
1315        let (viewing_epoch, tx_id) = self.get_transaction_context();
1316
1317        // Create processor with transaction context
1318        let processor = QueryProcessor::for_graph_store_with_tx(
1319            Arc::clone(&self.graph_store),
1320            Arc::clone(&self.tx_manager),
1321        );
1322
1323        // Apply transaction context if in a transaction
1324        let processor = if let Some(tx_id) = tx_id {
1325            processor.with_tx_context(viewing_epoch, tx_id)
1326        } else {
1327            processor
1328        };
1329
1330        processor.process(query, QueryLanguage::Gql, Some(&params))
1331    }
1332
1333    /// Executes a GQL query with parameters.
1334    ///
1335    /// # Errors
1336    ///
1337    /// Returns an error if no query language is enabled.
1338    #[cfg(not(any(feature = "gql", feature = "cypher")))]
1339    pub fn execute_with_params(
1340        &self,
1341        _query: &str,
1342        _params: std::collections::HashMap<String, Value>,
1343    ) -> Result<QueryResult> {
1344        Err(grafeo_common::utils::error::Error::Internal(
1345            "No query language enabled".to_string(),
1346        ))
1347    }
1348
1349    /// Executes a GQL query.
1350    ///
1351    /// # Errors
1352    ///
1353    /// Returns an error if no query language is enabled.
1354    #[cfg(not(any(feature = "gql", feature = "cypher")))]
1355    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
1356        Err(grafeo_common::utils::error::Error::Internal(
1357            "No query language enabled".to_string(),
1358        ))
1359    }
1360
1361    /// Executes a Cypher query.
1362    ///
1363    /// # Errors
1364    ///
1365    /// Returns an error if the query fails to parse or execute.
1366    #[cfg(feature = "cypher")]
1367    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
1368        use crate::query::{
1369            Executor, binder::Binder, cache::CacheKey, cypher_translator, optimizer::Optimizer,
1370            processor::QueryLanguage,
1371        };
1372
1373        // Create cache key for this query
1374        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
1375
1376        // Try to get cached optimized plan
1377        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1378            cached_plan
1379        } else {
1380            // Parse and translate the query to a logical plan
1381            let logical_plan = cypher_translator::translate(query)?;
1382
1383            // Semantic validation
1384            let mut binder = Binder::new();
1385            let _binding_context = binder.bind(&logical_plan)?;
1386
1387            // Optimize the plan
1388            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1389            let plan = optimizer.optimize(logical_plan)?;
1390
1391            // Cache the optimized plan
1392            self.query_cache.put_optimized(cache_key, plan.clone());
1393
1394            plan
1395        };
1396
1397        // Get transaction context for MVCC visibility
1398        let (viewing_epoch, tx_id) = self.get_transaction_context();
1399
1400        // Convert to physical plan with transaction context
1401        let planner = self.create_planner(viewing_epoch, tx_id);
1402        let mut physical_plan = planner.plan(&optimized_plan)?;
1403
1404        // Execute the plan
1405        let executor = Executor::with_columns(physical_plan.columns.clone())
1406            .with_deadline(self.query_deadline());
1407        let result = executor.execute(physical_plan.operator.as_mut())?;
1408        Ok(result)
1409    }
1410
1411    /// Executes a Gremlin query.
1412    ///
1413    /// # Errors
1414    ///
1415    /// Returns an error if the query fails to parse or execute.
1416    ///
1417    /// # Examples
1418    ///
1419    /// ```no_run
1420    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1421    /// use grafeo_engine::GrafeoDB;
1422    ///
1423    /// let db = GrafeoDB::new_in_memory();
1424    /// let session = db.session();
1425    ///
1426    /// // Create some nodes first
1427    /// session.create_node(&["Person"]);
1428    ///
1429    /// // Query using Gremlin
1430    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
1431    /// # Ok(())
1432    /// # }
1433    /// ```
1434    #[cfg(feature = "gremlin")]
1435    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
1436        use crate::query::{Executor, binder::Binder, gremlin_translator, optimizer::Optimizer};
1437
1438        // Parse and translate the query to a logical plan
1439        let logical_plan = gremlin_translator::translate(query)?;
1440
1441        // Semantic validation
1442        let mut binder = Binder::new();
1443        let _binding_context = binder.bind(&logical_plan)?;
1444
1445        // Optimize the plan
1446        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1447        let optimized_plan = optimizer.optimize(logical_plan)?;
1448
1449        // Get transaction context for MVCC visibility
1450        let (viewing_epoch, tx_id) = self.get_transaction_context();
1451
1452        // Convert to physical plan with transaction context
1453        let planner = self.create_planner(viewing_epoch, tx_id);
1454        let mut physical_plan = planner.plan(&optimized_plan)?;
1455
1456        // Execute the plan
1457        let executor = Executor::with_columns(physical_plan.columns.clone())
1458            .with_deadline(self.query_deadline());
1459        let result = executor.execute(physical_plan.operator.as_mut())?;
1460        Ok(result)
1461    }
1462
1463    /// Executes a Gremlin query with parameters.
1464    ///
1465    /// # Errors
1466    ///
1467    /// Returns an error if the query fails to parse or execute.
1468    #[cfg(feature = "gremlin")]
1469    pub fn execute_gremlin_with_params(
1470        &self,
1471        query: &str,
1472        params: std::collections::HashMap<String, Value>,
1473    ) -> Result<QueryResult> {
1474        use crate::query::processor::{QueryLanguage, QueryProcessor};
1475
1476        // Get transaction context for MVCC visibility
1477        let (viewing_epoch, tx_id) = self.get_transaction_context();
1478
1479        // Create processor with transaction context
1480        let processor = QueryProcessor::for_graph_store_with_tx(
1481            Arc::clone(&self.graph_store),
1482            Arc::clone(&self.tx_manager),
1483        );
1484
1485        // Apply transaction context if in a transaction
1486        let processor = if let Some(tx_id) = tx_id {
1487            processor.with_tx_context(viewing_epoch, tx_id)
1488        } else {
1489            processor
1490        };
1491
1492        processor.process(query, QueryLanguage::Gremlin, Some(&params))
1493    }
1494
1495    /// Executes a GraphQL query against the LPG store.
1496    ///
1497    /// # Errors
1498    ///
1499    /// Returns an error if the query fails to parse or execute.
1500    ///
1501    /// # Examples
1502    ///
1503    /// ```no_run
1504    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1505    /// use grafeo_engine::GrafeoDB;
1506    ///
1507    /// let db = GrafeoDB::new_in_memory();
1508    /// let session = db.session();
1509    ///
1510    /// // Create some nodes first
1511    /// session.create_node(&["User"]);
1512    ///
1513    /// // Query using GraphQL
1514    /// let result = session.execute_graphql("query { user { id name } }")?;
1515    /// # Ok(())
1516    /// # }
1517    /// ```
1518    #[cfg(feature = "graphql")]
1519    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
1520        use crate::query::{Executor, binder::Binder, graphql_translator, optimizer::Optimizer};
1521
1522        // Parse and translate the query to a logical plan
1523        let logical_plan = graphql_translator::translate(query)?;
1524
1525        // Semantic validation
1526        let mut binder = Binder::new();
1527        let _binding_context = binder.bind(&logical_plan)?;
1528
1529        // Optimize the plan
1530        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1531        let optimized_plan = optimizer.optimize(logical_plan)?;
1532
1533        // Get transaction context for MVCC visibility
1534        let (viewing_epoch, tx_id) = self.get_transaction_context();
1535
1536        // Convert to physical plan with transaction context
1537        let planner = self.create_planner(viewing_epoch, tx_id);
1538        let mut physical_plan = planner.plan(&optimized_plan)?;
1539
1540        // Execute the plan
1541        let executor = Executor::with_columns(physical_plan.columns.clone())
1542            .with_deadline(self.query_deadline());
1543        let result = executor.execute(physical_plan.operator.as_mut())?;
1544        Ok(result)
1545    }
1546
1547    /// Executes a GraphQL query with parameters.
1548    ///
1549    /// # Errors
1550    ///
1551    /// Returns an error if the query fails to parse or execute.
1552    #[cfg(feature = "graphql")]
1553    pub fn execute_graphql_with_params(
1554        &self,
1555        query: &str,
1556        params: std::collections::HashMap<String, Value>,
1557    ) -> Result<QueryResult> {
1558        use crate::query::processor::{QueryLanguage, QueryProcessor};
1559
1560        // Get transaction context for MVCC visibility
1561        let (viewing_epoch, tx_id) = self.get_transaction_context();
1562
1563        // Create processor with transaction context
1564        let processor = QueryProcessor::for_graph_store_with_tx(
1565            Arc::clone(&self.graph_store),
1566            Arc::clone(&self.tx_manager),
1567        );
1568
1569        // Apply transaction context if in a transaction
1570        let processor = if let Some(tx_id) = tx_id {
1571            processor.with_tx_context(viewing_epoch, tx_id)
1572        } else {
1573            processor
1574        };
1575
1576        processor.process(query, QueryLanguage::GraphQL, Some(&params))
1577    }
1578
1579    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
1580    ///
1581    /// # Errors
1582    ///
1583    /// Returns an error if the query fails to parse or execute.
1584    ///
1585    /// # Examples
1586    ///
1587    /// ```no_run
1588    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1589    /// use grafeo_engine::GrafeoDB;
1590    ///
1591    /// let db = GrafeoDB::new_in_memory();
1592    /// let session = db.session();
1593    ///
1594    /// let result = session.execute_sql(
1595    ///     "SELECT * FROM GRAPH_TABLE (
1596    ///         MATCH (n:Person)
1597    ///         COLUMNS (n.name AS name)
1598    ///     )"
1599    /// )?;
1600    /// # Ok(())
1601    /// # }
1602    /// ```
1603    #[cfg(feature = "sql-pgq")]
1604    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
1605        use crate::query::{
1606            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
1607            processor::QueryLanguage, sql_pgq_translator,
1608        };
1609
1610        // Parse and translate (always needed to check for DDL)
1611        let logical_plan = sql_pgq_translator::translate(query)?;
1612
1613        // Handle DDL statements directly (they don't go through the query pipeline)
1614        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
1615            return Ok(QueryResult {
1616                columns: vec!["status".into()],
1617                column_types: vec![grafeo_common::types::LogicalType::String],
1618                rows: vec![vec![Value::from(format!(
1619                    "Property graph '{}' created ({} node tables, {} edge tables)",
1620                    cpg.name,
1621                    cpg.node_tables.len(),
1622                    cpg.edge_tables.len()
1623                ))]],
1624                execution_time_ms: None,
1625                rows_scanned: None,
1626                status_message: None,
1627            });
1628        }
1629
1630        // Create cache key for query plans
1631        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
1632
1633        // Try to get cached optimized plan
1634        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1635            cached_plan
1636        } else {
1637            // Semantic validation
1638            let mut binder = Binder::new();
1639            let _binding_context = binder.bind(&logical_plan)?;
1640
1641            // Optimize the plan
1642            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1643            let plan = optimizer.optimize(logical_plan)?;
1644
1645            // Cache the optimized plan
1646            self.query_cache.put_optimized(cache_key, plan.clone());
1647
1648            plan
1649        };
1650
1651        // Get transaction context for MVCC visibility
1652        let (viewing_epoch, tx_id) = self.get_transaction_context();
1653
1654        // Convert to physical plan with transaction context
1655        let planner = self.create_planner(viewing_epoch, tx_id);
1656        let mut physical_plan = planner.plan(&optimized_plan)?;
1657
1658        // Execute the plan
1659        let executor = Executor::with_columns(physical_plan.columns.clone())
1660            .with_deadline(self.query_deadline());
1661        let result = executor.execute(physical_plan.operator.as_mut())?;
1662        Ok(result)
1663    }
1664
1665    /// Executes a SQL/PGQ query with parameters.
1666    ///
1667    /// # Errors
1668    ///
1669    /// Returns an error if the query fails to parse or execute.
1670    #[cfg(feature = "sql-pgq")]
1671    pub fn execute_sql_with_params(
1672        &self,
1673        query: &str,
1674        params: std::collections::HashMap<String, Value>,
1675    ) -> Result<QueryResult> {
1676        use crate::query::processor::{QueryLanguage, QueryProcessor};
1677
1678        // Get transaction context for MVCC visibility
1679        let (viewing_epoch, tx_id) = self.get_transaction_context();
1680
1681        // Create processor with transaction context
1682        let processor = QueryProcessor::for_graph_store_with_tx(
1683            Arc::clone(&self.graph_store),
1684            Arc::clone(&self.tx_manager),
1685        );
1686
1687        // Apply transaction context if in a transaction
1688        let processor = if let Some(tx_id) = tx_id {
1689            processor.with_tx_context(viewing_epoch, tx_id)
1690        } else {
1691            processor
1692        };
1693
1694        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
1695    }
1696
1697    /// Executes a SPARQL query.
1698    ///
1699    /// # Errors
1700    ///
1701    /// Returns an error if the query fails to parse or execute.
1702    #[cfg(all(feature = "sparql", feature = "rdf"))]
1703    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
1704        use crate::query::{
1705            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
1706        };
1707
1708        // Parse and translate the SPARQL query to a logical plan
1709        let logical_plan = sparql_translator::translate(query)?;
1710
1711        // Optimize the plan
1712        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1713        let optimized_plan = optimizer.optimize(logical_plan)?;
1714
1715        // Convert to physical plan using RDF planner
1716        let planner =
1717            RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(*self.current_tx.lock());
1718        let mut physical_plan = planner.plan(&optimized_plan)?;
1719
1720        // Execute the plan
1721        let executor = Executor::with_columns(physical_plan.columns.clone())
1722            .with_deadline(self.query_deadline());
1723        executor.execute(physical_plan.operator.as_mut())
1724    }
1725
1726    /// Executes a SPARQL query with parameters.
1727    ///
1728    /// # Errors
1729    ///
1730    /// Returns an error if the query fails to parse or execute.
1731    #[cfg(all(feature = "sparql", feature = "rdf"))]
1732    pub fn execute_sparql_with_params(
1733        &self,
1734        query: &str,
1735        _params: std::collections::HashMap<String, Value>,
1736    ) -> Result<QueryResult> {
1737        // TODO: Implement parameter substitution for SPARQL
1738        // For now, just execute the query without parameters
1739        self.execute_sparql(query)
1740    }
1741
1742    /// Executes a query in the specified language by name.
1743    ///
1744    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
1745    /// `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
1746    ///
1747    /// # Errors
1748    ///
1749    /// Returns an error if the language is unknown/disabled or the query fails.
1750    pub fn execute_language(
1751        &self,
1752        query: &str,
1753        language: &str,
1754        params: Option<std::collections::HashMap<String, Value>>,
1755    ) -> Result<QueryResult> {
1756        match language {
1757            "gql" => {
1758                if let Some(p) = params {
1759                    self.execute_with_params(query, p)
1760                } else {
1761                    self.execute(query)
1762                }
1763            }
1764            #[cfg(feature = "cypher")]
1765            "cypher" => {
1766                if let Some(p) = params {
1767                    use crate::query::processor::{QueryLanguage, QueryProcessor};
1768                    let processor = QueryProcessor::for_graph_store_with_tx(
1769                        Arc::clone(&self.graph_store),
1770                        Arc::clone(&self.tx_manager),
1771                    );
1772                    let (viewing_epoch, tx_id) = self.get_transaction_context();
1773                    let processor = if let Some(tx_id) = tx_id {
1774                        processor.with_tx_context(viewing_epoch, tx_id)
1775                    } else {
1776                        processor
1777                    };
1778                    processor.process(query, QueryLanguage::Cypher, Some(&p))
1779                } else {
1780                    self.execute_cypher(query)
1781                }
1782            }
1783            #[cfg(feature = "gremlin")]
1784            "gremlin" => {
1785                if let Some(p) = params {
1786                    self.execute_gremlin_with_params(query, p)
1787                } else {
1788                    self.execute_gremlin(query)
1789                }
1790            }
1791            #[cfg(feature = "graphql")]
1792            "graphql" => {
1793                if let Some(p) = params {
1794                    self.execute_graphql_with_params(query, p)
1795                } else {
1796                    self.execute_graphql(query)
1797                }
1798            }
1799            #[cfg(feature = "sql-pgq")]
1800            "sql" | "sql-pgq" => {
1801                if let Some(p) = params {
1802                    self.execute_sql_with_params(query, p)
1803                } else {
1804                    self.execute_sql(query)
1805                }
1806            }
1807            #[cfg(all(feature = "sparql", feature = "rdf"))]
1808            "sparql" => {
1809                if let Some(p) = params {
1810                    self.execute_sparql_with_params(query, p)
1811                } else {
1812                    self.execute_sparql(query)
1813                }
1814            }
1815            other => Err(grafeo_common::utils::error::Error::Query(
1816                grafeo_common::utils::error::QueryError::new(
1817                    grafeo_common::utils::error::QueryErrorKind::Semantic,
1818                    format!("Unknown query language: '{other}'"),
1819                ),
1820            )),
1821        }
1822    }
1823
1824    /// Begins a new transaction.
1825    ///
1826    /// # Errors
1827    ///
1828    /// Returns an error if a transaction is already active.
1829    ///
1830    /// # Examples
1831    ///
1832    /// ```no_run
1833    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1834    /// use grafeo_engine::GrafeoDB;
1835    ///
1836    /// let db = GrafeoDB::new_in_memory();
1837    /// let mut session = db.session();
1838    ///
1839    /// session.begin_tx()?;
1840    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
1841    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
1842    /// session.commit()?; // Both inserts committed atomically
1843    /// # Ok(())
1844    /// # }
1845    /// ```
1846    pub fn begin_tx(&mut self) -> Result<()> {
1847        self.begin_tx_inner(false, None)
1848    }
1849
1850    /// Begins a transaction with a specific isolation level.
1851    ///
1852    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
1853    ///
1854    /// # Errors
1855    ///
1856    /// Returns an error if a transaction is already active.
1857    pub fn begin_tx_with_isolation(
1858        &mut self,
1859        isolation_level: crate::transaction::IsolationLevel,
1860    ) -> Result<()> {
1861        self.begin_tx_inner(false, Some(isolation_level))
1862    }
1863
1864    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
1865    fn begin_tx_inner(
1866        &self,
1867        read_only: bool,
1868        isolation_level: Option<crate::transaction::IsolationLevel>,
1869    ) -> Result<()> {
1870        let mut current = self.current_tx.lock();
1871        if current.is_some() {
1872            return Err(grafeo_common::utils::error::Error::Transaction(
1873                grafeo_common::utils::error::TransactionError::InvalidState(
1874                    "Transaction already active".to_string(),
1875                ),
1876            ));
1877        }
1878
1879        self.tx_start_node_count
1880            .store(self.store.node_count(), Ordering::Relaxed);
1881        self.tx_start_edge_count
1882            .store(self.store.edge_count(), Ordering::Relaxed);
1883        let tx_id = if let Some(level) = isolation_level {
1884            self.tx_manager.begin_with_isolation(level)
1885        } else {
1886            self.tx_manager.begin()
1887        };
1888        *current = Some(tx_id);
1889        *self.read_only_tx.lock() = read_only;
1890        Ok(())
1891    }
1892
1893    /// Commits the current transaction.
1894    ///
1895    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
1896    ///
1897    /// # Errors
1898    ///
1899    /// Returns an error if no transaction is active.
1900    pub fn commit(&mut self) -> Result<()> {
1901        self.commit_inner()
1902    }
1903
1904    /// Core commit logic, usable from both `&mut self` and `&self` paths.
1905    fn commit_inner(&self) -> Result<()> {
1906        let tx_id = self.current_tx.lock().take().ok_or_else(|| {
1907            grafeo_common::utils::error::Error::Transaction(
1908                grafeo_common::utils::error::TransactionError::InvalidState(
1909                    "No active transaction".to_string(),
1910                ),
1911            )
1912        })?;
1913
1914        // Commit RDF store pending operations
1915        #[cfg(feature = "rdf")]
1916        self.rdf_store.commit_tx(tx_id);
1917
1918        self.tx_manager.commit(tx_id)?;
1919
1920        // Sync the LpgStore epoch with the TxManager so that
1921        // convenience lookups (edge_type, get_edge, get_node) that use
1922        // store.current_epoch() can see versions created at the latest epoch.
1923        self.store.sync_epoch(self.tx_manager.current_epoch());
1924
1925        // Reset read-only flag
1926        *self.read_only_tx.lock() = false;
1927
1928        // Auto-GC: periodically prune old MVCC versions
1929        if self.gc_interval > 0 {
1930            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
1931            if count.is_multiple_of(self.gc_interval) {
1932                let min_epoch = self.tx_manager.min_active_epoch();
1933                self.store.gc_versions(min_epoch);
1934                self.tx_manager.gc();
1935            }
1936        }
1937
1938        Ok(())
1939    }
1940
1941    /// Aborts the current transaction.
1942    ///
1943    /// Discards all changes since [`begin_tx`](Self::begin_tx).
1944    ///
1945    /// # Errors
1946    ///
1947    /// Returns an error if no transaction is active.
1948    ///
1949    /// # Examples
1950    ///
1951    /// ```no_run
1952    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1953    /// use grafeo_engine::GrafeoDB;
1954    ///
1955    /// let db = GrafeoDB::new_in_memory();
1956    /// let mut session = db.session();
1957    ///
1958    /// session.begin_tx()?;
1959    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
1960    /// session.rollback()?; // Insert is discarded
1961    /// # Ok(())
1962    /// # }
1963    /// ```
1964    pub fn rollback(&mut self) -> Result<()> {
1965        self.rollback_inner()
1966    }
1967
1968    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
1969    fn rollback_inner(&self) -> Result<()> {
1970        let tx_id = self.current_tx.lock().take().ok_or_else(|| {
1971            grafeo_common::utils::error::Error::Transaction(
1972                grafeo_common::utils::error::TransactionError::InvalidState(
1973                    "No active transaction".to_string(),
1974                ),
1975            )
1976        })?;
1977
1978        // Reset read-only flag
1979        *self.read_only_tx.lock() = false;
1980
1981        // Discard uncommitted versions in the LPG store
1982        self.store.discard_uncommitted_versions(tx_id);
1983
1984        // Discard pending operations in the RDF store
1985        #[cfg(feature = "rdf")]
1986        self.rdf_store.rollback_tx(tx_id);
1987
1988        // Mark transaction as aborted in the manager
1989        self.tx_manager.abort(tx_id)
1990    }
1991
1992    /// Returns whether a transaction is active.
1993    #[must_use]
1994    pub fn in_transaction(&self) -> bool {
1995        self.current_tx.lock().is_some()
1996    }
1997
1998    /// Returns the current transaction ID, if any.
1999    #[must_use]
2000    pub(crate) fn current_tx_id(&self) -> Option<TxId> {
2001        *self.current_tx.lock()
2002    }
2003
2004    /// Returns a reference to the transaction manager.
2005    #[must_use]
2006    pub(crate) fn tx_manager(&self) -> &TransactionManager {
2007        &self.tx_manager
2008    }
2009
2010    /// Returns the store's current node count and the count at transaction start.
2011    #[must_use]
2012    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
2013        (
2014            self.tx_start_node_count.load(Ordering::Relaxed),
2015            self.store.node_count(),
2016        )
2017    }
2018
2019    /// Returns the store's current edge count and the count at transaction start.
2020    #[must_use]
2021    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
2022        (
2023            self.tx_start_edge_count.load(Ordering::Relaxed),
2024            self.store.edge_count(),
2025        )
2026    }
2027
2028    /// Prepares the current transaction for a two-phase commit.
2029    ///
2030    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
2031    /// lets you inspect pending changes and attach metadata before finalizing.
2032    /// The mutable borrow prevents concurrent operations while the commit is
2033    /// pending.
2034    ///
2035    /// If the `PreparedCommit` is dropped without calling `commit()` or
2036    /// `abort()`, the transaction is automatically rolled back.
2037    ///
2038    /// # Errors
2039    ///
2040    /// Returns an error if no transaction is active.
2041    ///
2042    /// # Examples
2043    ///
2044    /// ```no_run
2045    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2046    /// use grafeo_engine::GrafeoDB;
2047    ///
2048    /// let db = GrafeoDB::new_in_memory();
2049    /// let mut session = db.session();
2050    ///
2051    /// session.begin_tx()?;
2052    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
2053    ///
2054    /// let mut prepared = session.prepare_commit()?;
2055    /// println!("Nodes written: {}", prepared.info().nodes_written);
2056    /// prepared.set_metadata("audit_user", "admin");
2057    /// prepared.commit()?;
2058    /// # Ok(())
2059    /// # }
2060    /// ```
2061    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
2062        crate::transaction::PreparedCommit::new(self)
2063    }
2064
2065    /// Sets auto-commit mode.
2066    pub fn set_auto_commit(&mut self, auto_commit: bool) {
2067        self.auto_commit = auto_commit;
2068    }
2069
2070    /// Returns whether auto-commit is enabled.
2071    #[must_use]
2072    pub fn auto_commit(&self) -> bool {
2073        self.auto_commit
2074    }
2075
2076    /// Computes the wall-clock deadline for query execution.
2077    #[must_use]
2078    fn query_deadline(&self) -> Option<Instant> {
2079        self.query_timeout.map(|d| Instant::now() + d)
2080    }
2081
2082    /// Evaluates a simple integer literal from a session parameter expression.
2083    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
2084        use grafeo_adapters::query::gql::ast::{Expression, Literal};
2085        match expr {
2086            Expression::Literal(Literal::Integer(n)) => Some(*n),
2087            _ => None,
2088        }
2089    }
2090
2091    /// Returns the current transaction context for MVCC visibility.
2092    ///
2093    /// Returns `(viewing_epoch, tx_id)` where:
2094    /// - `viewing_epoch` is the epoch at which to check version visibility
2095    /// - `tx_id` is the current transaction ID (if in a transaction)
2096    #[must_use]
2097    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
2098        // Time-travel override takes precedence (read-only, no tx context)
2099        if let Some(epoch) = *self.viewing_epoch_override.lock() {
2100            return (epoch, None);
2101        }
2102
2103        if let Some(tx_id) = *self.current_tx.lock() {
2104            // In a transaction: use the transaction's start epoch
2105            let epoch = self
2106                .tx_manager
2107                .start_epoch(tx_id)
2108                .unwrap_or_else(|| self.tx_manager.current_epoch());
2109            (epoch, Some(tx_id))
2110        } else {
2111            // No transaction: use current epoch
2112            (self.tx_manager.current_epoch(), None)
2113        }
2114    }
2115
2116    /// Creates a planner with transaction context and constraint validator.
2117    fn create_planner(&self, viewing_epoch: EpochId, tx_id: Option<TxId>) -> crate::query::Planner {
2118        use crate::query::Planner;
2119
2120        let mut planner = Planner::with_context(
2121            Arc::clone(&self.graph_store),
2122            Arc::clone(&self.tx_manager),
2123            tx_id,
2124            viewing_epoch,
2125        )
2126        .with_factorized_execution(self.factorized_execution)
2127        .with_catalog(Arc::clone(&self.catalog));
2128
2129        // Attach the constraint validator for schema enforcement
2130        let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog));
2131        planner = planner.with_validator(Arc::new(validator));
2132
2133        planner
2134    }
2135
2136    /// Creates a node directly (bypassing query execution).
2137    ///
2138    /// This is a low-level API for testing and direct manipulation.
2139    /// If a transaction is active, the node will be versioned with the transaction ID.
2140    pub fn create_node(&self, labels: &[&str]) -> NodeId {
2141        let (epoch, tx_id) = self.get_transaction_context();
2142        self.store
2143            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2144    }
2145
2146    /// Creates a node with properties.
2147    ///
2148    /// If a transaction is active, the node will be versioned with the transaction ID.
2149    pub fn create_node_with_props<'a>(
2150        &self,
2151        labels: &[&str],
2152        properties: impl IntoIterator<Item = (&'a str, Value)>,
2153    ) -> NodeId {
2154        let (epoch, tx_id) = self.get_transaction_context();
2155        self.store.create_node_with_props_versioned(
2156            labels,
2157            properties.into_iter().map(|(k, v)| (k, v)),
2158            epoch,
2159            tx_id.unwrap_or(TxId::SYSTEM),
2160        )
2161    }
2162
2163    /// Creates an edge between two nodes.
2164    ///
2165    /// This is a low-level API for testing and direct manipulation.
2166    /// If a transaction is active, the edge will be versioned with the transaction ID.
2167    pub fn create_edge(
2168        &self,
2169        src: NodeId,
2170        dst: NodeId,
2171        edge_type: &str,
2172    ) -> grafeo_common::types::EdgeId {
2173        let (epoch, tx_id) = self.get_transaction_context();
2174        self.store
2175            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2176    }
2177
2178    // =========================================================================
2179    // Direct Lookup APIs (bypass query planning for O(1) point reads)
2180    // =========================================================================
2181
2182    /// Gets a node by ID directly, bypassing query planning.
2183    ///
2184    /// This is the fastest way to retrieve a single node when you know its ID.
2185    /// Skips parsing, binding, optimization, and physical planning entirely.
2186    ///
2187    /// # Performance
2188    ///
2189    /// - Time complexity: O(1) average case
2190    /// - No lock contention (uses DashMap internally)
2191    /// - ~20-30x faster than equivalent MATCH query
2192    ///
2193    /// # Example
2194    ///
2195    /// ```no_run
2196    /// # use grafeo_engine::GrafeoDB;
2197    /// # let db = GrafeoDB::new_in_memory();
2198    /// let session = db.session();
2199    /// let node_id = session.create_node(&["Person"]);
2200    ///
2201    /// // Direct lookup - O(1), no query planning
2202    /// let node = session.get_node(node_id);
2203    /// assert!(node.is_some());
2204    /// ```
2205    #[must_use]
2206    pub fn get_node(&self, id: NodeId) -> Option<Node> {
2207        let (epoch, tx_id) = self.get_transaction_context();
2208        self.store
2209            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2210    }
2211
2212    /// Gets a single property from a node by ID, bypassing query planning.
2213    ///
2214    /// More efficient than `get_node()` when you only need one property,
2215    /// as it avoids loading the full node with all properties.
2216    ///
2217    /// # Performance
2218    ///
2219    /// - Time complexity: O(1) average case
2220    /// - No query planning overhead
2221    ///
2222    /// # Example
2223    ///
2224    /// ```no_run
2225    /// # use grafeo_engine::GrafeoDB;
2226    /// # use grafeo_common::types::Value;
2227    /// # let db = GrafeoDB::new_in_memory();
2228    /// let session = db.session();
2229    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
2230    ///
2231    /// // Direct property access - O(1)
2232    /// let name = session.get_node_property(id, "name");
2233    /// assert_eq!(name, Some(Value::String("Alice".into())));
2234    /// ```
2235    #[must_use]
2236    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
2237        self.get_node(id)
2238            .and_then(|node| node.get_property(key).cloned())
2239    }
2240
2241    /// Gets an edge by ID directly, bypassing query planning.
2242    ///
2243    /// # Performance
2244    ///
2245    /// - Time complexity: O(1) average case
2246    /// - No lock contention
2247    #[must_use]
2248    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
2249        let (epoch, tx_id) = self.get_transaction_context();
2250        self.store
2251            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2252    }
2253
2254    /// Gets outgoing neighbors of a node directly, bypassing query planning.
2255    ///
2256    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
2257    ///
2258    /// # Performance
2259    ///
2260    /// - Time complexity: O(degree) where degree is the number of outgoing edges
2261    /// - Uses adjacency index for direct access
2262    /// - ~10-20x faster than equivalent MATCH query
2263    ///
2264    /// # Example
2265    ///
2266    /// ```no_run
2267    /// # use grafeo_engine::GrafeoDB;
2268    /// # let db = GrafeoDB::new_in_memory();
2269    /// let session = db.session();
2270    /// let alice = session.create_node(&["Person"]);
2271    /// let bob = session.create_node(&["Person"]);
2272    /// session.create_edge(alice, bob, "KNOWS");
2273    ///
2274    /// // Direct neighbor lookup - O(degree)
2275    /// let neighbors = session.get_neighbors_outgoing(alice);
2276    /// assert_eq!(neighbors.len(), 1);
2277    /// assert_eq!(neighbors[0].0, bob);
2278    /// ```
2279    #[must_use]
2280    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2281        self.store.edges_from(node, Direction::Outgoing).collect()
2282    }
2283
2284    /// Gets incoming neighbors of a node directly, bypassing query planning.
2285    ///
2286    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
2287    ///
2288    /// # Performance
2289    ///
2290    /// - Time complexity: O(degree) where degree is the number of incoming edges
2291    /// - Uses backward adjacency index for direct access
2292    #[must_use]
2293    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2294        self.store.edges_from(node, Direction::Incoming).collect()
2295    }
2296
2297    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
2298    ///
2299    /// # Example
2300    ///
2301    /// ```no_run
2302    /// # use grafeo_engine::GrafeoDB;
2303    /// # let db = GrafeoDB::new_in_memory();
2304    /// # let session = db.session();
2305    /// # let alice = session.create_node(&["Person"]);
2306    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
2307    /// ```
2308    #[must_use]
2309    pub fn get_neighbors_outgoing_by_type(
2310        &self,
2311        node: NodeId,
2312        edge_type: &str,
2313    ) -> Vec<(NodeId, EdgeId)> {
2314        self.store
2315            .edges_from(node, Direction::Outgoing)
2316            .filter(|(_, edge_id)| {
2317                self.get_edge(*edge_id)
2318                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
2319            })
2320            .collect()
2321    }
2322
2323    /// Checks if a node exists, bypassing query planning.
2324    ///
2325    /// # Performance
2326    ///
2327    /// - Time complexity: O(1)
2328    /// - Fastest existence check available
2329    #[must_use]
2330    pub fn node_exists(&self, id: NodeId) -> bool {
2331        self.get_node(id).is_some()
2332    }
2333
2334    /// Checks if an edge exists, bypassing query planning.
2335    #[must_use]
2336    pub fn edge_exists(&self, id: EdgeId) -> bool {
2337        self.get_edge(id).is_some()
2338    }
2339
2340    /// Gets the degree (number of edges) of a node.
2341    ///
2342    /// Returns (outgoing_degree, incoming_degree).
2343    #[must_use]
2344    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
2345        let out = self.store.out_degree(node);
2346        let in_degree = self.store.in_degree(node);
2347        (out, in_degree)
2348    }
2349
2350    /// Batch lookup of multiple nodes by ID.
2351    ///
2352    /// More efficient than calling `get_node()` in a loop because it
2353    /// amortizes overhead.
2354    ///
2355    /// # Performance
2356    ///
2357    /// - Time complexity: O(n) where n is the number of IDs
2358    /// - Better cache utilization than individual lookups
2359    #[must_use]
2360    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
2361        let (epoch, tx_id) = self.get_transaction_context();
2362        let tx = tx_id.unwrap_or(TxId::SYSTEM);
2363        ids.iter()
2364            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
2365            .collect()
2366    }
2367
2368    // ── Change Data Capture ─────────────────────────────────────────────
2369
2370    /// Returns the full change history for an entity (node or edge).
2371    #[cfg(feature = "cdc")]
2372    pub fn history(
2373        &self,
2374        entity_id: impl Into<crate::cdc::EntityId>,
2375    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2376        Ok(self.cdc_log.history(entity_id.into()))
2377    }
2378
2379    /// Returns change events for an entity since the given epoch.
2380    #[cfg(feature = "cdc")]
2381    pub fn history_since(
2382        &self,
2383        entity_id: impl Into<crate::cdc::EntityId>,
2384        since_epoch: EpochId,
2385    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2386        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
2387    }
2388
2389    /// Returns all change events across all entities in an epoch range.
2390    #[cfg(feature = "cdc")]
2391    pub fn changes_between(
2392        &self,
2393        start_epoch: EpochId,
2394        end_epoch: EpochId,
2395    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2396        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
2397    }
2398}
2399
2400#[cfg(test)]
2401mod tests {
2402    use crate::database::GrafeoDB;
2403
2404    #[test]
2405    fn test_session_create_node() {
2406        let db = GrafeoDB::new_in_memory();
2407        let session = db.session();
2408
2409        let id = session.create_node(&["Person"]);
2410        assert!(id.is_valid());
2411        assert_eq!(db.node_count(), 1);
2412    }
2413
2414    #[test]
2415    fn test_session_transaction() {
2416        let db = GrafeoDB::new_in_memory();
2417        let mut session = db.session();
2418
2419        assert!(!session.in_transaction());
2420
2421        session.begin_tx().unwrap();
2422        assert!(session.in_transaction());
2423
2424        session.commit().unwrap();
2425        assert!(!session.in_transaction());
2426    }
2427
2428    #[test]
2429    fn test_session_transaction_context() {
2430        let db = GrafeoDB::new_in_memory();
2431        let mut session = db.session();
2432
2433        // Without transaction - context should have current epoch and no tx_id
2434        let (_epoch1, tx_id1) = session.get_transaction_context();
2435        assert!(tx_id1.is_none());
2436
2437        // Start a transaction
2438        session.begin_tx().unwrap();
2439        let (epoch2, tx_id2) = session.get_transaction_context();
2440        assert!(tx_id2.is_some());
2441        // Transaction should have a valid epoch
2442        let _ = epoch2; // Use the variable
2443
2444        // Commit and verify
2445        session.commit().unwrap();
2446        let (epoch3, tx_id3) = session.get_transaction_context();
2447        assert!(tx_id3.is_none());
2448        // Epoch should have advanced after commit
2449        assert!(epoch3.as_u64() >= epoch2.as_u64());
2450    }
2451
2452    #[test]
2453    fn test_session_rollback() {
2454        let db = GrafeoDB::new_in_memory();
2455        let mut session = db.session();
2456
2457        session.begin_tx().unwrap();
2458        session.rollback().unwrap();
2459        assert!(!session.in_transaction());
2460    }
2461
2462    #[test]
2463    fn test_session_rollback_discards_versions() {
2464        use grafeo_common::types::TxId;
2465
2466        let db = GrafeoDB::new_in_memory();
2467
2468        // Create a node outside of any transaction (at system level)
2469        let node_before = db.store().create_node(&["Person"]);
2470        assert!(node_before.is_valid());
2471        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2472
2473        // Start a transaction
2474        let mut session = db.session();
2475        session.begin_tx().unwrap();
2476        let tx_id = session.current_tx.lock().unwrap();
2477
2478        // Create a node versioned with the transaction's ID
2479        let epoch = db.store().current_epoch();
2480        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
2481        assert!(node_in_tx.is_valid());
2482
2483        // Should see 2 nodes at this point
2484        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2485
2486        // Rollback the transaction
2487        session.rollback().unwrap();
2488        assert!(!session.in_transaction());
2489
2490        // The node created in the transaction should be discarded
2491        // Only the first node should remain visible
2492        let count_after = db.node_count();
2493        assert_eq!(
2494            count_after, 1,
2495            "Rollback should discard uncommitted node, but got {count_after}"
2496        );
2497
2498        // The original node should still be accessible
2499        let current_epoch = db.store().current_epoch();
2500        assert!(
2501            db.store()
2502                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
2503                .is_some(),
2504            "Original node should still exist"
2505        );
2506
2507        // The node created in the transaction should not be accessible
2508        assert!(
2509            db.store()
2510                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
2511                .is_none(),
2512            "Transaction node should be gone"
2513        );
2514    }
2515
2516    #[test]
2517    fn test_session_create_node_in_transaction() {
2518        // Test that session.create_node() is transaction-aware
2519        let db = GrafeoDB::new_in_memory();
2520
2521        // Create a node outside of any transaction
2522        let node_before = db.create_node(&["Person"]);
2523        assert!(node_before.is_valid());
2524        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2525
2526        // Start a transaction and create a node through the session
2527        let mut session = db.session();
2528        session.begin_tx().unwrap();
2529
2530        // Create a node through session.create_node() - should be versioned with tx
2531        let node_in_tx = session.create_node(&["Person"]);
2532        assert!(node_in_tx.is_valid());
2533
2534        // Should see 2 nodes at this point
2535        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2536
2537        // Rollback the transaction
2538        session.rollback().unwrap();
2539
2540        // The node created via session.create_node() should be discarded
2541        let count_after = db.node_count();
2542        assert_eq!(
2543            count_after, 1,
2544            "Rollback should discard node created via session.create_node(), but got {count_after}"
2545        );
2546    }
2547
2548    #[test]
2549    fn test_session_create_node_with_props_in_transaction() {
2550        use grafeo_common::types::Value;
2551
2552        // Test that session.create_node_with_props() is transaction-aware
2553        let db = GrafeoDB::new_in_memory();
2554
2555        // Create a node outside of any transaction
2556        db.create_node(&["Person"]);
2557        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2558
2559        // Start a transaction and create a node with properties
2560        let mut session = db.session();
2561        session.begin_tx().unwrap();
2562
2563        let node_in_tx =
2564            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2565        assert!(node_in_tx.is_valid());
2566
2567        // Should see 2 nodes
2568        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2569
2570        // Rollback the transaction
2571        session.rollback().unwrap();
2572
2573        // The node should be discarded
2574        let count_after = db.node_count();
2575        assert_eq!(
2576            count_after, 1,
2577            "Rollback should discard node created via session.create_node_with_props()"
2578        );
2579    }
2580
2581    #[cfg(feature = "gql")]
2582    mod gql_tests {
2583        use super::*;
2584
2585        #[test]
2586        fn test_gql_query_execution() {
2587            let db = GrafeoDB::new_in_memory();
2588            let session = db.session();
2589
2590            // Create some test data
2591            session.create_node(&["Person"]);
2592            session.create_node(&["Person"]);
2593            session.create_node(&["Animal"]);
2594
2595            // Execute a GQL query
2596            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
2597
2598            // Should return 2 Person nodes
2599            assert_eq!(result.row_count(), 2);
2600            assert_eq!(result.column_count(), 1);
2601            assert_eq!(result.columns[0], "n");
2602        }
2603
2604        #[test]
2605        fn test_gql_empty_result() {
2606            let db = GrafeoDB::new_in_memory();
2607            let session = db.session();
2608
2609            // No data in database
2610            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
2611
2612            assert_eq!(result.row_count(), 0);
2613        }
2614
2615        #[test]
2616        fn test_gql_parse_error() {
2617            let db = GrafeoDB::new_in_memory();
2618            let session = db.session();
2619
2620            // Invalid GQL syntax
2621            let result = session.execute("MATCH (n RETURN n");
2622
2623            assert!(result.is_err());
2624        }
2625
2626        #[test]
2627        fn test_gql_relationship_traversal() {
2628            let db = GrafeoDB::new_in_memory();
2629            let session = db.session();
2630
2631            // Create a graph: Alice -> Bob, Alice -> Charlie
2632            let alice = session.create_node(&["Person"]);
2633            let bob = session.create_node(&["Person"]);
2634            let charlie = session.create_node(&["Person"]);
2635
2636            session.create_edge(alice, bob, "KNOWS");
2637            session.create_edge(alice, charlie, "KNOWS");
2638
2639            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
2640            let result = session
2641                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
2642                .unwrap();
2643
2644            // Should return 2 rows (Alice->Bob, Alice->Charlie)
2645            assert_eq!(result.row_count(), 2);
2646            assert_eq!(result.column_count(), 2);
2647            assert_eq!(result.columns[0], "a");
2648            assert_eq!(result.columns[1], "b");
2649        }
2650
2651        #[test]
2652        fn test_gql_relationship_with_type_filter() {
2653            let db = GrafeoDB::new_in_memory();
2654            let session = db.session();
2655
2656            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
2657            let alice = session.create_node(&["Person"]);
2658            let bob = session.create_node(&["Person"]);
2659            let charlie = session.create_node(&["Person"]);
2660
2661            session.create_edge(alice, bob, "KNOWS");
2662            session.create_edge(alice, charlie, "WORKS_WITH");
2663
2664            // Query only KNOWS relationships
2665            let result = session
2666                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
2667                .unwrap();
2668
2669            // Should return only 1 row (Alice->Bob)
2670            assert_eq!(result.row_count(), 1);
2671        }
2672
2673        #[test]
2674        fn test_gql_semantic_error_undefined_variable() {
2675            let db = GrafeoDB::new_in_memory();
2676            let session = db.session();
2677
2678            // Reference undefined variable 'x' in RETURN
2679            let result = session.execute("MATCH (n:Person) RETURN x");
2680
2681            // Should fail with semantic error
2682            assert!(result.is_err());
2683            let Err(err) = result else {
2684                panic!("Expected error")
2685            };
2686            assert!(
2687                err.to_string().contains("Undefined variable"),
2688                "Expected undefined variable error, got: {}",
2689                err
2690            );
2691        }
2692
2693        #[test]
2694        fn test_gql_where_clause_property_filter() {
2695            use grafeo_common::types::Value;
2696
2697            let db = GrafeoDB::new_in_memory();
2698            let session = db.session();
2699
2700            // Create people with ages
2701            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
2702            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
2703            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
2704
2705            // Query with WHERE clause: age > 30
2706            let result = session
2707                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
2708                .unwrap();
2709
2710            // Should return 2 people (ages 35 and 45)
2711            assert_eq!(result.row_count(), 2);
2712        }
2713
2714        #[test]
2715        fn test_gql_where_clause_equality() {
2716            use grafeo_common::types::Value;
2717
2718            let db = GrafeoDB::new_in_memory();
2719            let session = db.session();
2720
2721            // Create people with names
2722            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2723            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
2724            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2725
2726            // Query with WHERE clause: name = "Alice"
2727            let result = session
2728                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
2729                .unwrap();
2730
2731            // Should return 2 people named Alice
2732            assert_eq!(result.row_count(), 2);
2733        }
2734
2735        #[test]
2736        fn test_gql_return_property_access() {
2737            use grafeo_common::types::Value;
2738
2739            let db = GrafeoDB::new_in_memory();
2740            let session = db.session();
2741
2742            // Create people with names and ages
2743            session.create_node_with_props(
2744                &["Person"],
2745                [
2746                    ("name", Value::String("Alice".into())),
2747                    ("age", Value::Int64(30)),
2748                ],
2749            );
2750            session.create_node_with_props(
2751                &["Person"],
2752                [
2753                    ("name", Value::String("Bob".into())),
2754                    ("age", Value::Int64(25)),
2755                ],
2756            );
2757
2758            // Query returning properties
2759            let result = session
2760                .execute("MATCH (n:Person) RETURN n.name, n.age")
2761                .unwrap();
2762
2763            // Should return 2 rows with name and age columns
2764            assert_eq!(result.row_count(), 2);
2765            assert_eq!(result.column_count(), 2);
2766            assert_eq!(result.columns[0], "n.name");
2767            assert_eq!(result.columns[1], "n.age");
2768
2769            // Check that we get actual values
2770            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
2771            assert!(names.contains(&&Value::String("Alice".into())));
2772            assert!(names.contains(&&Value::String("Bob".into())));
2773        }
2774
2775        #[test]
2776        fn test_gql_return_mixed_expressions() {
2777            use grafeo_common::types::Value;
2778
2779            let db = GrafeoDB::new_in_memory();
2780            let session = db.session();
2781
2782            // Create a person
2783            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2784
2785            // Query returning both node and property
2786            let result = session
2787                .execute("MATCH (n:Person) RETURN n, n.name")
2788                .unwrap();
2789
2790            assert_eq!(result.row_count(), 1);
2791            assert_eq!(result.column_count(), 2);
2792            assert_eq!(result.columns[0], "n");
2793            assert_eq!(result.columns[1], "n.name");
2794
2795            // Second column should be the name
2796            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
2797        }
2798    }
2799
2800    #[cfg(feature = "cypher")]
2801    mod cypher_tests {
2802        use super::*;
2803
2804        #[test]
2805        fn test_cypher_query_execution() {
2806            let db = GrafeoDB::new_in_memory();
2807            let session = db.session();
2808
2809            // Create some test data
2810            session.create_node(&["Person"]);
2811            session.create_node(&["Person"]);
2812            session.create_node(&["Animal"]);
2813
2814            // Execute a Cypher query
2815            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
2816
2817            // Should return 2 Person nodes
2818            assert_eq!(result.row_count(), 2);
2819            assert_eq!(result.column_count(), 1);
2820            assert_eq!(result.columns[0], "n");
2821        }
2822
2823        #[test]
2824        fn test_cypher_empty_result() {
2825            let db = GrafeoDB::new_in_memory();
2826            let session = db.session();
2827
2828            // No data in database
2829            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
2830
2831            assert_eq!(result.row_count(), 0);
2832        }
2833
2834        #[test]
2835        fn test_cypher_parse_error() {
2836            let db = GrafeoDB::new_in_memory();
2837            let session = db.session();
2838
2839            // Invalid Cypher syntax
2840            let result = session.execute_cypher("MATCH (n RETURN n");
2841
2842            assert!(result.is_err());
2843        }
2844    }
2845
2846    // ==================== Direct Lookup API Tests ====================
2847
2848    mod direct_lookup_tests {
2849        use super::*;
2850        use grafeo_common::types::Value;
2851
2852        #[test]
2853        fn test_get_node() {
2854            let db = GrafeoDB::new_in_memory();
2855            let session = db.session();
2856
2857            let id = session.create_node(&["Person"]);
2858            let node = session.get_node(id);
2859
2860            assert!(node.is_some());
2861            let node = node.unwrap();
2862            assert_eq!(node.id, id);
2863        }
2864
2865        #[test]
2866        fn test_get_node_not_found() {
2867            use grafeo_common::types::NodeId;
2868
2869            let db = GrafeoDB::new_in_memory();
2870            let session = db.session();
2871
2872            // Try to get a non-existent node
2873            let node = session.get_node(NodeId::new(9999));
2874            assert!(node.is_none());
2875        }
2876
2877        #[test]
2878        fn test_get_node_property() {
2879            let db = GrafeoDB::new_in_memory();
2880            let session = db.session();
2881
2882            let id = session
2883                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
2884
2885            let name = session.get_node_property(id, "name");
2886            assert_eq!(name, Some(Value::String("Alice".into())));
2887
2888            // Non-existent property
2889            let missing = session.get_node_property(id, "missing");
2890            assert!(missing.is_none());
2891        }
2892
2893        #[test]
2894        fn test_get_edge() {
2895            let db = GrafeoDB::new_in_memory();
2896            let session = db.session();
2897
2898            let alice = session.create_node(&["Person"]);
2899            let bob = session.create_node(&["Person"]);
2900            let edge_id = session.create_edge(alice, bob, "KNOWS");
2901
2902            let edge = session.get_edge(edge_id);
2903            assert!(edge.is_some());
2904            let edge = edge.unwrap();
2905            assert_eq!(edge.id, edge_id);
2906            assert_eq!(edge.src, alice);
2907            assert_eq!(edge.dst, bob);
2908        }
2909
2910        #[test]
2911        fn test_get_edge_not_found() {
2912            use grafeo_common::types::EdgeId;
2913
2914            let db = GrafeoDB::new_in_memory();
2915            let session = db.session();
2916
2917            let edge = session.get_edge(EdgeId::new(9999));
2918            assert!(edge.is_none());
2919        }
2920
2921        #[test]
2922        fn test_get_neighbors_outgoing() {
2923            let db = GrafeoDB::new_in_memory();
2924            let session = db.session();
2925
2926            let alice = session.create_node(&["Person"]);
2927            let bob = session.create_node(&["Person"]);
2928            let carol = session.create_node(&["Person"]);
2929
2930            session.create_edge(alice, bob, "KNOWS");
2931            session.create_edge(alice, carol, "KNOWS");
2932
2933            let neighbors = session.get_neighbors_outgoing(alice);
2934            assert_eq!(neighbors.len(), 2);
2935
2936            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
2937            assert!(neighbor_ids.contains(&bob));
2938            assert!(neighbor_ids.contains(&carol));
2939        }
2940
2941        #[test]
2942        fn test_get_neighbors_incoming() {
2943            let db = GrafeoDB::new_in_memory();
2944            let session = db.session();
2945
2946            let alice = session.create_node(&["Person"]);
2947            let bob = session.create_node(&["Person"]);
2948            let carol = session.create_node(&["Person"]);
2949
2950            session.create_edge(bob, alice, "KNOWS");
2951            session.create_edge(carol, alice, "KNOWS");
2952
2953            let neighbors = session.get_neighbors_incoming(alice);
2954            assert_eq!(neighbors.len(), 2);
2955
2956            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
2957            assert!(neighbor_ids.contains(&bob));
2958            assert!(neighbor_ids.contains(&carol));
2959        }
2960
2961        #[test]
2962        fn test_get_neighbors_outgoing_by_type() {
2963            let db = GrafeoDB::new_in_memory();
2964            let session = db.session();
2965
2966            let alice = session.create_node(&["Person"]);
2967            let bob = session.create_node(&["Person"]);
2968            let company = session.create_node(&["Company"]);
2969
2970            session.create_edge(alice, bob, "KNOWS");
2971            session.create_edge(alice, company, "WORKS_AT");
2972
2973            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
2974            assert_eq!(knows_neighbors.len(), 1);
2975            assert_eq!(knows_neighbors[0].0, bob);
2976
2977            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
2978            assert_eq!(works_neighbors.len(), 1);
2979            assert_eq!(works_neighbors[0].0, company);
2980
2981            // No edges of this type
2982            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
2983            assert!(no_neighbors.is_empty());
2984        }
2985
2986        #[test]
2987        fn test_node_exists() {
2988            use grafeo_common::types::NodeId;
2989
2990            let db = GrafeoDB::new_in_memory();
2991            let session = db.session();
2992
2993            let id = session.create_node(&["Person"]);
2994
2995            assert!(session.node_exists(id));
2996            assert!(!session.node_exists(NodeId::new(9999)));
2997        }
2998
2999        #[test]
3000        fn test_edge_exists() {
3001            use grafeo_common::types::EdgeId;
3002
3003            let db = GrafeoDB::new_in_memory();
3004            let session = db.session();
3005
3006            let alice = session.create_node(&["Person"]);
3007            let bob = session.create_node(&["Person"]);
3008            let edge_id = session.create_edge(alice, bob, "KNOWS");
3009
3010            assert!(session.edge_exists(edge_id));
3011            assert!(!session.edge_exists(EdgeId::new(9999)));
3012        }
3013
3014        #[test]
3015        fn test_get_degree() {
3016            let db = GrafeoDB::new_in_memory();
3017            let session = db.session();
3018
3019            let alice = session.create_node(&["Person"]);
3020            let bob = session.create_node(&["Person"]);
3021            let carol = session.create_node(&["Person"]);
3022
3023            // Alice knows Bob and Carol (2 outgoing)
3024            session.create_edge(alice, bob, "KNOWS");
3025            session.create_edge(alice, carol, "KNOWS");
3026            // Bob knows Alice (1 incoming for Alice)
3027            session.create_edge(bob, alice, "KNOWS");
3028
3029            let (out_degree, in_degree) = session.get_degree(alice);
3030            assert_eq!(out_degree, 2);
3031            assert_eq!(in_degree, 1);
3032
3033            // Node with no edges
3034            let lonely = session.create_node(&["Person"]);
3035            let (out, in_deg) = session.get_degree(lonely);
3036            assert_eq!(out, 0);
3037            assert_eq!(in_deg, 0);
3038        }
3039
3040        #[test]
3041        fn test_get_nodes_batch() {
3042            let db = GrafeoDB::new_in_memory();
3043            let session = db.session();
3044
3045            let alice = session.create_node(&["Person"]);
3046            let bob = session.create_node(&["Person"]);
3047            let carol = session.create_node(&["Person"]);
3048
3049            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
3050            assert_eq!(nodes.len(), 3);
3051            assert!(nodes[0].is_some());
3052            assert!(nodes[1].is_some());
3053            assert!(nodes[2].is_some());
3054
3055            // With non-existent node
3056            use grafeo_common::types::NodeId;
3057            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
3058            assert_eq!(nodes_with_missing.len(), 3);
3059            assert!(nodes_with_missing[0].is_some());
3060            assert!(nodes_with_missing[1].is_none()); // Missing node
3061            assert!(nodes_with_missing[2].is_some());
3062        }
3063
3064        #[test]
3065        fn test_auto_commit_setting() {
3066            let db = GrafeoDB::new_in_memory();
3067            let mut session = db.session();
3068
3069            // Default is auto-commit enabled
3070            assert!(session.auto_commit());
3071
3072            session.set_auto_commit(false);
3073            assert!(!session.auto_commit());
3074
3075            session.set_auto_commit(true);
3076            assert!(session.auto_commit());
3077        }
3078
3079        #[test]
3080        fn test_transaction_double_begin_error() {
3081            let db = GrafeoDB::new_in_memory();
3082            let mut session = db.session();
3083
3084            session.begin_tx().unwrap();
3085            let result = session.begin_tx();
3086
3087            assert!(result.is_err());
3088            // Clean up
3089            session.rollback().unwrap();
3090        }
3091
3092        #[test]
3093        fn test_commit_without_transaction_error() {
3094            let db = GrafeoDB::new_in_memory();
3095            let mut session = db.session();
3096
3097            let result = session.commit();
3098            assert!(result.is_err());
3099        }
3100
3101        #[test]
3102        fn test_rollback_without_transaction_error() {
3103            let db = GrafeoDB::new_in_memory();
3104            let mut session = db.session();
3105
3106            let result = session.rollback();
3107            assert!(result.is_err());
3108        }
3109
3110        #[test]
3111        fn test_create_edge_in_transaction() {
3112            let db = GrafeoDB::new_in_memory();
3113            let mut session = db.session();
3114
3115            // Create nodes outside transaction
3116            let alice = session.create_node(&["Person"]);
3117            let bob = session.create_node(&["Person"]);
3118
3119            // Create edge in transaction
3120            session.begin_tx().unwrap();
3121            let edge_id = session.create_edge(alice, bob, "KNOWS");
3122
3123            // Edge should be visible in the transaction
3124            assert!(session.edge_exists(edge_id));
3125
3126            // Commit
3127            session.commit().unwrap();
3128
3129            // Edge should still be visible
3130            assert!(session.edge_exists(edge_id));
3131        }
3132
3133        #[test]
3134        fn test_neighbors_empty_node() {
3135            let db = GrafeoDB::new_in_memory();
3136            let session = db.session();
3137
3138            let lonely = session.create_node(&["Person"]);
3139
3140            assert!(session.get_neighbors_outgoing(lonely).is_empty());
3141            assert!(session.get_neighbors_incoming(lonely).is_empty());
3142            assert!(
3143                session
3144                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
3145                    .is_empty()
3146            );
3147        }
3148    }
3149
3150    #[test]
3151    fn test_auto_gc_triggers_on_commit_interval() {
3152        use crate::config::Config;
3153
3154        let config = Config::in_memory().with_gc_interval(2);
3155        let db = GrafeoDB::with_config(config).unwrap();
3156        let mut session = db.session();
3157
3158        // First commit: counter = 1, no GC (not a multiple of 2)
3159        session.begin_tx().unwrap();
3160        session.create_node(&["A"]);
3161        session.commit().unwrap();
3162
3163        // Second commit: counter = 2, GC should trigger (multiple of 2)
3164        session.begin_tx().unwrap();
3165        session.create_node(&["B"]);
3166        session.commit().unwrap();
3167
3168        // Verify the database is still functional after GC
3169        assert_eq!(db.node_count(), 2);
3170    }
3171
3172    #[test]
3173    fn test_query_timeout_config_propagates_to_session() {
3174        use crate::config::Config;
3175        use std::time::Duration;
3176
3177        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
3178        let db = GrafeoDB::with_config(config).unwrap();
3179        let session = db.session();
3180
3181        // Verify the session has a query deadline (timeout was set)
3182        assert!(session.query_deadline().is_some());
3183    }
3184
3185    #[test]
3186    fn test_no_query_timeout_returns_no_deadline() {
3187        let db = GrafeoDB::new_in_memory();
3188        let session = db.session();
3189
3190        // Default config has no timeout
3191        assert!(session.query_deadline().is_none());
3192    }
3193
3194    #[test]
3195    fn test_graph_model_accessor() {
3196        use crate::config::GraphModel;
3197
3198        let db = GrafeoDB::new_in_memory();
3199        let session = db.session();
3200
3201        assert_eq!(session.graph_model(), GraphModel::Lpg);
3202    }
3203
3204    #[cfg(feature = "gql")]
3205    #[test]
3206    fn test_external_store_session() {
3207        use grafeo_core::graph::GraphStoreMut;
3208        use std::sync::Arc;
3209
3210        let config = crate::config::Config::in_memory();
3211        let store = Arc::new(grafeo_core::graph::lpg::LpgStore::new()) as Arc<dyn GraphStoreMut>;
3212        let db = GrafeoDB::with_store(store, config).unwrap();
3213
3214        let session = db.session();
3215
3216        // Create data through a query (goes through the external graph_store)
3217        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
3218
3219        // Verify we can query through it
3220        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
3221        assert_eq!(result.row_count(), 1);
3222    }
3223
3224    // ==================== Session Command Tests ====================
3225
3226    #[cfg(feature = "gql")]
3227    mod session_command_tests {
3228        use super::*;
3229
3230        #[test]
3231        fn test_use_graph_sets_current_graph() {
3232            let db = GrafeoDB::new_in_memory();
3233            let session = db.session();
3234
3235            // Create the graph first, then USE it
3236            session.execute("CREATE GRAPH mydb").unwrap();
3237            session.execute("USE GRAPH mydb").unwrap();
3238
3239            assert_eq!(session.current_graph(), Some("mydb".to_string()));
3240        }
3241
3242        #[test]
3243        fn test_use_graph_nonexistent_errors() {
3244            let db = GrafeoDB::new_in_memory();
3245            let session = db.session();
3246
3247            let result = session.execute("USE GRAPH doesnotexist");
3248            assert!(result.is_err());
3249            let err = result.unwrap_err().to_string();
3250            assert!(
3251                err.contains("does not exist"),
3252                "Expected 'does not exist' error, got: {err}"
3253            );
3254        }
3255
3256        #[test]
3257        fn test_use_graph_default_always_valid() {
3258            let db = GrafeoDB::new_in_memory();
3259            let session = db.session();
3260
3261            // "default" is always valid, even without CREATE GRAPH
3262            session.execute("USE GRAPH default").unwrap();
3263            assert_eq!(session.current_graph(), Some("default".to_string()));
3264        }
3265
3266        #[test]
3267        fn test_session_set_graph() {
3268            let db = GrafeoDB::new_in_memory();
3269            let session = db.session();
3270
3271            // SESSION SET GRAPH does not verify existence
3272            session.execute("SESSION SET GRAPH analytics").unwrap();
3273            assert_eq!(session.current_graph(), Some("analytics".to_string()));
3274        }
3275
3276        #[test]
3277        fn test_session_set_time_zone() {
3278            let db = GrafeoDB::new_in_memory();
3279            let session = db.session();
3280
3281            assert_eq!(session.time_zone(), None);
3282
3283            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3284            assert_eq!(session.time_zone(), Some("UTC".to_string()));
3285
3286            session
3287                .execute("SESSION SET TIME ZONE 'America/New_York'")
3288                .unwrap();
3289            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
3290        }
3291
3292        #[test]
3293        fn test_session_set_parameter() {
3294            let db = GrafeoDB::new_in_memory();
3295            let session = db.session();
3296
3297            session
3298                .execute("SESSION SET PARAMETER $timeout = 30")
3299                .unwrap();
3300
3301            // Parameter is stored (value is Null for now, since expression
3302            // evaluation is not yet wired up)
3303            assert!(session.get_parameter("timeout").is_some());
3304        }
3305
3306        #[test]
3307        fn test_session_reset_clears_all_state() {
3308            let db = GrafeoDB::new_in_memory();
3309            let session = db.session();
3310
3311            // Set various session state
3312            session.execute("SESSION SET GRAPH analytics").unwrap();
3313            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3314            session
3315                .execute("SESSION SET PARAMETER $limit = 100")
3316                .unwrap();
3317
3318            // Verify state was set
3319            assert!(session.current_graph().is_some());
3320            assert!(session.time_zone().is_some());
3321            assert!(session.get_parameter("limit").is_some());
3322
3323            // Reset everything
3324            session.execute("SESSION RESET").unwrap();
3325
3326            assert_eq!(session.current_graph(), None);
3327            assert_eq!(session.time_zone(), None);
3328            assert!(session.get_parameter("limit").is_none());
3329        }
3330
3331        #[test]
3332        fn test_session_close_clears_state() {
3333            let db = GrafeoDB::new_in_memory();
3334            let session = db.session();
3335
3336            session.execute("SESSION SET GRAPH analytics").unwrap();
3337            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3338
3339            session.execute("SESSION CLOSE").unwrap();
3340
3341            assert_eq!(session.current_graph(), None);
3342            assert_eq!(session.time_zone(), None);
3343        }
3344
3345        #[test]
3346        fn test_create_graph() {
3347            let db = GrafeoDB::new_in_memory();
3348            let session = db.session();
3349
3350            session.execute("CREATE GRAPH mydb").unwrap();
3351
3352            // Should be able to USE it now
3353            session.execute("USE GRAPH mydb").unwrap();
3354            assert_eq!(session.current_graph(), Some("mydb".to_string()));
3355        }
3356
3357        #[test]
3358        fn test_create_graph_duplicate_errors() {
3359            let db = GrafeoDB::new_in_memory();
3360            let session = db.session();
3361
3362            session.execute("CREATE GRAPH mydb").unwrap();
3363            let result = session.execute("CREATE GRAPH mydb");
3364
3365            assert!(result.is_err());
3366            let err = result.unwrap_err().to_string();
3367            assert!(
3368                err.contains("already exists"),
3369                "Expected 'already exists' error, got: {err}"
3370            );
3371        }
3372
3373        #[test]
3374        fn test_create_graph_if_not_exists() {
3375            let db = GrafeoDB::new_in_memory();
3376            let session = db.session();
3377
3378            session.execute("CREATE GRAPH mydb").unwrap();
3379            // Should succeed silently with IF NOT EXISTS
3380            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
3381        }
3382
3383        #[test]
3384        fn test_drop_graph() {
3385            let db = GrafeoDB::new_in_memory();
3386            let session = db.session();
3387
3388            session.execute("CREATE GRAPH mydb").unwrap();
3389            session.execute("DROP GRAPH mydb").unwrap();
3390
3391            // Should no longer be usable
3392            let result = session.execute("USE GRAPH mydb");
3393            assert!(result.is_err());
3394        }
3395
3396        #[test]
3397        fn test_drop_graph_nonexistent_errors() {
3398            let db = GrafeoDB::new_in_memory();
3399            let session = db.session();
3400
3401            let result = session.execute("DROP GRAPH nosuchgraph");
3402            assert!(result.is_err());
3403            let err = result.unwrap_err().to_string();
3404            assert!(
3405                err.contains("does not exist"),
3406                "Expected 'does not exist' error, got: {err}"
3407            );
3408        }
3409
3410        #[test]
3411        fn test_drop_graph_if_exists() {
3412            let db = GrafeoDB::new_in_memory();
3413            let session = db.session();
3414
3415            // Should succeed silently with IF EXISTS
3416            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
3417        }
3418
3419        #[test]
3420        fn test_start_transaction_via_gql() {
3421            let db = GrafeoDB::new_in_memory();
3422            let session = db.session();
3423
3424            session.execute("START TRANSACTION").unwrap();
3425            assert!(session.in_transaction());
3426            session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
3427            session.execute("COMMIT").unwrap();
3428            assert!(!session.in_transaction());
3429
3430            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3431            assert_eq!(result.rows.len(), 1);
3432        }
3433
3434        #[test]
3435        fn test_start_transaction_read_only_blocks_insert() {
3436            let db = GrafeoDB::new_in_memory();
3437            let session = db.session();
3438
3439            session.execute("START TRANSACTION READ ONLY").unwrap();
3440            let result = session.execute("INSERT (:Person {name: 'Alice'})");
3441            assert!(result.is_err());
3442            let err = result.unwrap_err().to_string();
3443            assert!(
3444                err.contains("read-only"),
3445                "Expected read-only error, got: {err}"
3446            );
3447            session.execute("ROLLBACK").unwrap();
3448        }
3449
3450        #[test]
3451        fn test_start_transaction_read_only_allows_reads() {
3452            let db = GrafeoDB::new_in_memory();
3453            let mut session = db.session();
3454            session.begin_tx().unwrap();
3455            session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
3456            session.commit().unwrap();
3457
3458            session.execute("START TRANSACTION READ ONLY").unwrap();
3459            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3460            assert_eq!(result.rows.len(), 1);
3461            session.execute("COMMIT").unwrap();
3462        }
3463
3464        #[test]
3465        fn test_rollback_via_gql() {
3466            let db = GrafeoDB::new_in_memory();
3467            let session = db.session();
3468
3469            session.execute("START TRANSACTION").unwrap();
3470            session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
3471            session.execute("ROLLBACK").unwrap();
3472
3473            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3474            assert!(result.rows.is_empty());
3475        }
3476
3477        #[test]
3478        fn test_start_transaction_with_isolation_level() {
3479            let db = GrafeoDB::new_in_memory();
3480            let session = db.session();
3481
3482            session
3483                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
3484                .unwrap();
3485            assert!(session.in_transaction());
3486            session.execute("ROLLBACK").unwrap();
3487        }
3488
3489        #[test]
3490        fn test_session_commands_return_empty_result() {
3491            let db = GrafeoDB::new_in_memory();
3492            let session = db.session();
3493
3494            let result = session.execute("SESSION SET GRAPH test").unwrap();
3495            assert_eq!(result.row_count(), 0);
3496            assert_eq!(result.column_count(), 0);
3497        }
3498
3499        #[test]
3500        fn test_current_graph_default_is_none() {
3501            let db = GrafeoDB::new_in_memory();
3502            let session = db.session();
3503
3504            assert_eq!(session.current_graph(), None);
3505        }
3506
3507        #[test]
3508        fn test_time_zone_default_is_none() {
3509            let db = GrafeoDB::new_in_memory();
3510            let session = db.session();
3511
3512            assert_eq!(session.time_zone(), None);
3513        }
3514
3515        #[test]
3516        fn test_session_state_independent_across_sessions() {
3517            let db = GrafeoDB::new_in_memory();
3518            let session1 = db.session();
3519            let session2 = db.session();
3520
3521            session1.execute("SESSION SET GRAPH first").unwrap();
3522            session2.execute("SESSION SET GRAPH second").unwrap();
3523
3524            assert_eq!(session1.current_graph(), Some("first".to_string()));
3525            assert_eq!(session2.current_graph(), Some("second".to_string()));
3526        }
3527    }
3528}