Skip to main content

noxu_db/
transaction.rs

1//! Transaction handle for Noxu DB.
2//!
3
4use crate::durability::{Durability, SyncPolicy};
5use crate::environment::ActiveTxns;
6use crate::error::{NoxuError, Result};
7use crate::transaction_config::TransactionConfig;
8use noxu_dbi::{
9    AckWaitErrorKind, DatabaseId, EnvironmentImpl, ReplicaAckPolicyKind,
10    SharedReplicaAckCoordinator,
11};
12use noxu_log::LogManager;
13use noxu_sync::Mutex as SyncMutex;
14use noxu_txn::Txn;
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17
18/// Transaction state.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum TransactionState {
21    /// Transaction is open and can be used for operations.
22    Open,
23    /// Transaction has been prepared (XA two-phase commit phase 1).
24    ///
25    /// Locks are still held; the only valid transitions are
26    /// [`Transaction::resolved_commit_after_prepare`] and
27    /// [`Transaction::resolved_abort_after_prepare`].  Direct
28    /// `commit()` / `abort()` are protocol errors.
29    Prepared,
30    /// Transaction has been committed.
31    Committed,
32    /// Transaction has been aborted.
33    Aborted,
34    /// Transaction must be aborted (error occurred).
35    MustAbort,
36}
37
38/// A transaction handle.
39///
40///
41///
42/// Transaction handles are used to protect database operations.
43/// A single Transaction may be used for operations on multiple databases
44/// within the same environment.
45///
46/// Transaction handles are free-threaded; they may be used concurrently
47/// by multiple threads. Once committed or aborted, the handle must not
48/// be used for any further operations.
49///
50/// # Example
51/// ```no_run
52/// use noxu_db::{Environment, EnvironmentConfig};
53/// use std::path::PathBuf;
54///
55/// let config = EnvironmentConfig::new(PathBuf::from("/tmp/mydb"))
56///     .with_allow_create(true)
57///     .with_transactional(true);
58/// let env = Environment::open(config).unwrap();
59/// let txn = env.begin_transaction(None).unwrap();
60/// // ... do operations ...
61/// txn.commit().unwrap();
62/// ```
63pub struct Transaction {
64    /// Transaction ID
65    id: u64,
66    /// Current state
67    state: Mutex<TransactionState>,
68    /// When this transaction was created
69    start_time: Instant,
70    /// Whether this is read-only
71    read_only: bool,
72    /// Optional caller-supplied transaction name (JE
73    /// `Transaction.setName(String)`).
74    ///
75    /// The name is purely diagnostic: it is included in `Debug`
76    /// output and structured logs, and may be queried via
77    /// [`Transaction::get_name`].
78    /// (transaction-env F22 `setName/getName missing`).
79    name: Mutex<Option<String>>,
80    /// Durability override (None = use environment default)
81    durability: Option<Durability>,
82    /// Lock timeout in milliseconds (0 = use environment default)
83    lock_timeout_ms: Mutex<u64>,
84    /// Transaction timeout in milliseconds (0 = use environment default)
85    txn_timeout_ms: Mutex<u64>,
86    /// Write-ahead log manager (None when created outside of an Environment).
87    log_manager: Option<Arc<LogManager>>,
88    /// Internal transaction for lock management and write-set tracking.
89    ///
90    /// When `Some`, write operations on cursors acquire per-record write locks
91    /// via this `Txn` and record abort before-images.  On `abort()`, this `Txn`
92    /// releases all locks and collects `UndoRecord`s.
93    ///
94    /// Relationship between `Transaction` (public) and `Txn` (internal)
95    /// in the: `Transaction.txnImpl` field.
96    inner_txn: Option<Arc<Mutex<Txn>>>,
97    /// Reference to the owning `EnvironmentImpl`.
98    ///
99    /// Used by `abort()` to look up each modified database by ID and apply
100    /// undo records to the B-tree.
101    ///
102    /// which is used by `Txn.undoLNs()` to call
103    /// `EnvironmentImpl.getDatabase(dbId).abort(undoLsn, locker)`.
104    env_impl: Option<Arc<SyncMutex<EnvironmentImpl>>>,
105    /// Shared registry of active transactions on the owning
106    /// `Environment`.  When the transaction reaches a terminal state
107    /// (`commit`, `commit_with_durability`, or `abort`) we prune our
108    /// entry here so that `Environment::close()` can succeed.
109    ///
110    /// Resolves F1 of the May 2026 API audit.
111    active_txns: Option<Arc<ActiveTxns>>,
112    /// Optional replica-ack coordinator (typically a
113    /// `noxu_rep::ReplicatedEnvironment`).  When `Some`, a successful
114    /// `commit_with_durability` blocks until the configured
115    /// `ReplicaAckPolicy` is satisfied or the durability ack-timeout
116    /// elapses, in which case `NoxuError::InsufficientReplicas` is
117    /// returned.  Closes finding F1 of
118    /// `docs/src/internal/api-audit-2026-05-rep.md`.
119    replica_coordinator: Option<SharedReplicaAckCoordinator>,
120    /// Per-commit timeout for replica acknowledgments.  Default 5s; set
121    /// from the environment's `replica_ack_timeout_ms` (see
122    /// `EnvironmentConfig::replica_ack_timeout_ms`) when the
123    /// coordinator is installed.
124    replica_ack_timeout: std::time::Duration,
125
126    /// Callbacks to run when this transaction aborts.
127    ///
128    /// C-4 / JE 1-I: used to undo transactional database registrations
129    /// when `open_database(Some(txn), ...)` is followed by `txn.abort()`.
130    /// Each callback is a `Box<dyn FnOnce() + Send>` so it can capture
131    /// shared state without requiring the caller to hold locks.
132    abort_callbacks: Mutex<Vec<Box<dyn FnOnce() + Send>>>,
133
134    /// Callbacks to run when this transaction commits.
135    ///
136    /// C-4 / JE 1-I: used to finalise transactional database registrations
137    /// by moving the database name from `pending_names` to `name_map`.
138    commit_callbacks: Mutex<Vec<Box<dyn FnOnce() + Send>>>,
139}
140
141impl Transaction {
142    /// Create a new unconnected transaction handle.
143    ///
144    /// **Deprecated** — this constructor creates a transaction that is not
145    /// wired to a WAL, lock manager, or environment.  Commits and aborts
146    /// on such a transaction are no-ops.  Use
147    /// [`Environment::begin_transaction`][crate::environment::Environment::begin_transaction]
148    /// to obtain a fully operational handle.
149    ///
150    /// # Arguments
151    /// * `id` - Unique transaction ID
152    /// * `config` - Transaction configuration
153    #[deprecated(
154        since = "2.4.1",
155        note = "use Environment::begin_transaction() to obtain a fully wired transaction handle"
156    )]
157    pub fn new(id: u64, config: TransactionConfig) -> Self {
158        observe_gauge_inc!("noxu_db_active_transactions");
159        Self {
160            id,
161            state: Mutex::new(TransactionState::Open),
162            start_time: Instant::now(),
163            read_only: config.read_only,
164            name: Mutex::new(None),
165            durability: Some(config.durability),
166            lock_timeout_ms: Mutex::new(config.lock_timeout_ms),
167            txn_timeout_ms: Mutex::new(config.txn_timeout_ms),
168            log_manager: None,
169            inner_txn: None,
170            env_impl: None,
171            active_txns: None,
172            replica_coordinator: None,
173            replica_ack_timeout: std::time::Duration::from_secs(5),
174            abort_callbacks: Mutex::new(Vec::new()),
175            commit_callbacks: Mutex::new(Vec::new()),
176        }
177    }
178
179    /// Create a new transaction backed by a real WAL.
180    ///
181    /// Called by `Environment::begin_transaction()` to wire the transaction to
182    /// the environment's log manager so that commit/abort write WAL entries.
183    ///
184    /// **Internal** — this method is `pub` for cross-crate wiring within the
185    /// Noxu DB engine but is not part of the v3.0 stable surface.
186    /// `LogManager` is not re-exported by `noxu-db`; downstream callers
187    /// cannot use this method without adding an internal crate dependency.
188    pub fn with_log_manager(
189        id: u64,
190        config: TransactionConfig,
191        log_manager: Arc<LogManager>,
192    ) -> Self {
193        observe_gauge_inc!("noxu_db_active_transactions");
194        Self {
195            id,
196            state: Mutex::new(TransactionState::Open),
197            start_time: Instant::now(),
198            read_only: config.read_only,
199            name: Mutex::new(None),
200            durability: Some(config.durability),
201            lock_timeout_ms: Mutex::new(config.lock_timeout_ms),
202            txn_timeout_ms: Mutex::new(config.txn_timeout_ms),
203            log_manager: Some(log_manager),
204            inner_txn: None,
205            env_impl: None,
206            active_txns: None,
207            replica_coordinator: None,
208            replica_ack_timeout: std::time::Duration::from_secs(5),
209            abort_callbacks: Mutex::new(Vec::new()),
210            commit_callbacks: Mutex::new(Vec::new()),
211        }
212    }
213
214    /// Wires the `EnvironmentImpl` so that `abort()` can apply undo records.
215    ///
216    /// Called by `Environment::begin_transaction()` after constructing the
217    /// `Transaction`.
218    ///
219    /// **Internal** — `EnvironmentImpl` is not re-exported by `noxu-db`.
220    pub fn with_env_impl(
221        mut self,
222        env_impl: Arc<SyncMutex<EnvironmentImpl>>,
223    ) -> Self {
224        self.env_impl = Some(env_impl);
225        self
226    }
227
228    /// Sets the inner `Txn` for lock management and write-set tracking.
229    ///
230    /// Called by `Environment::begin_transaction()` to wire the transaction to
231    /// the environment's `TxnManager` / `LockManager`.
232    ///
233    /// **Internal** — `noxu_txn::Txn` is not re-exported by `noxu-db`.
234    pub fn with_inner_txn(mut self, txn: Arc<Mutex<Txn>>) -> Self {
235        self.inner_txn = Some(txn);
236        self
237    }
238
239    /// Removes the inner `Txn` from the environment's `TxnManager` (its
240    /// `all_txns` map and the lock manager's locker-label map) — the
241    /// counterpart to `TxnManager::begin_txn`, which the explicit-transaction
242    /// commit/abort paths previously never called (review F-5).
243    ///
244    /// Without this, `TxnManager::all_txns` and the locker-label map grow
245    /// without bound for the process lifetime, `n_active_txns()` reports a
246    /// monotonically increasing (wrong) count, and `n_commits`/`n_aborts`
247    /// undercount. The inner `Txn`'s locker id (a separate id space from
248    /// `Transaction::id`) is the `all_txns` key, so we use it here.
249    ///
250    /// Lock discipline: the inner-txn lock is read in a tight scope and
251    /// released before the (separate) environment lock is taken, and both
252    /// commit/abort paths have already released any env lock by this point.
253    fn unregister_inner_txn(&self, committed: bool) {
254        let Some(inner) = self.inner_txn.as_ref() else {
255            return;
256        };
257        let inner_id = inner.lock().unwrap().id_as_locker();
258        if let Some(env) = self.env_impl.as_ref() {
259            let guard = env.lock();
260            let tm = guard.get_txn_manager();
261            if committed {
262                tm.commit_txn(inner_id);
263            } else {
264                tm.abort_txn(inner_id);
265            }
266        }
267    }
268
269    /// Wires the shared active-transactions registry so that `commit` /
270    /// `abort` can prune their own entry on completion.
271    ///
272    /// Resolves F1 of the May 2026 API audit.
273    pub(crate) fn with_active_txns(
274        mut self,
275        registry: Arc<ActiveTxns>,
276    ) -> Self {
277        self.active_txns = Some(registry);
278        self
279    }
280
281    /// Wires the replica-ack coordinator from the owning `Environment`.
282    ///
283    /// When set, a successful `commit_with_durability` blocks until the
284    /// configured `ReplicaAckPolicy` is satisfied or `replica_ack_timeout`
285    /// elapses, in which case `NoxuError::InsufficientReplicas` is
286    /// returned.
287    ///
288    /// Closes finding F1 of `docs/src/internal/api-audit-2026-05-rep.md`.
289    pub(crate) fn with_replica_coordinator(
290        mut self,
291        coord: SharedReplicaAckCoordinator,
292        ack_timeout: std::time::Duration,
293    ) -> Self {
294        self.replica_coordinator = Some(coord);
295        self.replica_ack_timeout = ack_timeout;
296        self
297    }
298
299    /// Returns a clone of the `Arc<Mutex<Txn>>` inner transaction, if any.
300    ///
301    /// Used by `Database::make_cursor_for_txn()` to wire the cursor to the
302    /// same `Txn` so that write operations lock via the transaction.
303    ///
304    /// **Internal** — `noxu_txn::Txn` is not re-exported by `noxu-db`.
305    pub fn get_inner_txn(&self) -> Option<Arc<Mutex<Txn>>> {
306        self.inner_txn.clone()
307    }
308
309    /// Register a callback to run when this transaction aborts.
310    ///
311    /// Used by `Environment::open_database()` to roll back a transactional
312    /// database creation if the owning transaction is aborted (C-4 / JE 1-I).
313    /// The callback is invoked from within `abort()`, after WAL writes but
314    /// before the outer state is marked `Aborted`.
315    pub fn register_abort_callback<F>(&self, f: F)
316    where
317        F: FnOnce() + Send + 'static,
318    {
319        self.abort_callbacks.lock().unwrap().push(Box::new(f));
320    }
321
322    /// Register a callback to run when this transaction commits.
323    ///
324    /// Used by `Environment::open_database()` to finalise a transactional
325    /// database creation when the owning transaction commits (C-4 / JE 1-I).
326    /// The callback is invoked from within `commit_with_durability()`, after
327    /// the WAL entry is written and locks are released.
328    pub fn register_commit_callback<F>(&self, f: F)
329    where
330        F: FnOnce() + Send + 'static,
331    {
332        self.commit_callbacks.lock().unwrap().push(Box::new(f));
333    }
334
335    /// Commit the transaction.
336    ///
337    /// All operations performed under this transaction are made durable
338    /// and visible to other transactions.
339    ///
340    /// # Errors
341    /// Returns an error if:
342    /// - The transaction is not in `Open` state.
343    /// - Writing the `TxnCommit` WAL entry fails (`EnvironmentFailure`
344    ///   with reason `LogWrite`, propagated from `write_txn_end`).
345    /// - The inner-`Txn` commit fails after the WAL entry has been
346    ///   fsynced (e.g. open cursors held against this transaction, or
347    ///   inner-state inconsistency surfaced by `check_state`). When
348    ///   this happens the transaction is still durably committed; the
349    ///   error is propagated so the caller can react to the leak.
350    pub fn commit(&self) -> Result<()> {
351        observe_span!("txn_commit", txn_id = self.id);
352        let _obs_timer = observe_timer_start!();
353        observe_counter!("noxu_db_operations_total", "op" => "commit");
354        let durability = self.durability.unwrap_or(Durability::COMMIT_SYNC);
355        let result = self.commit_with_durability(durability);
356        observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "commit");
357        result
358    }
359
360    /// Commit the transaction with specific durability.
361    ///
362    /// # Arguments
363    /// * `durability` - Durability settings for this commit
364    ///
365    /// # Errors
366    /// Returns an error if:
367    /// - The transaction is not in `Open` state.
368    /// - Writing the `TxnCommit` WAL entry fails (`EnvironmentFailure`
369    ///   with reason `LogWrite`, propagated from `write_txn_end`).
370    /// - The inner-`Txn` commit fails after the WAL entry has been
371    ///   fsynced (e.g. open cursors held against this transaction, or
372    ///   inner-state inconsistency surfaced by `check_state`). When
373    ///   this happens the transaction is still durably committed; the
374    ///   error is propagated so the caller can react to the leak.
375    pub fn commit_with_durability(&self, durability: Durability) -> Result<()> {
376        self.check_open()?;
377
378        // Write TxnCommit to the WAL before marking committed.
379        // Durability controls whether we fsync, flush, or just buffer.
380        if !self.read_only
381            && let Some(lm) = &self.log_manager
382        {
383            let (fsync, flush) = match durability.local_sync {
384                SyncPolicy::Sync => (true, true),
385                SyncPolicy::WriteNoSync => (false, true),
386                SyncPolicy::NoSync => (false, false),
387            };
388            self.write_txn_end(lm, true, fsync, flush)?;
389        }
390
391        // F1 (rep audit): wait for replica acknowledgments before returning
392        // success.  This wait happens AFTER the local WAL is durable but
393        // BEFORE the inner txn releases its locks, mirroring BDB-JE
394        // `Txn.preLogCommitHook` / `commit(Durability)` ordering: the
395        // master is durable locally and replicas are notified, but the
396        // commit only "returns" once `replica_ack` is satisfied.
397        //
398        // If no coordinator is wired (non-replicated env) or the policy
399        // is `None`, the wait is skipped.  Read-only commits never need
400        // replica acks.  Captured failure is propagated at the end of
401        // the function after lock release so the caller observes a
402        // typed `NoxuError::InsufficientReplicas` rather than a state
403        // leak.
404        let ack_err: Option<NoxuError> = if !self.read_only
405            && durability.replica_ack
406                != crate::durability::ReplicaAckPolicy::None
407            && let Some(coord) = &self.replica_coordinator
408        {
409            match coord.await_replica_acks(
410                durability.replica_ack.as_kind(),
411                self.replica_ack_timeout,
412            ) {
413                Ok(_received) => None,
414                Err(e) => match e.kind {
415                    AckWaitErrorKind::NotMaster => {
416                        Some(NoxuError::ReplicaWrite)
417                    }
418                    AckWaitErrorKind::Timeout | AckWaitErrorKind::Shutdown => {
419                        Some(NoxuError::InsufficientReplicas {
420                            required: e.needed,
421                            available: e.received,
422                        })
423                    }
424                },
425            }
426        } else {
427            None
428        };
429
430        // Apply cleaner write-path backpressure: if the log write rate exceeds
431        // the cleaner's capacity, sleep briefly to let cleaning catch up.
432        // Implements CleanerThrottle.getWriteDelay() path in Txn.commit().
433        // Extract the throttle Arc while holding the env lock, then
434        // drop the lock BEFORE sleeping to avoid blocking other threads.
435        if !self.read_only
436            && let Some(ref env) = self.env_impl
437        {
438            let throttle = env.lock().get_cleaner_throttle();
439            if let Some(delay) =
440                throttle.and_then(|t| t.should_throttle_writer())
441            {
442                std::thread::sleep(delay);
443            }
444        }
445
446        // Release per-record locks held by the inner Txn.
447        // The inner Txn has no log_manager so it won't write duplicate WAL records.
448        //
449        // At this point the commit is *durable* on disk: `write_txn_end`
450        // above has already fsynced the TxnCommit WAL entry, and recovery
451        // will replay this commit on the next environment open. The inner
452        // commit only releases `lock_manager` locks and flips the inner
453        // state Open → Committed; the data path mutations were applied to
454        // the in-memory tree at `db.put()` time.
455        //
456        // Possible failure modes for `inner.commit()`:
457        //   * `has_open_cursors`: a user bug — a cursor on this transaction
458        //     was not closed before `commit()`. The data is still
459        //     durably committed; the cursor's lifetime contract is
460        //     violated. Surfacing this lets the caller find the leak.
461        //   * `check_state` (state != Open): the inner txn was flipped to
462        //     `MustAbort` by the deadlock detector after our WAL fsync,
463        //     or somehow advanced to `Committed`/`Aborted` already. Both
464        //     indicate a state-machine inconsistency that needs to be
465        //     visible.
466        //
467        // Either way, we mark the outer state `Committed` first because
468        // the durable record says so — returning early before that would
469        // leave the outer in `Open`, and a retried `commit()` would
470        // append a *second* `TxnCommit` record to the WAL. We then
471        // propagate the inner error so the caller can react.
472        //
473        // The inner Txn's `commit_with_durability` now drains all
474        // read and write locks on every error return path, so a
475        // failed inner.commit() no longer leaks lock-manager entries
476        // until environment close. See `Txn::commit_with_durability`
477        // and `Txn::release_all_locks` in noxu-txn for the
478        // implementation of this guarantee.
479        let inner_err = if let Some(inner) = &self.inner_txn {
480            match inner.lock().unwrap().commit() {
481                Ok(_) => None,
482                Err(e) => {
483                    log::error!(
484                        "Transaction::commit_with_durability: inner txn \
485                         commit failed after WAL fsync (txn is durably \
486                         committed; lock_manager locks may be leaked): {e}"
487                    );
488                    Some(e)
489                }
490            }
491        } else {
492            None
493        };
494
495        let mut state = self.state.lock().unwrap();
496        *state = TransactionState::Committed;
497        drop(state);
498
499        // C-4 / JE 1-I: run commit callbacks (transactional database
500        // registration finalisation).
501        let callbacks: Vec<Box<dyn FnOnce() + Send>> =
502            std::mem::take(&mut *self.commit_callbacks.lock().unwrap());
503        for cb in callbacks {
504            cb();
505        }
506
507        // Prune our entry from the environment's active-txns registry so
508        // that `Environment::close()` can succeed (F1).  Decrement the
509        // active-transactions gauge here (rather than in `commit()`) so
510        // that callers of `commit_with_durability` directly are also
511        // accounted for (resolves F9 as a side effect).
512        if let Some(registry) = &self.active_txns {
513            registry.mark_complete(self.id);
514        }
515        // F-5: counterpart to begin_txn — remove the inner Txn from
516        // TxnManager (all_txns + locker label) to avoid an unbounded leak.
517        self.unregister_inner_txn(true);
518        observe_gauge_dec!("noxu_db_active_transactions");
519
520        if let Some(e) = inner_err {
521            return Err(NoxuError::from(e));
522        }
523        // F1: surface any replica-ack failure last, after the local
524        // commit has fully released locks.  The local commit is durable;
525        // returning this error tells the caller the durability policy
526        // was not satisfied so they can retry or rollback at the
527        // application layer.
528        if let Some(e) = ack_err {
529            return Err(e);
530        }
531        Ok(())
532    }
533
534    /// Abort the transaction.
535    ///
536    /// All operations performed under this transaction are rolled back.
537    ///
538    /// # Errors
539    /// Returns an error if:
540    /// - The transaction is already committed or aborted.
541    /// - Writing the `TxnAbort` WAL entry fails (`EnvironmentFailure`
542    ///   with reason `LogWrite`, propagated from `write_txn_end`).
543    ///   This path is taken only when the transaction is not read-only
544    ///   and a `LogManager` is configured on the environment.
545    pub fn abort(&self) -> Result<()> {
546        observe_span!("txn_abort", txn_id = self.id);
547        observe_counter!("noxu_db_operations_total", "op" => "abort");
548        {
549            let state = self.state.lock().unwrap();
550            match *state {
551                TransactionState::Committed => {
552                    return Err(NoxuError::OperationNotAllowed(
553                        "Cannot abort a committed transaction".to_string(),
554                    ));
555                }
556                TransactionState::Aborted => {
557                    return Err(NoxuError::OperationNotAllowed(
558                        "Transaction already aborted".to_string(),
559                    ));
560                }
561                TransactionState::Open | TransactionState::MustAbort => {}
562                TransactionState::Prepared => {
563                    return Err(NoxuError::OperationNotAllowed(
564                        "Cannot abort a prepared transaction directly; \
565                         use xa_rollback / resolved_abort_after_prepare"
566                            .to_string(),
567                    ));
568                }
569            }
570        }
571
572        // Write TxnAbort to WAL before marking aborted (no fsync needed).
573        if !self.read_only
574            && let Some(lm) = &self.log_manager
575        {
576            self.write_txn_end(lm, false, false, false)?;
577        }
578
579        // Apply undo records to the B-tree to restore before-images, then
580        // release write locks.  The two steps must happen in this order: while
581        // write locks are still held, no reader can observe the in-flight value;
582        // once release_all_locks() is called, blocked readers unblock and must
583        // already see the restored before-image.
584        if let Some(inner) = &self.inner_txn {
585            // Phase 1: collect undo records without releasing write locks.
586            let mut undo_records =
587                inner.lock().unwrap().abort_collect_undo().unwrap_or_default();
588
589            // Apply undo in reverse-operation order (newest LSN first).
590            //
591            // The in-memory write-lock map is a HashMap (no order), so the
592            // raw `undo_records` order is non-deterministic.  When the same
593            // key is touched multiple times in one txn (e.g. delete →
594            // re-insert in SR9465), the undo records carry conflicting
595            // intents:
596            //   - DELETE undo  (abort_data=orig)        : restore the slot
597            //   - INSERT undo  (abort_known_deleted=t)  : remove the slot
598            // Applying these in arbitrary order can leave the tree in either
599            // "correct" or "slot deleted" depending on iteration luck.
600            //
601            // The recovery path's backward log scan already applies undo
602            // newest-first; we mirror that here so the in-memory abort and
603            // crash-recovery undo are observationally identical.  Sorting by
604            // `current_lsn` descending is sufficient because LSNs are
605            // monotonic per-WAL-write.
606            undo_records.sort_by_key(|r| std::cmp::Reverse(r.current_lsn));
607
608            // Phase 2: apply undo to the B-tree (write locks still held).
609            //
610            // H-1 (audit-2026-05-keith.md F-2.2): acquire env lock only for
611            // the fast database-handle lookup, then drop it immediately.
612            // This prevents the entire abort undo loop from serialising all
613            // concurrent readers/writers against the EnvironmentImpl mutex.
614            //
615            // Algorithm:
616            //   a) Collect unique database IDs referenced by the undo set.
617            //   b) For each ID, briefly lock env, clone the Arc<RwLock<DatabaseImpl>>,
618            //      and immediately release the env lock.
619            //   c) Apply all undo records without ever holding the env lock.
620            //
621            // Safety: the database Arcs are ref-counted; even if a concurrent
622            // `env.remove_database()` call drops the EnvironmentImpl's own Arc,
623            // our cloned Arc keeps the DatabaseImpl alive for the duration of
624            // the undo loop.
625            if let Some(env) = &self.env_impl {
626                // Step (a+b): collect database handles with minimal lock hold time.
627                use std::collections::HashMap;
628                let mut db_handles: HashMap<
629                    i64,
630                    Arc<noxu_sync::RwLock<noxu_dbi::DatabaseImpl>>,
631                > = HashMap::new();
632                for undo in &undo_records {
633                    let db_id_raw = undo.database_id as i64;
634                    if db_handles.contains_key(&db_id_raw) {
635                        continue;
636                    }
637                    // Brief env lock: lookup only.
638                    let guard = env.lock();
639                    if let Some(arc) =
640                        guard.get_database_by_id(DatabaseId::new(db_id_raw))
641                    {
642                        db_handles.insert(db_id_raw, arc);
643                    }
644                    // env lock released here — drop(guard) implicit at end of block
645                }
646
647                // Step (c): apply undo records without holding env lock.
648                for undo in undo_records {
649                    let Some(abort_key) = undo.abort_key else { continue };
650                    let db_id_raw = undo.database_id as i64;
651                    let Some(db_arc) = db_handles.get(&db_id_raw) else {
652                        continue;
653                    };
654                    let db_guard = db_arc.read();
655                    if let Some(tree) = db_guard.get_real_tree() {
656                        if undo.abort_known_deleted {
657                            if tree.delete(&abort_key) {
658                                db_guard.decrement_entry_count();
659                            }
660                        } else if let Some(abort_data) = undo.abort_data {
661                            let lsn = noxu_util::Lsn::from_u64(undo.abort_lsn);
662                            if let Ok(is_new) =
663                                tree.insert(abort_key, abort_data, lsn)
664                                && is_new
665                            {
666                                // Restoring a slot that the aborted txn had
667                                // deleted: the in-memory delete already
668                                // decremented the counter, so the restore
669                                // must re-bump it.
670                                db_guard.increment_entry_count();
671                            }
672                        }
673                    }
674                }
675            }
676
677            // Phase 3: release write locks — blocked readers now unblock and
678            // see the restored before-image.
679            inner.lock().unwrap().release_all_locks();
680        }
681
682        let mut state = self.state.lock().unwrap();
683        *state = TransactionState::Aborted;
684        drop(state);
685
686        // C-4 / JE 1-I: run abort callbacks (transactional database
687        // registration rollback).
688        let callbacks: Vec<Box<dyn FnOnce() + Send>> =
689            std::mem::take(&mut *self.abort_callbacks.lock().unwrap());
690        for cb in callbacks {
691            cb();
692        }
693
694        // Prune our entry from the environment's active-txns registry so
695        // that `Environment::close()` can succeed (F1).
696        if let Some(registry) = &self.active_txns {
697            registry.mark_complete(self.id);
698        }
699        // F-5: counterpart to begin_txn — remove the inner Txn from
700        // TxnManager (all_txns + locker label) to avoid an unbounded leak.
701        self.unregister_inner_txn(false);
702        observe_gauge_dec!("noxu_db_active_transactions");
703        Ok(())
704    }
705
706    /// Prepares the transaction for the second phase of XA two-phase
707    /// commit.
708    ///
709    /// Implements the crash-durable contract introduced in wave 3-2:
710    ///
711    /// 1. Writes a `TxnPrepare` WAL frame containing the txn id, the
712    ///    first / last LSN logged by this transaction, and the supplied
713    ///    XID components (format_id, gtrid, bqual).  The frame is
714    ///    fsynced before this method returns, so a crash immediately
715    ///    afterwards still allows recovery to resurrect the prepared
716    ///    state.
717    /// 2. Marks the inner `Txn` as PREPARED — direct `commit()` and
718    ///    `abort()` calls now return `OperationNotAllowed`; only
719    ///    `resolved_commit_after_prepare` and
720    ///    `resolved_abort_after_prepare` may finalise the transaction.
721    /// 3. Locks are RETAINED — prepared transactions hold every lock
722    ///    until xa_commit / xa_rollback so concurrent readers cannot
723    ///    observe in-flight state.
724    /// 4. The persistent prepared-log entry (the `noxu-xa::PreparedLog`
725    ///    XID -> timestamp record) is the responsibility of the XA
726    ///    layer and is written *after* this method returns.  The WAL
727    ///    `TxnPrepare` frame is the source of truth for crash
728    ///    durability; the prepared-log database is a convenience for
729    ///    operators inspecting in-doubt XIDs without scanning the WAL.
730    ///
731    /// Returns `Ok(())` on success.  Read-only transactions (no LN
732    /// frames written) still take a code path here so the inner Txn
733    /// flips to PREPARED, but no `TxnPrepare` frame is emitted — the
734    /// XA layer should take its `PrepareResult::ReadOnly` shortcut
735    /// rather than calling this method on read-only branches.
736    ///
737    /// # Errors
738    /// * `OperationNotAllowed` if the transaction is not Open.
739    /// * `EnvironmentFailure { reason: LogWrite }` if the WAL write or
740    ///   fsync fails.
741    pub fn prepare(
742        &self,
743        xid_format_id: i32,
744        xid_gtrid: &[u8],
745        xid_bqual: &[u8],
746    ) -> Result<()> {
747        self.check_open()?;
748
749        // Capture first / last LSN from the inner Txn so the recovery
750        // code can chain them.  For read-only branches both are
751        // NULL_LSN, in which case we skip writing the frame entirely
752        // (the prepared XID will appear in the persistent prepared-log
753        // database but recovery has nothing to do for it).
754        let (first_lsn, last_lsn) = match &self.inner_txn {
755            Some(inner) => {
756                let g = inner.lock().unwrap();
757                (g.first_lsn(), g.last_lsn())
758            }
759            None => {
760                (noxu_util::NULL_LSN.as_u64(), noxu_util::NULL_LSN.as_u64())
761            }
762        };
763
764        // Write the durable TxnPrepare frame.  Skipped for read-only
765        // txns (no inner Txn or no LN frames) to avoid recording an
766        // empty prepare that recovery would have nothing to do with.
767        if !self.read_only
768            && let Some(lm) = &self.log_manager
769            && first_lsn != noxu_util::NULL_LSN.as_u64()
770        {
771            self.write_txn_prepare(
772                lm,
773                first_lsn,
774                last_lsn,
775                xid_format_id,
776                xid_gtrid,
777                xid_bqual,
778            )?;
779        }
780
781        // Flip the inner Txn into PREPARED state so direct
782        // `inner.commit()` / `inner.abort()` are protocol errors.
783        // The inner Txn has no `log_manager` of its own (the outer
784        // Transaction owns the only LM reference), so its `prepare`
785        // call is a pure flag-flip; it does not write a duplicate
786        // TxnPrepare frame.
787        if let Some(inner) = &self.inner_txn {
788            inner
789                .lock()
790                .unwrap()
791                .prepare(xid_format_id, xid_gtrid.to_vec(), xid_bqual.to_vec())
792                .map_err(NoxuError::from)?;
793        }
794
795        let mut state = self.state.lock().unwrap();
796        *state = TransactionState::Prepared;
797        Ok(())
798    }
799
800    /// Resolves a prepared transaction with a commit.
801    ///
802    /// Used by the XA `xa_commit` path.  Bypasses the
803    /// `TransactionState::Prepared` guard in `commit_with_durability`
804    /// because the prepare already established the commit decision.
805    ///
806    /// Steps:
807    /// 1. Verifies the txn is Prepared.
808    /// 2. Writes a `TxnCommit` WAL frame (mirrors `commit()`).
809    /// 3. Releases the inner Txn's locks via `resolved_commit_after_prepare`.
810    /// 4. Transitions state to Committed and prunes from the active-txns
811    ///    registry.
812    pub fn resolved_commit_after_prepare(&self) -> Result<()> {
813        {
814            let state = self.state.lock().unwrap();
815            if !matches!(*state, TransactionState::Prepared) {
816                return Err(NoxuError::OperationNotAllowed(format!(
817                    "resolved_commit_after_prepare: expected Prepared, got {:?}",
818                    *state
819                )));
820            }
821        }
822
823        // Write the TxnCommit frame.
824        if !self.read_only
825            && let Some(lm) = &self.log_manager
826        {
827            self.write_txn_end(lm, true /* is_commit */, true, true)?;
828        }
829
830        // Inner-side resolution: clear IS_PREPARED and run the standard
831        // commit path (which releases locks and flips inner state).
832        if let Some(inner) = &self.inner_txn {
833            inner
834                .lock()
835                .unwrap()
836                .resolved_commit_after_prepare()
837                .map_err(NoxuError::from)?;
838        }
839
840        let mut state = self.state.lock().unwrap();
841        *state = TransactionState::Committed;
842        drop(state);
843        // Run commit callbacks (e.g. transactional database registration).
844        let cbs: Vec<Box<dyn FnOnce() + Send>> =
845            std::mem::take(&mut *self.commit_callbacks.lock().unwrap());
846        for cb in cbs {
847            cb();
848        }
849        if let Some(registry) = &self.active_txns {
850            registry.mark_complete(self.id);
851        }
852        // F-5: counterpart to begin_txn — remove the inner Txn from
853        // TxnManager (all_txns + locker label) to avoid an unbounded leak.
854        self.unregister_inner_txn(true);
855        observe_gauge_dec!("noxu_db_active_transactions");
856        Ok(())
857    }
858
859    /// Resolves a prepared transaction with an abort.
860    pub fn resolved_abort_after_prepare(&self) -> Result<()> {
861        {
862            let state = self.state.lock().unwrap();
863            if !matches!(*state, TransactionState::Prepared) {
864                return Err(NoxuError::OperationNotAllowed(format!(
865                    "resolved_abort_after_prepare: expected Prepared, got {:?}",
866                    *state
867                )));
868            }
869        }
870
871        if !self.read_only
872            && let Some(lm) = &self.log_manager
873        {
874            self.write_txn_end(lm, false /* is_commit */, false, false)?;
875        }
876
877        // Apply undo records to the B-tree to restore before-images, then
878        // release write locks.  Same 3-phase ordering as `Transaction::abort()`:
879        // collect undo → apply → release locks.  See the matching comment
880        // in `Transaction::abort` for the rationale (no reader sees the
881        // in-flight value until the before-image is back in the tree).
882        if let Some(inner) = &self.inner_txn {
883            // First, clear the IS_PREPARED flag on the inner Txn so that
884            // `abort_collect_undo()` (which calls into `Txn::abort`) does
885            // not refuse with InvalidTransaction { state: PREPARED }.
886            // We do this via the inner's resolved-abort path, which
887            // performs `txn_flags &= !IS_PREPARED` and then runs `abort()`.
888            // Unfortunately that consumes the locks; we want the
889            // pre-release behaviour instead, so flip the flag manually
890            // and then run abort_collect_undo.
891            let mut undo_records = {
892                let mut g = inner.lock().unwrap();
893                // Undo the IS_PREPARED flag so abort_collect_undo doesn't
894                // refuse.  Inner state is still Open at this point.
895                g.clear_prepared_flag();
896                g.abort_collect_undo().unwrap_or_default()
897            };
898
899            // See `abort()` above: undo must be applied newest-LSN first so
900            // that delete-then-reinsert sequences in the same txn are
901            // unwound in reverse-operation order, matching the recovery
902            // path's backward log scan.
903            undo_records.sort_by_key(|r| std::cmp::Reverse(r.current_lsn));
904
905            if let Some(env) = &self.env_impl {
906                let env_guard = env.lock();
907                for undo in undo_records {
908                    let Some(abort_key) = undo.abort_key else { continue };
909                    let db_id =
910                        noxu_dbi::DatabaseId::new(undo.database_id as i64);
911                    let Some(db_arc) = env_guard.get_database_by_id(db_id)
912                    else {
913                        continue;
914                    };
915                    let db_guard = db_arc.read();
916                    if let Some(tree) = db_guard.get_real_tree() {
917                        if undo.abort_known_deleted {
918                            if tree.delete(&abort_key) {
919                                db_guard.decrement_entry_count();
920                            }
921                        } else if let Some(abort_data) = undo.abort_data {
922                            let lsn = noxu_util::Lsn::from_u64(undo.abort_lsn);
923                            if let Ok(is_new) =
924                                tree.insert(abort_key, abort_data, lsn)
925                                && is_new
926                            {
927                                db_guard.increment_entry_count();
928                            }
929                        }
930                    }
931                }
932            }
933
934            inner.lock().unwrap().release_all_locks();
935        }
936
937        let mut state = self.state.lock().unwrap();
938        *state = TransactionState::Aborted;
939        drop(state);
940        // Run abort callbacks (e.g. transactional database registration rollback).
941        let cbs: Vec<Box<dyn FnOnce() + Send>> =
942            std::mem::take(&mut *self.abort_callbacks.lock().unwrap());
943        for cb in cbs {
944            cb();
945        }
946        if let Some(registry) = &self.active_txns {
947            registry.mark_complete(self.id);
948        }
949        // F-5: counterpart to begin_txn — remove the inner Txn from
950        // TxnManager (all_txns + locker label) to avoid an unbounded leak.
951        self.unregister_inner_txn(false);
952        observe_gauge_dec!("noxu_db_active_transactions");
953        Ok(())
954    }
955
956    /// Serializes a `TxnPrepareEntry` and writes it to the WAL with fsync.
957    fn write_txn_prepare(
958        &self,
959        lm: &LogManager,
960        first_lsn: u64,
961        last_lsn: u64,
962        xid_format_id: i32,
963        xid_gtrid: &[u8],
964        xid_bqual: &[u8],
965    ) -> Result<()> {
966        use noxu_log::{LogEntryType, Provisional, entry::TxnPrepareEntry};
967
968        let timestamp_ms = std::time::SystemTime::now()
969            .duration_since(std::time::UNIX_EPOCH)
970            .unwrap_or_default()
971            .as_millis() as u64;
972
973        let entry = TxnPrepareEntry::new(
974            self.id as i64,
975            timestamp_ms,
976            first_lsn,
977            last_lsn,
978            xid_format_id,
979            xid_gtrid.to_vec(),
980            xid_bqual.to_vec(),
981        )
982        .map_err(|e| {
983            NoxuError::environment_with_reason(
984                crate::error::EnvironmentFailureReason::LogWrite,
985                format!("prepare entry encode: {e}"),
986            )
987        })?;
988
989        let mut buf = Vec::with_capacity(entry.log_size());
990        entry.write_to_log(&mut buf);
991
992        // fsync=true, flush=true: prepare must be durable before
993        // returning so a subsequent crash sees the prepare frame.
994        lm.log(LogEntryType::TxnPrepare, &buf, Provisional::No, true, true)
995            .map(|_| ())
996            .map_err(|e| {
997                NoxuError::environment_with_reason(
998                    crate::error::EnvironmentFailureReason::LogWrite,
999                    e.to_string(),
1000                )
1001            })
1002    }
1003
1004    /// Serializes a TxnCommit or TxnAbort entry and writes it to `lm`.
1005    fn write_txn_end(
1006        &self,
1007        lm: &LogManager,
1008        is_commit: bool,
1009        fsync: bool,
1010        flush: bool,
1011    ) -> Result<()> {
1012        use bytes::BytesMut;
1013        use noxu_log::{LogEntryType, Provisional, entry::TxnEndEntry};
1014        use noxu_util::{lsn::NULL_LSN, vlsn::NULL_VLSN};
1015
1016        let timestamp = std::time::SystemTime::now()
1017            .duration_since(std::time::UNIX_EPOCH)
1018            .unwrap_or_default()
1019            .as_millis() as u64;
1020
1021        let entry = if is_commit {
1022            TxnEndEntry::new_commit(
1023                self.id as i64,
1024                NULL_LSN,
1025                timestamp,
1026                0,
1027                NULL_VLSN,
1028            )
1029        } else {
1030            TxnEndEntry::new_abort(
1031                self.id as i64,
1032                NULL_LSN,
1033                timestamp,
1034                0,
1035                NULL_VLSN,
1036            )
1037        };
1038
1039        let entry_type = if is_commit {
1040            LogEntryType::TxnCommit
1041        } else {
1042            LogEntryType::TxnAbort
1043        };
1044
1045        let mut buf = BytesMut::with_capacity(entry.log_size());
1046        entry.write_to_log(&mut buf);
1047
1048        lm.log(entry_type, &buf, Provisional::No, flush, fsync)
1049            .map(|_| ())
1050            .map_err(|e| {
1051                NoxuError::environment_with_reason(
1052                    crate::error::EnvironmentFailureReason::LogWrite,
1053                    e.to_string(),
1054                )
1055            })
1056    }
1057
1058    /// Get the transaction ID.
1059    pub fn get_id(&self) -> u64 {
1060        self.id
1061    }
1062
1063    /// Set the human-readable name of this transaction.
1064    ///
1065    /// Mirrors `Transaction.setName(String)`.  The name is purely
1066    /// diagnostic — it appears in `Debug` output, structured log
1067    /// records, and lock-conflict reports.
1068    /// (transaction-env F22).
1069    pub fn set_name<S: Into<String>>(&self, name: S) {
1070        *self.name.lock().unwrap() = Some(name.into());
1071    }
1072
1073    /// Returns the caller-supplied transaction name, if any.
1074    ///
1075    /// Mirrors `Transaction.getName()`.
1076    pub fn get_name(&self) -> Option<String> {
1077        self.name.lock().unwrap().clone()
1078    }
1079
1080    /// Returns the number of locks currently held by this transaction.
1081    ///
1082    /// Mirrors `Transaction.getLockStat()` / `Transaction.getNumWriteLocks() +
1083    /// getNumReadLocks()` (the JE API exposes both counts; we return
1084    /// the sum because the lock manager partitions reads / writes per
1085    /// LSN rather than per record).  Returns `0` for transactions that
1086    /// have not acquired any locks (or for read-only transactions
1087    /// running with read-uncommitted isolation, which skip lock
1088    /// acquisition entirely).
1089    /// (transaction-env F23 "lock-stat reporting missing").
1090    pub fn lock_count(&self) -> usize {
1091        match &self.inner_txn {
1092            Some(txn) => {
1093                let g = txn.lock().unwrap();
1094                g.read_lock_count() + g.write_lock_count()
1095            }
1096            None => 0,
1097        }
1098    }
1099
1100    /// Returns `(read_lock_count, write_lock_count)` for this
1101    /// transaction's lock set.
1102    ///
1103    /// Mirrors JE's `Transaction.getNumReadLocks()` /
1104    /// `getNumWriteLocks()` accessors.  Returns `(0, 0)` for a
1105    /// transaction that has not acquired any locks.
1106    pub fn lock_counts(&self) -> (usize, usize) {
1107        match &self.inner_txn {
1108            Some(txn) => {
1109                let g = txn.lock().unwrap();
1110                (g.read_lock_count(), g.write_lock_count())
1111            }
1112            None => (0, 0),
1113        }
1114    }
1115
1116    /// Get the current transaction state.
1117    pub fn get_state(&self) -> TransactionState {
1118        *self.state.lock().unwrap()
1119    }
1120
1121    /// Check if the transaction is valid (in Open state).
1122    pub fn is_valid(&self) -> bool {
1123        matches!(self.get_state(), TransactionState::Open)
1124    }
1125
1126    /// Set the lock timeout for this transaction.
1127    ///
1128    /// # Arguments
1129    /// * `timeout_ms` - Lock timeout in milliseconds (0 = use environment default)
1130    pub fn set_lock_timeout(&self, timeout_ms: u64) {
1131        *self.lock_timeout_ms.lock().unwrap() = timeout_ms;
1132    }
1133
1134    /// Get the lock timeout for this transaction.
1135    pub fn get_lock_timeout(&self) -> u64 {
1136        *self.lock_timeout_ms.lock().unwrap()
1137    }
1138
1139    /// Set the transaction timeout.
1140    ///
1141    /// # Arguments
1142    /// * `timeout_ms` - Transaction timeout in milliseconds (0 = use environment default)
1143    pub fn set_txn_timeout(&self, timeout_ms: u64) {
1144        *self.txn_timeout_ms.lock().unwrap() = timeout_ms;
1145    }
1146
1147    /// Get the transaction timeout for this transaction.
1148    pub fn get_txn_timeout(&self) -> u64 {
1149        *self.txn_timeout_ms.lock().unwrap()
1150    }
1151
1152    /// Get the durability setting for this transaction.
1153    pub fn get_durability(&self) -> Option<Durability> {
1154        self.durability
1155    }
1156
1157    /// Check if this is a read-only transaction.
1158    pub fn is_read_only(&self) -> bool {
1159        self.read_only
1160    }
1161
1162    /// Get the elapsed time since transaction start.
1163    pub fn elapsed(&self) -> std::time::Duration {
1164        self.start_time.elapsed()
1165    }
1166
1167    /// Check that the transaction is in Open state.
1168    ///
1169    /// # Errors
1170    /// Returns error if the transaction is not Open.
1171    fn check_open(&self) -> Result<()> {
1172        let state = self.get_state();
1173        match state {
1174            TransactionState::Open => Ok(()),
1175            TransactionState::Prepared => Err(NoxuError::OperationNotAllowed(
1176                "Transaction has been prepared; use xa_commit / xa_rollback"
1177                    .to_string(),
1178            )),
1179            TransactionState::Committed => Err(NoxuError::OperationNotAllowed(
1180                "Transaction has been committed".to_string(),
1181            )),
1182            TransactionState::Aborted => Err(NoxuError::OperationNotAllowed(
1183                "Transaction has been aborted".to_string(),
1184            )),
1185            TransactionState::MustAbort => Err(NoxuError::OperationNotAllowed(
1186                "Transaction must be aborted due to previous error".to_string(),
1187            )),
1188        }
1189    }
1190}
1191
1192impl Drop for Transaction {
1193    fn drop(&mut self) {
1194        // Audit transaction-env F10 (Wave 2C-4): if the txn is still in
1195        // a non-terminal state at drop time, perform an actual abort
1196        // (release locks, apply undo, prune from active-txn registry,
1197        // decrement gauge) instead of just logging a warning.
1198        let state = *self.state.lock().unwrap();
1199        if matches!(state, TransactionState::Open | TransactionState::MustAbort)
1200        {
1201            log::warn!(
1202                "Transaction {} dropped without commit or abort, \
1203                 implicitly aborting",
1204                self.id
1205            );
1206            // Best-effort abort.  Errors are swallowed because Drop
1207            // cannot return Result; any failure (e.g., WAL write error)
1208            // is still observable through the abort path's logging.
1209            if let Err(e) = self.abort() {
1210                log::error!(
1211                    "Transaction {} implicit abort on drop failed: {e}",
1212                    self.id,
1213                );
1214            }
1215        } else if matches!(state, TransactionState::Prepared) {
1216            // Prepared txns dropped without resolution simulate a crash
1217            // — the durable TxnPrepare frame on disk is recovered on the
1218            // next environment open, where xa_recover() will surface the
1219            // XID for resolution.  This is intentional and supports the
1220            // crash-durable XA contract introduced in wave 3-2.
1221            log::info!(
1222                "Transaction {} dropped while prepared; XID will be \
1223                 surfaced via xa_recover() on next open",
1224                self.id
1225            );
1226        }
1227    }
1228}
1229
1230#[cfg(test)]
1231#[allow(deprecated)] // tests use Transaction::new directly to test state-machine logic in isolation
1232mod tests {
1233    use super::*;
1234
1235    #[test]
1236    fn test_new_transaction() {
1237        let config = TransactionConfig::default();
1238        let txn = Transaction::new(1, config);
1239        assert_eq!(txn.get_id(), 1);
1240        assert_eq!(txn.get_state(), TransactionState::Open);
1241        assert!(txn.is_valid());
1242        assert!(!txn.is_read_only());
1243    }
1244
1245    /// `set_name` / `get_name`
1246    /// round-trip and survives commit (the JE shape stays valid until
1247    /// the txn is dropped).
1248    #[test]
1249    fn test_set_name_get_name_round_trip() {
1250        let config = TransactionConfig::default();
1251        let txn = Transaction::new(1, config);
1252        assert_eq!(txn.get_name(), None);
1253
1254        txn.set_name("workload-import");
1255        assert_eq!(txn.get_name().as_deref(), Some("workload-import"));
1256
1257        // Setting again replaces.
1258        txn.set_name("workload-import-2");
1259        assert_eq!(txn.get_name().as_deref(), Some("workload-import-2"));
1260    }
1261
1262    /// `lock_count` and
1263    /// lock_counts return zero when there is no inner Txn (i.e., the
1264    /// transaction is decorative — unit-test mode without an
1265    /// EnvironmentImpl wired in).
1266    #[test]
1267    fn test_lock_counts_without_inner_txn_are_zero() {
1268        let config = TransactionConfig::default();
1269        let txn = Transaction::new(1, config);
1270        assert_eq!(txn.lock_count(), 0);
1271        assert_eq!(txn.lock_counts(), (0, 0));
1272    }
1273
1274    #[test]
1275    fn test_read_only_transaction() {
1276        let config = TransactionConfig::default().with_read_only(true);
1277        let txn = Transaction::new(2, config);
1278        assert!(txn.is_read_only());
1279        assert!(txn.is_valid());
1280    }
1281
1282    #[test]
1283    fn test_commit() {
1284        let config = TransactionConfig::default();
1285        let txn = Transaction::new(3, config);
1286        assert!(txn.commit().is_ok());
1287        assert_eq!(txn.get_state(), TransactionState::Committed);
1288        assert!(!txn.is_valid());
1289    }
1290
1291    #[test]
1292    fn test_commit_twice_fails() {
1293        let config = TransactionConfig::default();
1294        let txn = Transaction::new(4, config);
1295        assert!(txn.commit().is_ok());
1296        let result = txn.commit();
1297        assert!(result.is_err());
1298        assert!(matches!(
1299            result.unwrap_err(),
1300            NoxuError::OperationNotAllowed(_)
1301        ));
1302    }
1303
1304    #[test]
1305    fn test_abort() {
1306        let config = TransactionConfig::default();
1307        let txn = Transaction::new(5, config);
1308        assert!(txn.abort().is_ok());
1309        assert_eq!(txn.get_state(), TransactionState::Aborted);
1310        assert!(!txn.is_valid());
1311    }
1312
1313    #[test]
1314    fn test_abort_twice_fails() {
1315        let config = TransactionConfig::default();
1316        let txn = Transaction::new(6, config);
1317        assert!(txn.abort().is_ok());
1318        let result = txn.abort();
1319        assert!(result.is_err());
1320    }
1321
1322    #[test]
1323    fn test_commit_after_abort_fails() {
1324        let config = TransactionConfig::default();
1325        let txn = Transaction::new(7, config);
1326        assert!(txn.abort().is_ok());
1327        let result = txn.commit();
1328        assert!(result.is_err());
1329    }
1330
1331    #[test]
1332    fn test_abort_after_commit_fails() {
1333        let config = TransactionConfig::default();
1334        let txn = Transaction::new(8, config);
1335        assert!(txn.commit().is_ok());
1336        let result = txn.abort();
1337        assert!(result.is_err());
1338    }
1339
1340    #[test]
1341    fn test_lock_timeout() {
1342        let config = TransactionConfig::default();
1343        let txn = Transaction::new(9, config);
1344        assert_eq!(txn.get_lock_timeout(), 0);
1345        txn.set_lock_timeout(5000);
1346        assert_eq!(txn.get_lock_timeout(), 5000);
1347    }
1348
1349    #[test]
1350    fn test_txn_timeout() {
1351        let config = TransactionConfig::default();
1352        let txn = Transaction::new(10, config);
1353        assert_eq!(txn.get_txn_timeout(), 0);
1354        txn.set_txn_timeout(10000);
1355        assert_eq!(txn.get_txn_timeout(), 10000);
1356    }
1357
1358    #[test]
1359    fn test_durability() {
1360        let dur = Durability::COMMIT_SYNC;
1361        let config = TransactionConfig::default().with_durability(dur);
1362        let txn = Transaction::new(11, config);
1363        assert_eq!(txn.get_durability(), Some(dur));
1364    }
1365
1366    #[test]
1367    fn test_elapsed_time() {
1368        let config = TransactionConfig::default();
1369        let txn = Transaction::new(12, config);
1370        std::thread::sleep(std::time::Duration::from_millis(10));
1371        let elapsed = txn.elapsed();
1372        assert!(elapsed.as_millis() >= 10);
1373    }
1374
1375    #[test]
1376    fn test_commit_with_durability() {
1377        let config = TransactionConfig::default();
1378        let txn = Transaction::new(13, config);
1379        let dur = Durability::COMMIT_NO_SYNC;
1380        assert!(txn.commit_with_durability(dur).is_ok());
1381        assert_eq!(txn.get_state(), TransactionState::Committed);
1382    }
1383
1384    #[test]
1385    fn test_must_abort_state() {
1386        let config = TransactionConfig::default();
1387        let txn = Transaction::new(14, config);
1388        {
1389            let mut state = txn.state.lock().unwrap();
1390            *state = TransactionState::MustAbort;
1391        }
1392        assert_eq!(txn.get_state(), TransactionState::MustAbort);
1393        assert!(!txn.is_valid());
1394
1395        // Can still abort a MustAbort transaction
1396        assert!(txn.abort().is_ok());
1397        assert_eq!(txn.get_state(), TransactionState::Aborted);
1398    }
1399
1400    #[test]
1401    fn test_must_abort_cannot_commit() {
1402        let config = TransactionConfig::default();
1403        let txn = Transaction::new(15, config);
1404        {
1405            let mut state = txn.state.lock().unwrap();
1406            *state = TransactionState::MustAbort;
1407        }
1408
1409        let result = txn.commit();
1410        assert!(result.is_err());
1411        assert!(matches!(
1412            result.unwrap_err(),
1413            NoxuError::OperationNotAllowed(_)
1414        ));
1415    }
1416
1417    #[test]
1418    fn test_state_transitions() {
1419        let config = TransactionConfig::default();
1420        // Open -> Committed
1421        let txn1 = Transaction::new(16, config.clone());
1422        assert_eq!(txn1.get_state(), TransactionState::Open);
1423        txn1.commit().unwrap();
1424        assert_eq!(txn1.get_state(), TransactionState::Committed);
1425
1426        // Open -> Aborted
1427        let txn2 = Transaction::new(17, config);
1428        assert_eq!(txn2.get_state(), TransactionState::Open);
1429        txn2.abort().unwrap();
1430        assert_eq!(txn2.get_state(), TransactionState::Aborted);
1431    }
1432
1433    #[test]
1434    fn test_transaction_id_uniqueness() {
1435        let config = TransactionConfig::default();
1436        let txn1 = Transaction::new(100, config.clone());
1437        let txn2 = Transaction::new(101, config);
1438        assert_ne!(txn1.get_id(), txn2.get_id());
1439    }
1440}