noxu_db/transaction.rs
1//! Transaction handle for Noxu DB.
2//!
3
4use crate::durability::{Durability, SyncPolicy};
5use crate::environment::ActiveTxns;
6use crate::error::{NoxuError, Result};
7use crate::transaction_config::TransactionConfig;
8use noxu_dbi::{
9 AckWaitErrorKind, DatabaseId, EnvironmentImpl, ReplicaAckPolicyKind,
10 SharedReplicaAckCoordinator,
11};
12use noxu_log::LogManager;
13use noxu_sync::Mutex as SyncMutex;
14use noxu_txn::Txn;
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17
18/// Transaction state.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum TransactionState {
21 /// Transaction is open and can be used for operations.
22 Open,
23 /// Transaction has been prepared (XA two-phase commit phase 1).
24 ///
25 /// Locks are still held; the only valid transitions are
26 /// [`Transaction::resolved_commit_after_prepare`] and
27 /// [`Transaction::resolved_abort_after_prepare`]. Direct
28 /// `commit()` / `abort()` are protocol errors.
29 Prepared,
30 /// Transaction has been committed.
31 Committed,
32 /// Transaction has been aborted.
33 Aborted,
34 /// Transaction must be aborted (error occurred).
35 MustAbort,
36}
37
38/// A transaction handle.
39///
40///
41///
42/// Transaction handles are used to protect database operations.
43/// A single Transaction may be used for operations on multiple databases
44/// within the same environment.
45///
46/// Transaction handles are free-threaded; they may be used concurrently
47/// by multiple threads. Once committed or aborted, the handle must not
48/// be used for any further operations.
49///
50/// # Example
51/// ```no_run
52/// use noxu_db::{Environment, EnvironmentConfig};
53/// use std::path::PathBuf;
54///
55/// let config = EnvironmentConfig::new(PathBuf::from("/tmp/mydb"))
56/// .with_allow_create(true)
57/// .with_transactional(true);
58/// let env = Environment::open(config).unwrap();
59/// let txn = env.begin_transaction(None).unwrap();
60/// // ... do operations ...
61/// txn.commit().unwrap();
62/// ```
63pub struct Transaction {
64 /// Transaction ID
65 id: u64,
66 /// Current state
67 state: Mutex<TransactionState>,
68 /// When this transaction was created
69 start_time: Instant,
70 /// Whether this is read-only
71 read_only: bool,
72 /// Optional caller-supplied transaction name (JE
73 /// `Transaction.setName(String)`).
74 ///
75 /// The name is purely diagnostic: it is included in `Debug`
76 /// output and structured logs, and may be queried via
77 /// [`Transaction::get_name`].
78 /// (transaction-env F22 `setName/getName missing`).
79 name: Mutex<Option<String>>,
80 /// Durability override (None = use environment default)
81 durability: Option<Durability>,
82 /// Lock timeout in milliseconds (0 = use environment default)
83 lock_timeout_ms: Mutex<u64>,
84 /// Transaction timeout in milliseconds (0 = use environment default)
85 txn_timeout_ms: Mutex<u64>,
86 /// Write-ahead log manager (None when created outside of an Environment).
87 log_manager: Option<Arc<LogManager>>,
88 /// Internal transaction for lock management and write-set tracking.
89 ///
90 /// When `Some`, write operations on cursors acquire per-record write locks
91 /// via this `Txn` and record abort before-images. On `abort()`, this `Txn`
92 /// releases all locks and collects `UndoRecord`s.
93 ///
94 /// Relationship between `Transaction` (public) and `Txn` (internal)
95 /// in the: `Transaction.txnImpl` field.
96 inner_txn: Option<Arc<Mutex<Txn>>>,
97 /// Reference to the owning `EnvironmentImpl`.
98 ///
99 /// Used by `abort()` to look up each modified database by ID and apply
100 /// undo records to the B-tree.
101 ///
102 /// which is used by `Txn.undoLNs()` to call
103 /// `EnvironmentImpl.getDatabase(dbId).abort(undoLsn, locker)`.
104 env_impl: Option<Arc<SyncMutex<EnvironmentImpl>>>,
105 /// Shared registry of active transactions on the owning
106 /// `Environment`. When the transaction reaches a terminal state
107 /// (`commit`, `commit_with_durability`, or `abort`) we prune our
108 /// entry here so that `Environment::close()` can succeed.
109 ///
110 /// Resolves F1 of the May 2026 API audit.
111 active_txns: Option<Arc<ActiveTxns>>,
112 /// Optional replica-ack coordinator (typically a
113 /// `noxu_rep::ReplicatedEnvironment`). When `Some`, a successful
114 /// `commit_with_durability` blocks until the configured
115 /// `ReplicaAckPolicy` is satisfied or the durability ack-timeout
116 /// elapses, in which case `NoxuError::InsufficientReplicas` is
117 /// returned. Closes finding F1 of
118 /// `docs/src/internal/api-audit-2026-05-rep.md`.
119 replica_coordinator: Option<SharedReplicaAckCoordinator>,
120 /// Per-commit timeout for replica acknowledgments. Default 5s; set
121 /// from the environment's `replica_ack_timeout_ms` (see
122 /// `EnvironmentConfig::replica_ack_timeout_ms`) when the
123 /// coordinator is installed.
124 replica_ack_timeout: std::time::Duration,
125
126 /// Callbacks to run when this transaction aborts.
127 ///
128 /// C-4 / JE 1-I: used to undo transactional database registrations
129 /// when `open_database(Some(txn), ...)` is followed by `txn.abort()`.
130 /// Each callback is a `Box<dyn FnOnce() + Send>` so it can capture
131 /// shared state without requiring the caller to hold locks.
132 abort_callbacks: Mutex<Vec<Box<dyn FnOnce() + Send>>>,
133
134 /// Callbacks to run when this transaction commits.
135 ///
136 /// C-4 / JE 1-I: used to finalise transactional database registrations
137 /// by moving the database name from `pending_names` to `name_map`.
138 commit_callbacks: Mutex<Vec<Box<dyn FnOnce() + Send>>>,
139}
140
141impl Transaction {
142 /// Create a new unconnected transaction handle.
143 ///
144 /// **Deprecated** — this constructor creates a transaction that is not
145 /// wired to a WAL, lock manager, or environment. Commits and aborts
146 /// on such a transaction are no-ops. Use
147 /// [`Environment::begin_transaction`][crate::environment::Environment::begin_transaction]
148 /// to obtain a fully operational handle.
149 ///
150 /// # Arguments
151 /// * `id` - Unique transaction ID
152 /// * `config` - Transaction configuration
153 #[deprecated(
154 since = "2.4.1",
155 note = "use Environment::begin_transaction() to obtain a fully wired transaction handle"
156 )]
157 pub fn new(id: u64, config: TransactionConfig) -> Self {
158 observe_gauge_inc!("noxu_db_active_transactions");
159 Self {
160 id,
161 state: Mutex::new(TransactionState::Open),
162 start_time: Instant::now(),
163 read_only: config.read_only,
164 name: Mutex::new(None),
165 durability: Some(config.durability),
166 lock_timeout_ms: Mutex::new(config.lock_timeout_ms),
167 txn_timeout_ms: Mutex::new(config.txn_timeout_ms),
168 log_manager: None,
169 inner_txn: None,
170 env_impl: None,
171 active_txns: None,
172 replica_coordinator: None,
173 replica_ack_timeout: std::time::Duration::from_secs(5),
174 abort_callbacks: Mutex::new(Vec::new()),
175 commit_callbacks: Mutex::new(Vec::new()),
176 }
177 }
178
179 /// Create a new transaction backed by a real WAL.
180 ///
181 /// Called by `Environment::begin_transaction()` to wire the transaction to
182 /// the environment's log manager so that commit/abort write WAL entries.
183 ///
184 /// **Internal** — this method is `pub` for cross-crate wiring within the
185 /// Noxu DB engine but is not part of the v3.0 stable surface.
186 /// `LogManager` is not re-exported by `noxu-db`; downstream callers
187 /// cannot use this method without adding an internal crate dependency.
188 pub fn with_log_manager(
189 id: u64,
190 config: TransactionConfig,
191 log_manager: Arc<LogManager>,
192 ) -> Self {
193 observe_gauge_inc!("noxu_db_active_transactions");
194 Self {
195 id,
196 state: Mutex::new(TransactionState::Open),
197 start_time: Instant::now(),
198 read_only: config.read_only,
199 name: Mutex::new(None),
200 durability: Some(config.durability),
201 lock_timeout_ms: Mutex::new(config.lock_timeout_ms),
202 txn_timeout_ms: Mutex::new(config.txn_timeout_ms),
203 log_manager: Some(log_manager),
204 inner_txn: None,
205 env_impl: None,
206 active_txns: None,
207 replica_coordinator: None,
208 replica_ack_timeout: std::time::Duration::from_secs(5),
209 abort_callbacks: Mutex::new(Vec::new()),
210 commit_callbacks: Mutex::new(Vec::new()),
211 }
212 }
213
214 /// Wires the `EnvironmentImpl` so that `abort()` can apply undo records.
215 ///
216 /// Called by `Environment::begin_transaction()` after constructing the
217 /// `Transaction`.
218 ///
219 /// **Internal** — `EnvironmentImpl` is not re-exported by `noxu-db`.
220 pub fn with_env_impl(
221 mut self,
222 env_impl: Arc<SyncMutex<EnvironmentImpl>>,
223 ) -> Self {
224 self.env_impl = Some(env_impl);
225 self
226 }
227
228 /// Sets the inner `Txn` for lock management and write-set tracking.
229 ///
230 /// Called by `Environment::begin_transaction()` to wire the transaction to
231 /// the environment's `TxnManager` / `LockManager`.
232 ///
233 /// **Internal** — `noxu_txn::Txn` is not re-exported by `noxu-db`.
234 pub fn with_inner_txn(mut self, txn: Arc<Mutex<Txn>>) -> Self {
235 self.inner_txn = Some(txn);
236 self
237 }
238
239 /// Removes the inner `Txn` from the environment's `TxnManager` (its
240 /// `all_txns` map and the lock manager's locker-label map) — the
241 /// counterpart to `TxnManager::begin_txn`, which the explicit-transaction
242 /// commit/abort paths previously never called (review F-5).
243 ///
244 /// Without this, `TxnManager::all_txns` and the locker-label map grow
245 /// without bound for the process lifetime, `n_active_txns()` reports a
246 /// monotonically increasing (wrong) count, and `n_commits`/`n_aborts`
247 /// undercount. The inner `Txn`'s locker id (a separate id space from
248 /// `Transaction::id`) is the `all_txns` key, so we use it here.
249 ///
250 /// Lock discipline: the inner-txn lock is read in a tight scope and
251 /// released before the (separate) environment lock is taken, and both
252 /// commit/abort paths have already released any env lock by this point.
253 fn unregister_inner_txn(&self, committed: bool) {
254 let Some(inner) = self.inner_txn.as_ref() else {
255 return;
256 };
257 let inner_id = inner.lock().unwrap().id_as_locker();
258 if let Some(env) = self.env_impl.as_ref() {
259 let guard = env.lock();
260 let tm = guard.get_txn_manager();
261 if committed {
262 tm.commit_txn(inner_id);
263 } else {
264 tm.abort_txn(inner_id);
265 }
266 }
267 }
268
269 /// Wires the shared active-transactions registry so that `commit` /
270 /// `abort` can prune their own entry on completion.
271 ///
272 /// Resolves F1 of the May 2026 API audit.
273 pub(crate) fn with_active_txns(
274 mut self,
275 registry: Arc<ActiveTxns>,
276 ) -> Self {
277 self.active_txns = Some(registry);
278 self
279 }
280
281 /// Wires the replica-ack coordinator from the owning `Environment`.
282 ///
283 /// When set, a successful `commit_with_durability` blocks until the
284 /// configured `ReplicaAckPolicy` is satisfied or `replica_ack_timeout`
285 /// elapses, in which case `NoxuError::InsufficientReplicas` is
286 /// returned.
287 ///
288 /// Closes finding F1 of `docs/src/internal/api-audit-2026-05-rep.md`.
289 pub(crate) fn with_replica_coordinator(
290 mut self,
291 coord: SharedReplicaAckCoordinator,
292 ack_timeout: std::time::Duration,
293 ) -> Self {
294 self.replica_coordinator = Some(coord);
295 self.replica_ack_timeout = ack_timeout;
296 self
297 }
298
299 /// Returns a clone of the `Arc<Mutex<Txn>>` inner transaction, if any.
300 ///
301 /// Used by `Database::make_cursor_for_txn()` to wire the cursor to the
302 /// same `Txn` so that write operations lock via the transaction.
303 ///
304 /// **Internal** — `noxu_txn::Txn` is not re-exported by `noxu-db`.
305 pub fn get_inner_txn(&self) -> Option<Arc<Mutex<Txn>>> {
306 self.inner_txn.clone()
307 }
308
309 /// Register a callback to run when this transaction aborts.
310 ///
311 /// Used by `Environment::open_database()` to roll back a transactional
312 /// database creation if the owning transaction is aborted (C-4 / JE 1-I).
313 /// The callback is invoked from within `abort()`, after WAL writes but
314 /// before the outer state is marked `Aborted`.
315 pub fn register_abort_callback<F>(&self, f: F)
316 where
317 F: FnOnce() + Send + 'static,
318 {
319 self.abort_callbacks.lock().unwrap().push(Box::new(f));
320 }
321
322 /// Register a callback to run when this transaction commits.
323 ///
324 /// Used by `Environment::open_database()` to finalise a transactional
325 /// database creation when the owning transaction commits (C-4 / JE 1-I).
326 /// The callback is invoked from within `commit_with_durability()`, after
327 /// the WAL entry is written and locks are released.
328 pub fn register_commit_callback<F>(&self, f: F)
329 where
330 F: FnOnce() + Send + 'static,
331 {
332 self.commit_callbacks.lock().unwrap().push(Box::new(f));
333 }
334
335 /// Commit the transaction.
336 ///
337 /// All operations performed under this transaction are made durable
338 /// and visible to other transactions.
339 ///
340 /// # Errors
341 /// Returns an error if:
342 /// - The transaction is not in `Open` state.
343 /// - Writing the `TxnCommit` WAL entry fails (`EnvironmentFailure`
344 /// with reason `LogWrite`, propagated from `write_txn_end`).
345 /// - The inner-`Txn` commit fails after the WAL entry has been
346 /// fsynced (e.g. open cursors held against this transaction, or
347 /// inner-state inconsistency surfaced by `check_state`). When
348 /// this happens the transaction is still durably committed; the
349 /// error is propagated so the caller can react to the leak.
350 pub fn commit(&self) -> Result<()> {
351 observe_span!("txn_commit", txn_id = self.id);
352 let _obs_timer = observe_timer_start!();
353 observe_counter!("noxu_db_operations_total", "op" => "commit");
354 let durability = self.durability.unwrap_or(Durability::COMMIT_SYNC);
355 let result = self.commit_with_durability(durability);
356 observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "commit");
357 result
358 }
359
360 /// Commit the transaction with specific durability.
361 ///
362 /// # Arguments
363 /// * `durability` - Durability settings for this commit
364 ///
365 /// # Errors
366 /// Returns an error if:
367 /// - The transaction is not in `Open` state.
368 /// - Writing the `TxnCommit` WAL entry fails (`EnvironmentFailure`
369 /// with reason `LogWrite`, propagated from `write_txn_end`).
370 /// - The inner-`Txn` commit fails after the WAL entry has been
371 /// fsynced (e.g. open cursors held against this transaction, or
372 /// inner-state inconsistency surfaced by `check_state`). When
373 /// this happens the transaction is still durably committed; the
374 /// error is propagated so the caller can react to the leak.
375 pub fn commit_with_durability(&self, durability: Durability) -> Result<()> {
376 self.check_open()?;
377
378 // Write TxnCommit to the WAL before marking committed.
379 // Durability controls whether we fsync, flush, or just buffer.
380 if !self.read_only
381 && let Some(lm) = &self.log_manager
382 {
383 let (fsync, flush) = match durability.local_sync {
384 SyncPolicy::Sync => (true, true),
385 SyncPolicy::WriteNoSync => (false, true),
386 SyncPolicy::NoSync => (false, false),
387 };
388 self.write_txn_end(lm, true, fsync, flush)?;
389 }
390
391 // F1 (rep audit): wait for replica acknowledgments before returning
392 // success. This wait happens AFTER the local WAL is durable but
393 // BEFORE the inner txn releases its locks, mirroring BDB-JE
394 // `Txn.preLogCommitHook` / `commit(Durability)` ordering: the
395 // master is durable locally and replicas are notified, but the
396 // commit only "returns" once `replica_ack` is satisfied.
397 //
398 // If no coordinator is wired (non-replicated env) or the policy
399 // is `None`, the wait is skipped. Read-only commits never need
400 // replica acks. Captured failure is propagated at the end of
401 // the function after lock release so the caller observes a
402 // typed `NoxuError::InsufficientReplicas` rather than a state
403 // leak.
404 let ack_err: Option<NoxuError> = if !self.read_only
405 && durability.replica_ack
406 != crate::durability::ReplicaAckPolicy::None
407 && let Some(coord) = &self.replica_coordinator
408 {
409 match coord.await_replica_acks(
410 durability.replica_ack.as_kind(),
411 self.replica_ack_timeout,
412 ) {
413 Ok(_received) => None,
414 Err(e) => match e.kind {
415 AckWaitErrorKind::NotMaster => {
416 Some(NoxuError::ReplicaWrite)
417 }
418 AckWaitErrorKind::Timeout | AckWaitErrorKind::Shutdown => {
419 Some(NoxuError::InsufficientReplicas {
420 required: e.needed,
421 available: e.received,
422 })
423 }
424 },
425 }
426 } else {
427 None
428 };
429
430 // Apply cleaner write-path backpressure: if the log write rate exceeds
431 // the cleaner's capacity, sleep briefly to let cleaning catch up.
432 // Implements CleanerThrottle.getWriteDelay() path in Txn.commit().
433 // Extract the throttle Arc while holding the env lock, then
434 // drop the lock BEFORE sleeping to avoid blocking other threads.
435 if !self.read_only
436 && let Some(ref env) = self.env_impl
437 {
438 let throttle = env.lock().get_cleaner_throttle();
439 if let Some(delay) =
440 throttle.and_then(|t| t.should_throttle_writer())
441 {
442 std::thread::sleep(delay);
443 }
444 }
445
446 // Release per-record locks held by the inner Txn.
447 // The inner Txn has no log_manager so it won't write duplicate WAL records.
448 //
449 // At this point the commit is *durable* on disk: `write_txn_end`
450 // above has already fsynced the TxnCommit WAL entry, and recovery
451 // will replay this commit on the next environment open. The inner
452 // commit only releases `lock_manager` locks and flips the inner
453 // state Open → Committed; the data path mutations were applied to
454 // the in-memory tree at `db.put()` time.
455 //
456 // Possible failure modes for `inner.commit()`:
457 // * `has_open_cursors`: a user bug — a cursor on this transaction
458 // was not closed before `commit()`. The data is still
459 // durably committed; the cursor's lifetime contract is
460 // violated. Surfacing this lets the caller find the leak.
461 // * `check_state` (state != Open): the inner txn was flipped to
462 // `MustAbort` by the deadlock detector after our WAL fsync,
463 // or somehow advanced to `Committed`/`Aborted` already. Both
464 // indicate a state-machine inconsistency that needs to be
465 // visible.
466 //
467 // Either way, we mark the outer state `Committed` first because
468 // the durable record says so — returning early before that would
469 // leave the outer in `Open`, and a retried `commit()` would
470 // append a *second* `TxnCommit` record to the WAL. We then
471 // propagate the inner error so the caller can react.
472 //
473 // The inner Txn's `commit_with_durability` now drains all
474 // read and write locks on every error return path, so a
475 // failed inner.commit() no longer leaks lock-manager entries
476 // until environment close. See `Txn::commit_with_durability`
477 // and `Txn::release_all_locks` in noxu-txn for the
478 // implementation of this guarantee.
479 let inner_err = if let Some(inner) = &self.inner_txn {
480 match inner.lock().unwrap().commit() {
481 Ok(_) => None,
482 Err(e) => {
483 log::error!(
484 "Transaction::commit_with_durability: inner txn \
485 commit failed after WAL fsync (txn is durably \
486 committed; lock_manager locks may be leaked): {e}"
487 );
488 Some(e)
489 }
490 }
491 } else {
492 None
493 };
494
495 let mut state = self.state.lock().unwrap();
496 *state = TransactionState::Committed;
497 drop(state);
498
499 // C-4 / JE 1-I: run commit callbacks (transactional database
500 // registration finalisation).
501 let callbacks: Vec<Box<dyn FnOnce() + Send>> =
502 std::mem::take(&mut *self.commit_callbacks.lock().unwrap());
503 for cb in callbacks {
504 cb();
505 }
506
507 // Prune our entry from the environment's active-txns registry so
508 // that `Environment::close()` can succeed (F1). Decrement the
509 // active-transactions gauge here (rather than in `commit()`) so
510 // that callers of `commit_with_durability` directly are also
511 // accounted for (resolves F9 as a side effect).
512 if let Some(registry) = &self.active_txns {
513 registry.mark_complete(self.id);
514 }
515 // F-5: counterpart to begin_txn — remove the inner Txn from
516 // TxnManager (all_txns + locker label) to avoid an unbounded leak.
517 self.unregister_inner_txn(true);
518 observe_gauge_dec!("noxu_db_active_transactions");
519
520 if let Some(e) = inner_err {
521 return Err(NoxuError::from(e));
522 }
523 // F1: surface any replica-ack failure last, after the local
524 // commit has fully released locks. The local commit is durable;
525 // returning this error tells the caller the durability policy
526 // was not satisfied so they can retry or rollback at the
527 // application layer.
528 if let Some(e) = ack_err {
529 return Err(e);
530 }
531 Ok(())
532 }
533
534 /// Abort the transaction.
535 ///
536 /// All operations performed under this transaction are rolled back.
537 ///
538 /// # Errors
539 /// Returns an error if:
540 /// - The transaction is already committed or aborted.
541 /// - Writing the `TxnAbort` WAL entry fails (`EnvironmentFailure`
542 /// with reason `LogWrite`, propagated from `write_txn_end`).
543 /// This path is taken only when the transaction is not read-only
544 /// and a `LogManager` is configured on the environment.
545 pub fn abort(&self) -> Result<()> {
546 observe_span!("txn_abort", txn_id = self.id);
547 observe_counter!("noxu_db_operations_total", "op" => "abort");
548 {
549 let state = self.state.lock().unwrap();
550 match *state {
551 TransactionState::Committed => {
552 return Err(NoxuError::OperationNotAllowed(
553 "Cannot abort a committed transaction".to_string(),
554 ));
555 }
556 TransactionState::Aborted => {
557 return Err(NoxuError::OperationNotAllowed(
558 "Transaction already aborted".to_string(),
559 ));
560 }
561 TransactionState::Open | TransactionState::MustAbort => {}
562 TransactionState::Prepared => {
563 return Err(NoxuError::OperationNotAllowed(
564 "Cannot abort a prepared transaction directly; \
565 use xa_rollback / resolved_abort_after_prepare"
566 .to_string(),
567 ));
568 }
569 }
570 }
571
572 // Write TxnAbort to WAL before marking aborted (no fsync needed).
573 if !self.read_only
574 && let Some(lm) = &self.log_manager
575 {
576 self.write_txn_end(lm, false, false, false)?;
577 }
578
579 // Apply undo records to the B-tree to restore before-images, then
580 // release write locks. The two steps must happen in this order: while
581 // write locks are still held, no reader can observe the in-flight value;
582 // once release_all_locks() is called, blocked readers unblock and must
583 // already see the restored before-image.
584 if let Some(inner) = &self.inner_txn {
585 // Phase 1: collect undo records without releasing write locks.
586 let mut undo_records =
587 inner.lock().unwrap().abort_collect_undo().unwrap_or_default();
588
589 // Apply undo in reverse-operation order (newest LSN first).
590 //
591 // The in-memory write-lock map is a HashMap (no order), so the
592 // raw `undo_records` order is non-deterministic. When the same
593 // key is touched multiple times in one txn (e.g. delete →
594 // re-insert in SR9465), the undo records carry conflicting
595 // intents:
596 // - DELETE undo (abort_data=orig) : restore the slot
597 // - INSERT undo (abort_known_deleted=t) : remove the slot
598 // Applying these in arbitrary order can leave the tree in either
599 // "correct" or "slot deleted" depending on iteration luck.
600 //
601 // The recovery path's backward log scan already applies undo
602 // newest-first; we mirror that here so the in-memory abort and
603 // crash-recovery undo are observationally identical. Sorting by
604 // `current_lsn` descending is sufficient because LSNs are
605 // monotonic per-WAL-write.
606 undo_records.sort_by_key(|r| std::cmp::Reverse(r.current_lsn));
607
608 // Phase 2: apply undo to the B-tree (write locks still held).
609 //
610 // H-1 (audit-2026-05-keith.md F-2.2): acquire env lock only for
611 // the fast database-handle lookup, then drop it immediately.
612 // This prevents the entire abort undo loop from serialising all
613 // concurrent readers/writers against the EnvironmentImpl mutex.
614 //
615 // Algorithm:
616 // a) Collect unique database IDs referenced by the undo set.
617 // b) For each ID, briefly lock env, clone the Arc<RwLock<DatabaseImpl>>,
618 // and immediately release the env lock.
619 // c) Apply all undo records without ever holding the env lock.
620 //
621 // Safety: the database Arcs are ref-counted; even if a concurrent
622 // `env.remove_database()` call drops the EnvironmentImpl's own Arc,
623 // our cloned Arc keeps the DatabaseImpl alive for the duration of
624 // the undo loop.
625 if let Some(env) = &self.env_impl {
626 // Step (a+b): collect database handles with minimal lock hold time.
627 use std::collections::HashMap;
628 let mut db_handles: HashMap<
629 i64,
630 Arc<noxu_sync::RwLock<noxu_dbi::DatabaseImpl>>,
631 > = HashMap::new();
632 for undo in &undo_records {
633 let db_id_raw = undo.database_id as i64;
634 if db_handles.contains_key(&db_id_raw) {
635 continue;
636 }
637 // Brief env lock: lookup only.
638 let guard = env.lock();
639 if let Some(arc) =
640 guard.get_database_by_id(DatabaseId::new(db_id_raw))
641 {
642 db_handles.insert(db_id_raw, arc);
643 }
644 // env lock released here — drop(guard) implicit at end of block
645 }
646
647 // Step (c): apply undo records without holding env lock.
648 for undo in undo_records {
649 let Some(abort_key) = undo.abort_key else { continue };
650 let db_id_raw = undo.database_id as i64;
651 let Some(db_arc) = db_handles.get(&db_id_raw) else {
652 continue;
653 };
654 let db_guard = db_arc.read();
655 if let Some(tree) = db_guard.get_real_tree() {
656 if undo.abort_known_deleted {
657 if tree.delete(&abort_key) {
658 db_guard.decrement_entry_count();
659 }
660 } else if let Some(abort_data) = undo.abort_data {
661 let lsn = noxu_util::Lsn::from_u64(undo.abort_lsn);
662 if let Ok(is_new) =
663 tree.insert(abort_key, abort_data, lsn)
664 && is_new
665 {
666 // Restoring a slot that the aborted txn had
667 // deleted: the in-memory delete already
668 // decremented the counter, so the restore
669 // must re-bump it.
670 db_guard.increment_entry_count();
671 }
672 }
673 }
674 }
675 }
676
677 // Phase 3: release write locks — blocked readers now unblock and
678 // see the restored before-image.
679 inner.lock().unwrap().release_all_locks();
680 }
681
682 let mut state = self.state.lock().unwrap();
683 *state = TransactionState::Aborted;
684 drop(state);
685
686 // C-4 / JE 1-I: run abort callbacks (transactional database
687 // registration rollback).
688 let callbacks: Vec<Box<dyn FnOnce() + Send>> =
689 std::mem::take(&mut *self.abort_callbacks.lock().unwrap());
690 for cb in callbacks {
691 cb();
692 }
693
694 // Prune our entry from the environment's active-txns registry so
695 // that `Environment::close()` can succeed (F1).
696 if let Some(registry) = &self.active_txns {
697 registry.mark_complete(self.id);
698 }
699 // F-5: counterpart to begin_txn — remove the inner Txn from
700 // TxnManager (all_txns + locker label) to avoid an unbounded leak.
701 self.unregister_inner_txn(false);
702 observe_gauge_dec!("noxu_db_active_transactions");
703 Ok(())
704 }
705
706 /// Prepares the transaction for the second phase of XA two-phase
707 /// commit.
708 ///
709 /// Implements the crash-durable contract introduced in wave 3-2:
710 ///
711 /// 1. Writes a `TxnPrepare` WAL frame containing the txn id, the
712 /// first / last LSN logged by this transaction, and the supplied
713 /// XID components (format_id, gtrid, bqual). The frame is
714 /// fsynced before this method returns, so a crash immediately
715 /// afterwards still allows recovery to resurrect the prepared
716 /// state.
717 /// 2. Marks the inner `Txn` as PREPARED — direct `commit()` and
718 /// `abort()` calls now return `OperationNotAllowed`; only
719 /// `resolved_commit_after_prepare` and
720 /// `resolved_abort_after_prepare` may finalise the transaction.
721 /// 3. Locks are RETAINED — prepared transactions hold every lock
722 /// until xa_commit / xa_rollback so concurrent readers cannot
723 /// observe in-flight state.
724 /// 4. The persistent prepared-log entry (the `noxu-xa::PreparedLog`
725 /// XID -> timestamp record) is the responsibility of the XA
726 /// layer and is written *after* this method returns. The WAL
727 /// `TxnPrepare` frame is the source of truth for crash
728 /// durability; the prepared-log database is a convenience for
729 /// operators inspecting in-doubt XIDs without scanning the WAL.
730 ///
731 /// Returns `Ok(())` on success. Read-only transactions (no LN
732 /// frames written) still take a code path here so the inner Txn
733 /// flips to PREPARED, but no `TxnPrepare` frame is emitted — the
734 /// XA layer should take its `PrepareResult::ReadOnly` shortcut
735 /// rather than calling this method on read-only branches.
736 ///
737 /// # Errors
738 /// * `OperationNotAllowed` if the transaction is not Open.
739 /// * `EnvironmentFailure { reason: LogWrite }` if the WAL write or
740 /// fsync fails.
741 pub fn prepare(
742 &self,
743 xid_format_id: i32,
744 xid_gtrid: &[u8],
745 xid_bqual: &[u8],
746 ) -> Result<()> {
747 self.check_open()?;
748
749 // Capture first / last LSN from the inner Txn so the recovery
750 // code can chain them. For read-only branches both are
751 // NULL_LSN, in which case we skip writing the frame entirely
752 // (the prepared XID will appear in the persistent prepared-log
753 // database but recovery has nothing to do for it).
754 let (first_lsn, last_lsn) = match &self.inner_txn {
755 Some(inner) => {
756 let g = inner.lock().unwrap();
757 (g.first_lsn(), g.last_lsn())
758 }
759 None => {
760 (noxu_util::NULL_LSN.as_u64(), noxu_util::NULL_LSN.as_u64())
761 }
762 };
763
764 // Write the durable TxnPrepare frame. Skipped for read-only
765 // txns (no inner Txn or no LN frames) to avoid recording an
766 // empty prepare that recovery would have nothing to do with.
767 if !self.read_only
768 && let Some(lm) = &self.log_manager
769 && first_lsn != noxu_util::NULL_LSN.as_u64()
770 {
771 self.write_txn_prepare(
772 lm,
773 first_lsn,
774 last_lsn,
775 xid_format_id,
776 xid_gtrid,
777 xid_bqual,
778 )?;
779 }
780
781 // Flip the inner Txn into PREPARED state so direct
782 // `inner.commit()` / `inner.abort()` are protocol errors.
783 // The inner Txn has no `log_manager` of its own (the outer
784 // Transaction owns the only LM reference), so its `prepare`
785 // call is a pure flag-flip; it does not write a duplicate
786 // TxnPrepare frame.
787 if let Some(inner) = &self.inner_txn {
788 inner
789 .lock()
790 .unwrap()
791 .prepare(xid_format_id, xid_gtrid.to_vec(), xid_bqual.to_vec())
792 .map_err(NoxuError::from)?;
793 }
794
795 let mut state = self.state.lock().unwrap();
796 *state = TransactionState::Prepared;
797 Ok(())
798 }
799
800 /// Resolves a prepared transaction with a commit.
801 ///
802 /// Used by the XA `xa_commit` path. Bypasses the
803 /// `TransactionState::Prepared` guard in `commit_with_durability`
804 /// because the prepare already established the commit decision.
805 ///
806 /// Steps:
807 /// 1. Verifies the txn is Prepared.
808 /// 2. Writes a `TxnCommit` WAL frame (mirrors `commit()`).
809 /// 3. Releases the inner Txn's locks via `resolved_commit_after_prepare`.
810 /// 4. Transitions state to Committed and prunes from the active-txns
811 /// registry.
812 pub fn resolved_commit_after_prepare(&self) -> Result<()> {
813 {
814 let state = self.state.lock().unwrap();
815 if !matches!(*state, TransactionState::Prepared) {
816 return Err(NoxuError::OperationNotAllowed(format!(
817 "resolved_commit_after_prepare: expected Prepared, got {:?}",
818 *state
819 )));
820 }
821 }
822
823 // Write the TxnCommit frame.
824 if !self.read_only
825 && let Some(lm) = &self.log_manager
826 {
827 self.write_txn_end(lm, true /* is_commit */, true, true)?;
828 }
829
830 // Inner-side resolution: clear IS_PREPARED and run the standard
831 // commit path (which releases locks and flips inner state).
832 if let Some(inner) = &self.inner_txn {
833 inner
834 .lock()
835 .unwrap()
836 .resolved_commit_after_prepare()
837 .map_err(NoxuError::from)?;
838 }
839
840 let mut state = self.state.lock().unwrap();
841 *state = TransactionState::Committed;
842 drop(state);
843 // Run commit callbacks (e.g. transactional database registration).
844 let cbs: Vec<Box<dyn FnOnce() + Send>> =
845 std::mem::take(&mut *self.commit_callbacks.lock().unwrap());
846 for cb in cbs {
847 cb();
848 }
849 if let Some(registry) = &self.active_txns {
850 registry.mark_complete(self.id);
851 }
852 // F-5: counterpart to begin_txn — remove the inner Txn from
853 // TxnManager (all_txns + locker label) to avoid an unbounded leak.
854 self.unregister_inner_txn(true);
855 observe_gauge_dec!("noxu_db_active_transactions");
856 Ok(())
857 }
858
859 /// Resolves a prepared transaction with an abort.
860 pub fn resolved_abort_after_prepare(&self) -> Result<()> {
861 {
862 let state = self.state.lock().unwrap();
863 if !matches!(*state, TransactionState::Prepared) {
864 return Err(NoxuError::OperationNotAllowed(format!(
865 "resolved_abort_after_prepare: expected Prepared, got {:?}",
866 *state
867 )));
868 }
869 }
870
871 if !self.read_only
872 && let Some(lm) = &self.log_manager
873 {
874 self.write_txn_end(lm, false /* is_commit */, false, false)?;
875 }
876
877 // Apply undo records to the B-tree to restore before-images, then
878 // release write locks. Same 3-phase ordering as `Transaction::abort()`:
879 // collect undo → apply → release locks. See the matching comment
880 // in `Transaction::abort` for the rationale (no reader sees the
881 // in-flight value until the before-image is back in the tree).
882 if let Some(inner) = &self.inner_txn {
883 // First, clear the IS_PREPARED flag on the inner Txn so that
884 // `abort_collect_undo()` (which calls into `Txn::abort`) does
885 // not refuse with InvalidTransaction { state: PREPARED }.
886 // We do this via the inner's resolved-abort path, which
887 // performs `txn_flags &= !IS_PREPARED` and then runs `abort()`.
888 // Unfortunately that consumes the locks; we want the
889 // pre-release behaviour instead, so flip the flag manually
890 // and then run abort_collect_undo.
891 let mut undo_records = {
892 let mut g = inner.lock().unwrap();
893 // Undo the IS_PREPARED flag so abort_collect_undo doesn't
894 // refuse. Inner state is still Open at this point.
895 g.clear_prepared_flag();
896 g.abort_collect_undo().unwrap_or_default()
897 };
898
899 // See `abort()` above: undo must be applied newest-LSN first so
900 // that delete-then-reinsert sequences in the same txn are
901 // unwound in reverse-operation order, matching the recovery
902 // path's backward log scan.
903 undo_records.sort_by_key(|r| std::cmp::Reverse(r.current_lsn));
904
905 if let Some(env) = &self.env_impl {
906 let env_guard = env.lock();
907 for undo in undo_records {
908 let Some(abort_key) = undo.abort_key else { continue };
909 let db_id =
910 noxu_dbi::DatabaseId::new(undo.database_id as i64);
911 let Some(db_arc) = env_guard.get_database_by_id(db_id)
912 else {
913 continue;
914 };
915 let db_guard = db_arc.read();
916 if let Some(tree) = db_guard.get_real_tree() {
917 if undo.abort_known_deleted {
918 if tree.delete(&abort_key) {
919 db_guard.decrement_entry_count();
920 }
921 } else if let Some(abort_data) = undo.abort_data {
922 let lsn = noxu_util::Lsn::from_u64(undo.abort_lsn);
923 if let Ok(is_new) =
924 tree.insert(abort_key, abort_data, lsn)
925 && is_new
926 {
927 db_guard.increment_entry_count();
928 }
929 }
930 }
931 }
932 }
933
934 inner.lock().unwrap().release_all_locks();
935 }
936
937 let mut state = self.state.lock().unwrap();
938 *state = TransactionState::Aborted;
939 drop(state);
940 // Run abort callbacks (e.g. transactional database registration rollback).
941 let cbs: Vec<Box<dyn FnOnce() + Send>> =
942 std::mem::take(&mut *self.abort_callbacks.lock().unwrap());
943 for cb in cbs {
944 cb();
945 }
946 if let Some(registry) = &self.active_txns {
947 registry.mark_complete(self.id);
948 }
949 // F-5: counterpart to begin_txn — remove the inner Txn from
950 // TxnManager (all_txns + locker label) to avoid an unbounded leak.
951 self.unregister_inner_txn(false);
952 observe_gauge_dec!("noxu_db_active_transactions");
953 Ok(())
954 }
955
956 /// Serializes a `TxnPrepareEntry` and writes it to the WAL with fsync.
957 fn write_txn_prepare(
958 &self,
959 lm: &LogManager,
960 first_lsn: u64,
961 last_lsn: u64,
962 xid_format_id: i32,
963 xid_gtrid: &[u8],
964 xid_bqual: &[u8],
965 ) -> Result<()> {
966 use noxu_log::{LogEntryType, Provisional, entry::TxnPrepareEntry};
967
968 let timestamp_ms = std::time::SystemTime::now()
969 .duration_since(std::time::UNIX_EPOCH)
970 .unwrap_or_default()
971 .as_millis() as u64;
972
973 let entry = TxnPrepareEntry::new(
974 self.id as i64,
975 timestamp_ms,
976 first_lsn,
977 last_lsn,
978 xid_format_id,
979 xid_gtrid.to_vec(),
980 xid_bqual.to_vec(),
981 )
982 .map_err(|e| {
983 NoxuError::environment_with_reason(
984 crate::error::EnvironmentFailureReason::LogWrite,
985 format!("prepare entry encode: {e}"),
986 )
987 })?;
988
989 let mut buf = Vec::with_capacity(entry.log_size());
990 entry.write_to_log(&mut buf);
991
992 // fsync=true, flush=true: prepare must be durable before
993 // returning so a subsequent crash sees the prepare frame.
994 lm.log(LogEntryType::TxnPrepare, &buf, Provisional::No, true, true)
995 .map(|_| ())
996 .map_err(|e| {
997 NoxuError::environment_with_reason(
998 crate::error::EnvironmentFailureReason::LogWrite,
999 e.to_string(),
1000 )
1001 })
1002 }
1003
1004 /// Serializes a TxnCommit or TxnAbort entry and writes it to `lm`.
1005 fn write_txn_end(
1006 &self,
1007 lm: &LogManager,
1008 is_commit: bool,
1009 fsync: bool,
1010 flush: bool,
1011 ) -> Result<()> {
1012 use bytes::BytesMut;
1013 use noxu_log::{LogEntryType, Provisional, entry::TxnEndEntry};
1014 use noxu_util::{lsn::NULL_LSN, vlsn::NULL_VLSN};
1015
1016 let timestamp = std::time::SystemTime::now()
1017 .duration_since(std::time::UNIX_EPOCH)
1018 .unwrap_or_default()
1019 .as_millis() as u64;
1020
1021 let entry = if is_commit {
1022 TxnEndEntry::new_commit(
1023 self.id as i64,
1024 NULL_LSN,
1025 timestamp,
1026 0,
1027 NULL_VLSN,
1028 )
1029 } else {
1030 TxnEndEntry::new_abort(
1031 self.id as i64,
1032 NULL_LSN,
1033 timestamp,
1034 0,
1035 NULL_VLSN,
1036 )
1037 };
1038
1039 let entry_type = if is_commit {
1040 LogEntryType::TxnCommit
1041 } else {
1042 LogEntryType::TxnAbort
1043 };
1044
1045 let mut buf = BytesMut::with_capacity(entry.log_size());
1046 entry.write_to_log(&mut buf);
1047
1048 lm.log(entry_type, &buf, Provisional::No, flush, fsync)
1049 .map(|_| ())
1050 .map_err(|e| {
1051 NoxuError::environment_with_reason(
1052 crate::error::EnvironmentFailureReason::LogWrite,
1053 e.to_string(),
1054 )
1055 })
1056 }
1057
1058 /// Get the transaction ID.
1059 pub fn get_id(&self) -> u64 {
1060 self.id
1061 }
1062
1063 /// Set the human-readable name of this transaction.
1064 ///
1065 /// Mirrors `Transaction.setName(String)`. The name is purely
1066 /// diagnostic — it appears in `Debug` output, structured log
1067 /// records, and lock-conflict reports.
1068 /// (transaction-env F22).
1069 pub fn set_name<S: Into<String>>(&self, name: S) {
1070 *self.name.lock().unwrap() = Some(name.into());
1071 }
1072
1073 /// Returns the caller-supplied transaction name, if any.
1074 ///
1075 /// Mirrors `Transaction.getName()`.
1076 pub fn get_name(&self) -> Option<String> {
1077 self.name.lock().unwrap().clone()
1078 }
1079
1080 /// Returns the number of locks currently held by this transaction.
1081 ///
1082 /// Mirrors `Transaction.getLockStat()` / `Transaction.getNumWriteLocks() +
1083 /// getNumReadLocks()` (the JE API exposes both counts; we return
1084 /// the sum because the lock manager partitions reads / writes per
1085 /// LSN rather than per record). Returns `0` for transactions that
1086 /// have not acquired any locks (or for read-only transactions
1087 /// running with read-uncommitted isolation, which skip lock
1088 /// acquisition entirely).
1089 /// (transaction-env F23 "lock-stat reporting missing").
1090 pub fn lock_count(&self) -> usize {
1091 match &self.inner_txn {
1092 Some(txn) => {
1093 let g = txn.lock().unwrap();
1094 g.read_lock_count() + g.write_lock_count()
1095 }
1096 None => 0,
1097 }
1098 }
1099
1100 /// Returns `(read_lock_count, write_lock_count)` for this
1101 /// transaction's lock set.
1102 ///
1103 /// Mirrors JE's `Transaction.getNumReadLocks()` /
1104 /// `getNumWriteLocks()` accessors. Returns `(0, 0)` for a
1105 /// transaction that has not acquired any locks.
1106 pub fn lock_counts(&self) -> (usize, usize) {
1107 match &self.inner_txn {
1108 Some(txn) => {
1109 let g = txn.lock().unwrap();
1110 (g.read_lock_count(), g.write_lock_count())
1111 }
1112 None => (0, 0),
1113 }
1114 }
1115
1116 /// Get the current transaction state.
1117 pub fn get_state(&self) -> TransactionState {
1118 *self.state.lock().unwrap()
1119 }
1120
1121 /// Check if the transaction is valid (in Open state).
1122 pub fn is_valid(&self) -> bool {
1123 matches!(self.get_state(), TransactionState::Open)
1124 }
1125
1126 /// Set the lock timeout for this transaction.
1127 ///
1128 /// # Arguments
1129 /// * `timeout_ms` - Lock timeout in milliseconds (0 = use environment default)
1130 pub fn set_lock_timeout(&self, timeout_ms: u64) {
1131 *self.lock_timeout_ms.lock().unwrap() = timeout_ms;
1132 }
1133
1134 /// Get the lock timeout for this transaction.
1135 pub fn get_lock_timeout(&self) -> u64 {
1136 *self.lock_timeout_ms.lock().unwrap()
1137 }
1138
1139 /// Set the transaction timeout.
1140 ///
1141 /// # Arguments
1142 /// * `timeout_ms` - Transaction timeout in milliseconds (0 = use environment default)
1143 pub fn set_txn_timeout(&self, timeout_ms: u64) {
1144 *self.txn_timeout_ms.lock().unwrap() = timeout_ms;
1145 }
1146
1147 /// Get the transaction timeout for this transaction.
1148 pub fn get_txn_timeout(&self) -> u64 {
1149 *self.txn_timeout_ms.lock().unwrap()
1150 }
1151
1152 /// Get the durability setting for this transaction.
1153 pub fn get_durability(&self) -> Option<Durability> {
1154 self.durability
1155 }
1156
1157 /// Check if this is a read-only transaction.
1158 pub fn is_read_only(&self) -> bool {
1159 self.read_only
1160 }
1161
1162 /// Get the elapsed time since transaction start.
1163 pub fn elapsed(&self) -> std::time::Duration {
1164 self.start_time.elapsed()
1165 }
1166
1167 /// Check that the transaction is in Open state.
1168 ///
1169 /// # Errors
1170 /// Returns error if the transaction is not Open.
1171 fn check_open(&self) -> Result<()> {
1172 let state = self.get_state();
1173 match state {
1174 TransactionState::Open => Ok(()),
1175 TransactionState::Prepared => Err(NoxuError::OperationNotAllowed(
1176 "Transaction has been prepared; use xa_commit / xa_rollback"
1177 .to_string(),
1178 )),
1179 TransactionState::Committed => Err(NoxuError::OperationNotAllowed(
1180 "Transaction has been committed".to_string(),
1181 )),
1182 TransactionState::Aborted => Err(NoxuError::OperationNotAllowed(
1183 "Transaction has been aborted".to_string(),
1184 )),
1185 TransactionState::MustAbort => Err(NoxuError::OperationNotAllowed(
1186 "Transaction must be aborted due to previous error".to_string(),
1187 )),
1188 }
1189 }
1190}
1191
1192impl Drop for Transaction {
1193 fn drop(&mut self) {
1194 // Audit transaction-env F10 (Wave 2C-4): if the txn is still in
1195 // a non-terminal state at drop time, perform an actual abort
1196 // (release locks, apply undo, prune from active-txn registry,
1197 // decrement gauge) instead of just logging a warning.
1198 let state = *self.state.lock().unwrap();
1199 if matches!(state, TransactionState::Open | TransactionState::MustAbort)
1200 {
1201 log::warn!(
1202 "Transaction {} dropped without commit or abort, \
1203 implicitly aborting",
1204 self.id
1205 );
1206 // Best-effort abort. Errors are swallowed because Drop
1207 // cannot return Result; any failure (e.g., WAL write error)
1208 // is still observable through the abort path's logging.
1209 if let Err(e) = self.abort() {
1210 log::error!(
1211 "Transaction {} implicit abort on drop failed: {e}",
1212 self.id,
1213 );
1214 }
1215 } else if matches!(state, TransactionState::Prepared) {
1216 // Prepared txns dropped without resolution simulate a crash
1217 // — the durable TxnPrepare frame on disk is recovered on the
1218 // next environment open, where xa_recover() will surface the
1219 // XID for resolution. This is intentional and supports the
1220 // crash-durable XA contract introduced in wave 3-2.
1221 log::info!(
1222 "Transaction {} dropped while prepared; XID will be \
1223 surfaced via xa_recover() on next open",
1224 self.id
1225 );
1226 }
1227 }
1228}
1229
1230#[cfg(test)]
1231#[allow(deprecated)] // tests use Transaction::new directly to test state-machine logic in isolation
1232mod tests {
1233 use super::*;
1234
1235 #[test]
1236 fn test_new_transaction() {
1237 let config = TransactionConfig::default();
1238 let txn = Transaction::new(1, config);
1239 assert_eq!(txn.get_id(), 1);
1240 assert_eq!(txn.get_state(), TransactionState::Open);
1241 assert!(txn.is_valid());
1242 assert!(!txn.is_read_only());
1243 }
1244
1245 /// `set_name` / `get_name`
1246 /// round-trip and survives commit (the JE shape stays valid until
1247 /// the txn is dropped).
1248 #[test]
1249 fn test_set_name_get_name_round_trip() {
1250 let config = TransactionConfig::default();
1251 let txn = Transaction::new(1, config);
1252 assert_eq!(txn.get_name(), None);
1253
1254 txn.set_name("workload-import");
1255 assert_eq!(txn.get_name().as_deref(), Some("workload-import"));
1256
1257 // Setting again replaces.
1258 txn.set_name("workload-import-2");
1259 assert_eq!(txn.get_name().as_deref(), Some("workload-import-2"));
1260 }
1261
1262 /// `lock_count` and
1263 /// lock_counts return zero when there is no inner Txn (i.e., the
1264 /// transaction is decorative — unit-test mode without an
1265 /// EnvironmentImpl wired in).
1266 #[test]
1267 fn test_lock_counts_without_inner_txn_are_zero() {
1268 let config = TransactionConfig::default();
1269 let txn = Transaction::new(1, config);
1270 assert_eq!(txn.lock_count(), 0);
1271 assert_eq!(txn.lock_counts(), (0, 0));
1272 }
1273
1274 #[test]
1275 fn test_read_only_transaction() {
1276 let config = TransactionConfig::default().with_read_only(true);
1277 let txn = Transaction::new(2, config);
1278 assert!(txn.is_read_only());
1279 assert!(txn.is_valid());
1280 }
1281
1282 #[test]
1283 fn test_commit() {
1284 let config = TransactionConfig::default();
1285 let txn = Transaction::new(3, config);
1286 assert!(txn.commit().is_ok());
1287 assert_eq!(txn.get_state(), TransactionState::Committed);
1288 assert!(!txn.is_valid());
1289 }
1290
1291 #[test]
1292 fn test_commit_twice_fails() {
1293 let config = TransactionConfig::default();
1294 let txn = Transaction::new(4, config);
1295 assert!(txn.commit().is_ok());
1296 let result = txn.commit();
1297 assert!(result.is_err());
1298 assert!(matches!(
1299 result.unwrap_err(),
1300 NoxuError::OperationNotAllowed(_)
1301 ));
1302 }
1303
1304 #[test]
1305 fn test_abort() {
1306 let config = TransactionConfig::default();
1307 let txn = Transaction::new(5, config);
1308 assert!(txn.abort().is_ok());
1309 assert_eq!(txn.get_state(), TransactionState::Aborted);
1310 assert!(!txn.is_valid());
1311 }
1312
1313 #[test]
1314 fn test_abort_twice_fails() {
1315 let config = TransactionConfig::default();
1316 let txn = Transaction::new(6, config);
1317 assert!(txn.abort().is_ok());
1318 let result = txn.abort();
1319 assert!(result.is_err());
1320 }
1321
1322 #[test]
1323 fn test_commit_after_abort_fails() {
1324 let config = TransactionConfig::default();
1325 let txn = Transaction::new(7, config);
1326 assert!(txn.abort().is_ok());
1327 let result = txn.commit();
1328 assert!(result.is_err());
1329 }
1330
1331 #[test]
1332 fn test_abort_after_commit_fails() {
1333 let config = TransactionConfig::default();
1334 let txn = Transaction::new(8, config);
1335 assert!(txn.commit().is_ok());
1336 let result = txn.abort();
1337 assert!(result.is_err());
1338 }
1339
1340 #[test]
1341 fn test_lock_timeout() {
1342 let config = TransactionConfig::default();
1343 let txn = Transaction::new(9, config);
1344 assert_eq!(txn.get_lock_timeout(), 0);
1345 txn.set_lock_timeout(5000);
1346 assert_eq!(txn.get_lock_timeout(), 5000);
1347 }
1348
1349 #[test]
1350 fn test_txn_timeout() {
1351 let config = TransactionConfig::default();
1352 let txn = Transaction::new(10, config);
1353 assert_eq!(txn.get_txn_timeout(), 0);
1354 txn.set_txn_timeout(10000);
1355 assert_eq!(txn.get_txn_timeout(), 10000);
1356 }
1357
1358 #[test]
1359 fn test_durability() {
1360 let dur = Durability::COMMIT_SYNC;
1361 let config = TransactionConfig::default().with_durability(dur);
1362 let txn = Transaction::new(11, config);
1363 assert_eq!(txn.get_durability(), Some(dur));
1364 }
1365
1366 #[test]
1367 fn test_elapsed_time() {
1368 let config = TransactionConfig::default();
1369 let txn = Transaction::new(12, config);
1370 std::thread::sleep(std::time::Duration::from_millis(10));
1371 let elapsed = txn.elapsed();
1372 assert!(elapsed.as_millis() >= 10);
1373 }
1374
1375 #[test]
1376 fn test_commit_with_durability() {
1377 let config = TransactionConfig::default();
1378 let txn = Transaction::new(13, config);
1379 let dur = Durability::COMMIT_NO_SYNC;
1380 assert!(txn.commit_with_durability(dur).is_ok());
1381 assert_eq!(txn.get_state(), TransactionState::Committed);
1382 }
1383
1384 #[test]
1385 fn test_must_abort_state() {
1386 let config = TransactionConfig::default();
1387 let txn = Transaction::new(14, config);
1388 {
1389 let mut state = txn.state.lock().unwrap();
1390 *state = TransactionState::MustAbort;
1391 }
1392 assert_eq!(txn.get_state(), TransactionState::MustAbort);
1393 assert!(!txn.is_valid());
1394
1395 // Can still abort a MustAbort transaction
1396 assert!(txn.abort().is_ok());
1397 assert_eq!(txn.get_state(), TransactionState::Aborted);
1398 }
1399
1400 #[test]
1401 fn test_must_abort_cannot_commit() {
1402 let config = TransactionConfig::default();
1403 let txn = Transaction::new(15, config);
1404 {
1405 let mut state = txn.state.lock().unwrap();
1406 *state = TransactionState::MustAbort;
1407 }
1408
1409 let result = txn.commit();
1410 assert!(result.is_err());
1411 assert!(matches!(
1412 result.unwrap_err(),
1413 NoxuError::OperationNotAllowed(_)
1414 ));
1415 }
1416
1417 #[test]
1418 fn test_state_transitions() {
1419 let config = TransactionConfig::default();
1420 // Open -> Committed
1421 let txn1 = Transaction::new(16, config.clone());
1422 assert_eq!(txn1.get_state(), TransactionState::Open);
1423 txn1.commit().unwrap();
1424 assert_eq!(txn1.get_state(), TransactionState::Committed);
1425
1426 // Open -> Aborted
1427 let txn2 = Transaction::new(17, config);
1428 assert_eq!(txn2.get_state(), TransactionState::Open);
1429 txn2.abort().unwrap();
1430 assert_eq!(txn2.get_state(), TransactionState::Aborted);
1431 }
1432
1433 #[test]
1434 fn test_transaction_id_uniqueness() {
1435 let config = TransactionConfig::default();
1436 let txn1 = Transaction::new(100, config.clone());
1437 let txn2 = Transaction::new(101, config);
1438 assert_ne!(txn1.get_id(), txn2.get_id());
1439 }
1440}