Skip to main content

selene_gql/runtime/
session.rs

1//! Statement-session state for explicit transaction control.
2
3use std::{
4    borrow::Cow, cell::RefCell, collections::BTreeMap, num::NonZeroUsize, sync::Arc, time::Instant,
5};
6
7use selene_core::{CancellationToken, Change, DbString, Value};
8use selene_graph::{CommitOutcome, SharedGraph, WriteTxn};
9
10use crate::{
11    GqlStatus, SourceSpan,
12    plan::ImplDefinedCaps,
13    runtime::{
14        BindingTable, BindingTableRegistry, CallPlanCache, ExecutorError, ExecutorWarning,
15        PlanCache, PlanCacheStats, SharedPlanCache, WarningSink, WriteOutcome,
16    },
17};
18
19/// Session-local query parameter value.
20///
21/// Scalar parameters are visible to statements as regular [`Value`]s. Table
22/// parameters are registered in that statement's binding-table registry and
23/// materialized as request-scoped `TableRef` values.
24#[derive(Clone, Debug, PartialEq)]
25pub enum SessionParameterValue {
26    /// Scalar query parameter value.
27    Scalar(Value),
28    /// Binding-table query parameter value.
29    Table(Arc<BindingTable>),
30}
31
32/// Caller-owned executor session bound to one shared graph.
33pub struct Session<'g> {
34    graph: &'g SharedGraph,
35    principal: Option<Arc<[u8]>>,
36    pub(crate) parameters: BTreeMap<DbString, SessionParameterValue>,
37    pub(crate) scalar_parameters: BTreeMap<DbString, Value>,
38    pub(crate) plan_cache: Option<PlanCache>,
39    pub(crate) shared_plan_cache: Option<Arc<SharedPlanCache>>,
40    pub(crate) call_plan_cache: Option<Arc<CallPlanCache>>,
41    pub(crate) active_txn: Option<WriteTxn<'g>>,
42    pub(crate) aborted: bool,
43    pub(crate) tx_started_at: Option<Instant>,
44    pub(crate) tx_statement_count: u32,
45    pub(crate) cancellation: Option<CancellationToken>,
46    pub(crate) deadline: Option<Instant>,
47    pub(crate) max_nodes_scanned: Option<usize>,
48    pub(crate) row_cap: Option<usize>,
49    pub(crate) warning_sink: Option<RefCell<Box<dyn WarningSink>>>,
50    /// When set, `execute_source` runs the optimizer with a snapshot-pinned
51    /// [`LiveIndexCatalog`] so label / typed / composite index access paths are
52    /// selected. Default `true` (greenfield default-on). Toggle off via
53    /// [`Session::without_index_selection`] to lower the byte-identical Linear
54    /// plan (perf-baseline pinning / debugging).
55    pub(crate) index_selection: bool,
56    /// Session-local time-zone displacement (ISO/IEC 39075:2024 section 4.5.2.1).
57    ///
58    /// `None` is the ID048 default (UTC); `SESSION SET TIME ZONE` sets it and
59    /// `SESSION RESET TIME ZONE` clears it back to `None`. The threaded value
60    /// is consumed by the section 20.27 current-datetime functions.
61    pub(crate) time_zone: Option<jiff::tz::TimeZone>,
62    /// Session termination flag (ISO/IEC 39075:2024 section 7.3).
63    ///
64    /// Set by `SESSION CLOSE`; once set, every subsequent `execute_source`
65    /// request returns [`ExecutorError::SessionClosed`].
66    pub(crate) closed: bool,
67    /// Embedder-configured implementation-defined planning/runtime caps
68    /// (ISO IL013/IL015/IL018 limit surfaces). Defaults to
69    /// [`ImplDefinedCaps::DEFAULT`]; overridden via
70    /// [`Session::with_impl_defined_caps`]. Passed into `build_plan`, so it is
71    /// baked into every lowered plan and consulted by the plan-time quantifier
72    /// gate as well as the runtime/optimizer cap checks.
73    pub(crate) caps: ImplDefinedCaps,
74}
75
76pub(crate) fn materialize_parameter_values<'a>(
77    parameters: &'a BTreeMap<DbString, SessionParameterValue>,
78    scalar_parameters: &'a BTreeMap<DbString, Value>,
79    registry: &BindingTableRegistry,
80) -> Cow<'a, BTreeMap<DbString, Value>> {
81    if parameters
82        .values()
83        .all(|value| matches!(value, SessionParameterValue::Scalar(_)))
84    {
85        return Cow::Borrowed(scalar_parameters);
86    }
87
88    let mut materialized = scalar_parameters.clone();
89    for (name, value) in parameters {
90        if let SessionParameterValue::Table(table) = value {
91            materialized.insert(
92                name.clone(),
93                Value::TableRef(registry.register(Arc::clone(table))),
94            );
95        }
96    }
97    Cow::Owned(materialized)
98}
99
100/// Metadata returned after committing an explicit transaction through a [`Session`].
101#[derive(Clone, Debug, PartialEq)]
102#[non_exhaustive]
103pub struct TransactionOutcome {
104    /// Total changes aggregated across all statements in the transaction.
105    pub changes: Vec<Change>,
106    /// Graph generation published by the commit.
107    pub generation: u64,
108    /// Next node ID after the commit.
109    pub next_node_id: u64,
110    /// Next edge ID after the commit.
111    pub next_edge_id: u64,
112    /// Highest sequence reported by commit-critical durable providers.
113    pub durable_at: Option<u64>,
114    /// Wall-clock duration from `start_transaction` to commit completion.
115    pub duration_micros: u64,
116    /// Number of accepted non-control statements in the transaction window.
117    pub statement_count: u32,
118}
119
120impl TransactionOutcome {
121    pub(crate) fn into_write_outcome(self) -> WriteOutcome {
122        WriteOutcome {
123            rows: None,
124            changes: self.changes,
125            generation: self.generation,
126            next_node_id: self.next_node_id,
127            next_edge_id: self.next_edge_id,
128            durable_at: self.durable_at,
129        }
130    }
131}
132
133/// Metadata returned after rolling back an explicit transaction through a [`Session`].
134#[derive(Clone, Debug, PartialEq, Eq)]
135#[non_exhaustive]
136pub struct RollbackOutcome {
137    /// Count of changes discarded by the rollback.
138    pub discarded_changes: usize,
139    /// Number of accepted non-control statements in the transaction window.
140    pub statement_count: u32,
141    /// Wall-clock duration from `start_transaction` to rollback completion.
142    pub duration_micros: u64,
143}
144
145impl<'g> Session<'g> {
146    /// Create a session without commit-principal bytes.
147    #[must_use]
148    pub const fn new(graph: &'g SharedGraph) -> Self {
149        Self {
150            graph,
151            principal: None,
152            parameters: BTreeMap::new(),
153            scalar_parameters: BTreeMap::new(),
154            plan_cache: None,
155            shared_plan_cache: None,
156            call_plan_cache: None,
157            active_txn: None,
158            aborted: false,
159            tx_started_at: None,
160            tx_statement_count: 0,
161            cancellation: None,
162            deadline: None,
163            max_nodes_scanned: None,
164            row_cap: None,
165            warning_sink: None,
166            index_selection: true,
167            time_zone: None,
168            closed: false,
169            caps: ImplDefinedCaps::DEFAULT,
170        }
171    }
172
173    /// Create a session that forwards opaque principal bytes to commits.
174    #[must_use]
175    pub fn with_principal(graph: &'g SharedGraph, principal: Arc<[u8]>) -> Self {
176        Self {
177            graph,
178            principal: Some(principal),
179            parameters: BTreeMap::new(),
180            scalar_parameters: BTreeMap::new(),
181            plan_cache: None,
182            shared_plan_cache: None,
183            call_plan_cache: None,
184            active_txn: None,
185            aborted: false,
186            tx_started_at: None,
187            tx_statement_count: 0,
188            cancellation: None,
189            deadline: None,
190            max_nodes_scanned: None,
191            row_cap: None,
192            warning_sink: None,
193            index_selection: true,
194            time_zone: None,
195            closed: false,
196            caps: ImplDefinedCaps::DEFAULT,
197        }
198    }
199
200    /// Attach a cooperative cancellation token to subsequent statements.
201    ///
202    /// Cancellation is cooperative: statements observe the token at executor,
203    /// built-in-procedure, and algorithm checkpoints. If a statement inside an
204    /// explicit transaction returns `Cancelled`, the transaction enters the
205    /// failed state until `ROLLBACK`.
206    #[must_use]
207    pub fn with_cancellation_token(mut self, token: CancellationToken) -> Self {
208        self.cancellation = Some(token);
209        self
210    }
211
212    /// Attach an absolute per-statement deadline to subsequent statements.
213    ///
214    /// The deadline is compared with `Instant::now()` at the same cooperative
215    /// checkpoints as cancellation. Expiry returns `Timeout`; inside an
216    /// explicit transaction that also marks the transaction failed until
217    /// `ROLLBACK`.
218    #[must_use]
219    pub fn with_deadline(mut self, deadline: Instant) -> Self {
220        self.deadline = Some(deadline);
221        self
222    }
223
224    /// Attach a deterministic per-statement node-scan budget.
225    ///
226    /// Scan-heavy graph and procedure loops debit this budget at batch
227    /// boundaries. Exceeding it returns GQLSTATUS `5GQL1`
228    /// (program-limit-exceeded); inside an explicit transaction that also
229    /// marks the transaction failed until `ROLLBACK`.
230    #[must_use]
231    pub fn with_max_nodes_scanned(mut self, max_nodes: usize) -> Self {
232        self.max_nodes_scanned = Some(max_nodes);
233        self
234    }
235
236    /// Set the implementation-defined planning/runtime caps for subsequent
237    /// statements (ISO IL013/IL015/IL018 limit surfaces — e.g.
238    /// [`max_quantifier`](ImplDefinedCaps::max_quantifier), set-op / `GROUP BY`
239    /// key caps, optimizer-iteration, string/byte-string concat length,
240    /// path-length, and list-cardinality bounds).
241    ///
242    /// The caps are baked into every plan lowered for this session, so they are
243    /// honored by both the plan-time variable-length quantifier gate and the
244    /// runtime/optimizer cap checks. Defaults to [`ImplDefinedCaps::DEFAULT`].
245    #[must_use]
246    pub fn with_impl_defined_caps(mut self, caps: ImplDefinedCaps) -> Self {
247        self.caps = caps;
248        self
249    }
250
251    /// Attach an outermost result-row cap to subsequent statements.
252    ///
253    /// The cap is enforced only at the statement output boundary. Intermediate
254    /// rows produced by scans, joins, `FOR`, or other pipeline operators do
255    /// not count against it. Exceeding the cap returns `RowCapExceeded`; inside
256    /// an explicit transaction that marks the transaction failed until
257    /// `ROLLBACK`.
258    #[must_use]
259    pub fn with_row_cap(mut self, max_rows: usize) -> Self {
260        self.row_cap = Some(max_rows);
261        self
262    }
263
264    /// Attach an opt-in runtime warning sink to subsequent statements.
265    ///
266    /// Sessions without a sink silently discard warnings. The sink currently
267    /// receives ISO warning records such as `01G11` for aggregate NULL
268    /// elimination and `01N01` for relaxed validation-mode writes; see
269    /// `docs/embedding-guide.md` for an embedder-side collection example.
270    #[must_use]
271    pub fn with_warning_sink(mut self, sink: impl WarningSink + 'static) -> Self {
272        self.warning_sink = Some(RefCell::new(Box::new(sink)));
273        self
274    }
275
276    /// Bind or replace a session-local query parameter.
277    ///
278    /// Parameters are named without the leading `$` and are resolved by
279    /// `$name` references during statement execution. Binding is an upsert:
280    /// rebinding a name replaces the prior value and affects subsequent
281    /// statements only. Parameters are session-level metadata, so transaction
282    /// boundaries and [`Self::abort`] preserve the map. Parameters not
283    /// referenced by a statement are ignored. Session plan-cache keys remain
284    /// source-only; parameter values and runtime types are checked during each
285    /// execution.
286    ///
287    /// Runtime positions that require a specific type validate strictly; for
288    /// example, `LIMIT $n` accepts only non-negative integer values and returns
289    /// [`ExecutorError::InvalidParameterType`] for mismatches.
290    ///
291    /// If `name` previously held a table binding, the table is replaced and
292    /// `None` is returned. Use [`Self::bind_table_parameter`] when callers need
293    /// table-aware replacement information.
294    pub fn bind_parameter(&mut self, name: DbString, value: Value) -> Option<Value> {
295        self.scalar_parameters.insert(name.clone(), value.clone());
296        match self
297            .parameters
298            .insert(name, SessionParameterValue::Scalar(value))
299        {
300            Some(SessionParameterValue::Scalar(prior)) => Some(prior),
301            Some(SessionParameterValue::Table(_)) | None => None,
302        }
303    }
304
305    /// Bind or replace a session-local query parameter with a binding table.
306    ///
307    /// The table is stored at session scope and materialized into a fresh
308    /// request-scoped table reference for each statement execution.
309    pub fn bind_table_parameter(
310        &mut self,
311        name: DbString,
312        table: BindingTable,
313    ) -> Option<SessionParameterValue> {
314        self.scalar_parameters.remove(&name);
315        self.parameters
316            .insert(name, SessionParameterValue::Table(Arc::new(table)))
317    }
318
319    /// Remove one session-local query parameter and return its prior scalar value.
320    ///
321    /// If `name` held a table binding, the table is removed and `None` is
322    /// returned.
323    pub fn clear_parameter(&mut self, name: &DbString) -> Option<Value> {
324        self.scalar_parameters.remove(name);
325        match self.parameters.remove(name) {
326            Some(SessionParameterValue::Scalar(prior)) => Some(prior),
327            Some(SessionParameterValue::Table(_)) | None => None,
328        }
329    }
330
331    /// Remove all session-local query parameters.
332    pub fn clear_parameters(&mut self) {
333        self.parameters.clear();
334        self.scalar_parameters.clear();
335    }
336
337    /// True when a session-local parameter named `name` is currently bound.
338    ///
339    /// Used to honor `SESSION SET VALUE IF NOT EXISTS` (ISO section 7.4): an
340    /// existing binding is left untouched.
341    #[must_use]
342    pub(crate) fn has_parameter(&self, name: &DbString) -> bool {
343        self.parameters.contains_key(name)
344    }
345
346    /// Set the session-local time-zone displacement (ISO feature GS15).
347    ///
348    /// Consumed by the section 20.27 current-datetime functions; persists across
349    /// transaction boundaries like the other session characteristics.
350    pub(crate) fn set_time_zone(&mut self, zone: jiff::tz::TimeZone) {
351        self.time_zone = Some(zone);
352    }
353
354    /// Reset the session time zone to the ID048 default, UTC (ISO feature GS07).
355    pub(crate) fn reset_time_zone(&mut self) {
356        self.time_zone = None;
357    }
358
359    /// Return the time zone temporal evaluation should use for this session.
360    ///
361    /// `None` maps to the ID048 default of UTC.
362    #[must_use]
363    pub(crate) fn effective_time_zone(&self) -> jiff::tz::TimeZone {
364        self.time_zone.clone().unwrap_or(jiff::tz::TimeZone::UTC)
365    }
366
367    /// Reset every session characteristic (ISO feature GS04).
368    ///
369    /// Clears all session parameters and resets the time zone to its default.
370    pub(crate) fn reset_characteristics(&mut self) {
371        self.clear_parameters();
372        self.reset_time_zone();
373    }
374
375    /// Reset all session parameters, leaving other characteristics (ISO feature GS08).
376    pub(crate) fn reset_parameters(&mut self) {
377        self.clear_parameters();
378    }
379
380    /// Reset one named session parameter (ISO feature GS16).
381    pub(crate) fn reset_parameter(&mut self, name: &DbString) {
382        self.clear_parameter(name);
383    }
384
385    /// Mark this session closed (ISO/IEC 39075:2024 section 7.3).
386    ///
387    /// Any active explicit transaction is rolled back first so a closed session
388    /// leaves no dangling write lock.
389    pub(crate) fn close(&mut self) {
390        self.abort();
391        self.closed = true;
392    }
393
394    /// True when `SESSION CLOSE` has terminated this session.
395    #[must_use]
396    pub const fn is_closed(&self) -> bool {
397        self.closed
398    }
399
400    /// Borrow the session-local query-parameter map used for statement execution.
401    #[must_use]
402    #[cfg(test)]
403    pub(crate) fn parameters(&self) -> &BTreeMap<DbString, SessionParameterValue> {
404        &self.parameters
405    }
406
407    #[cfg(test)]
408    pub(crate) fn materialize_parameters<'a>(
409        &'a self,
410        registry: &BindingTableRegistry,
411    ) -> Cow<'a, BTreeMap<DbString, Value>> {
412        materialize_parameter_values(&self.parameters, &self.scalar_parameters, registry)
413    }
414
415    /// Disable optimizer index selection; all scans fall back to
416    /// [`ScanAccess::Linear`](crate::ScanAccess::Linear).
417    ///
418    /// With index selection off, `execute_source` skips the optimizer entirely
419    /// and lowers the byte-identical Linear plan (and EXPLAIN output) of
420    /// pre-optimizer-wiring HEAD. This is the escape hatch for committed
421    /// perf-baseline reproduction and access-path debugging.
422    #[must_use]
423    pub const fn without_index_selection(mut self) -> Self {
424        self.index_selection = false;
425        self
426    }
427
428    /// (Re-)enable optimizer index selection (the default).
429    ///
430    /// When enabled, `execute_source` builds a snapshot-pinned
431    /// [`LiveIndexCatalog`](crate::LiveIndexCatalog) per cache-miss statement
432    /// and runs the optimizer so label / typed / composite index access paths
433    /// are selected. Linear remains the always-correct fallback inside every
434    /// rule, so results are byte-identical to the disabled path.
435    #[must_use]
436    pub const fn with_index_selection(mut self) -> Self {
437        self.index_selection = true;
438        self
439    }
440
441    /// Enable this session's source-string plan cache with the given capacity.
442    ///
443    /// The cache is Session-local and invalidates entries when the backing
444    /// graph's schema-version epoch changes.
445    #[must_use]
446    pub fn with_plan_cache(mut self, capacity: NonZeroUsize) -> Self {
447        self.plan_cache = Some(PlanCache::new(capacity));
448        self
449    }
450
451    /// Enable this session's shared non-CALL source-string plan cache.
452    ///
453    /// Embedders should pass one shared cache per graph so short-lived
454    /// sessions can reuse read and write plans across requests. The cache key
455    /// includes graph ID, schema-version epoch, procedure-registry version,
456    /// source text, implementation-defined caps, and optimizer
457    /// index-selection mode.
458    #[must_use]
459    pub fn with_shared_plan_cache(mut self, cache: Arc<SharedPlanCache>) -> Self {
460        self.shared_plan_cache = Some(cache);
461        self
462    }
463
464    /// Enable this session's shared procedure-CALL plan cache.
465    ///
466    /// Embedders should pass one shared cache per graph so short-lived
467    /// sessions can reuse procedure-call plans across requests. The cache key
468    /// includes the graph ID, schema-version epoch, and procedure-registry
469    /// version.
470    #[must_use]
471    pub fn with_call_plan_cache(mut self, cache: Arc<CallPlanCache>) -> Self {
472        self.call_plan_cache = Some(cache);
473        self
474    }
475
476    /// Return this session's plan-cache counters, if caching is enabled.
477    #[must_use]
478    pub fn plan_cache_stats(&self) -> Option<PlanCacheStats> {
479        self.plan_cache.as_ref().map(PlanCache::stats)
480    }
481
482    /// Clear this session's cached plans without resetting counters.
483    pub fn clear_plan_cache(&mut self) {
484        if let Some(cache) = self.plan_cache.as_mut() {
485            cache.clear();
486        }
487    }
488
489    /// Borrow the graph this session executes against.
490    #[must_use]
491    pub(crate) const fn graph(&self) -> &'g SharedGraph {
492        self.graph
493    }
494
495    /// Clone the principal bytes for a commit boundary.
496    #[must_use]
497    pub(crate) fn principal(&self) -> Option<Arc<[u8]>> {
498        self.principal.clone()
499    }
500
501    /// Return true when the session owns an explicit write transaction.
502    #[must_use]
503    pub const fn has_active_txn(&self) -> bool {
504        self.active_txn.is_some()
505    }
506
507    /// Return true when the active explicit transaction is aborted.
508    #[must_use]
509    pub const fn is_aborted(&self) -> bool {
510        self.aborted
511    }
512
513    /// Open an explicit write transaction.
514    ///
515    /// Subsequent non-control statements executed through this session run
516    /// inside the transaction until [`Self::commit_transaction`] or
517    /// [`Self::rollback_transaction`] closes it.
518    ///
519    /// # Errors
520    ///
521    /// Returns [`ExecutorError::TransactionAlreadyActive`] when this session
522    /// already owns an explicit transaction.
523    pub fn start_transaction(&mut self) -> Result<(), ExecutorError> {
524        if self.active_txn.is_some() {
525            return Err(ExecutorError::TransactionAlreadyActive {
526                span: SourceSpan::default(),
527            });
528        }
529        self.active_txn = Some(self.graph.begin_write());
530        self.tx_started_at = Some(Instant::now());
531        self.tx_statement_count = 0;
532        self.aborted = false;
533        Ok(())
534    }
535
536    /// Commit the open explicit transaction.
537    ///
538    /// # Errors
539    ///
540    /// Returns [`ExecutorError::NoActiveTransaction`] when no explicit
541    /// transaction is open, [`ExecutorError::InFailedTransaction`] when the
542    /// transaction has been aborted by a failed statement, or
543    /// [`ExecutorError::GraphMutation`] when the graph commit is rejected.
544    pub fn commit_transaction(&mut self) -> Result<TransactionOutcome, ExecutorError> {
545        if self.aborted {
546            if let Some(txn) = self.active_txn.take() {
547                txn.rollback();
548            }
549            self.clear_tx_state();
550            return Err(ExecutorError::InFailedTransaction {
551                span: SourceSpan::default(),
552            });
553        }
554        let txn = self
555            .active_txn
556            .take()
557            .ok_or(ExecutorError::NoActiveTransaction {
558                span: SourceSpan::default(),
559            })?;
560        let statement_count = self.tx_statement_count;
561        let outcome = txn.commit_with_principal(self.principal.clone());
562        let duration_micros = self.tx_duration_micros();
563        self.clear_tx_state();
564        let outcome = outcome.map_err(|source| ExecutorError::GraphMutation {
565            source,
566            span: SourceSpan::default(),
567        })?;
568        emit_commit_warnings(&outcome, self.warning_sink.as_ref());
569        Ok(TransactionOutcome {
570            changes: outcome.changes,
571            generation: outcome.generation,
572            next_node_id: outcome.next_node_id,
573            next_edge_id: outcome.next_edge_id,
574            durable_at: outcome.durable_at,
575            duration_micros,
576            statement_count,
577        })
578    }
579
580    /// Roll back the open explicit transaction.
581    ///
582    /// # Errors
583    ///
584    /// Returns [`ExecutorError::NoActiveTransaction`] when no explicit
585    /// transaction is open.
586    pub fn rollback_transaction(&mut self) -> Result<RollbackOutcome, ExecutorError> {
587        let txn = self
588            .active_txn
589            .take()
590            .ok_or(ExecutorError::NoActiveTransaction {
591                span: SourceSpan::default(),
592            })?;
593        let discarded_changes = txn.change_count();
594        let statement_count = self.tx_statement_count;
595        let duration_micros = self.tx_duration_micros();
596        txn.rollback();
597        self.clear_tx_state();
598        Ok(RollbackOutcome {
599            discarded_changes,
600            statement_count,
601            duration_micros,
602        })
603    }
604
605    /// Flush every commit-critical durable provider registered on this graph.
606    ///
607    /// Returns the highest durable sequence reported by providers, or `None`
608    /// when the graph has no durable providers.
609    ///
610    /// # Errors
611    ///
612    /// Returns [`ExecutorError::Flush`] when any provider-owned flush fails.
613    pub fn flush(&self) -> Result<Option<u64>, ExecutorError> {
614        let mut highest = None;
615        for provider in self.graph.durable_providers() {
616            let tag = provider.provider_tag();
617            let seq = provider.flush().map_err(|error| ExecutorError::Flush {
618                provider_tag: tag,
619                reason: error.to_string(),
620            })?;
621            if let Some(seq) = seq {
622                highest = Some(highest.map_or(seq, |current: u64| current.max(seq)));
623            }
624        }
625        Ok(highest)
626    }
627
628    /// Roll back and clear the explicit transaction, when one is active.
629    pub fn abort(&mut self) {
630        if let Some(txn) = self.active_txn.take() {
631            txn.rollback();
632        }
633        self.clear_tx_state();
634    }
635
636    fn tx_duration_micros(&self) -> u64 {
637        self.tx_started_at
638            .map_or(0, |started| started.elapsed().as_micros() as u64)
639    }
640
641    fn clear_tx_state(&mut self) {
642        self.aborted = false;
643        self.tx_started_at = None;
644        self.tx_statement_count = 0;
645    }
646}
647
648fn emit_commit_warnings(
649    outcome: &CommitOutcome,
650    warning_sink: Option<&RefCell<Box<dyn WarningSink>>>,
651) {
652    let Some(sink) = warning_sink else {
653        return;
654    };
655    for warning in &outcome.warnings {
656        sink.borrow_mut().emit(ExecutorWarning {
657            code: GqlStatus::VALIDATION_MODE_RELAXED_WRITE,
658            message: warning.warning.violation.to_string(),
659            span: SourceSpan::default(),
660        });
661    }
662}
663
664#[cfg(test)]
665#[path = "session_tests.rs"]
666mod session_tests;