Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25/// Your handle to the database - execute queries and manage transactions.
26///
27/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
28/// tracks its own transaction state, so you can have multiple concurrent
29/// sessions without them interfering.
30pub struct Session {
31    /// The underlying store.
32    store: Arc<LpgStore>,
33    /// Graph store trait object for pluggable storage backends.
34    graph_store: Arc<dyn GraphStoreMut>,
35    /// Schema and metadata catalog shared across sessions.
36    catalog: Arc<Catalog>,
37    /// RDF triple store (if RDF feature is enabled).
38    #[cfg(feature = "rdf")]
39    rdf_store: Arc<RdfStore>,
40    /// Transaction manager.
41    transaction_manager: Arc<TransactionManager>,
42    /// Query cache shared across sessions.
43    query_cache: Arc<QueryCache>,
44    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
45    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
46    /// from within `execute(&self)`.
47    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
48    /// Whether the current transaction is read-only (blocks mutations).
49    read_only_tx: parking_lot::Mutex<bool>,
50    /// Whether the session is in auto-commit mode.
51    auto_commit: bool,
52    /// Adaptive execution configuration.
53    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
54    adaptive_config: AdaptiveConfig,
55    /// Whether to use factorized execution for multi-hop queries.
56    factorized_execution: bool,
57    /// The graph data model this session operates on.
58    graph_model: GraphModel,
59    /// Maximum time a query may run before being cancelled.
60    query_timeout: Option<Duration>,
61    /// Shared commit counter for triggering auto-GC.
62    commit_counter: Arc<AtomicUsize>,
63    /// GC every N commits (0 = disabled).
64    gc_interval: usize,
65    /// Node count at the start of the current transaction (for PreparedCommit stats).
66    transaction_start_node_count: AtomicUsize,
67    /// Edge count at the start of the current transaction (for PreparedCommit stats).
68    transaction_start_edge_count: AtomicUsize,
69    /// WAL for logging schema changes.
70    #[cfg(feature = "wal")]
71    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
72    /// CDC log for change tracking.
73    #[cfg(feature = "cdc")]
74    cdc_log: Arc<crate::cdc::CdcLog>,
75    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
76    current_graph: parking_lot::Mutex<Option<String>>,
77    /// Session time zone override.
78    time_zone: parking_lot::Mutex<Option<String>>,
79    /// Session-level parameters (SET PARAMETER).
80    session_params:
81        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
82    /// Override epoch for time-travel queries (None = use transaction/current epoch).
83    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
84    /// Savepoints within the current transaction: name -> (next_node_id, next_edge_id) snapshot.
85    savepoints: parking_lot::Mutex<Vec<(String, u64, u64)>>,
86    /// Nesting depth for nested transactions (0 = outermost).
87    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
88    /// releases it, nested `ROLLBACK` rolls back to it.
89    transaction_nesting_depth: parking_lot::Mutex<u32>,
90}
91
92impl Session {
93    /// Creates a new session with adaptive execution configuration.
94    #[allow(dead_code, clippy::too_many_arguments)]
95    pub(crate) fn with_adaptive(
96        store: Arc<LpgStore>,
97        transaction_manager: Arc<TransactionManager>,
98        query_cache: Arc<QueryCache>,
99        catalog: Arc<Catalog>,
100        adaptive_config: AdaptiveConfig,
101        factorized_execution: bool,
102        graph_model: GraphModel,
103        query_timeout: Option<Duration>,
104        commit_counter: Arc<AtomicUsize>,
105        gc_interval: usize,
106    ) -> Self {
107        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
108        Self {
109            store,
110            graph_store,
111            catalog,
112            #[cfg(feature = "rdf")]
113            rdf_store: Arc::new(RdfStore::new()),
114            transaction_manager,
115            query_cache,
116            current_transaction: parking_lot::Mutex::new(None),
117            read_only_tx: parking_lot::Mutex::new(false),
118            auto_commit: true,
119            adaptive_config,
120            factorized_execution,
121            graph_model,
122            query_timeout,
123            commit_counter,
124            gc_interval,
125            transaction_start_node_count: AtomicUsize::new(0),
126            transaction_start_edge_count: AtomicUsize::new(0),
127            #[cfg(feature = "wal")]
128            wal: None,
129            #[cfg(feature = "cdc")]
130            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
131            current_graph: parking_lot::Mutex::new(None),
132            time_zone: parking_lot::Mutex::new(None),
133            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
134            viewing_epoch_override: parking_lot::Mutex::new(None),
135            savepoints: parking_lot::Mutex::new(Vec::new()),
136            transaction_nesting_depth: parking_lot::Mutex::new(0),
137        }
138    }
139
140    /// Sets the WAL for this session (shared with the database).
141    ///
142    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
143    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
144    #[cfg(feature = "wal")]
145    pub(crate) fn set_wal(&mut self, wal: Arc<grafeo_adapters::storage::wal::LpgWal>) {
146        // Wrap the graph store so query-engine mutations are WAL-logged
147        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
148            Arc::clone(&self.store),
149            Arc::clone(&wal),
150        ));
151        self.wal = Some(wal);
152    }
153
154    /// Sets the CDC log for this session (shared with the database).
155    #[cfg(feature = "cdc")]
156    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
157        self.cdc_log = cdc_log;
158    }
159
160    /// Creates a new session with RDF store and adaptive configuration.
161    #[cfg(feature = "rdf")]
162    #[allow(clippy::too_many_arguments)]
163    pub(crate) fn with_rdf_store_and_adaptive(
164        store: Arc<LpgStore>,
165        rdf_store: Arc<RdfStore>,
166        transaction_manager: Arc<TransactionManager>,
167        query_cache: Arc<QueryCache>,
168        catalog: Arc<Catalog>,
169        adaptive_config: AdaptiveConfig,
170        factorized_execution: bool,
171        graph_model: GraphModel,
172        query_timeout: Option<Duration>,
173        commit_counter: Arc<AtomicUsize>,
174        gc_interval: usize,
175    ) -> Self {
176        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
177        Self {
178            store,
179            graph_store,
180            catalog,
181            rdf_store,
182            transaction_manager,
183            query_cache,
184            current_transaction: parking_lot::Mutex::new(None),
185            read_only_tx: parking_lot::Mutex::new(false),
186            auto_commit: true,
187            adaptive_config,
188            factorized_execution,
189            graph_model,
190            query_timeout,
191            commit_counter,
192            gc_interval,
193            transaction_start_node_count: AtomicUsize::new(0),
194            transaction_start_edge_count: AtomicUsize::new(0),
195            #[cfg(feature = "wal")]
196            wal: None,
197            #[cfg(feature = "cdc")]
198            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
199            current_graph: parking_lot::Mutex::new(None),
200            time_zone: parking_lot::Mutex::new(None),
201            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
202            viewing_epoch_override: parking_lot::Mutex::new(None),
203            savepoints: parking_lot::Mutex::new(Vec::new()),
204            transaction_nesting_depth: parking_lot::Mutex::new(0),
205        }
206    }
207
208    /// Creates a session backed by an external graph store.
209    ///
210    /// The external store handles all data operations. Transaction management
211    /// (begin/commit/rollback) is not supported for external stores.
212    #[allow(clippy::too_many_arguments)]
213    pub(crate) fn with_external_store(
214        store: Arc<dyn GraphStoreMut>,
215        transaction_manager: Arc<TransactionManager>,
216        query_cache: Arc<QueryCache>,
217        catalog: Arc<Catalog>,
218        adaptive_config: AdaptiveConfig,
219        factorized_execution: bool,
220        graph_model: GraphModel,
221        query_timeout: Option<Duration>,
222        commit_counter: Arc<AtomicUsize>,
223        gc_interval: usize,
224    ) -> Self {
225        Self {
226            store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), // dummy for LpgStore-specific ops
227            graph_store: store,
228            catalog,
229            #[cfg(feature = "rdf")]
230            rdf_store: Arc::new(RdfStore::new()),
231            transaction_manager,
232            query_cache,
233            current_transaction: parking_lot::Mutex::new(None),
234            read_only_tx: parking_lot::Mutex::new(false),
235            auto_commit: true,
236            adaptive_config,
237            factorized_execution,
238            graph_model,
239            query_timeout,
240            commit_counter,
241            gc_interval,
242            transaction_start_node_count: AtomicUsize::new(0),
243            transaction_start_edge_count: AtomicUsize::new(0),
244            #[cfg(feature = "wal")]
245            wal: None,
246            #[cfg(feature = "cdc")]
247            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
248            current_graph: parking_lot::Mutex::new(None),
249            time_zone: parking_lot::Mutex::new(None),
250            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
251            viewing_epoch_override: parking_lot::Mutex::new(None),
252            savepoints: parking_lot::Mutex::new(Vec::new()),
253            transaction_nesting_depth: parking_lot::Mutex::new(0),
254        }
255    }
256
257    /// Returns the graph model this session operates on.
258    #[must_use]
259    pub fn graph_model(&self) -> GraphModel {
260        self.graph_model
261    }
262
263    // === Session State Management ===
264
265    /// Sets the current graph for this session (USE GRAPH).
266    pub fn use_graph(&self, name: &str) {
267        *self.current_graph.lock() = Some(name.to_string());
268    }
269
270    /// Returns the current graph name, if any.
271    #[must_use]
272    pub fn current_graph(&self) -> Option<String> {
273        self.current_graph.lock().clone()
274    }
275
276    /// Sets the session time zone.
277    pub fn set_time_zone(&self, tz: &str) {
278        *self.time_zone.lock() = Some(tz.to_string());
279    }
280
281    /// Returns the session time zone, if set.
282    #[must_use]
283    pub fn time_zone(&self) -> Option<String> {
284        self.time_zone.lock().clone()
285    }
286
287    /// Sets a session parameter.
288    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
289        self.session_params.lock().insert(key.to_string(), value);
290    }
291
292    /// Gets a session parameter by cloning it.
293    #[must_use]
294    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
295        self.session_params.lock().get(key).cloned()
296    }
297
298    /// Resets all session state to defaults.
299    pub fn reset_session(&self) {
300        *self.current_graph.lock() = None;
301        *self.time_zone.lock() = None;
302        self.session_params.lock().clear();
303        *self.viewing_epoch_override.lock() = None;
304    }
305
306    // --- Time-travel API ---
307
308    /// Sets a viewing epoch override for time-travel queries.
309    ///
310    /// While set, all queries on this session see the database as it existed
311    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
312    /// to return to normal behavior.
313    pub fn set_viewing_epoch(&self, epoch: EpochId) {
314        *self.viewing_epoch_override.lock() = Some(epoch);
315    }
316
317    /// Clears the viewing epoch override, returning to normal behavior.
318    pub fn clear_viewing_epoch(&self) {
319        *self.viewing_epoch_override.lock() = None;
320    }
321
322    /// Returns the current viewing epoch override, if any.
323    #[must_use]
324    pub fn viewing_epoch(&self) -> Option<EpochId> {
325        *self.viewing_epoch_override.lock()
326    }
327
328    /// Returns all versions of a node with their creation/deletion epochs.
329    ///
330    /// Properties and labels reflect the current state (not versioned per-epoch).
331    #[must_use]
332    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
333        self.store.get_node_history(id)
334    }
335
336    /// Returns all versions of an edge with their creation/deletion epochs.
337    ///
338    /// Properties reflect the current state (not versioned per-epoch).
339    #[must_use]
340    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
341        self.store.get_edge_history(id)
342    }
343
344    /// Checks that the session's graph model supports LPG operations.
345    fn require_lpg(&self, language: &str) -> Result<()> {
346        if self.graph_model == GraphModel::Rdf {
347            return Err(grafeo_common::utils::error::Error::Internal(format!(
348                "This is an RDF database. {language} queries require an LPG database."
349            )));
350        }
351        Ok(())
352    }
353
354    /// Executes a session or transaction command, returning an empty result.
355    #[cfg(feature = "gql")]
356    fn execute_session_command(
357        &self,
358        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
359    ) -> Result<QueryResult> {
360        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
361        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
362
363        match cmd {
364            SessionCommand::CreateGraph {
365                name,
366                if_not_exists,
367                typed,
368                like_graph,
369                copy_of,
370                open: _,
371            } => {
372                // Validate source graph exists for LIKE / AS COPY OF
373                if let Some(ref src) = like_graph
374                    && self.store.graph(src).is_none()
375                {
376                    return Err(Error::Query(QueryError::new(
377                        QueryErrorKind::Semantic,
378                        format!("Source graph '{src}' does not exist"),
379                    )));
380                }
381                if let Some(ref src) = copy_of
382                    && self.store.graph(src).is_none()
383                {
384                    return Err(Error::Query(QueryError::new(
385                        QueryErrorKind::Semantic,
386                        format!("Source graph '{src}' does not exist"),
387                    )));
388                }
389
390                let created = self
391                    .store
392                    .create_graph(&name)
393                    .map_err(|e| Error::Internal(e.to_string()))?;
394                if !created && !if_not_exists {
395                    return Err(Error::Query(QueryError::new(
396                        QueryErrorKind::Semantic,
397                        format!("Graph '{name}' already exists"),
398                    )));
399                }
400
401                // AS COPY OF: copy data from source graph
402                if let Some(ref src) = copy_of {
403                    self.store
404                        .copy_graph(Some(src), Some(&name))
405                        .map_err(|e| Error::Internal(e.to_string()))?;
406                }
407
408                // Bind to graph type if specified
409                if let Some(type_name) = typed
410                    && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
411                {
412                    return Err(Error::Query(QueryError::new(
413                        QueryErrorKind::Semantic,
414                        e.to_string(),
415                    )));
416                }
417
418                // LIKE: copy graph type binding from source
419                if let Some(ref src) = like_graph
420                    && let Some(src_type) = self.catalog.get_graph_type_binding(src)
421                {
422                    let _ = self.catalog.bind_graph_type(&name, src_type);
423                }
424
425                Ok(QueryResult::empty())
426            }
427            SessionCommand::DropGraph { name, if_exists } => {
428                let dropped = self.store.drop_graph(&name);
429                if !dropped && !if_exists {
430                    return Err(Error::Query(QueryError::new(
431                        QueryErrorKind::Semantic,
432                        format!("Graph '{name}' does not exist"),
433                    )));
434                }
435                Ok(QueryResult::empty())
436            }
437            SessionCommand::UseGraph(name) => {
438                // Verify graph exists (default graph is always valid)
439                if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
440                    return Err(Error::Query(QueryError::new(
441                        QueryErrorKind::Semantic,
442                        format!("Graph '{name}' does not exist"),
443                    )));
444                }
445                self.use_graph(&name);
446                Ok(QueryResult::empty())
447            }
448            SessionCommand::SessionSetGraph(name) => {
449                self.use_graph(&name);
450                Ok(QueryResult::empty())
451            }
452            SessionCommand::SessionSetTimeZone(tz) => {
453                self.set_time_zone(&tz);
454                Ok(QueryResult::empty())
455            }
456            SessionCommand::SessionSetParameter(key, expr) => {
457                if key.eq_ignore_ascii_case("viewing_epoch") {
458                    match Self::eval_integer_literal(&expr) {
459                        Some(n) if n >= 0 => {
460                            self.set_viewing_epoch(EpochId::new(n as u64));
461                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
462                        }
463                        _ => Err(Error::Query(QueryError::new(
464                            QueryErrorKind::Semantic,
465                            "viewing_epoch must be a non-negative integer literal",
466                        ))),
467                    }
468                } else {
469                    // For now, store parameter name with Null value.
470                    // Full expression evaluation would require building and executing a plan.
471                    self.set_parameter(&key, Value::Null);
472                    Ok(QueryResult::empty())
473                }
474            }
475            SessionCommand::SessionReset => {
476                self.reset_session();
477                Ok(QueryResult::empty())
478            }
479            SessionCommand::SessionClose => {
480                self.reset_session();
481                Ok(QueryResult::empty())
482            }
483            SessionCommand::StartTransaction {
484                read_only,
485                isolation_level,
486            } => {
487                let engine_level = isolation_level.map(|l| match l {
488                    TransactionIsolationLevel::ReadCommitted => {
489                        crate::transaction::IsolationLevel::ReadCommitted
490                    }
491                    TransactionIsolationLevel::SnapshotIsolation => {
492                        crate::transaction::IsolationLevel::SnapshotIsolation
493                    }
494                    TransactionIsolationLevel::Serializable => {
495                        crate::transaction::IsolationLevel::Serializable
496                    }
497                });
498                self.begin_transaction_inner(read_only, engine_level)?;
499                Ok(QueryResult::status("Transaction started"))
500            }
501            SessionCommand::Commit => {
502                self.commit_inner()?;
503                Ok(QueryResult::status("Transaction committed"))
504            }
505            SessionCommand::Rollback => {
506                self.rollback_inner()?;
507                Ok(QueryResult::status("Transaction rolled back"))
508            }
509            SessionCommand::Savepoint(name) => {
510                self.savepoint(&name)?;
511                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
512            }
513            SessionCommand::RollbackToSavepoint(name) => {
514                self.rollback_to_savepoint(&name)?;
515                Ok(QueryResult::status(format!(
516                    "Rolled back to savepoint '{name}'"
517                )))
518            }
519            SessionCommand::ReleaseSavepoint(name) => {
520                self.release_savepoint(&name)?;
521                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
522            }
523        }
524    }
525
526    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
527    #[cfg(feature = "wal")]
528    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
529        if let Some(ref wal) = self.wal
530            && let Err(e) = wal.log(record)
531        {
532            tracing::warn!("Failed to log schema change to WAL: {}", e);
533        }
534    }
535
536    /// Executes a schema DDL command, returning a status result.
537    #[cfg(feature = "gql")]
538    fn execute_schema_command(
539        &self,
540        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
541    ) -> Result<QueryResult> {
542        use crate::catalog::{
543            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
544        };
545        use grafeo_adapters::query::gql::ast::SchemaStatement;
546        #[cfg(feature = "wal")]
547        use grafeo_adapters::storage::wal::WalRecord;
548        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
549
550        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
551        macro_rules! wal_log {
552            ($self:expr, $record:expr) => {
553                #[cfg(feature = "wal")]
554                $self.log_schema_wal(&$record);
555            };
556        }
557
558        match cmd {
559            SchemaStatement::CreateNodeType(stmt) => {
560                #[cfg(feature = "wal")]
561                let props_for_wal: Vec<(String, String, bool)> = stmt
562                    .properties
563                    .iter()
564                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
565                    .collect();
566                let def = NodeTypeDefinition {
567                    name: stmt.name.clone(),
568                    properties: stmt
569                        .properties
570                        .iter()
571                        .map(|p| TypedProperty {
572                            name: p.name.clone(),
573                            data_type: PropertyDataType::from_type_name(&p.data_type),
574                            nullable: p.nullable,
575                            default_value: None,
576                        })
577                        .collect(),
578                    constraints: Vec::new(),
579                };
580                let result = if stmt.or_replace {
581                    let _ = self.catalog.drop_node_type(&stmt.name);
582                    self.catalog.register_node_type(def)
583                } else {
584                    self.catalog.register_node_type(def)
585                };
586                match result {
587                    Ok(()) => {
588                        wal_log!(
589                            self,
590                            WalRecord::CreateNodeType {
591                                name: stmt.name.clone(),
592                                properties: props_for_wal,
593                                constraints: Vec::new(),
594                            }
595                        );
596                        Ok(QueryResult::status(format!(
597                            "Created node type '{}'",
598                            stmt.name
599                        )))
600                    }
601                    Err(e) if stmt.if_not_exists => {
602                        let _ = e;
603                        Ok(QueryResult::status("No change"))
604                    }
605                    Err(e) => Err(Error::Query(QueryError::new(
606                        QueryErrorKind::Semantic,
607                        e.to_string(),
608                    ))),
609                }
610            }
611            SchemaStatement::CreateEdgeType(stmt) => {
612                #[cfg(feature = "wal")]
613                let props_for_wal: Vec<(String, String, bool)> = stmt
614                    .properties
615                    .iter()
616                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
617                    .collect();
618                let def = EdgeTypeDefinition {
619                    name: stmt.name.clone(),
620                    properties: stmt
621                        .properties
622                        .iter()
623                        .map(|p| TypedProperty {
624                            name: p.name.clone(),
625                            data_type: PropertyDataType::from_type_name(&p.data_type),
626                            nullable: p.nullable,
627                            default_value: None,
628                        })
629                        .collect(),
630                    constraints: Vec::new(),
631                };
632                let result = if stmt.or_replace {
633                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
634                    self.catalog.register_edge_type_def(def)
635                } else {
636                    self.catalog.register_edge_type_def(def)
637                };
638                match result {
639                    Ok(()) => {
640                        wal_log!(
641                            self,
642                            WalRecord::CreateEdgeType {
643                                name: stmt.name.clone(),
644                                properties: props_for_wal,
645                                constraints: Vec::new(),
646                            }
647                        );
648                        Ok(QueryResult::status(format!(
649                            "Created edge type '{}'",
650                            stmt.name
651                        )))
652                    }
653                    Err(e) if stmt.if_not_exists => {
654                        let _ = e;
655                        Ok(QueryResult::status("No change"))
656                    }
657                    Err(e) => Err(Error::Query(QueryError::new(
658                        QueryErrorKind::Semantic,
659                        e.to_string(),
660                    ))),
661                }
662            }
663            SchemaStatement::CreateVectorIndex(stmt) => {
664                Self::create_vector_index_on_store(
665                    &self.store,
666                    &stmt.node_label,
667                    &stmt.property,
668                    stmt.dimensions,
669                    stmt.metric.as_deref(),
670                )?;
671                wal_log!(
672                    self,
673                    WalRecord::CreateIndex {
674                        name: stmt.name.clone(),
675                        label: stmt.node_label.clone(),
676                        property: stmt.property.clone(),
677                        index_type: "vector".to_string(),
678                    }
679                );
680                Ok(QueryResult::status(format!(
681                    "Created vector index '{}'",
682                    stmt.name
683                )))
684            }
685            SchemaStatement::DropNodeType { name, if_exists } => {
686                match self.catalog.drop_node_type(&name) {
687                    Ok(()) => {
688                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
689                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
690                    }
691                    Err(e) if if_exists => {
692                        let _ = e;
693                        Ok(QueryResult::status("No change"))
694                    }
695                    Err(e) => Err(Error::Query(QueryError::new(
696                        QueryErrorKind::Semantic,
697                        e.to_string(),
698                    ))),
699                }
700            }
701            SchemaStatement::DropEdgeType { name, if_exists } => {
702                match self.catalog.drop_edge_type_def(&name) {
703                    Ok(()) => {
704                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
705                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
706                    }
707                    Err(e) if if_exists => {
708                        let _ = e;
709                        Ok(QueryResult::status("No change"))
710                    }
711                    Err(e) => Err(Error::Query(QueryError::new(
712                        QueryErrorKind::Semantic,
713                        e.to_string(),
714                    ))),
715                }
716            }
717            SchemaStatement::CreateIndex(stmt) => {
718                use grafeo_adapters::query::gql::ast::IndexKind;
719                let index_type_str = match stmt.index_kind {
720                    IndexKind::Property => "property",
721                    IndexKind::BTree => "btree",
722                    IndexKind::Text => "text",
723                    IndexKind::Vector => "vector",
724                };
725                match stmt.index_kind {
726                    IndexKind::Property | IndexKind::BTree => {
727                        for prop in &stmt.properties {
728                            self.store.create_property_index(prop);
729                        }
730                    }
731                    IndexKind::Text => {
732                        for prop in &stmt.properties {
733                            Self::create_text_index_on_store(&self.store, &stmt.label, prop)?;
734                        }
735                    }
736                    IndexKind::Vector => {
737                        for prop in &stmt.properties {
738                            Self::create_vector_index_on_store(
739                                &self.store,
740                                &stmt.label,
741                                prop,
742                                stmt.options.dimensions,
743                                stmt.options.metric.as_deref(),
744                            )?;
745                        }
746                    }
747                }
748                #[cfg(feature = "wal")]
749                for prop in &stmt.properties {
750                    wal_log!(
751                        self,
752                        WalRecord::CreateIndex {
753                            name: stmt.name.clone(),
754                            label: stmt.label.clone(),
755                            property: prop.clone(),
756                            index_type: index_type_str.to_string(),
757                        }
758                    );
759                }
760                Ok(QueryResult::status(format!(
761                    "Created {} index '{}'",
762                    index_type_str, stmt.name
763                )))
764            }
765            SchemaStatement::DropIndex { name, if_exists } => {
766                // Try to drop property index by name
767                let dropped = self.store.drop_property_index(&name);
768                if dropped || if_exists {
769                    if dropped {
770                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
771                    }
772                    Ok(QueryResult::status(if dropped {
773                        format!("Dropped index '{name}'")
774                    } else {
775                        "No change".to_string()
776                    }))
777                } else {
778                    Err(Error::Query(QueryError::new(
779                        QueryErrorKind::Semantic,
780                        format!("Index '{name}' does not exist"),
781                    )))
782                }
783            }
784            SchemaStatement::CreateConstraint(stmt) => {
785                use grafeo_adapters::query::gql::ast::ConstraintKind;
786                let kind_str = match stmt.constraint_kind {
787                    ConstraintKind::Unique => "unique",
788                    ConstraintKind::NodeKey => "node_key",
789                    ConstraintKind::NotNull => "not_null",
790                    ConstraintKind::Exists => "exists",
791                };
792                let constraint_name = stmt
793                    .name
794                    .clone()
795                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
796                wal_log!(
797                    self,
798                    WalRecord::CreateConstraint {
799                        name: constraint_name.clone(),
800                        label: stmt.label.clone(),
801                        properties: stmt.properties.clone(),
802                        kind: kind_str.to_string(),
803                    }
804                );
805                Ok(QueryResult::status(format!(
806                    "Created {kind_str} constraint '{constraint_name}'"
807                )))
808            }
809            SchemaStatement::DropConstraint { name, if_exists } => {
810                let _ = if_exists;
811                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
812                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
813            }
814            SchemaStatement::CreateGraphType(stmt) => {
815                use crate::catalog::GraphTypeDefinition;
816                use grafeo_adapters::query::gql::ast::InlineElementType;
817
818                // GG04: LIKE clause copies type from existing graph
819                let (mut node_types, mut edge_types, open) =
820                    if let Some(ref like_graph) = stmt.like_graph {
821                        // Infer types from the graph's bound type, or use its existing types
822                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
823                            if let Some(existing) = self
824                                .catalog
825                                .schema()
826                                .and_then(|s| s.get_graph_type(&type_name))
827                            {
828                                (
829                                    existing.allowed_node_types.clone(),
830                                    existing.allowed_edge_types.clone(),
831                                    existing.open,
832                                )
833                            } else {
834                                (Vec::new(), Vec::new(), true)
835                            }
836                        } else {
837                            // GG22: Infer from graph data (labels used in graph)
838                            let nt = self.catalog.all_node_type_names();
839                            let et = self.catalog.all_edge_type_names();
840                            if nt.is_empty() && et.is_empty() {
841                                (Vec::new(), Vec::new(), true)
842                            } else {
843                                (nt, et, false)
844                            }
845                        }
846                    } else {
847                        (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
848                    };
849
850                // GG03: Register inline element types and add their names
851                for inline in &stmt.inline_types {
852                    match inline {
853                        InlineElementType::Node {
854                            name, properties, ..
855                        } => {
856                            let def = NodeTypeDefinition {
857                                name: name.clone(),
858                                properties: properties
859                                    .iter()
860                                    .map(|p| TypedProperty {
861                                        name: p.name.clone(),
862                                        data_type: PropertyDataType::from_type_name(&p.data_type),
863                                        nullable: p.nullable,
864                                        default_value: None,
865                                    })
866                                    .collect(),
867                                constraints: Vec::new(),
868                            };
869                            // Register or replace so inline defs override existing
870                            self.catalog.register_or_replace_node_type(def);
871                            #[cfg(feature = "wal")]
872                            {
873                                let props_for_wal: Vec<(String, String, bool)> = properties
874                                    .iter()
875                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
876                                    .collect();
877                                self.log_schema_wal(&WalRecord::CreateNodeType {
878                                    name: name.clone(),
879                                    properties: props_for_wal,
880                                    constraints: Vec::new(),
881                                });
882                            }
883                            if !node_types.contains(name) {
884                                node_types.push(name.clone());
885                            }
886                        }
887                        InlineElementType::Edge {
888                            name, properties, ..
889                        } => {
890                            let def = EdgeTypeDefinition {
891                                name: name.clone(),
892                                properties: properties
893                                    .iter()
894                                    .map(|p| TypedProperty {
895                                        name: p.name.clone(),
896                                        data_type: PropertyDataType::from_type_name(&p.data_type),
897                                        nullable: p.nullable,
898                                        default_value: None,
899                                    })
900                                    .collect(),
901                                constraints: Vec::new(),
902                            };
903                            self.catalog.register_or_replace_edge_type_def(def);
904                            #[cfg(feature = "wal")]
905                            {
906                                let props_for_wal: Vec<(String, String, bool)> = properties
907                                    .iter()
908                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
909                                    .collect();
910                                self.log_schema_wal(&WalRecord::CreateEdgeType {
911                                    name: name.clone(),
912                                    properties: props_for_wal,
913                                    constraints: Vec::new(),
914                                });
915                            }
916                            if !edge_types.contains(name) {
917                                edge_types.push(name.clone());
918                            }
919                        }
920                    }
921                }
922
923                let def = GraphTypeDefinition {
924                    name: stmt.name.clone(),
925                    allowed_node_types: node_types.clone(),
926                    allowed_edge_types: edge_types.clone(),
927                    open,
928                };
929                let result = if stmt.or_replace {
930                    // Drop existing first, ignore error if not found
931                    let _ = self.catalog.drop_graph_type(&stmt.name);
932                    self.catalog.register_graph_type(def)
933                } else {
934                    self.catalog.register_graph_type(def)
935                };
936                match result {
937                    Ok(()) => {
938                        wal_log!(
939                            self,
940                            WalRecord::CreateGraphType {
941                                name: stmt.name.clone(),
942                                node_types,
943                                edge_types,
944                                open,
945                            }
946                        );
947                        Ok(QueryResult::status(format!(
948                            "Created graph type '{}'",
949                            stmt.name
950                        )))
951                    }
952                    Err(e) if stmt.if_not_exists => {
953                        let _ = e;
954                        Ok(QueryResult::status("No change"))
955                    }
956                    Err(e) => Err(Error::Query(QueryError::new(
957                        QueryErrorKind::Semantic,
958                        e.to_string(),
959                    ))),
960                }
961            }
962            SchemaStatement::DropGraphType { name, if_exists } => {
963                match self.catalog.drop_graph_type(&name) {
964                    Ok(()) => {
965                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
966                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
967                    }
968                    Err(e) if if_exists => {
969                        let _ = e;
970                        Ok(QueryResult::status("No change"))
971                    }
972                    Err(e) => Err(Error::Query(QueryError::new(
973                        QueryErrorKind::Semantic,
974                        e.to_string(),
975                    ))),
976                }
977            }
978            SchemaStatement::CreateSchema {
979                name,
980                if_not_exists,
981            } => match self.catalog.register_schema_namespace(name.clone()) {
982                Ok(()) => {
983                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
984                    Ok(QueryResult::status(format!("Created schema '{name}'")))
985                }
986                Err(e) if if_not_exists => {
987                    let _ = e;
988                    Ok(QueryResult::status("No change"))
989                }
990                Err(e) => Err(Error::Query(QueryError::new(
991                    QueryErrorKind::Semantic,
992                    e.to_string(),
993                ))),
994            },
995            SchemaStatement::DropSchema { name, if_exists } => {
996                match self.catalog.drop_schema_namespace(&name) {
997                    Ok(()) => {
998                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
999                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1000                    }
1001                    Err(e) if if_exists => {
1002                        let _ = e;
1003                        Ok(QueryResult::status("No change"))
1004                    }
1005                    Err(e) => Err(Error::Query(QueryError::new(
1006                        QueryErrorKind::Semantic,
1007                        e.to_string(),
1008                    ))),
1009                }
1010            }
1011            SchemaStatement::AlterNodeType(stmt) => {
1012                use grafeo_adapters::query::gql::ast::TypeAlteration;
1013                let mut wal_alts = Vec::new();
1014                for alt in &stmt.alterations {
1015                    match alt {
1016                        TypeAlteration::AddProperty(prop) => {
1017                            let typed = TypedProperty {
1018                                name: prop.name.clone(),
1019                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1020                                nullable: prop.nullable,
1021                                default_value: None,
1022                            };
1023                            self.catalog
1024                                .alter_node_type_add_property(&stmt.name, typed)
1025                                .map_err(|e| {
1026                                    Error::Query(QueryError::new(
1027                                        QueryErrorKind::Semantic,
1028                                        e.to_string(),
1029                                    ))
1030                                })?;
1031                            wal_alts.push((
1032                                "add".to_string(),
1033                                prop.name.clone(),
1034                                prop.data_type.clone(),
1035                                prop.nullable,
1036                            ));
1037                        }
1038                        TypeAlteration::DropProperty(name) => {
1039                            self.catalog
1040                                .alter_node_type_drop_property(&stmt.name, name)
1041                                .map_err(|e| {
1042                                    Error::Query(QueryError::new(
1043                                        QueryErrorKind::Semantic,
1044                                        e.to_string(),
1045                                    ))
1046                                })?;
1047                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1048                        }
1049                    }
1050                }
1051                wal_log!(
1052                    self,
1053                    WalRecord::AlterNodeType {
1054                        name: stmt.name.clone(),
1055                        alterations: wal_alts,
1056                    }
1057                );
1058                Ok(QueryResult::status(format!(
1059                    "Altered node type '{}'",
1060                    stmt.name
1061                )))
1062            }
1063            SchemaStatement::AlterEdgeType(stmt) => {
1064                use grafeo_adapters::query::gql::ast::TypeAlteration;
1065                let mut wal_alts = Vec::new();
1066                for alt in &stmt.alterations {
1067                    match alt {
1068                        TypeAlteration::AddProperty(prop) => {
1069                            let typed = TypedProperty {
1070                                name: prop.name.clone(),
1071                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1072                                nullable: prop.nullable,
1073                                default_value: None,
1074                            };
1075                            self.catalog
1076                                .alter_edge_type_add_property(&stmt.name, typed)
1077                                .map_err(|e| {
1078                                    Error::Query(QueryError::new(
1079                                        QueryErrorKind::Semantic,
1080                                        e.to_string(),
1081                                    ))
1082                                })?;
1083                            wal_alts.push((
1084                                "add".to_string(),
1085                                prop.name.clone(),
1086                                prop.data_type.clone(),
1087                                prop.nullable,
1088                            ));
1089                        }
1090                        TypeAlteration::DropProperty(name) => {
1091                            self.catalog
1092                                .alter_edge_type_drop_property(&stmt.name, name)
1093                                .map_err(|e| {
1094                                    Error::Query(QueryError::new(
1095                                        QueryErrorKind::Semantic,
1096                                        e.to_string(),
1097                                    ))
1098                                })?;
1099                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1100                        }
1101                    }
1102                }
1103                wal_log!(
1104                    self,
1105                    WalRecord::AlterEdgeType {
1106                        name: stmt.name.clone(),
1107                        alterations: wal_alts,
1108                    }
1109                );
1110                Ok(QueryResult::status(format!(
1111                    "Altered edge type '{}'",
1112                    stmt.name
1113                )))
1114            }
1115            SchemaStatement::AlterGraphType(stmt) => {
1116                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1117                let mut wal_alts = Vec::new();
1118                for alt in &stmt.alterations {
1119                    match alt {
1120                        GraphTypeAlteration::AddNodeType(name) => {
1121                            self.catalog
1122                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
1123                                .map_err(|e| {
1124                                    Error::Query(QueryError::new(
1125                                        QueryErrorKind::Semantic,
1126                                        e.to_string(),
1127                                    ))
1128                                })?;
1129                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1130                        }
1131                        GraphTypeAlteration::DropNodeType(name) => {
1132                            self.catalog
1133                                .alter_graph_type_drop_node_type(&stmt.name, name)
1134                                .map_err(|e| {
1135                                    Error::Query(QueryError::new(
1136                                        QueryErrorKind::Semantic,
1137                                        e.to_string(),
1138                                    ))
1139                                })?;
1140                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1141                        }
1142                        GraphTypeAlteration::AddEdgeType(name) => {
1143                            self.catalog
1144                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1145                                .map_err(|e| {
1146                                    Error::Query(QueryError::new(
1147                                        QueryErrorKind::Semantic,
1148                                        e.to_string(),
1149                                    ))
1150                                })?;
1151                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1152                        }
1153                        GraphTypeAlteration::DropEdgeType(name) => {
1154                            self.catalog
1155                                .alter_graph_type_drop_edge_type(&stmt.name, name)
1156                                .map_err(|e| {
1157                                    Error::Query(QueryError::new(
1158                                        QueryErrorKind::Semantic,
1159                                        e.to_string(),
1160                                    ))
1161                                })?;
1162                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1163                        }
1164                    }
1165                }
1166                wal_log!(
1167                    self,
1168                    WalRecord::AlterGraphType {
1169                        name: stmt.name.clone(),
1170                        alterations: wal_alts,
1171                    }
1172                );
1173                Ok(QueryResult::status(format!(
1174                    "Altered graph type '{}'",
1175                    stmt.name
1176                )))
1177            }
1178            SchemaStatement::CreateProcedure(stmt) => {
1179                use crate::catalog::ProcedureDefinition;
1180
1181                let def = ProcedureDefinition {
1182                    name: stmt.name.clone(),
1183                    params: stmt
1184                        .params
1185                        .iter()
1186                        .map(|p| (p.name.clone(), p.param_type.clone()))
1187                        .collect(),
1188                    returns: stmt
1189                        .returns
1190                        .iter()
1191                        .map(|r| (r.name.clone(), r.return_type.clone()))
1192                        .collect(),
1193                    body: stmt.body.clone(),
1194                };
1195
1196                if stmt.or_replace {
1197                    self.catalog.replace_procedure(def).map_err(|e| {
1198                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1199                    })?;
1200                } else {
1201                    match self.catalog.register_procedure(def) {
1202                        Ok(()) => {}
1203                        Err(_) if stmt.if_not_exists => {
1204                            return Ok(QueryResult::empty());
1205                        }
1206                        Err(e) => {
1207                            return Err(Error::Query(QueryError::new(
1208                                QueryErrorKind::Semantic,
1209                                e.to_string(),
1210                            )));
1211                        }
1212                    }
1213                }
1214
1215                wal_log!(
1216                    self,
1217                    WalRecord::CreateProcedure {
1218                        name: stmt.name.clone(),
1219                        params: stmt
1220                            .params
1221                            .iter()
1222                            .map(|p| (p.name.clone(), p.param_type.clone()))
1223                            .collect(),
1224                        returns: stmt
1225                            .returns
1226                            .iter()
1227                            .map(|r| (r.name.clone(), r.return_type.clone()))
1228                            .collect(),
1229                        body: stmt.body,
1230                    }
1231                );
1232                Ok(QueryResult::status(format!(
1233                    "Created procedure '{}'",
1234                    stmt.name
1235                )))
1236            }
1237            SchemaStatement::DropProcedure { name, if_exists } => {
1238                match self.catalog.drop_procedure(&name) {
1239                    Ok(()) => {}
1240                    Err(_) if if_exists => {
1241                        return Ok(QueryResult::empty());
1242                    }
1243                    Err(e) => {
1244                        return Err(Error::Query(QueryError::new(
1245                            QueryErrorKind::Semantic,
1246                            e.to_string(),
1247                        )));
1248                    }
1249                }
1250                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1251                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1252            }
1253        }
1254    }
1255
1256    /// Creates a vector index on the store by scanning existing nodes.
1257    #[cfg(all(feature = "gql", feature = "vector-index"))]
1258    fn create_vector_index_on_store(
1259        store: &LpgStore,
1260        label: &str,
1261        property: &str,
1262        dimensions: Option<usize>,
1263        metric: Option<&str>,
1264    ) -> Result<()> {
1265        use grafeo_common::types::{PropertyKey, Value};
1266        use grafeo_common::utils::error::Error;
1267        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1268
1269        let metric = match metric {
1270            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1271                Error::Internal(format!(
1272                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1273                ))
1274            })?,
1275            None => DistanceMetric::Cosine,
1276        };
1277
1278        let prop_key = PropertyKey::new(property);
1279        let mut found_dims: Option<usize> = dimensions;
1280        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1281
1282        for node in store.nodes_with_label(label) {
1283            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1284                if let Some(expected) = found_dims {
1285                    if v.len() != expected {
1286                        return Err(Error::Internal(format!(
1287                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1288                            v.len(),
1289                            node.id.0
1290                        )));
1291                    }
1292                } else {
1293                    found_dims = Some(v.len());
1294                }
1295                vectors.push((node.id, v.to_vec()));
1296            }
1297        }
1298
1299        let Some(dims) = found_dims else {
1300            return Err(Error::Internal(format!(
1301                "No vector properties found on :{label}({property}) and no dimensions specified"
1302            )));
1303        };
1304
1305        let config = HnswConfig::new(dims, metric);
1306        let index = HnswIndex::with_capacity(config, vectors.len());
1307        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1308        for (node_id, vec) in &vectors {
1309            index.insert(*node_id, vec, &accessor);
1310        }
1311
1312        store.add_vector_index(label, property, Arc::new(index));
1313        Ok(())
1314    }
1315
1316    /// Stub for when vector-index feature is not enabled.
1317    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1318    fn create_vector_index_on_store(
1319        _store: &LpgStore,
1320        _label: &str,
1321        _property: &str,
1322        _dimensions: Option<usize>,
1323        _metric: Option<&str>,
1324    ) -> Result<()> {
1325        Err(grafeo_common::utils::error::Error::Internal(
1326            "Vector index support requires the 'vector-index' feature".to_string(),
1327        ))
1328    }
1329
1330    /// Creates a text index on the store by scanning existing nodes.
1331    #[cfg(all(feature = "gql", feature = "text-index"))]
1332    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1333        use grafeo_common::types::{PropertyKey, Value};
1334        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1335
1336        let mut index = InvertedIndex::new(BM25Config::default());
1337        let prop_key = PropertyKey::new(property);
1338
1339        let nodes = store.nodes_by_label(label);
1340        for node_id in nodes {
1341            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1342                index.insert(node_id, text.as_str());
1343            }
1344        }
1345
1346        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1347        Ok(())
1348    }
1349
1350    /// Stub for when text-index feature is not enabled.
1351    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1352    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1353        Err(grafeo_common::utils::error::Error::Internal(
1354            "Text index support requires the 'text-index' feature".to_string(),
1355        ))
1356    }
1357
1358    /// Executes a GQL query.
1359    ///
1360    /// # Errors
1361    ///
1362    /// Returns an error if the query fails to parse or execute.
1363    ///
1364    /// # Examples
1365    ///
1366    /// ```no_run
1367    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1368    /// use grafeo_engine::GrafeoDB;
1369    ///
1370    /// let db = GrafeoDB::new_in_memory();
1371    /// let session = db.session();
1372    ///
1373    /// // Create a node
1374    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
1375    ///
1376    /// // Query nodes
1377    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
1378    /// for row in &result.rows {
1379    ///     println!("{:?}", row);
1380    /// }
1381    /// # Ok(())
1382    /// # }
1383    /// ```
1384    #[cfg(feature = "gql")]
1385    pub fn execute(&self, query: &str) -> Result<QueryResult> {
1386        self.require_lpg("GQL")?;
1387
1388        use crate::query::{
1389            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1390            processor::QueryLanguage, translators::gql,
1391        };
1392
1393        #[cfg(not(target_arch = "wasm32"))]
1394        let start_time = std::time::Instant::now();
1395
1396        // Parse and translate, checking for session/schema commands first
1397        let translation = gql::translate_full(query)?;
1398        let logical_plan = match translation {
1399            gql::GqlTranslationResult::SessionCommand(cmd) => {
1400                return self.execute_session_command(cmd);
1401            }
1402            gql::GqlTranslationResult::SchemaCommand(cmd) => {
1403                // All DDL is a write operation
1404                if *self.read_only_tx.lock() {
1405                    return Err(grafeo_common::utils::error::Error::Transaction(
1406                        grafeo_common::utils::error::TransactionError::ReadOnly,
1407                    ));
1408                }
1409                return self.execute_schema_command(cmd);
1410            }
1411            gql::GqlTranslationResult::Plan(plan) => {
1412                // Block mutations in read-only transactions
1413                if *self.read_only_tx.lock() && plan.root.has_mutations() {
1414                    return Err(grafeo_common::utils::error::Error::Transaction(
1415                        grafeo_common::utils::error::TransactionError::ReadOnly,
1416                    ));
1417                }
1418                plan
1419            }
1420        };
1421
1422        // Create cache key for this query
1423        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
1424
1425        // Try to get cached optimized plan, or use the plan we just translated
1426        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1427            cached_plan
1428        } else {
1429            // Semantic validation
1430            let mut binder = Binder::new();
1431            let _binding_context = binder.bind(&logical_plan)?;
1432
1433            // Optimize the plan
1434            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1435            let plan = optimizer.optimize(logical_plan)?;
1436
1437            // Cache the optimized plan for future use
1438            self.query_cache.put_optimized(cache_key, plan.clone());
1439
1440            plan
1441        };
1442
1443        // EXPLAIN: annotate pushdown hints and return the plan tree
1444        if optimized_plan.explain {
1445            use crate::query::processor::{annotate_pushdown_hints, explain_result};
1446            let mut plan = optimized_plan;
1447            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
1448            return Ok(explain_result(&plan));
1449        }
1450
1451        let has_mutations = optimized_plan.root.has_mutations();
1452
1453        self.with_auto_commit(has_mutations, || {
1454            // Get transaction context for MVCC visibility
1455            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1456
1457            // Convert to physical plan with transaction context
1458            // (Physical planning cannot be cached as it depends on transaction state)
1459            let planner = self.create_planner(viewing_epoch, transaction_id);
1460            let mut physical_plan = planner.plan(&optimized_plan)?;
1461
1462            // Execute the plan
1463            let executor = Executor::with_columns(physical_plan.columns.clone())
1464                .with_deadline(self.query_deadline());
1465            let mut result = executor.execute(physical_plan.operator.as_mut())?;
1466
1467            // Add execution metrics
1468            let rows_scanned = result.rows.len() as u64;
1469            #[cfg(not(target_arch = "wasm32"))]
1470            {
1471                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1472                result.execution_time_ms = Some(elapsed_ms);
1473            }
1474            result.rows_scanned = Some(rows_scanned);
1475
1476            Ok(result)
1477        })
1478    }
1479
1480    /// Executes a GQL query with visibility at the specified epoch.
1481    ///
1482    /// This enables time-travel queries: the query sees the database
1483    /// as it existed at the given epoch.
1484    ///
1485    /// # Errors
1486    ///
1487    /// Returns an error if parsing or execution fails.
1488    #[cfg(feature = "gql")]
1489    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
1490        let previous = self.viewing_epoch_override.lock().replace(epoch);
1491        let result = self.execute(query);
1492        *self.viewing_epoch_override.lock() = previous;
1493        result
1494    }
1495
1496    /// Executes a GQL query with parameters.
1497    ///
1498    /// # Errors
1499    ///
1500    /// Returns an error if the query fails to parse or execute.
1501    #[cfg(feature = "gql")]
1502    pub fn execute_with_params(
1503        &self,
1504        query: &str,
1505        params: std::collections::HashMap<String, Value>,
1506    ) -> Result<QueryResult> {
1507        self.require_lpg("GQL")?;
1508
1509        use crate::query::processor::{QueryLanguage, QueryProcessor};
1510
1511        let has_mutations = Self::query_looks_like_mutation(query);
1512
1513        self.with_auto_commit(has_mutations, || {
1514            // Get transaction context for MVCC visibility
1515            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1516
1517            // Create processor with transaction context
1518            let processor = QueryProcessor::for_graph_store_with_transaction(
1519                Arc::clone(&self.graph_store),
1520                Arc::clone(&self.transaction_manager),
1521            );
1522
1523            // Apply transaction context if in a transaction
1524            let processor = if let Some(transaction_id) = transaction_id {
1525                processor.with_transaction_context(viewing_epoch, transaction_id)
1526            } else {
1527                processor
1528            };
1529
1530            processor.process(query, QueryLanguage::Gql, Some(&params))
1531        })
1532    }
1533
1534    /// Executes a GQL query with parameters.
1535    ///
1536    /// # Errors
1537    ///
1538    /// Returns an error if no query language is enabled.
1539    #[cfg(not(any(feature = "gql", feature = "cypher")))]
1540    pub fn execute_with_params(
1541        &self,
1542        _query: &str,
1543        _params: std::collections::HashMap<String, Value>,
1544    ) -> Result<QueryResult> {
1545        Err(grafeo_common::utils::error::Error::Internal(
1546            "No query language enabled".to_string(),
1547        ))
1548    }
1549
1550    /// Executes a GQL query.
1551    ///
1552    /// # Errors
1553    ///
1554    /// Returns an error if no query language is enabled.
1555    #[cfg(not(any(feature = "gql", feature = "cypher")))]
1556    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
1557        Err(grafeo_common::utils::error::Error::Internal(
1558            "No query language enabled".to_string(),
1559        ))
1560    }
1561
1562    /// Executes a Cypher query.
1563    ///
1564    /// # Errors
1565    ///
1566    /// Returns an error if the query fails to parse or execute.
1567    #[cfg(feature = "cypher")]
1568    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
1569        use crate::query::{
1570            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1571            processor::QueryLanguage, translators::cypher,
1572        };
1573
1574        // Create cache key for this query
1575        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
1576
1577        // Try to get cached optimized plan
1578        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1579            cached_plan
1580        } else {
1581            // Parse and translate the query to a logical plan
1582            let logical_plan = cypher::translate(query)?;
1583
1584            // Semantic validation
1585            let mut binder = Binder::new();
1586            let _binding_context = binder.bind(&logical_plan)?;
1587
1588            // Optimize the plan
1589            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1590            let plan = optimizer.optimize(logical_plan)?;
1591
1592            // Cache the optimized plan
1593            self.query_cache.put_optimized(cache_key, plan.clone());
1594
1595            plan
1596        };
1597
1598        let has_mutations = optimized_plan.root.has_mutations();
1599
1600        self.with_auto_commit(has_mutations, || {
1601            // Get transaction context for MVCC visibility
1602            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1603
1604            // Convert to physical plan with transaction context
1605            let planner = self.create_planner(viewing_epoch, transaction_id);
1606            let mut physical_plan = planner.plan(&optimized_plan)?;
1607
1608            // Execute the plan
1609            let executor = Executor::with_columns(physical_plan.columns.clone())
1610                .with_deadline(self.query_deadline());
1611            executor.execute(physical_plan.operator.as_mut())
1612        })
1613    }
1614
1615    /// Executes a Gremlin query.
1616    ///
1617    /// # Errors
1618    ///
1619    /// Returns an error if the query fails to parse or execute.
1620    ///
1621    /// # Examples
1622    ///
1623    /// ```no_run
1624    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1625    /// use grafeo_engine::GrafeoDB;
1626    ///
1627    /// let db = GrafeoDB::new_in_memory();
1628    /// let session = db.session();
1629    ///
1630    /// // Create some nodes first
1631    /// session.create_node(&["Person"]);
1632    ///
1633    /// // Query using Gremlin
1634    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
1635    /// # Ok(())
1636    /// # }
1637    /// ```
1638    #[cfg(feature = "gremlin")]
1639    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
1640        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
1641
1642        // Parse and translate the query to a logical plan
1643        let logical_plan = gremlin::translate(query)?;
1644
1645        // Semantic validation
1646        let mut binder = Binder::new();
1647        let _binding_context = binder.bind(&logical_plan)?;
1648
1649        // Optimize the plan
1650        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1651        let optimized_plan = optimizer.optimize(logical_plan)?;
1652
1653        let has_mutations = optimized_plan.root.has_mutations();
1654
1655        self.with_auto_commit(has_mutations, || {
1656            // Get transaction context for MVCC visibility
1657            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1658
1659            // Convert to physical plan with transaction context
1660            let planner = self.create_planner(viewing_epoch, transaction_id);
1661            let mut physical_plan = planner.plan(&optimized_plan)?;
1662
1663            // Execute the plan
1664            let executor = Executor::with_columns(physical_plan.columns.clone())
1665                .with_deadline(self.query_deadline());
1666            executor.execute(physical_plan.operator.as_mut())
1667        })
1668    }
1669
1670    /// Executes a Gremlin query with parameters.
1671    ///
1672    /// # Errors
1673    ///
1674    /// Returns an error if the query fails to parse or execute.
1675    #[cfg(feature = "gremlin")]
1676    pub fn execute_gremlin_with_params(
1677        &self,
1678        query: &str,
1679        params: std::collections::HashMap<String, Value>,
1680    ) -> Result<QueryResult> {
1681        use crate::query::processor::{QueryLanguage, QueryProcessor};
1682
1683        let has_mutations = Self::query_looks_like_mutation(query);
1684
1685        self.with_auto_commit(has_mutations, || {
1686            // Get transaction context for MVCC visibility
1687            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1688
1689            // Create processor with transaction context
1690            let processor = QueryProcessor::for_graph_store_with_transaction(
1691                Arc::clone(&self.graph_store),
1692                Arc::clone(&self.transaction_manager),
1693            );
1694
1695            // Apply transaction context if in a transaction
1696            let processor = if let Some(transaction_id) = transaction_id {
1697                processor.with_transaction_context(viewing_epoch, transaction_id)
1698            } else {
1699                processor
1700            };
1701
1702            processor.process(query, QueryLanguage::Gremlin, Some(&params))
1703        })
1704    }
1705
1706    /// Executes a GraphQL query against the LPG store.
1707    ///
1708    /// # Errors
1709    ///
1710    /// Returns an error if the query fails to parse or execute.
1711    ///
1712    /// # Examples
1713    ///
1714    /// ```no_run
1715    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1716    /// use grafeo_engine::GrafeoDB;
1717    ///
1718    /// let db = GrafeoDB::new_in_memory();
1719    /// let session = db.session();
1720    ///
1721    /// // Create some nodes first
1722    /// session.create_node(&["User"]);
1723    ///
1724    /// // Query using GraphQL
1725    /// let result = session.execute_graphql("query { user { id name } }")?;
1726    /// # Ok(())
1727    /// # }
1728    /// ```
1729    #[cfg(feature = "graphql")]
1730    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
1731        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
1732
1733        // Parse and translate the query to a logical plan
1734        let logical_plan = graphql::translate(query)?;
1735
1736        // Semantic validation
1737        let mut binder = Binder::new();
1738        let _binding_context = binder.bind(&logical_plan)?;
1739
1740        // Optimize the plan
1741        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1742        let optimized_plan = optimizer.optimize(logical_plan)?;
1743
1744        let has_mutations = optimized_plan.root.has_mutations();
1745
1746        self.with_auto_commit(has_mutations, || {
1747            // Get transaction context for MVCC visibility
1748            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1749
1750            // Convert to physical plan with transaction context
1751            let planner = self.create_planner(viewing_epoch, transaction_id);
1752            let mut physical_plan = planner.plan(&optimized_plan)?;
1753
1754            // Execute the plan
1755            let executor = Executor::with_columns(physical_plan.columns.clone())
1756                .with_deadline(self.query_deadline());
1757            executor.execute(physical_plan.operator.as_mut())
1758        })
1759    }
1760
1761    /// Executes a GraphQL query with parameters.
1762    ///
1763    /// # Errors
1764    ///
1765    /// Returns an error if the query fails to parse or execute.
1766    #[cfg(feature = "graphql")]
1767    pub fn execute_graphql_with_params(
1768        &self,
1769        query: &str,
1770        params: std::collections::HashMap<String, Value>,
1771    ) -> Result<QueryResult> {
1772        use crate::query::processor::{QueryLanguage, QueryProcessor};
1773
1774        let has_mutations = Self::query_looks_like_mutation(query);
1775
1776        self.with_auto_commit(has_mutations, || {
1777            // Get transaction context for MVCC visibility
1778            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1779
1780            // Create processor with transaction context
1781            let processor = QueryProcessor::for_graph_store_with_transaction(
1782                Arc::clone(&self.graph_store),
1783                Arc::clone(&self.transaction_manager),
1784            );
1785
1786            // Apply transaction context if in a transaction
1787            let processor = if let Some(transaction_id) = transaction_id {
1788                processor.with_transaction_context(viewing_epoch, transaction_id)
1789            } else {
1790                processor
1791            };
1792
1793            processor.process(query, QueryLanguage::GraphQL, Some(&params))
1794        })
1795    }
1796
1797    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
1798    ///
1799    /// # Errors
1800    ///
1801    /// Returns an error if the query fails to parse or execute.
1802    ///
1803    /// # Examples
1804    ///
1805    /// ```no_run
1806    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1807    /// use grafeo_engine::GrafeoDB;
1808    ///
1809    /// let db = GrafeoDB::new_in_memory();
1810    /// let session = db.session();
1811    ///
1812    /// let result = session.execute_sql(
1813    ///     "SELECT * FROM GRAPH_TABLE (
1814    ///         MATCH (n:Person)
1815    ///         COLUMNS (n.name AS name)
1816    ///     )"
1817    /// )?;
1818    /// # Ok(())
1819    /// # }
1820    /// ```
1821    #[cfg(feature = "sql-pgq")]
1822    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
1823        use crate::query::{
1824            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
1825            processor::QueryLanguage, translators::sql_pgq,
1826        };
1827
1828        // Parse and translate (always needed to check for DDL)
1829        let logical_plan = sql_pgq::translate(query)?;
1830
1831        // Handle DDL statements directly (they don't go through the query pipeline)
1832        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
1833            return Ok(QueryResult {
1834                columns: vec!["status".into()],
1835                column_types: vec![grafeo_common::types::LogicalType::String],
1836                rows: vec![vec![Value::from(format!(
1837                    "Property graph '{}' created ({} node tables, {} edge tables)",
1838                    cpg.name,
1839                    cpg.node_tables.len(),
1840                    cpg.edge_tables.len()
1841                ))]],
1842                execution_time_ms: None,
1843                rows_scanned: None,
1844                status_message: None,
1845                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1846            });
1847        }
1848
1849        // Create cache key for query plans
1850        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
1851
1852        // Try to get cached optimized plan
1853        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1854            cached_plan
1855        } else {
1856            // Semantic validation
1857            let mut binder = Binder::new();
1858            let _binding_context = binder.bind(&logical_plan)?;
1859
1860            // Optimize the plan
1861            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1862            let plan = optimizer.optimize(logical_plan)?;
1863
1864            // Cache the optimized plan
1865            self.query_cache.put_optimized(cache_key, plan.clone());
1866
1867            plan
1868        };
1869
1870        let has_mutations = optimized_plan.root.has_mutations();
1871
1872        self.with_auto_commit(has_mutations, || {
1873            // Get transaction context for MVCC visibility
1874            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1875
1876            // Convert to physical plan with transaction context
1877            let planner = self.create_planner(viewing_epoch, transaction_id);
1878            let mut physical_plan = planner.plan(&optimized_plan)?;
1879
1880            // Execute the plan
1881            let executor = Executor::with_columns(physical_plan.columns.clone())
1882                .with_deadline(self.query_deadline());
1883            executor.execute(physical_plan.operator.as_mut())
1884        })
1885    }
1886
1887    /// Executes a SQL/PGQ query with parameters.
1888    ///
1889    /// # Errors
1890    ///
1891    /// Returns an error if the query fails to parse or execute.
1892    #[cfg(feature = "sql-pgq")]
1893    pub fn execute_sql_with_params(
1894        &self,
1895        query: &str,
1896        params: std::collections::HashMap<String, Value>,
1897    ) -> Result<QueryResult> {
1898        use crate::query::processor::{QueryLanguage, QueryProcessor};
1899
1900        let has_mutations = Self::query_looks_like_mutation(query);
1901
1902        self.with_auto_commit(has_mutations, || {
1903            // Get transaction context for MVCC visibility
1904            let (viewing_epoch, transaction_id) = self.get_transaction_context();
1905
1906            // Create processor with transaction context
1907            let processor = QueryProcessor::for_graph_store_with_transaction(
1908                Arc::clone(&self.graph_store),
1909                Arc::clone(&self.transaction_manager),
1910            );
1911
1912            // Apply transaction context if in a transaction
1913            let processor = if let Some(transaction_id) = transaction_id {
1914                processor.with_transaction_context(viewing_epoch, transaction_id)
1915            } else {
1916                processor
1917            };
1918
1919            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
1920        })
1921    }
1922
1923    /// Executes a SPARQL query.
1924    ///
1925    /// # Errors
1926    ///
1927    /// Returns an error if the query fails to parse or execute.
1928    #[cfg(all(feature = "sparql", feature = "rdf"))]
1929    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
1930        use crate::query::{
1931            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
1932        };
1933
1934        // Parse and translate the SPARQL query to a logical plan
1935        let logical_plan = sparql::translate(query)?;
1936
1937        // Optimize the plan
1938        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1939        let optimized_plan = optimizer.optimize(logical_plan)?;
1940
1941        // Convert to physical plan using RDF planner
1942        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
1943            .with_transaction_id(*self.current_transaction.lock());
1944        let mut physical_plan = planner.plan(&optimized_plan)?;
1945
1946        // Execute the plan
1947        let executor = Executor::with_columns(physical_plan.columns.clone())
1948            .with_deadline(self.query_deadline());
1949        executor.execute(physical_plan.operator.as_mut())
1950    }
1951
1952    /// Executes a SPARQL query with parameters.
1953    ///
1954    /// # Errors
1955    ///
1956    /// Returns an error if the query fails to parse or execute.
1957    #[cfg(all(feature = "sparql", feature = "rdf"))]
1958    pub fn execute_sparql_with_params(
1959        &self,
1960        query: &str,
1961        _params: std::collections::HashMap<String, Value>,
1962    ) -> Result<QueryResult> {
1963        // TODO: Implement parameter substitution for SPARQL
1964        // For now, just execute the query without parameters
1965        self.execute_sparql(query)
1966    }
1967
1968    /// Executes a query in the specified language by name.
1969    ///
1970    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
1971    /// `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
1972    ///
1973    /// # Errors
1974    ///
1975    /// Returns an error if the language is unknown/disabled or the query fails.
1976    pub fn execute_language(
1977        &self,
1978        query: &str,
1979        language: &str,
1980        params: Option<std::collections::HashMap<String, Value>>,
1981    ) -> Result<QueryResult> {
1982        match language {
1983            "gql" => {
1984                if let Some(p) = params {
1985                    self.execute_with_params(query, p)
1986                } else {
1987                    self.execute(query)
1988                }
1989            }
1990            #[cfg(feature = "cypher")]
1991            "cypher" => {
1992                if let Some(p) = params {
1993                    use crate::query::processor::{QueryLanguage, QueryProcessor};
1994                    let has_mutations = Self::query_looks_like_mutation(query);
1995                    self.with_auto_commit(has_mutations, || {
1996                        let processor = QueryProcessor::for_graph_store_with_transaction(
1997                            Arc::clone(&self.graph_store),
1998                            Arc::clone(&self.transaction_manager),
1999                        );
2000                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2001                        let processor = if let Some(transaction_id) = transaction_id {
2002                            processor.with_transaction_context(viewing_epoch, transaction_id)
2003                        } else {
2004                            processor
2005                        };
2006                        processor.process(query, QueryLanguage::Cypher, Some(&p))
2007                    })
2008                } else {
2009                    self.execute_cypher(query)
2010                }
2011            }
2012            #[cfg(feature = "gremlin")]
2013            "gremlin" => {
2014                if let Some(p) = params {
2015                    self.execute_gremlin_with_params(query, p)
2016                } else {
2017                    self.execute_gremlin(query)
2018                }
2019            }
2020            #[cfg(feature = "graphql")]
2021            "graphql" => {
2022                if let Some(p) = params {
2023                    self.execute_graphql_with_params(query, p)
2024                } else {
2025                    self.execute_graphql(query)
2026                }
2027            }
2028            #[cfg(feature = "sql-pgq")]
2029            "sql" | "sql-pgq" => {
2030                if let Some(p) = params {
2031                    self.execute_sql_with_params(query, p)
2032                } else {
2033                    self.execute_sql(query)
2034                }
2035            }
2036            #[cfg(all(feature = "sparql", feature = "rdf"))]
2037            "sparql" => {
2038                if let Some(p) = params {
2039                    self.execute_sparql_with_params(query, p)
2040                } else {
2041                    self.execute_sparql(query)
2042                }
2043            }
2044            other => Err(grafeo_common::utils::error::Error::Query(
2045                grafeo_common::utils::error::QueryError::new(
2046                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2047                    format!("Unknown query language: '{other}'"),
2048                ),
2049            )),
2050        }
2051    }
2052
2053    /// Begins a new transaction.
2054    ///
2055    /// # Errors
2056    ///
2057    /// Returns an error if a transaction is already active.
2058    ///
2059    /// # Examples
2060    ///
2061    /// ```no_run
2062    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2063    /// use grafeo_engine::GrafeoDB;
2064    ///
2065    /// let db = GrafeoDB::new_in_memory();
2066    /// let mut session = db.session();
2067    ///
2068    /// session.begin_transaction()?;
2069    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2070    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
2071    /// session.commit()?; // Both inserts committed atomically
2072    /// # Ok(())
2073    /// # }
2074    /// ```
2075    pub fn begin_transaction(&mut self) -> Result<()> {
2076        self.begin_transaction_inner(false, None)
2077    }
2078
2079    /// Begins a transaction with a specific isolation level.
2080    ///
2081    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
2082    ///
2083    /// # Errors
2084    ///
2085    /// Returns an error if a transaction is already active.
2086    pub fn begin_transaction_with_isolation(
2087        &mut self,
2088        isolation_level: crate::transaction::IsolationLevel,
2089    ) -> Result<()> {
2090        self.begin_transaction_inner(false, Some(isolation_level))
2091    }
2092
2093    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
2094    fn begin_transaction_inner(
2095        &self,
2096        read_only: bool,
2097        isolation_level: Option<crate::transaction::IsolationLevel>,
2098    ) -> Result<()> {
2099        let mut current = self.current_transaction.lock();
2100        if current.is_some() {
2101            // Nested transaction: create an auto-savepoint instead of a new tx.
2102            drop(current);
2103            let mut depth = self.transaction_nesting_depth.lock();
2104            *depth += 1;
2105            let sp_name = format!("_nested_tx_{}", *depth);
2106            self.savepoint(&sp_name)?;
2107            return Ok(());
2108        }
2109
2110        self.transaction_start_node_count
2111            .store(self.store.node_count(), Ordering::Relaxed);
2112        self.transaction_start_edge_count
2113            .store(self.store.edge_count(), Ordering::Relaxed);
2114        let transaction_id = if let Some(level) = isolation_level {
2115            self.transaction_manager.begin_with_isolation(level)
2116        } else {
2117            self.transaction_manager.begin()
2118        };
2119        *current = Some(transaction_id);
2120        *self.read_only_tx.lock() = read_only;
2121        Ok(())
2122    }
2123
2124    /// Commits the current transaction.
2125    ///
2126    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
2127    ///
2128    /// # Errors
2129    ///
2130    /// Returns an error if no transaction is active.
2131    pub fn commit(&mut self) -> Result<()> {
2132        self.commit_inner()
2133    }
2134
2135    /// Core commit logic, usable from both `&mut self` and `&self` paths.
2136    fn commit_inner(&self) -> Result<()> {
2137        // Nested transaction: release the auto-savepoint (changes are preserved).
2138        {
2139            let mut depth = self.transaction_nesting_depth.lock();
2140            if *depth > 0 {
2141                let sp_name = format!("_nested_tx_{depth}");
2142                *depth -= 1;
2143                drop(depth);
2144                return self.release_savepoint(&sp_name);
2145            }
2146        }
2147
2148        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2149            grafeo_common::utils::error::Error::Transaction(
2150                grafeo_common::utils::error::TransactionError::InvalidState(
2151                    "No active transaction".to_string(),
2152                ),
2153            )
2154        })?;
2155
2156        // Commit RDF store pending operations
2157        #[cfg(feature = "rdf")]
2158        self.rdf_store.commit_transaction(transaction_id);
2159
2160        self.transaction_manager.commit(transaction_id)?;
2161
2162        // Sync the LpgStore epoch with the TxManager so that
2163        // convenience lookups (edge_type, get_edge, get_node) that use
2164        // store.current_epoch() can see versions created at the latest epoch.
2165        self.store
2166            .sync_epoch(self.transaction_manager.current_epoch());
2167
2168        // Reset read-only flag and clear savepoints
2169        *self.read_only_tx.lock() = false;
2170        self.savepoints.lock().clear();
2171
2172        // Auto-GC: periodically prune old MVCC versions
2173        if self.gc_interval > 0 {
2174            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
2175            if count.is_multiple_of(self.gc_interval) {
2176                let min_epoch = self.transaction_manager.min_active_epoch();
2177                self.store.gc_versions(min_epoch);
2178                self.transaction_manager.gc();
2179            }
2180        }
2181
2182        Ok(())
2183    }
2184
2185    /// Aborts the current transaction.
2186    ///
2187    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
2188    ///
2189    /// # Errors
2190    ///
2191    /// Returns an error if no transaction is active.
2192    ///
2193    /// # Examples
2194    ///
2195    /// ```no_run
2196    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2197    /// use grafeo_engine::GrafeoDB;
2198    ///
2199    /// let db = GrafeoDB::new_in_memory();
2200    /// let mut session = db.session();
2201    ///
2202    /// session.begin_transaction()?;
2203    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2204    /// session.rollback()?; // Insert is discarded
2205    /// # Ok(())
2206    /// # }
2207    /// ```
2208    pub fn rollback(&mut self) -> Result<()> {
2209        self.rollback_inner()
2210    }
2211
2212    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
2213    fn rollback_inner(&self) -> Result<()> {
2214        // Nested transaction: rollback to the auto-savepoint.
2215        {
2216            let mut depth = self.transaction_nesting_depth.lock();
2217            if *depth > 0 {
2218                let sp_name = format!("_nested_tx_{depth}");
2219                *depth -= 1;
2220                drop(depth);
2221                return self.rollback_to_savepoint(&sp_name);
2222            }
2223        }
2224
2225        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
2226            grafeo_common::utils::error::Error::Transaction(
2227                grafeo_common::utils::error::TransactionError::InvalidState(
2228                    "No active transaction".to_string(),
2229                ),
2230            )
2231        })?;
2232
2233        // Reset read-only flag
2234        *self.read_only_tx.lock() = false;
2235
2236        // Discard uncommitted versions in the LPG store
2237        self.store.discard_uncommitted_versions(transaction_id);
2238
2239        // Discard pending operations in the RDF store
2240        #[cfg(feature = "rdf")]
2241        self.rdf_store.rollback_transaction(transaction_id);
2242
2243        // Clear savepoints
2244        self.savepoints.lock().clear();
2245
2246        // Mark transaction as aborted in the manager
2247        self.transaction_manager.abort(transaction_id)
2248    }
2249
2250    /// Creates a named savepoint within the current transaction.
2251    ///
2252    /// The savepoint captures the current node/edge ID counters so that
2253    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
2254    /// entities created after this point.
2255    ///
2256    /// # Errors
2257    ///
2258    /// Returns an error if no transaction is active.
2259    pub fn savepoint(&self, name: &str) -> Result<()> {
2260        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
2261            grafeo_common::utils::error::Error::Transaction(
2262                grafeo_common::utils::error::TransactionError::InvalidState(
2263                    "No active transaction".to_string(),
2264                ),
2265            )
2266        })?;
2267
2268        let next_node = self.store.peek_next_node_id();
2269        let next_edge = self.store.peek_next_edge_id();
2270        self.savepoints
2271            .lock()
2272            .push((name.to_string(), next_node, next_edge));
2273        Ok(())
2274    }
2275
2276    /// Rolls back to a named savepoint, undoing all writes made after it.
2277    ///
2278    /// The savepoint and any savepoints created after it are removed.
2279    /// Entities with IDs >= the savepoint snapshot are discarded.
2280    ///
2281    /// # Errors
2282    ///
2283    /// Returns an error if no transaction is active or the savepoint does not exist.
2284    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
2285        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
2286            grafeo_common::utils::error::Error::Transaction(
2287                grafeo_common::utils::error::TransactionError::InvalidState(
2288                    "No active transaction".to_string(),
2289                ),
2290            )
2291        })?;
2292
2293        let mut savepoints = self.savepoints.lock();
2294
2295        // Find the savepoint by name (search from the end for nested savepoints)
2296        let pos = savepoints
2297            .iter()
2298            .rposition(|(n, _, _)| n == name)
2299            .ok_or_else(|| {
2300                grafeo_common::utils::error::Error::Transaction(
2301                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
2302                        "Savepoint '{name}' not found"
2303                    )),
2304                )
2305            })?;
2306
2307        let (_, sp_next_node, sp_next_edge) = savepoints[pos].clone();
2308
2309        // Remove this savepoint and all later ones
2310        savepoints.truncate(pos);
2311        drop(savepoints);
2312
2313        // Discard all nodes with ID >= sp_next_node and edges with ID >= sp_next_edge
2314        let current_next_node = self.store.peek_next_node_id();
2315        let current_next_edge = self.store.peek_next_edge_id();
2316
2317        let node_ids: Vec<NodeId> = (sp_next_node..current_next_node).map(NodeId::new).collect();
2318        let edge_ids: Vec<EdgeId> = (sp_next_edge..current_next_edge).map(EdgeId::new).collect();
2319
2320        if !node_ids.is_empty() || !edge_ids.is_empty() {
2321            self.store
2322                .discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
2323        }
2324
2325        Ok(())
2326    }
2327
2328    /// Releases (removes) a named savepoint without rolling back.
2329    ///
2330    /// # Errors
2331    ///
2332    /// Returns an error if no transaction is active or the savepoint does not exist.
2333    pub fn release_savepoint(&self, name: &str) -> Result<()> {
2334        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
2335            grafeo_common::utils::error::Error::Transaction(
2336                grafeo_common::utils::error::TransactionError::InvalidState(
2337                    "No active transaction".to_string(),
2338                ),
2339            )
2340        })?;
2341
2342        let mut savepoints = self.savepoints.lock();
2343        let pos = savepoints
2344            .iter()
2345            .rposition(|(n, _, _)| n == name)
2346            .ok_or_else(|| {
2347                grafeo_common::utils::error::Error::Transaction(
2348                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
2349                        "Savepoint '{name}' not found"
2350                    )),
2351                )
2352            })?;
2353        savepoints.remove(pos);
2354        Ok(())
2355    }
2356
2357    /// Returns whether a transaction is active.
2358    #[must_use]
2359    pub fn in_transaction(&self) -> bool {
2360        self.current_transaction.lock().is_some()
2361    }
2362
2363    /// Returns the current transaction ID, if any.
2364    #[must_use]
2365    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
2366        *self.current_transaction.lock()
2367    }
2368
2369    /// Returns a reference to the transaction manager.
2370    #[must_use]
2371    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
2372        &self.transaction_manager
2373    }
2374
2375    /// Returns the store's current node count and the count at transaction start.
2376    #[must_use]
2377    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
2378        (
2379            self.transaction_start_node_count.load(Ordering::Relaxed),
2380            self.store.node_count(),
2381        )
2382    }
2383
2384    /// Returns the store's current edge count and the count at transaction start.
2385    #[must_use]
2386    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
2387        (
2388            self.transaction_start_edge_count.load(Ordering::Relaxed),
2389            self.store.edge_count(),
2390        )
2391    }
2392
2393    /// Prepares the current transaction for a two-phase commit.
2394    ///
2395    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
2396    /// lets you inspect pending changes and attach metadata before finalizing.
2397    /// The mutable borrow prevents concurrent operations while the commit is
2398    /// pending.
2399    ///
2400    /// If the `PreparedCommit` is dropped without calling `commit()` or
2401    /// `abort()`, the transaction is automatically rolled back.
2402    ///
2403    /// # Errors
2404    ///
2405    /// Returns an error if no transaction is active.
2406    ///
2407    /// # Examples
2408    ///
2409    /// ```no_run
2410    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2411    /// use grafeo_engine::GrafeoDB;
2412    ///
2413    /// let db = GrafeoDB::new_in_memory();
2414    /// let mut session = db.session();
2415    ///
2416    /// session.begin_transaction()?;
2417    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2418    ///
2419    /// let mut prepared = session.prepare_commit()?;
2420    /// println!("Nodes written: {}", prepared.info().nodes_written);
2421    /// prepared.set_metadata("audit_user", "admin");
2422    /// prepared.commit()?;
2423    /// # Ok(())
2424    /// # }
2425    /// ```
2426    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
2427        crate::transaction::PreparedCommit::new(self)
2428    }
2429
2430    /// Sets auto-commit mode.
2431    pub fn set_auto_commit(&mut self, auto_commit: bool) {
2432        self.auto_commit = auto_commit;
2433    }
2434
2435    /// Returns whether auto-commit is enabled.
2436    #[must_use]
2437    pub fn auto_commit(&self) -> bool {
2438        self.auto_commit
2439    }
2440
2441    /// Returns `true` if auto-commit should wrap this execution.
2442    ///
2443    /// Auto-commit kicks in when: the session is in auto-commit mode,
2444    /// no explicit transaction is active, and the query mutates data.
2445    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
2446        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
2447    }
2448
2449    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
2450    /// returns `true`. On error the transaction is rolled back.
2451    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
2452    where
2453        F: FnOnce() -> Result<QueryResult>,
2454    {
2455        if self.needs_auto_commit(has_mutations) {
2456            self.begin_transaction_inner(false, None)?;
2457            match body() {
2458                Ok(result) => {
2459                    self.commit_inner()?;
2460                    Ok(result)
2461                }
2462                Err(e) => {
2463                    let _ = self.rollback_inner();
2464                    Err(e)
2465                }
2466            }
2467        } else {
2468            body()
2469        }
2470    }
2471
2472    /// Quick heuristic: returns `true` when the query text looks like it
2473    /// performs a mutation. Used by `_with_params` paths that go through the
2474    /// `QueryProcessor` (where the logical plan isn't available before
2475    /// execution). False negatives are harmless: the data just won't be
2476    /// auto-committed, which matches the prior behaviour.
2477    fn query_looks_like_mutation(query: &str) -> bool {
2478        let upper = query.to_ascii_uppercase();
2479        upper.contains("INSERT")
2480            || upper.contains("CREATE")
2481            || upper.contains("DELETE")
2482            || upper.contains("MERGE")
2483            || upper.contains("SET")
2484            || upper.contains("REMOVE")
2485            || upper.contains("DROP")
2486            || upper.contains("ALTER")
2487    }
2488
2489    /// Computes the wall-clock deadline for query execution.
2490    #[must_use]
2491    fn query_deadline(&self) -> Option<Instant> {
2492        #[cfg(not(target_arch = "wasm32"))]
2493        {
2494            self.query_timeout.map(|d| Instant::now() + d)
2495        }
2496        #[cfg(target_arch = "wasm32")]
2497        {
2498            let _ = &self.query_timeout;
2499            None
2500        }
2501    }
2502
2503    /// Evaluates a simple integer literal from a session parameter expression.
2504    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
2505        use grafeo_adapters::query::gql::ast::{Expression, Literal};
2506        match expr {
2507            Expression::Literal(Literal::Integer(n)) => Some(*n),
2508            _ => None,
2509        }
2510    }
2511
2512    /// Returns the current transaction context for MVCC visibility.
2513    ///
2514    /// Returns `(viewing_epoch, transaction_id)` where:
2515    /// - `viewing_epoch` is the epoch at which to check version visibility
2516    /// - `transaction_id` is the current transaction ID (if in a transaction)
2517    #[must_use]
2518    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
2519        // Time-travel override takes precedence (read-only, no tx context)
2520        if let Some(epoch) = *self.viewing_epoch_override.lock() {
2521            return (epoch, None);
2522        }
2523
2524        if let Some(transaction_id) = *self.current_transaction.lock() {
2525            // In a transaction: use the transaction's start epoch
2526            let epoch = self
2527                .transaction_manager
2528                .start_epoch(transaction_id)
2529                .unwrap_or_else(|| self.transaction_manager.current_epoch());
2530            (epoch, Some(transaction_id))
2531        } else {
2532            // No transaction: use current epoch
2533            (self.transaction_manager.current_epoch(), None)
2534        }
2535    }
2536
2537    /// Creates a planner with transaction context and constraint validator.
2538    fn create_planner(
2539        &self,
2540        viewing_epoch: EpochId,
2541        transaction_id: Option<TransactionId>,
2542    ) -> crate::query::Planner {
2543        use crate::query::Planner;
2544
2545        let mut planner = Planner::with_context(
2546            Arc::clone(&self.graph_store),
2547            Arc::clone(&self.transaction_manager),
2548            transaction_id,
2549            viewing_epoch,
2550        )
2551        .with_factorized_execution(self.factorized_execution)
2552        .with_catalog(Arc::clone(&self.catalog));
2553
2554        // Attach the constraint validator for schema enforcement
2555        let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog));
2556        planner = planner.with_validator(Arc::new(validator));
2557
2558        planner
2559    }
2560
2561    /// Creates a node directly (bypassing query execution).
2562    ///
2563    /// This is a low-level API for testing and direct manipulation.
2564    /// If a transaction is active, the node will be versioned with the transaction ID.
2565    pub fn create_node(&self, labels: &[&str]) -> NodeId {
2566        let (epoch, transaction_id) = self.get_transaction_context();
2567        self.store.create_node_versioned(
2568            labels,
2569            epoch,
2570            transaction_id.unwrap_or(TransactionId::SYSTEM),
2571        )
2572    }
2573
2574    /// Creates a node with properties.
2575    ///
2576    /// If a transaction is active, the node will be versioned with the transaction ID.
2577    pub fn create_node_with_props<'a>(
2578        &self,
2579        labels: &[&str],
2580        properties: impl IntoIterator<Item = (&'a str, Value)>,
2581    ) -> NodeId {
2582        let (epoch, transaction_id) = self.get_transaction_context();
2583        self.store.create_node_with_props_versioned(
2584            labels,
2585            properties,
2586            epoch,
2587            transaction_id.unwrap_or(TransactionId::SYSTEM),
2588        )
2589    }
2590
2591    /// Creates an edge between two nodes.
2592    ///
2593    /// This is a low-level API for testing and direct manipulation.
2594    /// If a transaction is active, the edge will be versioned with the transaction ID.
2595    pub fn create_edge(
2596        &self,
2597        src: NodeId,
2598        dst: NodeId,
2599        edge_type: &str,
2600    ) -> grafeo_common::types::EdgeId {
2601        let (epoch, transaction_id) = self.get_transaction_context();
2602        self.store.create_edge_versioned(
2603            src,
2604            dst,
2605            edge_type,
2606            epoch,
2607            transaction_id.unwrap_or(TransactionId::SYSTEM),
2608        )
2609    }
2610
2611    // =========================================================================
2612    // Direct Lookup APIs (bypass query planning for O(1) point reads)
2613    // =========================================================================
2614
2615    /// Gets a node by ID directly, bypassing query planning.
2616    ///
2617    /// This is the fastest way to retrieve a single node when you know its ID.
2618    /// Skips parsing, binding, optimization, and physical planning entirely.
2619    ///
2620    /// # Performance
2621    ///
2622    /// - Time complexity: O(1) average case
2623    /// - No lock contention (uses DashMap internally)
2624    /// - ~20-30x faster than equivalent MATCH query
2625    ///
2626    /// # Example
2627    ///
2628    /// ```no_run
2629    /// # use grafeo_engine::GrafeoDB;
2630    /// # let db = GrafeoDB::new_in_memory();
2631    /// let session = db.session();
2632    /// let node_id = session.create_node(&["Person"]);
2633    ///
2634    /// // Direct lookup - O(1), no query planning
2635    /// let node = session.get_node(node_id);
2636    /// assert!(node.is_some());
2637    /// ```
2638    #[must_use]
2639    pub fn get_node(&self, id: NodeId) -> Option<Node> {
2640        let (epoch, transaction_id) = self.get_transaction_context();
2641        self.store
2642            .get_node_versioned(id, epoch, transaction_id.unwrap_or(TransactionId::SYSTEM))
2643    }
2644
2645    /// Gets a single property from a node by ID, bypassing query planning.
2646    ///
2647    /// More efficient than `get_node()` when you only need one property,
2648    /// as it avoids loading the full node with all properties.
2649    ///
2650    /// # Performance
2651    ///
2652    /// - Time complexity: O(1) average case
2653    /// - No query planning overhead
2654    ///
2655    /// # Example
2656    ///
2657    /// ```no_run
2658    /// # use grafeo_engine::GrafeoDB;
2659    /// # use grafeo_common::types::Value;
2660    /// # let db = GrafeoDB::new_in_memory();
2661    /// let session = db.session();
2662    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
2663    ///
2664    /// // Direct property access - O(1)
2665    /// let name = session.get_node_property(id, "name");
2666    /// assert_eq!(name, Some(Value::String("Alix".into())));
2667    /// ```
2668    #[must_use]
2669    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
2670        self.get_node(id)
2671            .and_then(|node| node.get_property(key).cloned())
2672    }
2673
2674    /// Gets an edge by ID directly, bypassing query planning.
2675    ///
2676    /// # Performance
2677    ///
2678    /// - Time complexity: O(1) average case
2679    /// - No lock contention
2680    #[must_use]
2681    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
2682        let (epoch, transaction_id) = self.get_transaction_context();
2683        self.store
2684            .get_edge_versioned(id, epoch, transaction_id.unwrap_or(TransactionId::SYSTEM))
2685    }
2686
2687    /// Gets outgoing neighbors of a node directly, bypassing query planning.
2688    ///
2689    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
2690    ///
2691    /// # Performance
2692    ///
2693    /// - Time complexity: O(degree) where degree is the number of outgoing edges
2694    /// - Uses adjacency index for direct access
2695    /// - ~10-20x faster than equivalent MATCH query
2696    ///
2697    /// # Example
2698    ///
2699    /// ```no_run
2700    /// # use grafeo_engine::GrafeoDB;
2701    /// # let db = GrafeoDB::new_in_memory();
2702    /// let session = db.session();
2703    /// let alix = session.create_node(&["Person"]);
2704    /// let gus = session.create_node(&["Person"]);
2705    /// session.create_edge(alix, gus, "KNOWS");
2706    ///
2707    /// // Direct neighbor lookup - O(degree)
2708    /// let neighbors = session.get_neighbors_outgoing(alix);
2709    /// assert_eq!(neighbors.len(), 1);
2710    /// assert_eq!(neighbors[0].0, gus);
2711    /// ```
2712    #[must_use]
2713    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2714        self.store.edges_from(node, Direction::Outgoing).collect()
2715    }
2716
2717    /// Gets incoming neighbors of a node directly, bypassing query planning.
2718    ///
2719    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
2720    ///
2721    /// # Performance
2722    ///
2723    /// - Time complexity: O(degree) where degree is the number of incoming edges
2724    /// - Uses backward adjacency index for direct access
2725    #[must_use]
2726    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2727        self.store.edges_from(node, Direction::Incoming).collect()
2728    }
2729
2730    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
2731    ///
2732    /// # Example
2733    ///
2734    /// ```no_run
2735    /// # use grafeo_engine::GrafeoDB;
2736    /// # let db = GrafeoDB::new_in_memory();
2737    /// # let session = db.session();
2738    /// # let alix = session.create_node(&["Person"]);
2739    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
2740    /// ```
2741    #[must_use]
2742    pub fn get_neighbors_outgoing_by_type(
2743        &self,
2744        node: NodeId,
2745        edge_type: &str,
2746    ) -> Vec<(NodeId, EdgeId)> {
2747        self.store
2748            .edges_from(node, Direction::Outgoing)
2749            .filter(|(_, edge_id)| {
2750                self.get_edge(*edge_id)
2751                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
2752            })
2753            .collect()
2754    }
2755
2756    /// Checks if a node exists, bypassing query planning.
2757    ///
2758    /// # Performance
2759    ///
2760    /// - Time complexity: O(1)
2761    /// - Fastest existence check available
2762    #[must_use]
2763    pub fn node_exists(&self, id: NodeId) -> bool {
2764        self.get_node(id).is_some()
2765    }
2766
2767    /// Checks if an edge exists, bypassing query planning.
2768    #[must_use]
2769    pub fn edge_exists(&self, id: EdgeId) -> bool {
2770        self.get_edge(id).is_some()
2771    }
2772
2773    /// Gets the degree (number of edges) of a node.
2774    ///
2775    /// Returns (outgoing_degree, incoming_degree).
2776    #[must_use]
2777    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
2778        let out = self.store.out_degree(node);
2779        let in_degree = self.store.in_degree(node);
2780        (out, in_degree)
2781    }
2782
2783    /// Batch lookup of multiple nodes by ID.
2784    ///
2785    /// More efficient than calling `get_node()` in a loop because it
2786    /// amortizes overhead.
2787    ///
2788    /// # Performance
2789    ///
2790    /// - Time complexity: O(n) where n is the number of IDs
2791    /// - Better cache utilization than individual lookups
2792    #[must_use]
2793    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
2794        let (epoch, transaction_id) = self.get_transaction_context();
2795        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
2796        ids.iter()
2797            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
2798            .collect()
2799    }
2800
2801    // ── Change Data Capture ─────────────────────────────────────────────
2802
2803    /// Returns the full change history for an entity (node or edge).
2804    #[cfg(feature = "cdc")]
2805    pub fn history(
2806        &self,
2807        entity_id: impl Into<crate::cdc::EntityId>,
2808    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2809        Ok(self.cdc_log.history(entity_id.into()))
2810    }
2811
2812    /// Returns change events for an entity since the given epoch.
2813    #[cfg(feature = "cdc")]
2814    pub fn history_since(
2815        &self,
2816        entity_id: impl Into<crate::cdc::EntityId>,
2817        since_epoch: EpochId,
2818    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2819        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
2820    }
2821
2822    /// Returns all change events across all entities in an epoch range.
2823    #[cfg(feature = "cdc")]
2824    pub fn changes_between(
2825        &self,
2826        start_epoch: EpochId,
2827        end_epoch: EpochId,
2828    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2829        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
2830    }
2831}
2832
2833#[cfg(test)]
2834mod tests {
2835    use crate::database::GrafeoDB;
2836
2837    #[test]
2838    fn test_session_create_node() {
2839        let db = GrafeoDB::new_in_memory();
2840        let session = db.session();
2841
2842        let id = session.create_node(&["Person"]);
2843        assert!(id.is_valid());
2844        assert_eq!(db.node_count(), 1);
2845    }
2846
2847    #[test]
2848    fn test_session_transaction() {
2849        let db = GrafeoDB::new_in_memory();
2850        let mut session = db.session();
2851
2852        assert!(!session.in_transaction());
2853
2854        session.begin_transaction().unwrap();
2855        assert!(session.in_transaction());
2856
2857        session.commit().unwrap();
2858        assert!(!session.in_transaction());
2859    }
2860
2861    #[test]
2862    fn test_session_transaction_context() {
2863        let db = GrafeoDB::new_in_memory();
2864        let mut session = db.session();
2865
2866        // Without transaction - context should have current epoch and no transaction_id
2867        let (_epoch1, transaction_id1) = session.get_transaction_context();
2868        assert!(transaction_id1.is_none());
2869
2870        // Start a transaction
2871        session.begin_transaction().unwrap();
2872        let (epoch2, transaction_id2) = session.get_transaction_context();
2873        assert!(transaction_id2.is_some());
2874        // Transaction should have a valid epoch
2875        let _ = epoch2; // Use the variable
2876
2877        // Commit and verify
2878        session.commit().unwrap();
2879        let (epoch3, tx_id3) = session.get_transaction_context();
2880        assert!(tx_id3.is_none());
2881        // Epoch should have advanced after commit
2882        assert!(epoch3.as_u64() >= epoch2.as_u64());
2883    }
2884
2885    #[test]
2886    fn test_session_rollback() {
2887        let db = GrafeoDB::new_in_memory();
2888        let mut session = db.session();
2889
2890        session.begin_transaction().unwrap();
2891        session.rollback().unwrap();
2892        assert!(!session.in_transaction());
2893    }
2894
2895    #[test]
2896    fn test_session_rollback_discards_versions() {
2897        use grafeo_common::types::TransactionId;
2898
2899        let db = GrafeoDB::new_in_memory();
2900
2901        // Create a node outside of any transaction (at system level)
2902        let node_before = db.store().create_node(&["Person"]);
2903        assert!(node_before.is_valid());
2904        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2905
2906        // Start a transaction
2907        let mut session = db.session();
2908        session.begin_transaction().unwrap();
2909        let transaction_id = session.current_transaction.lock().unwrap();
2910
2911        // Create a node versioned with the transaction's ID
2912        let epoch = db.store().current_epoch();
2913        let node_in_tx = db
2914            .store()
2915            .create_node_versioned(&["Person"], epoch, transaction_id);
2916        assert!(node_in_tx.is_valid());
2917
2918        // Should see 2 nodes at this point
2919        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2920
2921        // Rollback the transaction
2922        session.rollback().unwrap();
2923        assert!(!session.in_transaction());
2924
2925        // The node created in the transaction should be discarded
2926        // Only the first node should remain visible
2927        let count_after = db.node_count();
2928        assert_eq!(
2929            count_after, 1,
2930            "Rollback should discard uncommitted node, but got {count_after}"
2931        );
2932
2933        // The original node should still be accessible
2934        let current_epoch = db.store().current_epoch();
2935        assert!(
2936            db.store()
2937                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
2938                .is_some(),
2939            "Original node should still exist"
2940        );
2941
2942        // The node created in the transaction should not be accessible
2943        assert!(
2944            db.store()
2945                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
2946                .is_none(),
2947            "Transaction node should be gone"
2948        );
2949    }
2950
2951    #[test]
2952    fn test_session_create_node_in_transaction() {
2953        // Test that session.create_node() is transaction-aware
2954        let db = GrafeoDB::new_in_memory();
2955
2956        // Create a node outside of any transaction
2957        let node_before = db.create_node(&["Person"]);
2958        assert!(node_before.is_valid());
2959        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2960
2961        // Start a transaction and create a node through the session
2962        let mut session = db.session();
2963        session.begin_transaction().unwrap();
2964
2965        // Create a node through session.create_node() - should be versioned with tx
2966        let node_in_tx = session.create_node(&["Person"]);
2967        assert!(node_in_tx.is_valid());
2968
2969        // Should see 2 nodes at this point
2970        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2971
2972        // Rollback the transaction
2973        session.rollback().unwrap();
2974
2975        // The node created via session.create_node() should be discarded
2976        let count_after = db.node_count();
2977        assert_eq!(
2978            count_after, 1,
2979            "Rollback should discard node created via session.create_node(), but got {count_after}"
2980        );
2981    }
2982
2983    #[test]
2984    fn test_session_create_node_with_props_in_transaction() {
2985        use grafeo_common::types::Value;
2986
2987        // Test that session.create_node_with_props() is transaction-aware
2988        let db = GrafeoDB::new_in_memory();
2989
2990        // Create a node outside of any transaction
2991        db.create_node(&["Person"]);
2992        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2993
2994        // Start a transaction and create a node with properties
2995        let mut session = db.session();
2996        session.begin_transaction().unwrap();
2997
2998        let node_in_tx =
2999            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3000        assert!(node_in_tx.is_valid());
3001
3002        // Should see 2 nodes
3003        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
3004
3005        // Rollback the transaction
3006        session.rollback().unwrap();
3007
3008        // The node should be discarded
3009        let count_after = db.node_count();
3010        assert_eq!(
3011            count_after, 1,
3012            "Rollback should discard node created via session.create_node_with_props()"
3013        );
3014    }
3015
3016    #[cfg(feature = "gql")]
3017    mod gql_tests {
3018        use super::*;
3019
3020        #[test]
3021        fn test_gql_query_execution() {
3022            let db = GrafeoDB::new_in_memory();
3023            let session = db.session();
3024
3025            // Create some test data
3026            session.create_node(&["Person"]);
3027            session.create_node(&["Person"]);
3028            session.create_node(&["Animal"]);
3029
3030            // Execute a GQL query
3031            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3032
3033            // Should return 2 Person nodes
3034            assert_eq!(result.row_count(), 2);
3035            assert_eq!(result.column_count(), 1);
3036            assert_eq!(result.columns[0], "n");
3037        }
3038
3039        #[test]
3040        fn test_gql_empty_result() {
3041            let db = GrafeoDB::new_in_memory();
3042            let session = db.session();
3043
3044            // No data in database
3045            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
3046
3047            assert_eq!(result.row_count(), 0);
3048        }
3049
3050        #[test]
3051        fn test_gql_parse_error() {
3052            let db = GrafeoDB::new_in_memory();
3053            let session = db.session();
3054
3055            // Invalid GQL syntax
3056            let result = session.execute("MATCH (n RETURN n");
3057
3058            assert!(result.is_err());
3059        }
3060
3061        #[test]
3062        fn test_gql_relationship_traversal() {
3063            let db = GrafeoDB::new_in_memory();
3064            let session = db.session();
3065
3066            // Create a graph: Alix -> Gus, Alix -> Vincent
3067            let alix = session.create_node(&["Person"]);
3068            let gus = session.create_node(&["Person"]);
3069            let vincent = session.create_node(&["Person"]);
3070
3071            session.create_edge(alix, gus, "KNOWS");
3072            session.create_edge(alix, vincent, "KNOWS");
3073
3074            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
3075            let result = session
3076                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
3077                .unwrap();
3078
3079            // Should return 2 rows (Alix->Gus, Alix->Vincent)
3080            assert_eq!(result.row_count(), 2);
3081            assert_eq!(result.column_count(), 2);
3082            assert_eq!(result.columns[0], "a");
3083            assert_eq!(result.columns[1], "b");
3084        }
3085
3086        #[test]
3087        fn test_gql_relationship_with_type_filter() {
3088            let db = GrafeoDB::new_in_memory();
3089            let session = db.session();
3090
3091            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
3092            let alix = session.create_node(&["Person"]);
3093            let gus = session.create_node(&["Person"]);
3094            let vincent = session.create_node(&["Person"]);
3095
3096            session.create_edge(alix, gus, "KNOWS");
3097            session.create_edge(alix, vincent, "WORKS_WITH");
3098
3099            // Query only KNOWS relationships
3100            let result = session
3101                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
3102                .unwrap();
3103
3104            // Should return only 1 row (Alix->Gus)
3105            assert_eq!(result.row_count(), 1);
3106        }
3107
3108        #[test]
3109        fn test_gql_semantic_error_undefined_variable() {
3110            let db = GrafeoDB::new_in_memory();
3111            let session = db.session();
3112
3113            // Reference undefined variable 'x' in RETURN
3114            let result = session.execute("MATCH (n:Person) RETURN x");
3115
3116            // Should fail with semantic error
3117            assert!(result.is_err());
3118            let Err(err) = result else {
3119                panic!("Expected error")
3120            };
3121            assert!(
3122                err.to_string().contains("Undefined variable"),
3123                "Expected undefined variable error, got: {}",
3124                err
3125            );
3126        }
3127
3128        #[test]
3129        fn test_gql_where_clause_property_filter() {
3130            use grafeo_common::types::Value;
3131
3132            let db = GrafeoDB::new_in_memory();
3133            let session = db.session();
3134
3135            // Create people with ages
3136            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
3137            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
3138            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
3139
3140            // Query with WHERE clause: age > 30
3141            let result = session
3142                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
3143                .unwrap();
3144
3145            // Should return 2 people (ages 35 and 45)
3146            assert_eq!(result.row_count(), 2);
3147        }
3148
3149        #[test]
3150        fn test_gql_where_clause_equality() {
3151            use grafeo_common::types::Value;
3152
3153            let db = GrafeoDB::new_in_memory();
3154            let session = db.session();
3155
3156            // Create people with names
3157            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3158            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
3159            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3160
3161            // Query with WHERE clause: name = "Alix"
3162            let result = session
3163                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
3164                .unwrap();
3165
3166            // Should return 2 people named Alix
3167            assert_eq!(result.row_count(), 2);
3168        }
3169
3170        #[test]
3171        fn test_gql_return_property_access() {
3172            use grafeo_common::types::Value;
3173
3174            let db = GrafeoDB::new_in_memory();
3175            let session = db.session();
3176
3177            // Create people with names and ages
3178            session.create_node_with_props(
3179                &["Person"],
3180                [
3181                    ("name", Value::String("Alix".into())),
3182                    ("age", Value::Int64(30)),
3183                ],
3184            );
3185            session.create_node_with_props(
3186                &["Person"],
3187                [
3188                    ("name", Value::String("Gus".into())),
3189                    ("age", Value::Int64(25)),
3190                ],
3191            );
3192
3193            // Query returning properties
3194            let result = session
3195                .execute("MATCH (n:Person) RETURN n.name, n.age")
3196                .unwrap();
3197
3198            // Should return 2 rows with name and age columns
3199            assert_eq!(result.row_count(), 2);
3200            assert_eq!(result.column_count(), 2);
3201            assert_eq!(result.columns[0], "n.name");
3202            assert_eq!(result.columns[1], "n.age");
3203
3204            // Check that we get actual values
3205            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
3206            assert!(names.contains(&&Value::String("Alix".into())));
3207            assert!(names.contains(&&Value::String("Gus".into())));
3208        }
3209
3210        #[test]
3211        fn test_gql_return_mixed_expressions() {
3212            use grafeo_common::types::Value;
3213
3214            let db = GrafeoDB::new_in_memory();
3215            let session = db.session();
3216
3217            // Create a person
3218            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3219
3220            // Query returning both node and property
3221            let result = session
3222                .execute("MATCH (n:Person) RETURN n, n.name")
3223                .unwrap();
3224
3225            assert_eq!(result.row_count(), 1);
3226            assert_eq!(result.column_count(), 2);
3227            assert_eq!(result.columns[0], "n");
3228            assert_eq!(result.columns[1], "n.name");
3229
3230            // Second column should be the name
3231            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
3232        }
3233    }
3234
3235    #[cfg(feature = "cypher")]
3236    mod cypher_tests {
3237        use super::*;
3238
3239        #[test]
3240        fn test_cypher_query_execution() {
3241            let db = GrafeoDB::new_in_memory();
3242            let session = db.session();
3243
3244            // Create some test data
3245            session.create_node(&["Person"]);
3246            session.create_node(&["Person"]);
3247            session.create_node(&["Animal"]);
3248
3249            // Execute a Cypher query
3250            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3251
3252            // Should return 2 Person nodes
3253            assert_eq!(result.row_count(), 2);
3254            assert_eq!(result.column_count(), 1);
3255            assert_eq!(result.columns[0], "n");
3256        }
3257
3258        #[test]
3259        fn test_cypher_empty_result() {
3260            let db = GrafeoDB::new_in_memory();
3261            let session = db.session();
3262
3263            // No data in database
3264            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3265
3266            assert_eq!(result.row_count(), 0);
3267        }
3268
3269        #[test]
3270        fn test_cypher_parse_error() {
3271            let db = GrafeoDB::new_in_memory();
3272            let session = db.session();
3273
3274            // Invalid Cypher syntax
3275            let result = session.execute_cypher("MATCH (n RETURN n");
3276
3277            assert!(result.is_err());
3278        }
3279    }
3280
3281    // ==================== Direct Lookup API Tests ====================
3282
3283    mod direct_lookup_tests {
3284        use super::*;
3285        use grafeo_common::types::Value;
3286
3287        #[test]
3288        fn test_get_node() {
3289            let db = GrafeoDB::new_in_memory();
3290            let session = db.session();
3291
3292            let id = session.create_node(&["Person"]);
3293            let node = session.get_node(id);
3294
3295            assert!(node.is_some());
3296            let node = node.unwrap();
3297            assert_eq!(node.id, id);
3298        }
3299
3300        #[test]
3301        fn test_get_node_not_found() {
3302            use grafeo_common::types::NodeId;
3303
3304            let db = GrafeoDB::new_in_memory();
3305            let session = db.session();
3306
3307            // Try to get a non-existent node
3308            let node = session.get_node(NodeId::new(9999));
3309            assert!(node.is_none());
3310        }
3311
3312        #[test]
3313        fn test_get_node_property() {
3314            let db = GrafeoDB::new_in_memory();
3315            let session = db.session();
3316
3317            let id = session
3318                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3319
3320            let name = session.get_node_property(id, "name");
3321            assert_eq!(name, Some(Value::String("Alix".into())));
3322
3323            // Non-existent property
3324            let missing = session.get_node_property(id, "missing");
3325            assert!(missing.is_none());
3326        }
3327
3328        #[test]
3329        fn test_get_edge() {
3330            let db = GrafeoDB::new_in_memory();
3331            let session = db.session();
3332
3333            let alix = session.create_node(&["Person"]);
3334            let gus = session.create_node(&["Person"]);
3335            let edge_id = session.create_edge(alix, gus, "KNOWS");
3336
3337            let edge = session.get_edge(edge_id);
3338            assert!(edge.is_some());
3339            let edge = edge.unwrap();
3340            assert_eq!(edge.id, edge_id);
3341            assert_eq!(edge.src, alix);
3342            assert_eq!(edge.dst, gus);
3343        }
3344
3345        #[test]
3346        fn test_get_edge_not_found() {
3347            use grafeo_common::types::EdgeId;
3348
3349            let db = GrafeoDB::new_in_memory();
3350            let session = db.session();
3351
3352            let edge = session.get_edge(EdgeId::new(9999));
3353            assert!(edge.is_none());
3354        }
3355
3356        #[test]
3357        fn test_get_neighbors_outgoing() {
3358            let db = GrafeoDB::new_in_memory();
3359            let session = db.session();
3360
3361            let alix = session.create_node(&["Person"]);
3362            let gus = session.create_node(&["Person"]);
3363            let harm = session.create_node(&["Person"]);
3364
3365            session.create_edge(alix, gus, "KNOWS");
3366            session.create_edge(alix, harm, "KNOWS");
3367
3368            let neighbors = session.get_neighbors_outgoing(alix);
3369            assert_eq!(neighbors.len(), 2);
3370
3371            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3372            assert!(neighbor_ids.contains(&gus));
3373            assert!(neighbor_ids.contains(&harm));
3374        }
3375
3376        #[test]
3377        fn test_get_neighbors_incoming() {
3378            let db = GrafeoDB::new_in_memory();
3379            let session = db.session();
3380
3381            let alix = session.create_node(&["Person"]);
3382            let gus = session.create_node(&["Person"]);
3383            let harm = session.create_node(&["Person"]);
3384
3385            session.create_edge(gus, alix, "KNOWS");
3386            session.create_edge(harm, alix, "KNOWS");
3387
3388            let neighbors = session.get_neighbors_incoming(alix);
3389            assert_eq!(neighbors.len(), 2);
3390
3391            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3392            assert!(neighbor_ids.contains(&gus));
3393            assert!(neighbor_ids.contains(&harm));
3394        }
3395
3396        #[test]
3397        fn test_get_neighbors_outgoing_by_type() {
3398            let db = GrafeoDB::new_in_memory();
3399            let session = db.session();
3400
3401            let alix = session.create_node(&["Person"]);
3402            let gus = session.create_node(&["Person"]);
3403            let company = session.create_node(&["Company"]);
3404
3405            session.create_edge(alix, gus, "KNOWS");
3406            session.create_edge(alix, company, "WORKS_AT");
3407
3408            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3409            assert_eq!(knows_neighbors.len(), 1);
3410            assert_eq!(knows_neighbors[0].0, gus);
3411
3412            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
3413            assert_eq!(works_neighbors.len(), 1);
3414            assert_eq!(works_neighbors[0].0, company);
3415
3416            // No edges of this type
3417            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
3418            assert!(no_neighbors.is_empty());
3419        }
3420
3421        #[test]
3422        fn test_node_exists() {
3423            use grafeo_common::types::NodeId;
3424
3425            let db = GrafeoDB::new_in_memory();
3426            let session = db.session();
3427
3428            let id = session.create_node(&["Person"]);
3429
3430            assert!(session.node_exists(id));
3431            assert!(!session.node_exists(NodeId::new(9999)));
3432        }
3433
3434        #[test]
3435        fn test_edge_exists() {
3436            use grafeo_common::types::EdgeId;
3437
3438            let db = GrafeoDB::new_in_memory();
3439            let session = db.session();
3440
3441            let alix = session.create_node(&["Person"]);
3442            let gus = session.create_node(&["Person"]);
3443            let edge_id = session.create_edge(alix, gus, "KNOWS");
3444
3445            assert!(session.edge_exists(edge_id));
3446            assert!(!session.edge_exists(EdgeId::new(9999)));
3447        }
3448
3449        #[test]
3450        fn test_get_degree() {
3451            let db = GrafeoDB::new_in_memory();
3452            let session = db.session();
3453
3454            let alix = session.create_node(&["Person"]);
3455            let gus = session.create_node(&["Person"]);
3456            let harm = session.create_node(&["Person"]);
3457
3458            // Alix knows Gus and Harm (2 outgoing)
3459            session.create_edge(alix, gus, "KNOWS");
3460            session.create_edge(alix, harm, "KNOWS");
3461            // Gus knows Alix (1 incoming for Alix)
3462            session.create_edge(gus, alix, "KNOWS");
3463
3464            let (out_degree, in_degree) = session.get_degree(alix);
3465            assert_eq!(out_degree, 2);
3466            assert_eq!(in_degree, 1);
3467
3468            // Node with no edges
3469            let lonely = session.create_node(&["Person"]);
3470            let (out, in_deg) = session.get_degree(lonely);
3471            assert_eq!(out, 0);
3472            assert_eq!(in_deg, 0);
3473        }
3474
3475        #[test]
3476        fn test_get_nodes_batch() {
3477            let db = GrafeoDB::new_in_memory();
3478            let session = db.session();
3479
3480            let alix = session.create_node(&["Person"]);
3481            let gus = session.create_node(&["Person"]);
3482            let harm = session.create_node(&["Person"]);
3483
3484            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
3485            assert_eq!(nodes.len(), 3);
3486            assert!(nodes[0].is_some());
3487            assert!(nodes[1].is_some());
3488            assert!(nodes[2].is_some());
3489
3490            // With non-existent node
3491            use grafeo_common::types::NodeId;
3492            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
3493            assert_eq!(nodes_with_missing.len(), 3);
3494            assert!(nodes_with_missing[0].is_some());
3495            assert!(nodes_with_missing[1].is_none()); // Missing node
3496            assert!(nodes_with_missing[2].is_some());
3497        }
3498
3499        #[test]
3500        fn test_auto_commit_setting() {
3501            let db = GrafeoDB::new_in_memory();
3502            let mut session = db.session();
3503
3504            // Default is auto-commit enabled
3505            assert!(session.auto_commit());
3506
3507            session.set_auto_commit(false);
3508            assert!(!session.auto_commit());
3509
3510            session.set_auto_commit(true);
3511            assert!(session.auto_commit());
3512        }
3513
3514        #[test]
3515        fn test_transaction_double_begin_nests() {
3516            let db = GrafeoDB::new_in_memory();
3517            let mut session = db.session();
3518
3519            session.begin_transaction().unwrap();
3520            // Second begin_transaction creates a nested transaction (auto-savepoint)
3521            let result = session.begin_transaction();
3522            assert!(result.is_ok());
3523            // Commit the inner (releases savepoint)
3524            session.commit().unwrap();
3525            // Commit the outer
3526            session.commit().unwrap();
3527        }
3528
3529        #[test]
3530        fn test_commit_without_transaction_error() {
3531            let db = GrafeoDB::new_in_memory();
3532            let mut session = db.session();
3533
3534            let result = session.commit();
3535            assert!(result.is_err());
3536        }
3537
3538        #[test]
3539        fn test_rollback_without_transaction_error() {
3540            let db = GrafeoDB::new_in_memory();
3541            let mut session = db.session();
3542
3543            let result = session.rollback();
3544            assert!(result.is_err());
3545        }
3546
3547        #[test]
3548        fn test_create_edge_in_transaction() {
3549            let db = GrafeoDB::new_in_memory();
3550            let mut session = db.session();
3551
3552            // Create nodes outside transaction
3553            let alix = session.create_node(&["Person"]);
3554            let gus = session.create_node(&["Person"]);
3555
3556            // Create edge in transaction
3557            session.begin_transaction().unwrap();
3558            let edge_id = session.create_edge(alix, gus, "KNOWS");
3559
3560            // Edge should be visible in the transaction
3561            assert!(session.edge_exists(edge_id));
3562
3563            // Commit
3564            session.commit().unwrap();
3565
3566            // Edge should still be visible
3567            assert!(session.edge_exists(edge_id));
3568        }
3569
3570        #[test]
3571        fn test_neighbors_empty_node() {
3572            let db = GrafeoDB::new_in_memory();
3573            let session = db.session();
3574
3575            let lonely = session.create_node(&["Person"]);
3576
3577            assert!(session.get_neighbors_outgoing(lonely).is_empty());
3578            assert!(session.get_neighbors_incoming(lonely).is_empty());
3579            assert!(
3580                session
3581                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
3582                    .is_empty()
3583            );
3584        }
3585    }
3586
3587    #[test]
3588    fn test_auto_gc_triggers_on_commit_interval() {
3589        use crate::config::Config;
3590
3591        let config = Config::in_memory().with_gc_interval(2);
3592        let db = GrafeoDB::with_config(config).unwrap();
3593        let mut session = db.session();
3594
3595        // First commit: counter = 1, no GC (not a multiple of 2)
3596        session.begin_transaction().unwrap();
3597        session.create_node(&["A"]);
3598        session.commit().unwrap();
3599
3600        // Second commit: counter = 2, GC should trigger (multiple of 2)
3601        session.begin_transaction().unwrap();
3602        session.create_node(&["B"]);
3603        session.commit().unwrap();
3604
3605        // Verify the database is still functional after GC
3606        assert_eq!(db.node_count(), 2);
3607    }
3608
3609    #[test]
3610    fn test_query_timeout_config_propagates_to_session() {
3611        use crate::config::Config;
3612        use std::time::Duration;
3613
3614        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
3615        let db = GrafeoDB::with_config(config).unwrap();
3616        let session = db.session();
3617
3618        // Verify the session has a query deadline (timeout was set)
3619        assert!(session.query_deadline().is_some());
3620    }
3621
3622    #[test]
3623    fn test_no_query_timeout_returns_no_deadline() {
3624        let db = GrafeoDB::new_in_memory();
3625        let session = db.session();
3626
3627        // Default config has no timeout
3628        assert!(session.query_deadline().is_none());
3629    }
3630
3631    #[test]
3632    fn test_graph_model_accessor() {
3633        use crate::config::GraphModel;
3634
3635        let db = GrafeoDB::new_in_memory();
3636        let session = db.session();
3637
3638        assert_eq!(session.graph_model(), GraphModel::Lpg);
3639    }
3640
3641    #[cfg(feature = "gql")]
3642    #[test]
3643    fn test_external_store_session() {
3644        use grafeo_core::graph::GraphStoreMut;
3645        use std::sync::Arc;
3646
3647        let config = crate::config::Config::in_memory();
3648        let store =
3649            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
3650        let db = GrafeoDB::with_store(store, config).unwrap();
3651
3652        let session = db.session();
3653
3654        // Create data through a query (goes through the external graph_store)
3655        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
3656
3657        // Verify we can query through it
3658        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
3659        assert_eq!(result.row_count(), 1);
3660    }
3661
3662    // ==================== Session Command Tests ====================
3663
3664    #[cfg(feature = "gql")]
3665    mod session_command_tests {
3666        use super::*;
3667
3668        #[test]
3669        fn test_use_graph_sets_current_graph() {
3670            let db = GrafeoDB::new_in_memory();
3671            let session = db.session();
3672
3673            // Create the graph first, then USE it
3674            session.execute("CREATE GRAPH mydb").unwrap();
3675            session.execute("USE GRAPH mydb").unwrap();
3676
3677            assert_eq!(session.current_graph(), Some("mydb".to_string()));
3678        }
3679
3680        #[test]
3681        fn test_use_graph_nonexistent_errors() {
3682            let db = GrafeoDB::new_in_memory();
3683            let session = db.session();
3684
3685            let result = session.execute("USE GRAPH doesnotexist");
3686            assert!(result.is_err());
3687            let err = result.unwrap_err().to_string();
3688            assert!(
3689                err.contains("does not exist"),
3690                "Expected 'does not exist' error, got: {err}"
3691            );
3692        }
3693
3694        #[test]
3695        fn test_use_graph_default_always_valid() {
3696            let db = GrafeoDB::new_in_memory();
3697            let session = db.session();
3698
3699            // "default" is always valid, even without CREATE GRAPH
3700            session.execute("USE GRAPH default").unwrap();
3701            assert_eq!(session.current_graph(), Some("default".to_string()));
3702        }
3703
3704        #[test]
3705        fn test_session_set_graph() {
3706            let db = GrafeoDB::new_in_memory();
3707            let session = db.session();
3708
3709            // SESSION SET GRAPH does not verify existence
3710            session.execute("SESSION SET GRAPH analytics").unwrap();
3711            assert_eq!(session.current_graph(), Some("analytics".to_string()));
3712        }
3713
3714        #[test]
3715        fn test_session_set_time_zone() {
3716            let db = GrafeoDB::new_in_memory();
3717            let session = db.session();
3718
3719            assert_eq!(session.time_zone(), None);
3720
3721            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3722            assert_eq!(session.time_zone(), Some("UTC".to_string()));
3723
3724            session
3725                .execute("SESSION SET TIME ZONE 'America/New_York'")
3726                .unwrap();
3727            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
3728        }
3729
3730        #[test]
3731        fn test_session_set_parameter() {
3732            let db = GrafeoDB::new_in_memory();
3733            let session = db.session();
3734
3735            session
3736                .execute("SESSION SET PARAMETER $timeout = 30")
3737                .unwrap();
3738
3739            // Parameter is stored (value is Null for now, since expression
3740            // evaluation is not yet wired up)
3741            assert!(session.get_parameter("timeout").is_some());
3742        }
3743
3744        #[test]
3745        fn test_session_reset_clears_all_state() {
3746            let db = GrafeoDB::new_in_memory();
3747            let session = db.session();
3748
3749            // Set various session state
3750            session.execute("SESSION SET GRAPH analytics").unwrap();
3751            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3752            session
3753                .execute("SESSION SET PARAMETER $limit = 100")
3754                .unwrap();
3755
3756            // Verify state was set
3757            assert!(session.current_graph().is_some());
3758            assert!(session.time_zone().is_some());
3759            assert!(session.get_parameter("limit").is_some());
3760
3761            // Reset everything
3762            session.execute("SESSION RESET").unwrap();
3763
3764            assert_eq!(session.current_graph(), None);
3765            assert_eq!(session.time_zone(), None);
3766            assert!(session.get_parameter("limit").is_none());
3767        }
3768
3769        #[test]
3770        fn test_session_close_clears_state() {
3771            let db = GrafeoDB::new_in_memory();
3772            let session = db.session();
3773
3774            session.execute("SESSION SET GRAPH analytics").unwrap();
3775            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3776
3777            session.execute("SESSION CLOSE").unwrap();
3778
3779            assert_eq!(session.current_graph(), None);
3780            assert_eq!(session.time_zone(), None);
3781        }
3782
3783        #[test]
3784        fn test_create_graph() {
3785            let db = GrafeoDB::new_in_memory();
3786            let session = db.session();
3787
3788            session.execute("CREATE GRAPH mydb").unwrap();
3789
3790            // Should be able to USE it now
3791            session.execute("USE GRAPH mydb").unwrap();
3792            assert_eq!(session.current_graph(), Some("mydb".to_string()));
3793        }
3794
3795        #[test]
3796        fn test_create_graph_duplicate_errors() {
3797            let db = GrafeoDB::new_in_memory();
3798            let session = db.session();
3799
3800            session.execute("CREATE GRAPH mydb").unwrap();
3801            let result = session.execute("CREATE GRAPH mydb");
3802
3803            assert!(result.is_err());
3804            let err = result.unwrap_err().to_string();
3805            assert!(
3806                err.contains("already exists"),
3807                "Expected 'already exists' error, got: {err}"
3808            );
3809        }
3810
3811        #[test]
3812        fn test_create_graph_if_not_exists() {
3813            let db = GrafeoDB::new_in_memory();
3814            let session = db.session();
3815
3816            session.execute("CREATE GRAPH mydb").unwrap();
3817            // Should succeed silently with IF NOT EXISTS
3818            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
3819        }
3820
3821        #[test]
3822        fn test_drop_graph() {
3823            let db = GrafeoDB::new_in_memory();
3824            let session = db.session();
3825
3826            session.execute("CREATE GRAPH mydb").unwrap();
3827            session.execute("DROP GRAPH mydb").unwrap();
3828
3829            // Should no longer be usable
3830            let result = session.execute("USE GRAPH mydb");
3831            assert!(result.is_err());
3832        }
3833
3834        #[test]
3835        fn test_drop_graph_nonexistent_errors() {
3836            let db = GrafeoDB::new_in_memory();
3837            let session = db.session();
3838
3839            let result = session.execute("DROP GRAPH nosuchgraph");
3840            assert!(result.is_err());
3841            let err = result.unwrap_err().to_string();
3842            assert!(
3843                err.contains("does not exist"),
3844                "Expected 'does not exist' error, got: {err}"
3845            );
3846        }
3847
3848        #[test]
3849        fn test_drop_graph_if_exists() {
3850            let db = GrafeoDB::new_in_memory();
3851            let session = db.session();
3852
3853            // Should succeed silently with IF EXISTS
3854            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
3855        }
3856
3857        #[test]
3858        fn test_start_transaction_via_gql() {
3859            let db = GrafeoDB::new_in_memory();
3860            let session = db.session();
3861
3862            session.execute("START TRANSACTION").unwrap();
3863            assert!(session.in_transaction());
3864            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3865            session.execute("COMMIT").unwrap();
3866            assert!(!session.in_transaction());
3867
3868            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3869            assert_eq!(result.rows.len(), 1);
3870        }
3871
3872        #[test]
3873        fn test_start_transaction_read_only_blocks_insert() {
3874            let db = GrafeoDB::new_in_memory();
3875            let session = db.session();
3876
3877            session.execute("START TRANSACTION READ ONLY").unwrap();
3878            let result = session.execute("INSERT (:Person {name: 'Alix'})");
3879            assert!(result.is_err());
3880            let err = result.unwrap_err().to_string();
3881            assert!(
3882                err.contains("read-only"),
3883                "Expected read-only error, got: {err}"
3884            );
3885            session.execute("ROLLBACK").unwrap();
3886        }
3887
3888        #[test]
3889        fn test_start_transaction_read_only_allows_reads() {
3890            let db = GrafeoDB::new_in_memory();
3891            let mut session = db.session();
3892            session.begin_transaction().unwrap();
3893            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3894            session.commit().unwrap();
3895
3896            session.execute("START TRANSACTION READ ONLY").unwrap();
3897            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3898            assert_eq!(result.rows.len(), 1);
3899            session.execute("COMMIT").unwrap();
3900        }
3901
3902        #[test]
3903        fn test_rollback_via_gql() {
3904            let db = GrafeoDB::new_in_memory();
3905            let session = db.session();
3906
3907            session.execute("START TRANSACTION").unwrap();
3908            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3909            session.execute("ROLLBACK").unwrap();
3910
3911            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3912            assert!(result.rows.is_empty());
3913        }
3914
3915        #[test]
3916        fn test_start_transaction_with_isolation_level() {
3917            let db = GrafeoDB::new_in_memory();
3918            let session = db.session();
3919
3920            session
3921                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
3922                .unwrap();
3923            assert!(session.in_transaction());
3924            session.execute("ROLLBACK").unwrap();
3925        }
3926
3927        #[test]
3928        fn test_session_commands_return_empty_result() {
3929            let db = GrafeoDB::new_in_memory();
3930            let session = db.session();
3931
3932            let result = session.execute("SESSION SET GRAPH test").unwrap();
3933            assert_eq!(result.row_count(), 0);
3934            assert_eq!(result.column_count(), 0);
3935        }
3936
3937        #[test]
3938        fn test_current_graph_default_is_none() {
3939            let db = GrafeoDB::new_in_memory();
3940            let session = db.session();
3941
3942            assert_eq!(session.current_graph(), None);
3943        }
3944
3945        #[test]
3946        fn test_time_zone_default_is_none() {
3947            let db = GrafeoDB::new_in_memory();
3948            let session = db.session();
3949
3950            assert_eq!(session.time_zone(), None);
3951        }
3952
3953        #[test]
3954        fn test_session_state_independent_across_sessions() {
3955            let db = GrafeoDB::new_in_memory();
3956            let session1 = db.session();
3957            let session2 = db.session();
3958
3959            session1.execute("SESSION SET GRAPH first").unwrap();
3960            session2.execute("SESSION SET GRAPH second").unwrap();
3961
3962            assert_eq!(session1.current_graph(), Some("first".to_string()));
3963            assert_eq!(session2.current_graph(), Some("second".to_string()));
3964        }
3965    }
3966}