Skip to main content

uni_db/api/
transaction.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Transaction — the explicit write scope.
5//!
6//! Transactions provide ACID guarantees for multi-statement writes.
7//! Changes are isolated until commit.
8
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::Ordering;
12use std::time::{Duration, Instant};
13
14use metrics;
15use tokio_util::sync::CancellationToken;
16use tracing::{info, instrument, warn};
17use uuid::Uuid;
18
19use crate::api::UniInner;
20use crate::api::impl_locy::{self, LocyRuleRegistry};
21use crate::api::session::Session;
22use uni_common::{Result, UniError};
23use uni_locy::DerivedFactSet;
24
25use crate::api::locy_result::LocyResult;
26use uni_query::{ExecuteResult, QueryCursor, QueryResult, Row, Value};
27
28/// Snapshot of L0 mutation state, used for before/after comparison in execute operations.
29struct L0Snapshot {
30    mutation_count: usize,
31    mutation_stats: uni_store::runtime::l0::MutationStats,
32}
33
34/// Transaction isolation level.
35///
36/// Uses commit-time serialization: `tx()` allocates a private L0 buffer
37/// without acquiring the writer lock; the writer lock is only acquired
38/// at commit time for WAL + merge.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
40#[non_exhaustive]
41pub enum IsolationLevel {
42    /// Serialized isolation with begin-time writer lock.
43    #[default]
44    Serialized,
45}
46
47impl std::fmt::Display for IsolationLevel {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            IsolationLevel::Serialized => write!(f, "Serialized"),
51        }
52    }
53}
54
55/// Result of committing a transaction.
56#[derive(Debug)]
57pub struct CommitResult {
58    /// Number of mutations committed.
59    pub mutations_committed: usize,
60    /// Number of rules promoted to the parent session.
61    pub rules_promoted: usize,
62    /// Database version after commit.
63    pub version: u64,
64    /// Database version when the transaction was created.
65    pub started_at_version: u64,
66    /// WAL log sequence number of the commit (0 when no WAL is configured).
67    pub wal_lsn: u64,
68    /// Duration of the commit operation (lock + WAL + merge).
69    pub duration: Duration,
70    /// Errors encountered during rule promotion (best-effort).
71    pub rule_promotion_errors: Vec<RulePromotionError>,
72}
73
74impl CommitResult {
75    /// Number of versions that committed between tx start and commit.
76    /// 0 means no concurrent commits occurred.
77    pub fn version_gap(&self) -> u64 {
78        self.version.saturating_sub(self.started_at_version + 1)
79    }
80}
81
82/// Error encountered during rule promotion at commit time.
83#[derive(Debug, Clone)]
84pub struct RulePromotionError {
85    pub rule_text: String,
86    pub error: String,
87}
88
89/// A database transaction — the explicit write scope.
90///
91/// Transactions provide ACID guarantees for multiple operations.
92/// Changes are isolated until [`commit()`](Self::commit).
93///
94/// # Concurrency
95///
96/// Uses commit-time serialization: each transaction owns a private L0 buffer.
97/// `tx()` only takes a reader lock (to snapshot the version); the writer lock
98/// is acquired briefly per-mutation and once at commit for WAL + merge.
99/// Multiple transactions can coexist; isolation is provided by private L0 buffers.
100///
101/// # Drop Behavior
102///
103/// If dropped without calling `commit()` or `rollback()`, the private L0 is
104/// simply discarded (no writer lock needed) and a warning is logged if dirty.
105pub struct Transaction {
106    pub(crate) db: Arc<UniInner>,
107    /// Private L0 buffer — mutations within this transaction are routed here.
108    pub(crate) tx_l0: Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
109    /// Session-level write guard (set false on complete)
110    session_write_guard: Arc<std::sync::atomic::AtomicBool>,
111    /// Session's rule registry (for rule promotion on commit)
112    session_rule_registry: Arc<std::sync::RwLock<LocyRuleRegistry>>,
113    /// Transaction-scoped rule registry
114    rule_registry: Arc<std::sync::RwLock<LocyRuleRegistry>>,
115    /// Session's metrics counters (for commit/rollback tracking)
116    session_metrics: Arc<crate::api::session::SessionMetricsInner>,
117    completed: bool,
118    id: String,
119    /// Session ID (for commit notifications and hooks).
120    session_id: String,
121    start_time: Instant,
122    started_at_version: u64,
123    /// Optional deadline for the transaction.
124    deadline: Option<Instant>,
125    /// Child cancellation token derived from the session's parent token.
126    cancellation_token: CancellationToken,
127    /// Hooks inherited from the session.
128    hooks: Vec<Arc<dyn crate::api::hooks::SessionHook>>, // Flattened from session's HashMap
129}
130
131impl Transaction {
132    pub(crate) async fn new(session: &Session) -> Result<Self> {
133        Self::new_with_options(session, None, IsolationLevel::default()).await
134    }
135
136    pub(crate) async fn new_with_options(
137        session: &Session,
138        timeout: Option<Duration>,
139        _isolation: IsolationLevel,
140    ) -> Result<Self> {
141        // Ensure no other write context is active on this session
142        if session
143            .active_write_guard()
144            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
145            .is_err()
146        {
147            return Err(UniError::WriteContextAlreadyActive {
148                session_id: session.id().to_string(),
149                hint: "Only one Transaction, BulkWriter, or Appender can be active per Session at a time. Commit or rollback the active one first, or create a separate Session for concurrent writes.",
150            });
151        }
152
153        // Panic safety: if anything between the compare_exchange above and the
154        // Transaction construction below panics, this scopeguard ensures the
155        // write guard is cleared so the Session isn't permanently locked.
156        // Once the Transaction is successfully constructed, we forget the guard —
157        // Transaction's Drop impl takes over cleanup responsibility.
158        let write_guard_cleanup = scopeguard::guard(session.active_write_guard().clone(), |g| {
159            g.store(false, Ordering::SeqCst);
160        });
161
162        let db = session.db().clone();
163        let writer_lock = db.writer.clone().ok_or_else(|| {
164            // No need to manually clear — scopeguard handles it on early return
165            UniError::ReadOnly {
166                operation: "start_transaction".to_string(),
167            }
168        })?;
169
170        // READ lock only — create a private L0 buffer without blocking other writers.
171        // This is the key commit-time serialization change: no writer WRITE lock
172        // is taken at transaction begin; it's deferred to commit().
173        let (started_at_version, tx_l0) = {
174            let writer = writer_lock.read().await;
175            let l0 = writer.create_transaction_l0();
176            let version = l0.read().current_version;
177            (version, l0)
178        };
179
180        let id = Uuid::new_v4().to_string();
181        info!(transaction_id = %id, "Transaction started");
182
183        // Clone session's rule registry for transaction-scoped modifications
184        let session_registry = session.rule_registry().read().unwrap().clone();
185
186        let deadline = timeout.map(|d| Instant::now() + d);
187        // Child token from session — cancelled when session.cancel() fires
188        let cancellation_token = session.cancellation_token().child_token();
189
190        let tx = Self {
191            db,
192            tx_l0,
193            session_write_guard: session.active_write_guard().clone(),
194            session_rule_registry: session.rule_registry().clone(),
195            rule_registry: Arc::new(std::sync::RwLock::new(session_registry)),
196            session_metrics: session.metrics_inner.clone(),
197            completed: false,
198            id,
199            session_id: session.id().to_string(),
200            start_time: Instant::now(),
201            started_at_version,
202            deadline,
203            cancellation_token,
204            hooks: session.hooks.values().cloned().collect(),
205        };
206
207        // Transaction constructed successfully — its Drop impl will clear the
208        // write guard, so we disarm the scopeguard.
209        std::mem::forget(write_guard_cleanup);
210
211        Ok(tx)
212    }
213
214    // ── Cypher Reads (sees shared DB + uncommitted writes) ────────────
215
216    /// Execute a Cypher query within the transaction.
217    /// Reads see the private L0 buffer (uncommitted writes).
218    #[instrument(skip(self), fields(transaction_id = %self.id))]
219    pub async fn query(&self, cypher: &str) -> Result<QueryResult> {
220        self.check_completed()?;
221        self.db
222            .execute_internal_with_tx_l0(cypher, HashMap::new(), self.tx_l0.clone())
223            .await
224    }
225
226    /// Execute a Cypher query with parameters.
227    pub fn query_with(&self, cypher: &str) -> TxQueryBuilder<'_> {
228        TxQueryBuilder {
229            tx: self,
230            cypher: cypher.to_string(),
231            params: HashMap::new(),
232            cancellation_token: None,
233            timeout: None,
234        }
235    }
236
237    // ── Cypher Writes ─────────────────────────────────────────────────
238
239    /// Execute a Cypher mutation within the transaction.
240    /// Mutation count is read from the private L0 (not the global writer).
241    #[instrument(skip(self), fields(transaction_id = %self.id))]
242    pub async fn execute(&self, cypher: &str) -> Result<ExecuteResult> {
243        self.check_completed()?;
244        let before = self.snapshot_l0();
245        let result = self.query(cypher).await?;
246        let after = self.snapshot_l0();
247        Ok(Self::compute_execute_result(&before, &after, &result))
248    }
249
250    /// Execute a mutation with parameters using a builder.
251    ///
252    /// Returns an [`ExecuteBuilder`] that provides `.param()` chaining
253    /// and a `.run()` method that returns [`ExecuteResult`].
254    pub fn execute_with(&self, cypher: &str) -> ExecuteBuilder<'_> {
255        ExecuteBuilder {
256            tx: self,
257            cypher: cypher.to_string(),
258            params: HashMap::new(),
259            timeout: None,
260        }
261    }
262
263    // ── DerivedFactSet Application ─────────────────────────────────────
264
265    /// Apply a `DerivedFactSet` (from a session-level DERIVE) to this transaction.
266    ///
267    /// Replays the collected Cypher mutation ASTs against the transaction's
268    /// private L0 buffer. Logs an info-level warning if the database version
269    /// has advanced since the DERIVE was evaluated (version gap > 0).
270    #[instrument(skip(self, derived), fields(transaction_id = %self.id))]
271    pub async fn apply(&self, derived: DerivedFactSet) -> Result<ApplyResult> {
272        self.apply_internal(derived, false, None).await
273    }
274
275    /// Start building an apply operation with staleness controls.
276    pub fn apply_with(&self, derived: DerivedFactSet) -> ApplyBuilder<'_> {
277        ApplyBuilder {
278            tx: self,
279            derived,
280            require_fresh: false,
281            max_version_gap: None,
282        }
283    }
284
285    async fn apply_internal(
286        &self,
287        derived: DerivedFactSet,
288        require_fresh: bool,
289        max_gap: Option<u64>,
290    ) -> Result<ApplyResult> {
291        self.check_completed()?;
292        let current_version = self.tx_l0.read().current_version;
293        let version_gap = current_version.saturating_sub(derived.evaluated_at_version);
294
295        if require_fresh && version_gap > 0 {
296            return Err(UniError::StaleDerivedFacts { version_gap });
297        }
298        if let Some(max) = max_gap
299            && version_gap > max
300        {
301            return Err(UniError::StaleDerivedFacts { version_gap });
302        }
303        if version_gap > 0 {
304            info!(
305                transaction_id = %self.id,
306                version_gap,
307                "Applying DerivedFactSet with version gap"
308            );
309        }
310
311        let mut facts_applied = 0;
312        for query in derived.mutation_queries {
313            self.db
314                .execute_ast_internal_with_tx_l0(
315                    query,
316                    "<locy-apply>",
317                    HashMap::new(),
318                    self.db.config.clone(),
319                    self.tx_l0.clone(),
320                )
321                .await?;
322            facts_applied += 1;
323        }
324
325        Ok(ApplyResult {
326            facts_applied,
327            version_gap,
328        })
329    }
330
331    // ── Bulk Insert (admin convenience) ─────────────────────────────
332
333    /// Bulk insert vertices for a given label within this transaction.
334    ///
335    /// Mutations are written to the transaction's private L0 and become
336    /// visible on commit. Returns the allocated VIDs in input order.
337    #[instrument(skip(self, properties_list), fields(transaction_id = %self.id))]
338    pub async fn bulk_insert_vertices(
339        &self,
340        label: &str,
341        properties_list: Vec<uni_common::Properties>,
342    ) -> Result<Vec<uni_common::core::id::Vid>> {
343        self.check_completed()?;
344        let schema = self.db.schema.schema();
345        schema
346            .labels
347            .get(label)
348            .ok_or_else(|| UniError::LabelNotFound {
349                label: label.to_string(),
350            })?;
351        let writer_lock = self.db.writer.as_ref().ok_or_else(|| UniError::ReadOnly {
352            operation: "bulk_insert_vertices".to_string(),
353        })?;
354        let mut writer = writer_lock.write().await;
355        if properties_list.is_empty() {
356            return Ok(Vec::new());
357        }
358        let vids = writer
359            .allocate_vids(properties_list.len())
360            .await
361            .map_err(UniError::Internal)?;
362        // Route mutations through the transaction's private L0.
363        let result = writer
364            .insert_vertices_batch(
365                vids.clone(),
366                properties_list,
367                vec![label.to_string()],
368                Some(&self.tx_l0),
369            )
370            .await
371            .map_err(UniError::Internal);
372        result?;
373        Ok(vids)
374    }
375
376    /// Bulk insert edges for a given edge type within this transaction.
377    ///
378    /// Mutations are written to the transaction's private L0 and become
379    /// visible on commit.
380    #[instrument(skip(self, edges), fields(transaction_id = %self.id))]
381    pub async fn bulk_insert_edges(
382        &self,
383        edge_type: &str,
384        edges: Vec<(
385            uni_common::core::id::Vid,
386            uni_common::core::id::Vid,
387            uni_common::Properties,
388        )>,
389    ) -> Result<()> {
390        self.check_completed()?;
391        let schema = self.db.schema.schema();
392        let edge_meta =
393            schema
394                .edge_types
395                .get(edge_type)
396                .ok_or_else(|| UniError::EdgeTypeNotFound {
397                    edge_type: edge_type.to_string(),
398                })?;
399        let type_id = edge_meta.id;
400        let writer_lock = self.db.writer.as_ref().ok_or_else(|| UniError::ReadOnly {
401            operation: "bulk_insert_edges".to_string(),
402        })?;
403        let mut writer = writer_lock.write().await;
404        // Route mutations through the transaction's private L0.
405        let result: Result<()> = async {
406            for (src_vid, dst_vid, props) in edges {
407                let eid = writer.next_eid(type_id).await.map_err(UniError::Internal)?;
408                writer
409                    .insert_edge(
410                        src_vid,
411                        dst_vid,
412                        type_id,
413                        eid,
414                        props,
415                        Some(edge_type.to_string()),
416                        Some(&self.tx_l0),
417                    )
418                    .await
419                    .map_err(UniError::Internal)?;
420            }
421            Ok(())
422        }
423        .await;
424        result
425    }
426
427    // ── Bulk Writer / Appender ────────────────────────────────────────
428
429    /// Create a bulk writer builder for efficient data loading within this transaction.
430    ///
431    /// The bulk writer writes directly to storage (bypassing the L0 buffer).
432    /// The Transaction's write guard ensures mutual exclusion — the BulkWriter
433    /// does not manage the guard itself.
434    pub fn bulk_writer(&self) -> crate::api::bulk::BulkWriterBuilder {
435        crate::api::bulk::BulkWriterBuilder::new_unguarded(self.db.clone())
436    }
437
438    /// Create a streaming appender for row-by-row data loading within this transaction.
439    ///
440    /// The appender writes directly to storage (bypassing the L0 buffer).
441    /// The Transaction's write guard ensures mutual exclusion.
442    pub fn appender(&self, label: &str) -> crate::api::appender::AppenderBuilder {
443        crate::api::appender::AppenderBuilder::new_from_tx(self.db.clone(), label)
444    }
445
446    // ── Locy Evaluation ───────────────────────────────────────────────
447
448    /// Evaluate a Locy program within the transaction.
449    ///
450    /// DERIVE commands auto-apply to the transaction's write buffer.
451    #[instrument(skip(self), fields(transaction_id = %self.id))]
452    pub async fn locy(&self, program: &str) -> Result<LocyResult> {
453        self.check_completed()?;
454        // Create a LocyEngine directly from UniInner (which sees tx L0).
455        // Transaction path: auto-apply DERIVE mutations to the private L0.
456        let engine = impl_locy::LocyEngine {
457            db: &self.db,
458            tx_l0_override: Some(self.tx_l0.clone()),
459            locy_l0: Some(self.tx_l0.clone()),
460            collect_derive: false,
461        };
462        engine.evaluate(program).await
463    }
464
465    /// Evaluate a Locy program with parameters using a builder.
466    pub fn locy_with(&self, program: &str) -> crate::api::locy_builder::TxLocyBuilder<'_> {
467        crate::api::locy_builder::TxLocyBuilder::new(self, program)
468    }
469
470    // ── Prepared Statements ──────────────────────────────────────────
471
472    /// Prepare a Cypher query for repeated execution within this transaction.
473    ///
474    /// The query is parsed and planned once; subsequent executions skip those
475    /// phases. If the schema changes, the prepared query auto-replans.
476    #[instrument(skip(self), fields(transaction_id = %self.id))]
477    pub async fn prepare(&self, cypher: &str) -> Result<crate::api::prepared::PreparedQuery> {
478        self.check_completed()?;
479        crate::api::prepared::PreparedQuery::new(self.db.clone(), cypher).await
480    }
481
482    /// Prepare a Locy program for repeated evaluation within this transaction.
483    #[instrument(skip(self), fields(transaction_id = %self.id))]
484    pub async fn prepare_locy(&self, program: &str) -> Result<crate::api::prepared::PreparedLocy> {
485        self.check_completed()?;
486        crate::api::prepared::PreparedLocy::new(
487            self.db.clone(),
488            self.rule_registry.clone(),
489            program,
490        )
491    }
492
493    // ── Rule Management ───────────────────────────────────────────────
494
495    /// Access the transaction-scoped rule registry.
496    /// On commit, new rules are promoted to the session (best-effort).
497    pub fn rules(&self) -> super::rule_registry::RuleRegistry<'_> {
498        super::rule_registry::RuleRegistry::new(&self.rule_registry)
499    }
500
501    // ── Lifecycle ─────────────────────────────────────────────────────
502
503    /// Commit the transaction.
504    ///
505    /// Persists all changes made during the transaction. Returns a
506    /// [`CommitResult`] with commit metadata.
507    #[instrument(skip(self), fields(transaction_id = %self.id, duration_ms), level = "info")]
508    pub async fn commit(mut self) -> Result<CommitResult> {
509        self.check_completed()?;
510
511        let writer_lock = self.db.writer.as_ref().ok_or_else(|| UniError::ReadOnly {
512            operation: "commit".to_string(),
513        })?;
514
515        // Read mutation count from the private L0 (no lock needed)
516        let mutations = self.tx_l0.read().mutation_count;
517
518        // Run before-commit hooks BEFORE acquiring writer lock (rejection point)
519        if !self.hooks.is_empty() {
520            let ctx = crate::api::hooks::CommitHookContext {
521                session_id: self.session_id.clone(),
522                tx_id: self.id.clone(),
523                mutation_count: mutations,
524            };
525            for hook in &self.hooks {
526                hook.before_commit(&ctx)?;
527            }
528        }
529
530        // Snapshot labels and edge types from L0 BEFORE commit consumes the buffer
531        let (labels_affected, edge_types_affected) = {
532            let l0 = self.tx_l0.read();
533            let labels: Vec<String> = l0
534                .vertex_labels
535                .values()
536                .flatten()
537                .cloned()
538                .collect::<std::collections::HashSet<_>>()
539                .into_iter()
540                .collect();
541            let edge_types: Vec<String> = l0
542                .edge_types
543                .values()
544                .cloned()
545                .collect::<std::collections::HashSet<_>>()
546                .into_iter()
547                .collect();
548            (labels, edge_types)
549        };
550
551        // Acquire writer WRITE lock for WAL + merge
552        let mut writer = tokio::time::timeout(
553            std::time::Duration::from_secs(5),
554            writer_lock.write(),
555        )
556        .await
557        .map_err(|_| UniError::CommitTimeout {
558            tx_id: self.id.clone(),
559            hint: "Another commit is in progress and taking longer than expected. Your transaction is still active \u{2014} you can retry commit().",
560        })?;
561        let wal_lsn = writer.commit_transaction_l0(self.tx_l0.clone()).await?;
562        // Update cached metrics atomics while we still hold the writer lock
563        {
564            let l0 = writer.l0_manager.get_current();
565            let l0_guard = l0.read();
566            self.db
567                .cached_l0_mutation_count
568                .store(l0_guard.mutation_count, Ordering::Relaxed);
569            self.db
570                .cached_l0_estimated_size
571                .store(l0_guard.estimated_size, Ordering::Relaxed);
572        }
573        self.db.cached_wal_lsn.store(wal_lsn, Ordering::Relaxed);
574        let version = writer.l0_manager.get_current().read().current_version;
575        drop(writer);
576
577        self.completed = true;
578
579        let duration = self.start_time.elapsed();
580        tracing::Span::current().record("duration_ms", duration.as_millis());
581        metrics::histogram!("uni_transaction_duration_seconds").record(duration.as_secs_f64());
582        metrics::counter!("uni_transaction_commits_total").increment(1);
583
584        // Best-effort rule promotion: promote new rules from tx → session
585        let mut rule_promotion_errors = Vec::new();
586        let rules_promoted = {
587            match (
588                self.rule_registry.read(),
589                self.session_rule_registry.write(),
590            ) {
591                (Ok(tx_reg), Ok(mut session_reg)) => {
592                    let mut promoted = 0;
593                    for (name, rule) in &tx_reg.rules {
594                        if !session_reg.rules.contains_key(name) {
595                            session_reg.rules.insert(name.clone(), rule.clone());
596                            promoted += 1;
597                        }
598                    }
599                    promoted
600                }
601                (Err(e), _) => {
602                    rule_promotion_errors.push(RulePromotionError {
603                        rule_text: "<all>".into(),
604                        error: format!("tx rule registry lock poisoned: {e}"),
605                    });
606                    0
607                }
608                (_, Err(e)) => {
609                    rule_promotion_errors.push(RulePromotionError {
610                        rule_text: "<all>".into(),
611                        error: format!("session rule registry lock poisoned: {e}"),
612                    });
613                    0
614                }
615            }
616        };
617
618        // Release write guard
619        self.session_write_guard.store(false, Ordering::SeqCst);
620
621        // Increment session-level commit counter
622        self.session_metrics
623            .transactions_committed
624            .fetch_add(1, Ordering::Relaxed);
625        self.db.total_commits.fetch_add(1, Ordering::Relaxed);
626
627        let commit_result = CommitResult {
628            mutations_committed: mutations,
629            rules_promoted,
630            version,
631            started_at_version: self.started_at_version,
632            wal_lsn,
633            duration,
634            rule_promotion_errors,
635        };
636
637        // Broadcast commit notification (ignore send error — no receivers is fine)
638        let notif = crate::api::notifications::CommitNotification {
639            version,
640            mutation_count: mutations,
641            labels_affected,
642            edge_types_affected,
643            rules_promoted,
644            timestamp: chrono::Utc::now(),
645            tx_id: self.id.clone(),
646            session_id: self.session_id.clone(),
647            causal_version: self.started_at_version,
648        };
649        let _ = self.db.commit_tx.send(Arc::new(notif));
650
651        // Run after-commit hooks (infallible — panics caught and logged)
652        if !self.hooks.is_empty() {
653            let ctx = crate::api::hooks::CommitHookContext {
654                session_id: self.session_id.clone(),
655                tx_id: self.id.clone(),
656                mutation_count: mutations,
657            };
658            for hook in &self.hooks {
659                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
660                    hook.after_commit(&ctx, &commit_result);
661                }));
662                if let Err(e) = result {
663                    tracing::error!("after_commit hook panicked: {:?}", e);
664                }
665            }
666        }
667
668        info!("Transaction committed");
669
670        Ok(commit_result)
671    }
672
673    /// Rollback the transaction, discarding all changes.
674    ///
675    /// No writer lock needed — the private L0 is simply dropped. This method
676    /// is infallible and synchronous. If the transaction is already completed,
677    /// this is a silent no-op (idempotent).
678    pub fn rollback(mut self) {
679        if self.completed {
680            return;
681        }
682        self.completed = true;
683
684        // Release write guard
685        self.session_write_guard.store(false, Ordering::SeqCst);
686
687        let duration = self.start_time.elapsed();
688        metrics::histogram!("uni_transaction_duration_seconds").record(duration.as_secs_f64());
689        metrics::counter!("uni_transaction_rollbacks_total").increment(1);
690
691        // Increment session-level rollback counter
692        self.session_metrics
693            .transactions_rolled_back
694            .fetch_add(1, Ordering::Relaxed);
695
696        info!("Transaction rolled back");
697    }
698
699    /// Check if the transaction has uncommitted changes.
700    pub fn is_dirty(&self) -> bool {
701        self.tx_l0.read().mutation_count > 0
702    }
703
704    /// Get the transaction ID.
705    pub fn id(&self) -> &str {
706        &self.id
707    }
708
709    /// Database version when this transaction was started.
710    pub fn started_at_version(&self) -> u64 {
711        self.started_at_version
712    }
713
714    /// Cancel all in-flight queries in this transaction.
715    #[instrument(skip(self), fields(transaction_id = %self.id))]
716    pub fn cancel(&self) {
717        self.cancellation_token.cancel();
718    }
719
720    /// Get a clone of this transaction's cancellation token.
721    pub fn cancellation_token(&self) -> CancellationToken {
722        self.cancellation_token.clone()
723    }
724
725    /// Snapshot the current L0 mutation count and stats for before/after comparison.
726    fn snapshot_l0(&self) -> L0Snapshot {
727        let l0 = self.tx_l0.read();
728        L0Snapshot {
729            mutation_count: l0.mutation_count,
730            mutation_stats: l0.mutation_stats.clone(),
731        }
732    }
733
734    /// Compute an `ExecuteResult` by comparing L0 snapshots before and after a query.
735    fn compute_execute_result(
736        before: &L0Snapshot,
737        after: &L0Snapshot,
738        result: &QueryResult,
739    ) -> ExecuteResult {
740        let affected_rows = if result.is_empty() {
741            after.mutation_count.saturating_sub(before.mutation_count)
742        } else {
743            result.len()
744        };
745        let diff = after.mutation_stats.diff(&before.mutation_stats);
746        ExecuteResult::with_details(affected_rows, &diff, result.metrics().clone())
747    }
748
749    fn check_completed(&self) -> Result<()> {
750        if self.completed {
751            return Err(UniError::TransactionAlreadyCompleted);
752        }
753        if let Some(deadline) = self.deadline
754            && Instant::now() > deadline
755        {
756            return Err(UniError::TransactionExpired {
757                tx_id: self.id.clone(),
758                hint: "Transaction exceeded its timeout. All operations are rejected. The transaction will auto-rollback on drop.",
759            });
760        }
761        Ok(())
762    }
763}
764
765impl Drop for Transaction {
766    fn drop(&mut self) {
767        if !self.completed {
768            if self.is_dirty() {
769                warn!(
770                    transaction_id = %self.id,
771                    "Transaction dropped with uncommitted writes — discarding private L0"
772                );
773            }
774            // No writer lock needed — the private L0 drops with the Transaction.
775            // Release write guard
776            self.session_write_guard.store(false, Ordering::SeqCst);
777        }
778    }
779}
780
781/// Builder for parameterized mutations within a transaction.
782///
783/// Created by [`Transaction::execute_with()`]. Chain `.param()` calls to bind
784/// parameters, then call `.run()` to execute and get an [`ExecuteResult`].
785pub struct ExecuteBuilder<'a> {
786    tx: &'a Transaction,
787    cypher: String,
788    params: HashMap<String, Value>,
789    timeout: Option<Duration>,
790}
791
792impl<'a> ExecuteBuilder<'a> {
793    /// Bind a parameter to the mutation.
794    pub fn param<K: Into<String>, V: Into<Value>>(mut self, key: K, value: V) -> Self {
795        self.params.insert(key.into(), value.into());
796        self
797    }
798
799    /// Bind multiple parameters from an iterator.
800    pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
801        for (k, v) in params {
802            self.params.insert(k.to_string(), v);
803        }
804        self
805    }
806
807    /// Set maximum execution time for this mutation.
808    pub fn timeout(mut self, duration: Duration) -> Self {
809        self.timeout = Some(duration);
810        self
811    }
812
813    /// Execute the mutation and return affected row count with detailed stats.
814    pub async fn run(self) -> Result<ExecuteResult> {
815        self.tx.check_completed()?;
816        let before = self.tx.snapshot_l0();
817        let fut = self.tx.db.execute_internal_with_tx_l0(
818            &self.cypher,
819            self.params,
820            self.tx.tx_l0.clone(),
821        );
822        let result = if let Some(t) = self.timeout {
823            tokio::time::timeout(t, fut)
824                .await
825                .map_err(|_| UniError::Timeout {
826                    timeout_ms: t.as_millis() as u64,
827                })??
828        } else {
829            fut.await?
830        };
831        let after = self.tx.snapshot_l0();
832        Ok(Transaction::compute_execute_result(
833            &before, &after, &result,
834        ))
835    }
836}
837
838/// Builder for parameterized queries within a transaction.
839pub struct TxQueryBuilder<'a> {
840    tx: &'a Transaction,
841    cypher: String,
842    params: HashMap<String, Value>,
843    cancellation_token: Option<CancellationToken>,
844    timeout: Option<Duration>,
845}
846
847impl<'a> TxQueryBuilder<'a> {
848    /// Bind a parameter to the mutation.
849    pub fn param(mut self, name: &str, value: impl Into<Value>) -> Self {
850        self.params.insert(name.to_string(), value.into());
851        self
852    }
853
854    /// Attach a cancellation token for cooperative query cancellation.
855    pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
856        self.cancellation_token = Some(token);
857        self
858    }
859
860    /// Set maximum execution time for this query.
861    pub fn timeout(mut self, duration: Duration) -> Self {
862        self.timeout = Some(duration);
863        self
864    }
865
866    /// Execute the mutation and return affected row count with detailed stats.
867    pub async fn execute(self) -> Result<ExecuteResult> {
868        self.tx.check_completed()?;
869        let before = self.tx.snapshot_l0();
870        let fut = self.tx.db.execute_internal_with_tx_l0(
871            &self.cypher,
872            self.params,
873            self.tx.tx_l0.clone(),
874        );
875        let result = if let Some(t) = self.timeout {
876            tokio::time::timeout(t, fut)
877                .await
878                .map_err(|_| UniError::Timeout {
879                    timeout_ms: t.as_millis() as u64,
880                })??
881        } else {
882            fut.await?
883        };
884        let after = self.tx.snapshot_l0();
885        Ok(Transaction::compute_execute_result(
886            &before, &after, &result,
887        ))
888    }
889
890    /// Execute as a query and return rows.
891    pub async fn fetch_all(self) -> Result<QueryResult> {
892        self.tx.check_completed()?;
893        let fut = self.tx.db.execute_internal_with_tx_l0(
894            &self.cypher,
895            self.params,
896            self.tx.tx_l0.clone(),
897        );
898        if let Some(t) = self.timeout {
899            tokio::time::timeout(t, fut)
900                .await
901                .map_err(|_| UniError::Timeout {
902                    timeout_ms: t.as_millis() as u64,
903                })?
904        } else {
905            fut.await
906        }
907    }
908
909    /// Execute the query and return the first row, or `None` if empty.
910    pub async fn fetch_one(self) -> Result<Option<Row>> {
911        let result = self.fetch_all().await?;
912        Ok(result.into_rows().into_iter().next())
913    }
914
915    /// Execute the query and return a cursor for streaming results.
916    pub async fn cursor(self) -> Result<QueryCursor> {
917        self.tx.check_completed()?;
918        self.tx
919            .db
920            .execute_cursor_internal_with_tx_l0(&self.cypher, self.params, self.tx.tx_l0.clone())
921            .await
922    }
923}
924
925/// Result of applying a `DerivedFactSet` to a transaction.
926#[derive(Debug)]
927pub struct ApplyResult {
928    /// Number of mutation queries replayed.
929    pub facts_applied: usize,
930    /// Number of versions that committed between DERIVE evaluation and apply.
931    /// 0 means the data was fresh.
932    pub version_gap: u64,
933}
934
935/// Builder for applying a `DerivedFactSet` with staleness controls.
936pub struct ApplyBuilder<'a> {
937    tx: &'a Transaction,
938    derived: DerivedFactSet,
939    require_fresh: bool,
940    max_version_gap: Option<u64>,
941}
942
943impl<'a> ApplyBuilder<'a> {
944    /// Require that no commits occurred between DERIVE evaluation and apply.
945    /// Returns `StaleDerivedFacts` if the version gap is > 0.
946    pub fn require_fresh(mut self) -> Self {
947        self.require_fresh = true;
948        self
949    }
950
951    /// Allow up to `n` versions of gap between evaluation and apply.
952    /// Returns `StaleDerivedFacts` if the gap exceeds `n`.
953    pub fn max_version_gap(mut self, n: u64) -> Self {
954        self.max_version_gap = Some(n);
955        self
956    }
957
958    /// Execute the apply operation.
959    pub async fn run(self) -> Result<ApplyResult> {
960        self.tx
961            .apply_internal(self.derived, self.require_fresh, self.max_version_gap)
962            .await
963    }
964}