libmdbx_remote/
txn_manager.rs

1use crate::{
2    environment::EnvPtr,
3    error::{mdbx_result, Result},
4    CommitLatency,
5};
6use std::{
7    ptr,
8    sync::mpsc::{sync_channel, Receiver, SyncSender},
9};
10
11#[derive(Copy, Clone, Debug)]
12pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
13unsafe impl Send for TxnPtr {}
14unsafe impl Sync for TxnPtr {}
15
16pub(crate) enum TxnManagerMessage {
17    Begin {
18        parent: TxnPtr,
19        flags: ffi::MDBX_txn_flags_t,
20        sender: SyncSender<Result<TxnPtr>>,
21    },
22    Abort {
23        tx: TxnPtr,
24        sender: SyncSender<Result<bool>>,
25    },
26    Commit {
27        tx: TxnPtr,
28        sender: SyncSender<Result<(bool, CommitLatency)>>,
29    },
30}
31
32/// Manages transactions by doing two things:
33/// - Opening, aborting, and committing transactions using [`TxnManager::send_message`] with the
34///   corresponding [`TxnManagerMessage`]
35/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
36///   `TxnManager::with_max_read_transaction_duration` is called)
37#[derive(Debug)]
38pub(crate) struct TxnManager {
39    sender: SyncSender<TxnManagerMessage>,
40    #[cfg(feature = "read-tx-timeouts")]
41    read_transactions: Option<std::sync::Arc<read_transactions::ReadTransactions>>,
42}
43
44impl TxnManager {
45    pub(crate) fn new(env: EnvPtr) -> Self {
46        let (tx, rx) = sync_channel(0);
47        let txn_manager = Self {
48            sender: tx,
49            #[cfg(feature = "read-tx-timeouts")]
50            read_transactions: None,
51        };
52
53        txn_manager.start_message_listener(env, rx);
54
55        txn_manager
56    }
57
58    /// Spawns a new [`std::thread`] that listens to incoming [`TxnManagerMessage`] messages,
59    /// executes an FFI function, and returns the result on the provided channel.
60    ///
61    /// - [`TxnManagerMessage::Begin`] opens a new transaction with [`ffi::mdbx_txn_begin_ex`]
62    /// - [`TxnManagerMessage::Abort`] aborts a transaction with [`ffi::mdbx_txn_abort`]
63    /// - [`TxnManagerMessage::Commit`] commits a transaction with [`ffi::mdbx_txn_commit_ex`]
64    fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
65        let task = move || {
66            #[allow(clippy::redundant_locals)]
67            let env = env;
68            loop {
69                match rx.recv() {
70                    Ok(msg) => match msg {
71                        TxnManagerMessage::Begin {
72                            parent,
73                            flags,
74                            sender,
75                        } => {
76                            let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
77                            let res = mdbx_result(unsafe {
78                                ffi::mdbx_txn_begin_ex(
79                                    env.0,
80                                    parent.0,
81                                    flags,
82                                    &mut txn,
83                                    ptr::null_mut(),
84                                )
85                            })
86                            .map(|_| TxnPtr(txn));
87                            sender.send(res).unwrap();
88                        }
89                        TxnManagerMessage::Abort { tx, sender } => {
90                            sender
91                                .send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) }))
92                                .unwrap();
93                        }
94                        TxnManagerMessage::Commit { tx, sender } => {
95                            sender
96                                .send({
97                                    let mut latency = CommitLatency::new();
98                                    mdbx_result(unsafe {
99                                        ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
100                                    })
101                                    .map(|v| (v, latency))
102                                })
103                                .unwrap();
104                        }
105                    },
106                    Err(_) => return,
107                }
108            }
109        };
110        std::thread::Builder::new()
111            .name("mbdx-rs-txn-manager".to_string())
112            .spawn(task)
113            .unwrap();
114    }
115
116    pub(crate) fn send_message(&self, message: TxnManagerMessage) {
117        self.sender.send(message).unwrap()
118    }
119}
120
121#[cfg(feature = "read-tx-timeouts")]
122mod read_transactions {
123    use crate::{
124        environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
125        txn_manager::TxnManager,
126    };
127    use dashmap::{DashMap, DashSet};
128    use std::{
129        sync::{mpsc::sync_channel, Arc},
130        time::{Duration, Instant},
131    };
132    use tracing::{error, trace, warn};
133
134    const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
135
136    impl TxnManager {
137        /// Returns a new instance for which the maximum duration that a read transaction can be
138        /// open is set.
139        pub(crate) fn new_with_max_read_transaction_duration(
140            env: EnvPtr,
141            duration: Duration,
142        ) -> Self {
143            let read_transactions = Arc::new(ReadTransactions::new(duration));
144            read_transactions.clone().start_monitor();
145
146            let (tx, rx) = sync_channel(0);
147
148            let txn_manager = Self {
149                sender: tx,
150                read_transactions: Some(read_transactions),
151            };
152
153            txn_manager.start_message_listener(env, rx);
154
155            txn_manager
156        }
157
158        /// Adds a new transaction to the list of active read transactions.
159        pub(crate) fn add_active_read_transaction(
160            &self,
161            ptr: *mut ffi::MDBX_txn,
162            tx: TransactionPtr,
163        ) {
164            if let Some(read_transactions) = &self.read_transactions {
165                read_transactions.add_active(ptr, tx);
166            }
167        }
168
169        /// Removes a transaction from the list of active read transactions.
170        pub(crate) fn remove_active_read_transaction(
171            &self,
172            ptr: *mut ffi::MDBX_txn,
173        ) -> Option<(usize, (TransactionPtr, Instant))> {
174            self.read_transactions.as_ref()?.remove_active(ptr)
175        }
176
177        /// Returns the number of timed out transactions that were not aborted by the user yet.
178        pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
179            self.read_transactions
180                .as_ref()
181                .map(|read_transactions| read_transactions.timed_out_not_aborted())
182        }
183    }
184
185    #[derive(Debug, Default)]
186    pub(super) struct ReadTransactions {
187        /// Maximum duration that a read transaction can be open until the
188        /// [`ReadTransactions::start_monitor`] aborts it.
189        max_duration: Duration,
190        /// List of currently active read transactions.
191        ///
192        /// We store `usize` instead of a raw pointer as a key, because pointers are not
193        /// comparable. The time of transaction opening is stored as a value.
194        active: DashMap<usize, (TransactionPtr, Instant)>,
195        /// List of timed out transactions that were not aborted by the user yet, hence have a
196        /// dangling read transaction pointer.
197        timed_out_not_aborted: DashSet<usize>,
198    }
199
200    impl ReadTransactions {
201        pub(super) fn new(max_duration: Duration) -> Self {
202            Self {
203                max_duration,
204                ..Default::default()
205            }
206        }
207
208        /// Adds a new transaction to the list of active read transactions.
209        pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
210            let _ = self.active.insert(ptr as usize, (tx, Instant::now()));
211        }
212
213        /// Removes a transaction from the list of active read transactions.
214        pub(super) fn remove_active(
215            &self,
216            ptr: *mut ffi::MDBX_txn,
217        ) -> Option<(usize, (TransactionPtr, Instant))> {
218            self.timed_out_not_aborted.remove(&(ptr as usize));
219            self.active.remove(&(ptr as usize))
220        }
221
222        /// Returns the number of timed out transactions that were not aborted by the user yet.
223        pub(super) fn timed_out_not_aborted(&self) -> usize {
224            self.timed_out_not_aborted.len()
225        }
226
227        /// Spawns a new [`std::thread`] that monitors the list of active read transactions and
228        /// timeouts those that are open for longer than `ReadTransactions.max_duration`.
229        pub(super) fn start_monitor(self: Arc<Self>) {
230            let task = move || {
231                let mut timed_out_active = Vec::new();
232
233                loop {
234                    let now = Instant::now();
235                    let mut max_active_transaction_duration = None;
236
237                    // Iterate through active read transactions and time out those that's open for
238                    // longer than `self.max_duration`.
239                    for entry in &self.active {
240                        let (tx, start) = entry.value();
241                        let duration = now - *start;
242
243                        if duration > self.max_duration {
244                            let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
245                                // Time out the transaction.
246                                //
247                                // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to
248                                // prevent MDBX from reusing the pointer of the aborted
249                                // transaction for new read-only transactions. This is
250                                // important because we store the pointer in the `active` list
251                                // and assume that it is unique.
252                                //
253                                // See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info.
254                                let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) });
255                                if result.is_ok() {
256                                    tx.set_timed_out();
257                                }
258                                (txn_ptr, duration, result)
259                            });
260
261                            match result {
262                                Ok((txn_ptr, duration, error)) => {
263                                    // Add the transaction to `timed_out_active`. We can't remove it
264                                    // instantly from the list of active transactions, because we
265                                    // iterate through it.
266                                    timed_out_active.push((txn_ptr, duration, error));
267                                }
268                                Err(err) => {
269                                    error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction")
270                                }
271                            }
272                        } else {
273                            max_active_transaction_duration = Some(
274                                duration.max(max_active_transaction_duration.unwrap_or_default()),
275                            );
276                        }
277                    }
278
279                    // Walk through timed out transactions, and delete them from the list of active
280                    // transactions.
281                    for (ptr, open_duration, err) in timed_out_active.iter().copied() {
282                        // Try deleting the transaction from the list of active transactions.
283                        let was_in_active = self.remove_active(ptr).is_some();
284                        if let Err(err) = err {
285                            if was_in_active {
286                                // If the transaction was in the list of active transactions,
287                                // then user didn't abort it and we failed to do so.
288                                error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction");
289                            }
290                        } else {
291                            // Happy path, the transaction has been timed out by us with no errors.
292                            warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out");
293                            // Add transaction to the list of timed out transactions that were not
294                            // aborted by the user yet.
295                            self.timed_out_not_aborted.insert(ptr as usize);
296                        }
297                    }
298
299                    // Clear the list of timed out transactions, but not de-allocate the reserved
300                    // capacity to save on further pushes.
301                    timed_out_active.clear();
302
303                    if !self.active.is_empty() {
304                        trace!(
305                            target: "libmdbx",
306                            elapsed = ?now.elapsed(),
307                            active = ?self.active.iter().map(|entry| {
308                                let (tx, start) = entry.value();
309                                (tx.clone(), start.elapsed())
310                            }).collect::<Vec<_>>(),
311                            "Read transactions"
312                        );
313                    }
314
315                    // Sleep not more than `READ_TRANSACTIONS_CHECK_INTERVAL`, but at least until
316                    // the closest deadline of an active read transaction
317                    let duration_until_closest_deadline =
318                        self.max_duration - max_active_transaction_duration.unwrap_or_default();
319                    std::thread::sleep(
320                        READ_TRANSACTIONS_CHECK_INTERVAL.min(duration_until_closest_deadline),
321                    );
322                }
323            };
324            std::thread::Builder::new()
325                .name("mdbx-rs-read-tx-timeouts".to_string())
326                .spawn(task)
327                .unwrap();
328        }
329    }
330
331    #[cfg(test)]
332    mod tests {
333        use crate::{
334            txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
335            MaxReadTransactionDuration,
336        };
337        use std::{thread::sleep, time::Duration};
338        use tempfile::tempdir;
339
340        #[test]
341        fn txn_manager_read_transactions_duration_set() {
342            const MAX_DURATION: Duration = Duration::from_secs(1);
343
344            let dir = tempdir().unwrap();
345            let env = Environment::builder()
346                .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
347                .open(dir.path())
348                .unwrap();
349
350            let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
351
352            // Create a read-only transaction, successfully use it, close it by dropping.
353            {
354                let tx = env.begin_ro_txn().unwrap();
355                let tx_ptr = tx.txn() as usize;
356                assert!(read_transactions.active.contains_key(&tx_ptr));
357
358                tx.open_db(None).unwrap();
359                drop(tx);
360
361                assert!(!read_transactions.active.contains_key(&tx_ptr));
362            }
363
364            // Create a read-only transaction, successfully use it, close it by committing.
365            {
366                let tx = env.begin_ro_txn().unwrap();
367                let tx_ptr = tx.txn() as usize;
368                assert!(read_transactions.active.contains_key(&tx_ptr));
369
370                tx.open_db(None).unwrap();
371                tx.commit().unwrap();
372
373                assert!(!read_transactions.active.contains_key(&tx_ptr));
374            }
375
376            {
377                // Create a read-only transaction and observe it's in the list of active
378                // transactions.
379                let tx = env.begin_ro_txn().unwrap();
380                let tx_ptr = tx.txn() as usize;
381                assert!(read_transactions.active.contains_key(&tx_ptr));
382
383                // Wait until the transaction is timed out by the manager.
384                sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
385
386                // Ensure that the transaction is not in the list of active transactions anymore,
387                // and is in the list of timed out but not aborted transactions.
388                assert!(!read_transactions.active.contains_key(&tx_ptr));
389                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
390
391                // Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
392                assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
393                assert!(!read_transactions.active.contains_key(&tx_ptr));
394                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
395
396                assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
397                assert!(!read_transactions.active.contains_key(&tx_ptr));
398                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
399
400                // Ensure that the transaction pointer is not reused when opening a new read-only
401                // transaction.
402                let new_tx = env.begin_ro_txn().unwrap();
403                let new_tx_ptr = new_tx.txn() as usize;
404                assert!(read_transactions.active.contains_key(&new_tx_ptr));
405                assert_ne!(tx_ptr, new_tx_ptr);
406
407                // Drop the transaction and ensure that it's not in the list of timed out but not
408                // aborted transactions anymore.
409                drop(tx);
410                assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
411            }
412        }
413
414        #[test]
415        fn txn_manager_read_transactions_duration_unbounded() {
416            let dir = tempdir().unwrap();
417            let env = Environment::builder()
418                .set_max_read_transaction_duration(MaxReadTransactionDuration::Unbounded)
419                .open(dir.path())
420                .unwrap();
421
422            assert!(env.txn_manager().read_transactions.is_none());
423
424            let tx = env.begin_ro_txn().unwrap();
425            sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
426            assert!(tx.commit().is_ok())
427        }
428    }
429}