libmdbx_remote/txn_manager.rs
1use crate::{
2 environment::EnvPtr,
3 error::{mdbx_result, Result},
4 CommitLatency,
5};
6use std::{
7 ptr,
8 sync::mpsc::{sync_channel, Receiver, SyncSender},
9};
10
11#[derive(Copy, Clone, Debug)]
12pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
13unsafe impl Send for TxnPtr {}
14unsafe impl Sync for TxnPtr {}
15
16pub(crate) enum TxnManagerMessage {
17 Begin {
18 parent: TxnPtr,
19 flags: ffi::MDBX_txn_flags_t,
20 sender: SyncSender<Result<TxnPtr>>,
21 },
22 Abort {
23 tx: TxnPtr,
24 sender: SyncSender<Result<bool>>,
25 },
26 Commit {
27 tx: TxnPtr,
28 sender: SyncSender<Result<(bool, CommitLatency)>>,
29 },
30}
31
32/// Manages transactions by doing two things:
33/// - Opening, aborting, and committing transactions using [`TxnManager::send_message`] with the
34/// corresponding [`TxnManagerMessage`]
35/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
36/// `TxnManager::with_max_read_transaction_duration` is called)
37#[derive(Debug)]
38pub(crate) struct TxnManager {
39 sender: SyncSender<TxnManagerMessage>,
40 #[cfg(feature = "read-tx-timeouts")]
41 read_transactions: Option<std::sync::Arc<read_transactions::ReadTransactions>>,
42}
43
44impl TxnManager {
45 pub(crate) fn new(env: EnvPtr) -> Self {
46 let (tx, rx) = sync_channel(0);
47 let txn_manager = Self {
48 sender: tx,
49 #[cfg(feature = "read-tx-timeouts")]
50 read_transactions: None,
51 };
52
53 txn_manager.start_message_listener(env, rx);
54
55 txn_manager
56 }
57
58 /// Spawns a new [`std::thread`] that listens to incoming [`TxnManagerMessage`] messages,
59 /// executes an FFI function, and returns the result on the provided channel.
60 ///
61 /// - [`TxnManagerMessage::Begin`] opens a new transaction with [`ffi::mdbx_txn_begin_ex`]
62 /// - [`TxnManagerMessage::Abort`] aborts a transaction with [`ffi::mdbx_txn_abort`]
63 /// - [`TxnManagerMessage::Commit`] commits a transaction with [`ffi::mdbx_txn_commit_ex`]
64 fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
65 let task = move || {
66 #[allow(clippy::redundant_locals)]
67 let env = env;
68 loop {
69 match rx.recv() {
70 Ok(msg) => match msg {
71 TxnManagerMessage::Begin {
72 parent,
73 flags,
74 sender,
75 } => {
76 let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
77 let res = mdbx_result(unsafe {
78 ffi::mdbx_txn_begin_ex(
79 env.0,
80 parent.0,
81 flags,
82 &mut txn,
83 ptr::null_mut(),
84 )
85 })
86 .map(|_| TxnPtr(txn));
87 sender.send(res).unwrap();
88 }
89 TxnManagerMessage::Abort { tx, sender } => {
90 sender
91 .send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) }))
92 .unwrap();
93 }
94 TxnManagerMessage::Commit { tx, sender } => {
95 sender
96 .send({
97 let mut latency = CommitLatency::new();
98 mdbx_result(unsafe {
99 ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
100 })
101 .map(|v| (v, latency))
102 })
103 .unwrap();
104 }
105 },
106 Err(_) => return,
107 }
108 }
109 };
110 std::thread::Builder::new()
111 .name("mbdx-rs-txn-manager".to_string())
112 .spawn(task)
113 .unwrap();
114 }
115
116 pub(crate) fn send_message(&self, message: TxnManagerMessage) {
117 self.sender.send(message).unwrap()
118 }
119}
120
121#[cfg(feature = "read-tx-timeouts")]
122mod read_transactions {
123 use crate::{
124 environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
125 txn_manager::TxnManager,
126 };
127 use dashmap::{DashMap, DashSet};
128 use std::{
129 sync::{mpsc::sync_channel, Arc},
130 time::{Duration, Instant},
131 };
132 use tracing::{error, trace, warn};
133
134 const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
135
136 impl TxnManager {
137 /// Returns a new instance for which the maximum duration that a read transaction can be
138 /// open is set.
139 pub(crate) fn new_with_max_read_transaction_duration(
140 env: EnvPtr,
141 duration: Duration,
142 ) -> Self {
143 let read_transactions = Arc::new(ReadTransactions::new(duration));
144 read_transactions.clone().start_monitor();
145
146 let (tx, rx) = sync_channel(0);
147
148 let txn_manager = Self {
149 sender: tx,
150 read_transactions: Some(read_transactions),
151 };
152
153 txn_manager.start_message_listener(env, rx);
154
155 txn_manager
156 }
157
158 /// Adds a new transaction to the list of active read transactions.
159 pub(crate) fn add_active_read_transaction(
160 &self,
161 ptr: *mut ffi::MDBX_txn,
162 tx: TransactionPtr,
163 ) {
164 if let Some(read_transactions) = &self.read_transactions {
165 read_transactions.add_active(ptr, tx);
166 }
167 }
168
169 /// Removes a transaction from the list of active read transactions.
170 pub(crate) fn remove_active_read_transaction(
171 &self,
172 ptr: *mut ffi::MDBX_txn,
173 ) -> Option<(usize, (TransactionPtr, Instant))> {
174 self.read_transactions.as_ref()?.remove_active(ptr)
175 }
176
177 /// Returns the number of timed out transactions that were not aborted by the user yet.
178 pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
179 self.read_transactions
180 .as_ref()
181 .map(|read_transactions| read_transactions.timed_out_not_aborted())
182 }
183 }
184
185 #[derive(Debug, Default)]
186 pub(super) struct ReadTransactions {
187 /// Maximum duration that a read transaction can be open until the
188 /// [`ReadTransactions::start_monitor`] aborts it.
189 max_duration: Duration,
190 /// List of currently active read transactions.
191 ///
192 /// We store `usize` instead of a raw pointer as a key, because pointers are not
193 /// comparable. The time of transaction opening is stored as a value.
194 active: DashMap<usize, (TransactionPtr, Instant)>,
195 /// List of timed out transactions that were not aborted by the user yet, hence have a
196 /// dangling read transaction pointer.
197 timed_out_not_aborted: DashSet<usize>,
198 }
199
200 impl ReadTransactions {
201 pub(super) fn new(max_duration: Duration) -> Self {
202 Self {
203 max_duration,
204 ..Default::default()
205 }
206 }
207
208 /// Adds a new transaction to the list of active read transactions.
209 pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
210 let _ = self.active.insert(ptr as usize, (tx, Instant::now()));
211 }
212
213 /// Removes a transaction from the list of active read transactions.
214 pub(super) fn remove_active(
215 &self,
216 ptr: *mut ffi::MDBX_txn,
217 ) -> Option<(usize, (TransactionPtr, Instant))> {
218 self.timed_out_not_aborted.remove(&(ptr as usize));
219 self.active.remove(&(ptr as usize))
220 }
221
222 /// Returns the number of timed out transactions that were not aborted by the user yet.
223 pub(super) fn timed_out_not_aborted(&self) -> usize {
224 self.timed_out_not_aborted.len()
225 }
226
227 /// Spawns a new [`std::thread`] that monitors the list of active read transactions and
228 /// timeouts those that are open for longer than `ReadTransactions.max_duration`.
229 pub(super) fn start_monitor(self: Arc<Self>) {
230 let task = move || {
231 let mut timed_out_active = Vec::new();
232
233 loop {
234 let now = Instant::now();
235 let mut max_active_transaction_duration = None;
236
237 // Iterate through active read transactions and time out those that's open for
238 // longer than `self.max_duration`.
239 for entry in &self.active {
240 let (tx, start) = entry.value();
241 let duration = now - *start;
242
243 if duration > self.max_duration {
244 let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
245 // Time out the transaction.
246 //
247 // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to
248 // prevent MDBX from reusing the pointer of the aborted
249 // transaction for new read-only transactions. This is
250 // important because we store the pointer in the `active` list
251 // and assume that it is unique.
252 //
253 // See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info.
254 let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) });
255 if result.is_ok() {
256 tx.set_timed_out();
257 }
258 (txn_ptr, duration, result)
259 });
260
261 match result {
262 Ok((txn_ptr, duration, error)) => {
263 // Add the transaction to `timed_out_active`. We can't remove it
264 // instantly from the list of active transactions, because we
265 // iterate through it.
266 timed_out_active.push((txn_ptr, duration, error));
267 }
268 Err(err) => {
269 error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction")
270 }
271 }
272 } else {
273 max_active_transaction_duration = Some(
274 duration.max(max_active_transaction_duration.unwrap_or_default()),
275 );
276 }
277 }
278
279 // Walk through timed out transactions, and delete them from the list of active
280 // transactions.
281 for (ptr, open_duration, err) in timed_out_active.iter().copied() {
282 // Try deleting the transaction from the list of active transactions.
283 let was_in_active = self.remove_active(ptr).is_some();
284 if let Err(err) = err {
285 if was_in_active {
286 // If the transaction was in the list of active transactions,
287 // then user didn't abort it and we failed to do so.
288 error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction");
289 }
290 } else {
291 // Happy path, the transaction has been timed out by us with no errors.
292 warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out");
293 // Add transaction to the list of timed out transactions that were not
294 // aborted by the user yet.
295 self.timed_out_not_aborted.insert(ptr as usize);
296 }
297 }
298
299 // Clear the list of timed out transactions, but not de-allocate the reserved
300 // capacity to save on further pushes.
301 timed_out_active.clear();
302
303 if !self.active.is_empty() {
304 trace!(
305 target: "libmdbx",
306 elapsed = ?now.elapsed(),
307 active = ?self.active.iter().map(|entry| {
308 let (tx, start) = entry.value();
309 (tx.clone(), start.elapsed())
310 }).collect::<Vec<_>>(),
311 "Read transactions"
312 );
313 }
314
315 // Sleep not more than `READ_TRANSACTIONS_CHECK_INTERVAL`, but at least until
316 // the closest deadline of an active read transaction
317 let duration_until_closest_deadline =
318 self.max_duration - max_active_transaction_duration.unwrap_or_default();
319 std::thread::sleep(
320 READ_TRANSACTIONS_CHECK_INTERVAL.min(duration_until_closest_deadline),
321 );
322 }
323 };
324 std::thread::Builder::new()
325 .name("mdbx-rs-read-tx-timeouts".to_string())
326 .spawn(task)
327 .unwrap();
328 }
329 }
330
331 #[cfg(test)]
332 mod tests {
333 use crate::{
334 txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
335 MaxReadTransactionDuration,
336 };
337 use std::{thread::sleep, time::Duration};
338 use tempfile::tempdir;
339
340 #[test]
341 fn txn_manager_read_transactions_duration_set() {
342 const MAX_DURATION: Duration = Duration::from_secs(1);
343
344 let dir = tempdir().unwrap();
345 let env = Environment::builder()
346 .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
347 .open(dir.path())
348 .unwrap();
349
350 let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
351
352 // Create a read-only transaction, successfully use it, close it by dropping.
353 {
354 let tx = env.begin_ro_txn().unwrap();
355 let tx_ptr = tx.txn() as usize;
356 assert!(read_transactions.active.contains_key(&tx_ptr));
357
358 tx.open_db(None).unwrap();
359 drop(tx);
360
361 assert!(!read_transactions.active.contains_key(&tx_ptr));
362 }
363
364 // Create a read-only transaction, successfully use it, close it by committing.
365 {
366 let tx = env.begin_ro_txn().unwrap();
367 let tx_ptr = tx.txn() as usize;
368 assert!(read_transactions.active.contains_key(&tx_ptr));
369
370 tx.open_db(None).unwrap();
371 tx.commit().unwrap();
372
373 assert!(!read_transactions.active.contains_key(&tx_ptr));
374 }
375
376 {
377 // Create a read-only transaction and observe it's in the list of active
378 // transactions.
379 let tx = env.begin_ro_txn().unwrap();
380 let tx_ptr = tx.txn() as usize;
381 assert!(read_transactions.active.contains_key(&tx_ptr));
382
383 // Wait until the transaction is timed out by the manager.
384 sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
385
386 // Ensure that the transaction is not in the list of active transactions anymore,
387 // and is in the list of timed out but not aborted transactions.
388 assert!(!read_transactions.active.contains_key(&tx_ptr));
389 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
390
391 // Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
392 assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
393 assert!(!read_transactions.active.contains_key(&tx_ptr));
394 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
395
396 assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
397 assert!(!read_transactions.active.contains_key(&tx_ptr));
398 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
399
400 // Ensure that the transaction pointer is not reused when opening a new read-only
401 // transaction.
402 let new_tx = env.begin_ro_txn().unwrap();
403 let new_tx_ptr = new_tx.txn() as usize;
404 assert!(read_transactions.active.contains_key(&new_tx_ptr));
405 assert_ne!(tx_ptr, new_tx_ptr);
406
407 // Drop the transaction and ensure that it's not in the list of timed out but not
408 // aborted transactions anymore.
409 drop(tx);
410 assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
411 }
412 }
413
414 #[test]
415 fn txn_manager_read_transactions_duration_unbounded() {
416 let dir = tempdir().unwrap();
417 let env = Environment::builder()
418 .set_max_read_transaction_duration(MaxReadTransactionDuration::Unbounded)
419 .open(dir.path())
420 .unwrap();
421
422 assert!(env.txn_manager().read_transactions.is_none());
423
424 let tx = env.begin_ro_txn().unwrap();
425 sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
426 assert!(tx.commit().is_ok())
427 }
428 }
429}