Skip to main content

uni_db/api/
session.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Session — the primary read scope for all database access.
5//!
6//! Sessions are cheap, synchronous, and infallible to create. All reads go
7//! through sessions, and sessions are the factory for transactions (writes).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::time::{Duration, Instant};
13
14use tokio_util::sync::CancellationToken;
15use tracing::instrument;
16use uuid::Uuid;
17
18use crate::api::UniInner;
19use crate::api::hooks::{HookContext, QueryType, SessionHook};
20use crate::api::impl_locy::LocyRuleRegistry;
21use crate::api::locy_result::LocyResult;
22use crate::api::transaction::{IsolationLevel, Transaction};
23use uni_common::{Result, UniError, Value};
24use uni_query::{ExplainOutput, ProfileOutput, QueryCursor, QueryResult, Row};
25
26/// Atomic counters for plan cache hits/misses, shared between Session and its
27/// query execution helpers.
28pub(crate) struct PlanCacheMetrics {
29    pub(crate) hits: AtomicU64,
30    pub(crate) misses: AtomicU64,
31}
32
33/// Describes the capabilities of a session in its current mode.
34///
35/// This is a snapshot — capabilities may change if the underlying database
36/// configuration changes (e.g., read-only mode toggled).
37#[derive(Debug, Clone)]
38pub struct SessionCapabilities {
39    /// Whether the session can create transactions and execute writes.
40    pub can_write: bool,
41    /// Whether the session supports version pinning (read-at-version).
42    pub can_pin: bool,
43    /// The isolation level used for transactions in this session.
44    pub isolation: IsolationLevel,
45    /// Whether commit notifications are available.
46    pub has_notifications: bool,
47    /// Write lease strategy in effect, if any.
48    pub write_lease: Option<WriteLeaseSummary>,
49}
50
51/// Summary of the write lease strategy, suitable for capability snapshots.
52///
53/// This is a `Clone`-friendly description of the [`WriteLease`](crate::WriteLease)
54/// variant without carrying the actual provider trait object.
55#[derive(Debug, Clone)]
56pub enum WriteLeaseSummary {
57    /// Local single-process lock.
58    Local,
59    /// DynamoDB-based distributed lease.
60    DynamoDB { table: String },
61    /// Custom lease provider (opaque).
62    Custom,
63}
64
65/// Internal atomic counters for session-level metrics.
66pub(crate) struct SessionMetricsInner {
67    pub(crate) queries_executed: AtomicU64,
68    pub(crate) locy_evaluations: AtomicU64,
69    pub(crate) total_query_time_us: AtomicU64,
70    pub(crate) transactions_committed: AtomicU64,
71    pub(crate) transactions_rolled_back: AtomicU64,
72    pub(crate) total_rows_returned: AtomicU64,
73    pub(crate) total_rows_scanned: AtomicU64,
74}
75
76impl SessionMetricsInner {
77    fn new() -> Self {
78        Self {
79            queries_executed: AtomicU64::new(0),
80            locy_evaluations: AtomicU64::new(0),
81            total_query_time_us: AtomicU64::new(0),
82            transactions_committed: AtomicU64::new(0),
83            transactions_rolled_back: AtomicU64::new(0),
84            total_rows_returned: AtomicU64::new(0),
85            total_rows_scanned: AtomicU64::new(0),
86        }
87    }
88}
89
90/// Snapshot of session-level metrics.
91#[derive(Debug, Clone)]
92pub struct SessionMetrics {
93    /// The session ID.
94    pub session_id: String,
95    /// When the session was created.
96    pub active_since: Instant,
97    /// Number of queries executed.
98    pub queries_executed: u64,
99    /// Number of Locy evaluations.
100    pub locy_evaluations: u64,
101    /// Total time spent executing queries.
102    pub total_query_time: Duration,
103    /// Number of transactions that were committed.
104    pub transactions_committed: u64,
105    /// Number of transactions that were rolled back.
106    pub transactions_rolled_back: u64,
107    /// Total rows returned across all queries.
108    pub total_rows_returned: u64,
109    /// Total rows scanned across all queries (0 until executor instrumentation).
110    pub total_rows_scanned: u64,
111    /// Number of plan cache hits.
112    pub plan_cache_hits: u64,
113    /// Number of plan cache misses.
114    pub plan_cache_misses: u64,
115    /// Current plan cache size (entries).
116    pub plan_cache_size: usize,
117}
118
119/// A database session — the primary scope for reads.
120///
121/// All data access goes through sessions. Sessions hold scoped query parameters
122/// and a private copy of the Locy rule registry. They are the factory for
123/// [`Transaction`]s (write scope).
124///
125/// Sessions are cheap to create (sync, no I/O) and cheap to clone (Arc-based).
126///
127/// # Examples
128///
129/// ```no_run
130/// # use uni_db::Uni;
131/// # async fn example(db: &Uni) -> uni_db::Result<()> {
132/// let mut session = db.session();
133/// session.set("tenant", 42);
134///
135/// let rows = session.query("MATCH (n) WHERE n.tenant = $tenant RETURN n").await?;
136///
137/// // Transactions for writes
138/// let tx = session.tx().await?;
139/// tx.execute("CREATE (:Person {name: 'Alice'})").await?;
140/// tx.commit().await?;
141/// # Ok(())
142/// # }
143/// ```
144pub struct Session {
145    pub(crate) db: Arc<UniInner>,
146    /// When pinned via `pin_to_version`/`pin_to_timestamp`, holds the original
147    /// (live) db reference so `refresh()` can restore it.
148    original_db: Option<Arc<UniInner>>,
149    id: String,
150    params: Arc<std::sync::RwLock<HashMap<String, Value>>>,
151    rule_registry: Arc<std::sync::RwLock<LocyRuleRegistry>>,
152    /// Mutual exclusion for write contexts (transaction, bulk writer).
153    /// Only one write context can be active per session.
154    active_write_guard: Arc<AtomicBool>,
155    /// Atomic session-level metrics counters.
156    pub(crate) metrics_inner: Arc<SessionMetricsInner>,
157    /// Timestamp when this session was created.
158    created_at: Instant,
159    /// Cancellation token for cooperative query cancellation.
160    /// Behind `Arc<RwLock<>>` so `cancel()` can take `&self`.
161    cancellation_token: Arc<std::sync::RwLock<CancellationToken>>,
162    /// Transparent plan cache for parsed/planned queries (shared across clones).
163    plan_cache: Arc<std::sync::Mutex<PlanCache>>,
164    /// Atomic plan cache hit/miss counters.
165    plan_cache_metrics: Arc<PlanCacheMetrics>,
166    /// Session-level hooks for query/commit interception, keyed by name.
167    pub(crate) hooks: HashMap<String, Arc<dyn SessionHook>>,
168    /// Default query timeout (from template or explicit configuration).
169    pub(crate) query_timeout: Option<Duration>,
170    /// Default transaction timeout (from template or explicit configuration).
171    pub(crate) transaction_timeout: Option<Duration>,
172}
173
174impl Session {
175    /// Create a new session from a shared database reference.
176    pub(crate) fn new(db: Arc<UniInner>) -> Self {
177        // Clone the global rule registry into this session
178        let global_registry = db.locy_rule_registry.read().unwrap();
179        let session_registry = global_registry.clone();
180        drop(global_registry);
181
182        db.active_session_count.fetch_add(1, Ordering::Relaxed);
183
184        Self {
185            db,
186            original_db: None,
187            id: Uuid::new_v4().to_string(),
188            params: Arc::new(std::sync::RwLock::new(HashMap::new())),
189            rule_registry: Arc::new(std::sync::RwLock::new(session_registry)),
190            active_write_guard: Arc::new(AtomicBool::new(false)),
191            metrics_inner: Arc::new(SessionMetricsInner::new()),
192            created_at: Instant::now(),
193            cancellation_token: Arc::new(std::sync::RwLock::new(CancellationToken::new())),
194            plan_cache: Arc::new(std::sync::Mutex::new(PlanCache::new(1000))),
195            plan_cache_metrics: Arc::new(PlanCacheMetrics {
196                hits: AtomicU64::new(0),
197                misses: AtomicU64::new(0),
198            }),
199            hooks: HashMap::new(),
200            query_timeout: None,
201            transaction_timeout: None,
202        }
203    }
204
205    /// Create a new session from a template's pre-compiled state.
206    pub(crate) fn new_from_template(
207        db: Arc<UniInner>,
208        params: HashMap<String, Value>,
209        rule_registry: LocyRuleRegistry,
210        hooks: HashMap<String, Arc<dyn SessionHook>>,
211        query_timeout: Option<Duration>,
212        transaction_timeout: Option<Duration>,
213    ) -> Self {
214        db.active_session_count.fetch_add(1, Ordering::Relaxed);
215
216        Self {
217            db,
218            original_db: None,
219            id: Uuid::new_v4().to_string(),
220            params: Arc::new(std::sync::RwLock::new(params)),
221            rule_registry: Arc::new(std::sync::RwLock::new(rule_registry)),
222            active_write_guard: Arc::new(AtomicBool::new(false)),
223            metrics_inner: Arc::new(SessionMetricsInner::new()),
224            created_at: Instant::now(),
225            cancellation_token: Arc::new(std::sync::RwLock::new(CancellationToken::new())),
226            plan_cache: Arc::new(std::sync::Mutex::new(PlanCache::new(1000))),
227            plan_cache_metrics: Arc::new(PlanCacheMetrics {
228                hits: AtomicU64::new(0),
229                misses: AtomicU64::new(0),
230            }),
231            hooks,
232            query_timeout,
233            transaction_timeout,
234        }
235    }
236
237    // ── Scoped Parameters ─────────────────────────────────────────────
238
239    /// Access the session-scoped parameter store.
240    pub fn params(&self) -> Params<'_> {
241        Params {
242            store: &self.params,
243        }
244    }
245
246    // ── Cypher Reads ──────────────────────────────────────────────────
247
248    /// Execute a read-only Cypher query.
249    ///
250    /// Uses the transparent plan cache: repeated queries with the same text
251    /// skip parsing and planning. Cache entries auto-invalidate on schema
252    /// changes.
253    #[instrument(skip(self), fields(session_id = %self.id))]
254    pub async fn query(&self, cypher: &str) -> Result<QueryResult> {
255        let params = self.merge_params(HashMap::new());
256        self.run_before_query_hooks(cypher, QueryType::Cypher, &params)?;
257        let start = Instant::now();
258        let result = self.execute_cached(cypher, params.clone()).await;
259        self.metrics_inner
260            .queries_executed
261            .fetch_add(1, Ordering::Relaxed);
262        self.db.total_queries.fetch_add(1, Ordering::Relaxed);
263        self.metrics_inner
264            .total_query_time_us
265            .fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed);
266        if let Ok(ref qr) = result {
267            self.metrics_inner
268                .total_rows_returned
269                .fetch_add(qr.len() as u64, Ordering::Relaxed);
270            self.run_after_query_hooks(cypher, QueryType::Cypher, &params, qr.metrics());
271        }
272        result
273    }
274
275    /// Execute a read-only Cypher query with a builder for parameters.
276    pub fn query_with(&self, cypher: &str) -> QueryBuilder<'_> {
277        QueryBuilder {
278            session: self,
279            cypher: cypher.to_string(),
280            params: HashMap::new(),
281            timeout: self.query_timeout,
282            max_memory: None,
283            cancellation_token: None,
284        }
285    }
286
287    // ── Locy Evaluation ───────────────────────────────────────────────
288
289    /// Evaluate a Locy program with default configuration.
290    #[instrument(skip(self), fields(session_id = %self.id))]
291    pub async fn locy(&self, program: &str) -> Result<LocyResult> {
292        self.run_before_query_hooks(program, QueryType::Locy, &HashMap::new())?;
293        let result = self.locy_with(program).run().await;
294        self.metrics_inner
295            .locy_evaluations
296            .fetch_add(1, Ordering::Relaxed);
297        result
298    }
299
300    /// Evaluate a Locy program with parameters using a builder.
301    pub fn locy_with(&self, program: &str) -> crate::api::locy_builder::LocyBuilder<'_> {
302        crate::api::locy_builder::LocyBuilder::new(self, program)
303    }
304
305    // ── Rule Management ───────────────────────────────────────────────
306
307    /// Access the session-scoped rule registry.
308    pub fn rules(&self) -> super::rule_registry::RuleRegistry<'_> {
309        super::rule_registry::RuleRegistry::new(&self.rule_registry)
310    }
311
312    /// Compile a Locy program without executing it, using this session's rule registry.
313    #[instrument(skip(self), fields(session_id = %self.id))]
314    pub fn compile_locy(&self, program: &str) -> Result<uni_locy::CompiledProgram> {
315        let ast = uni_cypher::parse_locy(program).map_err(|e| UniError::Parse {
316            message: format!("LocyParseError: {e}"),
317            position: None,
318            line: None,
319            column: None,
320            context: None,
321        })?;
322        let registry = self.rule_registry.read().unwrap();
323        if registry.rules.is_empty() {
324            drop(registry);
325            uni_locy::compile(&ast).map_err(|e| UniError::Query {
326                message: format!("LocyCompileError: {e}"),
327                query: None,
328            })
329        } else {
330            let external_names: Vec<String> = registry.rules.keys().cloned().collect();
331            drop(registry);
332            uni_locy::compile_with_external_rules(&ast, &external_names).map_err(|e| {
333                UniError::Query {
334                    message: format!("LocyCompileError: {e}"),
335                    query: None,
336                }
337            })
338        }
339    }
340
341    // ── Transaction & Writer Factories ────────────────────────────────
342
343    /// Create a new transaction for multi-statement writes.
344    ///
345    /// Only one write context (transaction or bulk writer) can be active
346    /// per session at a time. Returns `ReadOnly` if the session is pinned.
347    #[instrument(skip(self), fields(session_id = %self.id))]
348    pub async fn tx(&self) -> Result<Transaction> {
349        if self.is_pinned() {
350            return Err(UniError::ReadOnly {
351                operation: "start_transaction".to_string(),
352            });
353        }
354        Transaction::new(self).await
355    }
356
357    /// Create a transaction with builder options (timeout, isolation level).
358    pub fn tx_with(&self) -> TransactionBuilder<'_> {
359        TransactionBuilder {
360            session: self,
361            timeout: self.transaction_timeout,
362            isolation: IsolationLevel::default(),
363        }
364    }
365
366    // ── Version Pinning ──────────────────────────────────────────────
367
368    /// Pin this session to a specific snapshot version.
369    ///
370    /// All subsequent reads see data as of that version. Writes are rejected.
371    #[instrument(skip(self), fields(session_id = %self.id))]
372    pub async fn pin_to_version(&mut self, snapshot_id: &str) -> Result<()> {
373        let pinned = self.live_db().at_snapshot(snapshot_id).await?;
374        if self.original_db.is_none() {
375            self.original_db = Some(self.db.clone());
376        }
377        self.db = Arc::new(pinned);
378        Ok(())
379    }
380
381    /// Pin this session to a specific timestamp.
382    ///
383    /// Resolves the closest snapshot at or before the given timestamp, then
384    /// pins the session to that snapshot. Writes are rejected while pinned.
385    #[instrument(skip(self), fields(session_id = %self.id))]
386    pub async fn pin_to_timestamp(&mut self, ts: chrono::DateTime<chrono::Utc>) -> Result<()> {
387        let snapshot_id = self.live_db().resolve_time_travel_timestamp(ts).await?;
388        self.pin_to_version(&snapshot_id).await
389    }
390
391    /// Refresh: unpin the session, returning to the live database state.
392    ///
393    /// In single-process mode, this simply unpins the session.
394    /// In multi-agent mode (Phase 2), this picks up the latest
395    /// committed version from storage.
396    pub async fn refresh(&mut self) -> Result<()> {
397        if let Some(original) = self.original_db.take() {
398            self.db = original;
399        }
400        Ok(())
401    }
402
403    /// Returns `true` if the session is pinned to a specific version.
404    pub fn is_pinned(&self) -> bool {
405        self.original_db.is_some()
406    }
407
408    /// Get the live (unpinned) db reference for resolving snapshots.
409    fn live_db(&self) -> &Arc<UniInner> {
410        self.original_db.as_ref().unwrap_or(&self.db)
411    }
412
413    // ── Cancellation ─────────────────────────────────────────────────
414
415    /// Cancel all in-flight queries in this session.
416    ///
417    /// Queries check `is_cancelled()` at each operator boundary via
418    /// `check_timeout()`. After cancellation, a fresh token is created
419    /// so the session remains usable.
420    #[instrument(skip(self), fields(session_id = %self.id))]
421    pub fn cancel(&self) {
422        let mut token = self.cancellation_token.write().unwrap();
423        token.cancel();
424        *token = CancellationToken::new();
425    }
426
427    /// Get a clone of this session's cancellation token.
428    ///
429    /// Useful for external cancellation (e.g. from a timeout task).
430    pub fn cancellation_token(&self) -> CancellationToken {
431        self.cancellation_token.read().unwrap().clone()
432    }
433
434    // ── Prepared Statements ──────────────────────────────────────────
435
436    /// Prepare a Cypher query for repeated execution.
437    ///
438    /// The query is parsed and planned once; subsequent executions skip those
439    /// phases. If the schema changes, the prepared query auto-replans.
440    #[instrument(skip(self), fields(session_id = %self.id))]
441    pub async fn prepare(&self, cypher: &str) -> Result<crate::api::prepared::PreparedQuery> {
442        crate::api::prepared::PreparedQuery::new(self.db.clone(), cypher).await
443    }
444
445    /// Prepare a Locy program for repeated evaluation.
446    #[instrument(skip(self), fields(session_id = %self.id))]
447    pub async fn prepare_locy(&self, program: &str) -> Result<crate::api::prepared::PreparedLocy> {
448        crate::api::prepared::PreparedLocy::new(
449            self.db.clone(),
450            self.rule_registry.clone(),
451            program,
452        )
453    }
454
455    // ── Hooks ─────────────────────────────────────────────────────────
456
457    /// Add a named session hook for query/commit interception.
458    pub fn add_hook(&mut self, name: impl Into<String>, hook: impl SessionHook + 'static) {
459        self.hooks.insert(name.into(), Arc::new(hook));
460    }
461
462    /// Remove a hook by name. Returns true if it existed.
463    pub fn remove_hook(&mut self, name: &str) -> bool {
464        self.hooks.remove(name).is_some()
465    }
466
467    /// List names of all registered hooks.
468    pub fn list_hooks(&self) -> Vec<String> {
469        self.hooks.keys().cloned().collect()
470    }
471
472    /// Remove all hooks.
473    pub fn clear_hooks(&mut self) {
474        self.hooks.clear();
475    }
476
477    /// Run before-query hooks. Returns `Err(HookRejected)` if any hook rejects.
478    pub(crate) fn run_before_query_hooks(
479        &self,
480        query_text: &str,
481        query_type: QueryType,
482        params: &HashMap<String, Value>,
483    ) -> Result<()> {
484        if self.hooks.is_empty() {
485            return Ok(());
486        }
487        let ctx = HookContext {
488            session_id: self.id.clone(),
489            query_text: query_text.to_string(),
490            query_type,
491            params: params.clone(),
492        };
493        for hook in self.hooks.values() {
494            hook.before_query(&ctx)?;
495        }
496        Ok(())
497    }
498
499    /// Run after-query hooks. Panics in hooks are caught and logged.
500    pub(crate) fn run_after_query_hooks(
501        &self,
502        query_text: &str,
503        query_type: QueryType,
504        params: &HashMap<String, Value>,
505        metrics: &uni_query::QueryMetrics,
506    ) {
507        if self.hooks.is_empty() {
508            return;
509        }
510        let ctx = HookContext {
511            session_id: self.id.clone(),
512            query_text: query_text.to_string(),
513            query_type,
514            params: params.clone(),
515        };
516        for hook in self.hooks.values() {
517            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
518                hook.after_query(&ctx, metrics);
519            }));
520            if let Err(e) = result {
521                tracing::error!("after_query hook panicked: {:?}", e);
522            }
523        }
524    }
525
526    // ── Commit Notifications ─────────────────────────────────────────
527
528    /// Watch for all commit notifications.
529    pub fn watch(&self) -> crate::api::notifications::CommitStream {
530        let rx = self.db.commit_tx.subscribe();
531        crate::api::notifications::WatchBuilder::new(rx).build()
532    }
533
534    /// Watch for commit notifications with filters.
535    pub fn watch_with(&self) -> crate::api::notifications::WatchBuilder {
536        let rx = self.db.commit_tx.subscribe();
537        crate::api::notifications::WatchBuilder::new(rx)
538    }
539
540    // ── Lifecycle & Observability ──────────────────────────────────────
541
542    /// Get the session ID.
543    pub fn id(&self) -> &str {
544        &self.id
545    }
546
547    /// Query the capabilities of this session.
548    ///
549    /// Returns a snapshot of what the session can do in its current mode.
550    pub fn capabilities(&self) -> SessionCapabilities {
551        use crate::api::multi_agent::WriteLease;
552        let write_lease = self.db.write_lease.as_ref().map(|wl| match wl {
553            WriteLease::Local => WriteLeaseSummary::Local,
554            WriteLease::DynamoDB { table } => WriteLeaseSummary::DynamoDB {
555                table: table.clone(),
556            },
557            WriteLease::Custom(_) => WriteLeaseSummary::Custom,
558        });
559        SessionCapabilities {
560            can_write: self.db.writer.is_some() && !self.is_pinned(),
561            can_pin: true,
562            isolation: IsolationLevel::default(),
563            has_notifications: true,
564            write_lease,
565        }
566    }
567
568    /// Snapshot the session's accumulated metrics.
569    pub fn metrics(&self) -> SessionMetrics {
570        let m = &self.metrics_inner;
571        SessionMetrics {
572            session_id: self.id.clone(),
573            active_since: self.created_at,
574            queries_executed: m.queries_executed.load(Ordering::Relaxed),
575            locy_evaluations: m.locy_evaluations.load(Ordering::Relaxed),
576            total_query_time: Duration::from_micros(m.total_query_time_us.load(Ordering::Relaxed)),
577            transactions_committed: m.transactions_committed.load(Ordering::Relaxed),
578            transactions_rolled_back: m.transactions_rolled_back.load(Ordering::Relaxed),
579            total_rows_returned: m.total_rows_returned.load(Ordering::Relaxed),
580            total_rows_scanned: m.total_rows_scanned.load(Ordering::Relaxed),
581            plan_cache_hits: self.plan_cache_metrics.hits.load(Ordering::Relaxed),
582            plan_cache_misses: self.plan_cache_metrics.misses.load(Ordering::Relaxed),
583            plan_cache_size: self.plan_cache.lock().map(|c| c.len()).unwrap_or(0),
584        }
585    }
586
587    // ── Internal Helpers ──────────────────────────────────────────────
588
589    /// Execute a query using the transparent plan cache.
590    ///
591    /// On cache hit: reuses the parsed AST and logical plan, skipping parse
592    /// and planning phases entirely. On cache miss: parses, plans, caches,
593    /// then executes normally. Cache entries are invalidated when the schema
594    /// version changes.
595    pub(crate) async fn execute_cached(
596        &self,
597        cypher: &str,
598        params: HashMap<String, Value>,
599    ) -> Result<QueryResult> {
600        let schema_version = self.db.schema.schema().schema_version;
601        let cache_key = plan_cache_key(cypher);
602
603        // Try cache lookup (brief lock, then release)
604        let cached = self.plan_cache.lock().ok().and_then(|mut cache| {
605            cache
606                .get(cache_key, schema_version)
607                .map(|entry| (entry.ast.clone(), entry.plan.clone()))
608        });
609
610        if let Some((_ast, plan)) = cached {
611            // Cache hit — skip parse and plan, execute the cached plan directly
612            self.plan_cache_metrics.hits.fetch_add(1, Ordering::Relaxed);
613            return self
614                .db
615                .execute_plan_internal(plan, cypher, params, self.db.config.clone(), None)
616                .await;
617        }
618
619        // Cache miss — parse, plan, cache, execute via the normal path
620        self.plan_cache_metrics
621            .misses
622            .fetch_add(1, Ordering::Relaxed);
623
624        // Parse
625        let ast = uni_cypher::parse(cypher).map_err(crate::api::impl_query::into_parse_error)?;
626
627        // Enforce read-only semantics for session queries — mutations require
628        // a transaction for isolation, WAL protection, and commit hooks.
629        uni_query::validate_read_only(&ast).map_err(|_| UniError::Query {
630            message: "Session.query() is read-only. Mutation clauses (CREATE, MERGE, DELETE, SET, \
631                 REMOVE) require a transaction. Use session.tx() to start one."
632                .to_string(),
633            query: Some(cypher.to_string()),
634        })?;
635
636        // Time-travel queries bypass the cache entirely
637        if matches!(ast, uni_cypher::ast::Query::TimeTravel { .. }) {
638            return self
639                .db
640                .execute_internal_with_config(cypher, params, self.db.config.clone())
641                .await;
642        }
643
644        // Plan
645        let planner = uni_query::QueryPlanner::new(self.db.schema.schema().clone())
646            .with_params(params.clone());
647        let plan = planner
648            .plan(ast.clone())
649            .map_err(|e| crate::api::impl_query::into_query_error(e, cypher))?;
650
651        // Cache the entry
652        if let Ok(mut cache) = self.plan_cache.lock() {
653            cache.insert(
654                cache_key,
655                PlanCacheEntry {
656                    ast,
657                    plan: plan.clone(),
658                    schema_version,
659                    hit_count: 0,
660                },
661            );
662        }
663
664        // Execute the freshly planned query
665        self.db
666            .execute_plan_internal(plan, cypher, params, self.db.config.clone(), None)
667            .await
668    }
669
670    /// Get the database inner reference (for Transaction, LocyEngine, etc.)
671    pub(crate) fn db(&self) -> &Arc<UniInner> {
672        &self.db
673    }
674
675    /// Get the session's rule registry.
676    pub(crate) fn rule_registry(&self) -> &Arc<std::sync::RwLock<LocyRuleRegistry>> {
677        &self.rule_registry
678    }
679
680    /// Get the active write guard.
681    pub(crate) fn active_write_guard(&self) -> &Arc<AtomicBool> {
682        &self.active_write_guard
683    }
684
685    /// Merge session params with per-query params (per-query takes precedence).
686    pub(crate) fn merge_params(
687        &self,
688        mut query_params: HashMap<String, Value>,
689    ) -> HashMap<String, Value> {
690        let session_params = self.params.read().unwrap();
691        if !session_params.is_empty() {
692            let session_map: HashMap<String, Value> = session_params.clone();
693            if let Some(Value::Map(existing)) = query_params.get_mut("session") {
694                for (k, v) in session_map {
695                    existing.entry(k).or_insert(v);
696                }
697            } else {
698                query_params.insert("session".to_string(), Value::Map(session_map));
699            }
700        }
701        query_params
702    }
703}
704
705/// Facade for session-scoped parameters.
706///
707/// Obtained via `session.params()`.
708pub struct Params<'a> {
709    store: &'a Arc<std::sync::RwLock<HashMap<String, Value>>>,
710}
711
712impl<'a> Params<'a> {
713    /// Set a parameter.
714    pub fn set<K: Into<String>, V: Into<Value>>(&self, key: K, value: V) {
715        self.store.write().unwrap().insert(key.into(), value.into());
716    }
717
718    /// Get a parameter value by key.
719    pub fn get(&self, key: &str) -> Option<Value> {
720        self.store.read().unwrap().get(key).cloned()
721    }
722
723    /// Remove a parameter. Returns the previous value if it existed.
724    pub fn unset(&self, key: &str) -> Option<Value> {
725        self.store.write().unwrap().remove(key)
726    }
727
728    /// Get a snapshot of all parameters.
729    pub fn get_all(&self) -> HashMap<String, Value> {
730        self.store.read().unwrap().clone()
731    }
732
733    /// Set multiple parameters.
734    pub fn set_all<I, K, V>(&self, params: I)
735    where
736        I: IntoIterator<Item = (K, V)>,
737        K: Into<String>,
738        V: Into<Value>,
739    {
740        let mut store = self.store.write().unwrap();
741        for (k, v) in params {
742            store.insert(k.into(), v.into());
743        }
744    }
745
746    /// Clone the underlying store Arc for use in Python bindings.
747    pub fn clone_store_arc(&self) -> Arc<std::sync::RwLock<HashMap<String, Value>>> {
748        self.store.clone()
749    }
750}
751
752/// Builder for parameterized queries within a session.
753pub struct QueryBuilder<'a> {
754    session: &'a Session,
755    cypher: String,
756    params: HashMap<String, Value>,
757    timeout: Option<std::time::Duration>,
758    max_memory: Option<usize>,
759    cancellation_token: Option<CancellationToken>,
760}
761
762impl<'a> QueryBuilder<'a> {
763    /// Bind a parameter to the query.
764    pub fn param<K: Into<String>, V: Into<Value>>(mut self, key: K, value: V) -> Self {
765        self.params.insert(key.into(), value.into());
766        self
767    }
768
769    /// Bind multiple parameters from an iterator.
770    pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
771        for (k, v) in params {
772            self.params.insert(k.to_string(), v);
773        }
774        self
775    }
776
777    /// Set maximum execution time for this query.
778    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
779        self.timeout = Some(duration);
780        self
781    }
782
783    /// Set maximum memory per query in bytes.
784    pub fn max_memory(mut self, bytes: usize) -> Self {
785        self.max_memory = Some(bytes);
786        self
787    }
788
789    /// Attach a cancellation token for cooperative query cancellation.
790    pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
791        self.cancellation_token = Some(token);
792        self
793    }
794
795    /// Execute the query and fetch all results.
796    ///
797    /// Uses the session's transparent plan cache when no custom timeout or
798    /// memory limit is set.
799    pub async fn fetch_all(self) -> Result<QueryResult> {
800        let has_overrides = self.timeout.is_some()
801            || self.max_memory.is_some()
802            || self.cancellation_token.is_some();
803        if has_overrides {
804            // Validate read-only before bypassing the cache (which has its
805            // own validation). Parse is cheap relative to execution.
806            let ast = uni_cypher::parse(&self.cypher)
807                .map_err(crate::api::impl_query::into_parse_error)?;
808            uni_query::validate_read_only(&ast).map_err(|_| UniError::Query {
809                message: "Session.query() is read-only. Mutation clauses (CREATE, MERGE, DELETE, \
810                     SET, REMOVE) require a transaction. Use session.tx() to start one."
811                    .to_string(),
812                query: Some(self.cypher.clone()),
813            })?;
814
815            // Custom config — bypass cache and use the config-aware path
816            let mut db_config = self.session.db.config.clone();
817            if let Some(t) = self.timeout {
818                db_config.query_timeout = t;
819            }
820            if let Some(m) = self.max_memory {
821                db_config.max_query_memory = m;
822            }
823            let params = self.session.merge_params(self.params);
824            self.session
825                .db
826                .execute_internal_with_config_and_token(
827                    &self.cypher,
828                    params,
829                    db_config,
830                    self.cancellation_token,
831                )
832                .await
833        } else {
834            // Default config — use the plan cache
835            let params = self.session.merge_params(self.params);
836            self.session.execute_cached(&self.cypher, params).await
837        }
838    }
839
840    /// Execute the query and return the first row, or `None` if empty.
841    pub async fn fetch_one(self) -> Result<Option<Row>> {
842        let result = self.fetch_all().await?;
843        Ok(result.into_rows().into_iter().next())
844    }
845
846    /// Execute the query and return a cursor for streaming results.
847    pub async fn cursor(self) -> Result<QueryCursor> {
848        let mut db_config = self.session.db.config.clone();
849        if let Some(t) = self.timeout {
850            db_config.query_timeout = t;
851        }
852        if let Some(m) = self.max_memory {
853            db_config.max_query_memory = m;
854        }
855        let params = self.session.merge_params(self.params);
856        self.session
857            .db
858            .execute_cursor_internal_with_config(&self.cypher, params, db_config)
859            .await
860    }
861
862    /// Explain the query plan without executing it.
863    pub async fn explain(self) -> Result<ExplainOutput> {
864        self.session.db.explain_internal(&self.cypher).await
865    }
866
867    /// Profile the query execution, returning results with profiling output.
868    pub async fn profile(self) -> Result<(QueryResult, ProfileOutput)> {
869        let params = self.session.merge_params(self.params);
870        self.session.db.profile_internal(&self.cypher, params).await
871    }
872}
873
874/// Builder for starting a transaction with options.
875pub struct TransactionBuilder<'a> {
876    session: &'a Session,
877    timeout: Option<Duration>,
878    isolation: IsolationLevel,
879}
880
881impl<'a> TransactionBuilder<'a> {
882    /// Set the transaction timeout. The transaction will expire if operations
883    /// are attempted after this duration.
884    pub fn timeout(mut self, d: Duration) -> Self {
885        self.timeout = Some(d);
886        self
887    }
888
889    /// Set the isolation level for the transaction.
890    pub fn isolation(mut self, level: IsolationLevel) -> Self {
891        self.isolation = level;
892        self
893    }
894
895    /// Start the transaction.
896    pub async fn start(self) -> Result<Transaction> {
897        if self.session.is_pinned() {
898            return Err(UniError::ReadOnly {
899                operation: "start_transaction".to_string(),
900            });
901        }
902        Transaction::new_with_options(self.session, self.timeout, self.isolation).await
903    }
904}
905
906impl Clone for Session {
907    /// Clone the session, sharing the plan cache with the original.
908    ///
909    /// The cloned session gets a fresh ID, fresh metrics counters, and a fresh
910    /// cancellation token, but shares the plan cache so cache hits benefit all
911    /// clones. The database's active session count is incremented.
912    fn clone(&self) -> Self {
913        self.db.active_session_count.fetch_add(1, Ordering::Relaxed);
914        Self {
915            db: self.db.clone(),
916            original_db: self.original_db.clone(),
917            id: Uuid::new_v4().to_string(),
918            params: Arc::new(std::sync::RwLock::new(self.params.read().unwrap().clone())),
919            rule_registry: Arc::new(std::sync::RwLock::new(
920                self.rule_registry.read().unwrap().clone(),
921            )),
922            active_write_guard: Arc::new(AtomicBool::new(false)),
923            metrics_inner: Arc::new(SessionMetricsInner::new()),
924            created_at: Instant::now(),
925            cancellation_token: Arc::new(std::sync::RwLock::new(CancellationToken::new())),
926            plan_cache: self.plan_cache.clone(),
927            plan_cache_metrics: self.plan_cache_metrics.clone(),
928            hooks: self.hooks.clone(),
929            query_timeout: self.query_timeout,
930            transaction_timeout: self.transaction_timeout,
931        }
932    }
933}
934
935impl Drop for Session {
936    fn drop(&mut self) {
937        self.db.active_session_count.fetch_sub(1, Ordering::Relaxed);
938    }
939}
940
941// ── Plan Cache (internal) ─────────────────────────────────────────────
942
943/// Entry in the transparent plan cache.
944struct PlanCacheEntry {
945    ast: uni_query::CypherQuery,
946    plan: uni_query::LogicalPlan,
947    schema_version: u32,
948    hit_count: u64,
949}
950
951/// Transparent plan cache keyed by query text hash.
952///
953/// Caches parsed ASTs and logical plans to skip parsing and planning for
954/// repeated queries. Entries are evicted LFU-style when the cache is full.
955struct PlanCache {
956    entries: HashMap<u64, PlanCacheEntry>,
957    max_entries: usize,
958}
959
960impl PlanCache {
961    fn new(max_entries: usize) -> Self {
962        Self {
963            entries: HashMap::new(),
964            max_entries,
965        }
966    }
967
968    fn get(&mut self, key: u64, current_schema_version: u32) -> Option<&PlanCacheEntry> {
969        if let Some(entry) = self.entries.get_mut(&key) {
970            if entry.schema_version == current_schema_version {
971                entry.hit_count += 1;
972                return self.entries.get(&key);
973            }
974            // Schema changed — evict stale entry
975            self.entries.remove(&key);
976        }
977        None
978    }
979
980    fn insert(&mut self, key: u64, entry: PlanCacheEntry) {
981        if self.entries.len() >= self.max_entries {
982            // Evict entry with lowest hit_count
983            if let Some((&evict_key, _)) = self.entries.iter().min_by_key(|(_, e)| e.hit_count) {
984                self.entries.remove(&evict_key);
985            }
986        }
987        self.entries.insert(key, entry);
988    }
989
990    /// Number of cached plans.
991    fn len(&self) -> usize {
992        self.entries.len()
993    }
994}
995
996/// Compute a hash key from a query string.
997fn plan_cache_key(cypher: &str) -> u64 {
998    use std::hash::{Hash, Hasher};
999    let mut hasher = std::collections::hash_map::DefaultHasher::new();
1000    cypher.hash(&mut hasher);
1001    hasher.finish()
1002}