async_stm/
ops.rs

1use std::mem;
2
3use crate::{auxtx::*, StmControl};
4use crate::{
5    transaction::{with_tx, Transaction, TX},
6    Stm, StmError, StmResult,
7};
8
9/// Abandon the transaction and retry after some of the variables read have changed.
10pub fn retry<T>() -> Stm<T> {
11    Err(StmControl::Retry)
12}
13
14/// Retry unless a given condition has been met.
15pub fn guard(cond: bool) -> Stm<()> {
16    if cond {
17        Ok(())
18    } else {
19        retry()
20    }
21}
22
23/// Abort the transaction with an error.
24///
25/// Use this with [atomically_or_err] and a method that returns [StmResult] instead of [Stm].
26pub fn abort<T, E1, E2>(e: E1) -> StmResult<T, E2>
27where
28    E1: Into<E2>,
29{
30    Err(StmError::Abort(e.into()))
31}
32
33/// Run the first function; if it returns a [StmControl::Retry],
34/// run the second function; if that too returns [StmControl::Retry]
35/// then combine the values they have read, so that
36/// the overall retry will react to any change.
37///
38/// If they return [StmControl::Failure] then just return that result,
39/// since the transaction can be retried right now.
40pub fn or<F, G, T>(f: F, g: G) -> Stm<T>
41where
42    F: FnOnce() -> Stm<T>,
43    G: FnOnce() -> Stm<T>,
44{
45    let mut snapshot = with_tx(|tx| tx.clone());
46
47    match f() {
48        Err(StmControl::Retry) => {
49            // Restore the original transaction state.
50            with_tx(|tx| {
51                mem::swap(tx, &mut snapshot);
52            });
53
54            match g() {
55                retry @ Err(StmControl::Retry) =>
56                // Add any variable read in the first attempt.
57                {
58                    with_tx(|tx| {
59                        for (id, lvar) in snapshot.log.into_iter() {
60                            match tx.log.get(&id) {
61                                Some(lvar) if lvar.read => {}
62                                _ => {
63                                    tx.log.insert(id, lvar);
64                                }
65                            }
66                        }
67                    });
68                    retry
69                }
70                other => other,
71            }
72        }
73        other => other,
74    }
75}
76
77/// Create a new transaction and run `f` until it returns a successful result and
78/// can be committed without running into version conflicts.
79///
80/// Make sure `f` is free of any side effects, because it can be called repeatedly.
81pub async fn atomically<T, F>(f: F) -> T
82where
83    F: Fn() -> Stm<T>,
84{
85    atomically_aux(|| NoAux, |_| f()).await
86}
87
88/// Like [atomically], but this version also takes an auxiliary transaction system
89/// that gets committed or rolled back together with the STM transaction.
90pub async fn atomically_aux<T, F, A, X>(aux: A, f: F) -> T
91where
92    X: Aux,
93    A: Fn() -> X,
94    F: Fn(&mut X) -> Stm<T>,
95{
96    atomically_or_err_aux::<_, (), _, _, _>(aux, |atx| f(atx).map_err(StmError::Control))
97        .await
98        .expect("Didn't expect `abort`. Use `atomically_or_err` instead.")
99}
100
101/// Create a new transaction and run `f` until it returns a successful result and
102/// can be committed without running into version conflicts, or until it returns
103/// an [StmError::Abort] in which case the contained error is returned.
104///
105/// Make sure `f` is free of any side effects, becuase it can be called repeatedly
106/// and also be aborted.
107pub async fn atomically_or_err<T, E, F>(f: F) -> Result<T, E>
108where
109    F: Fn() -> StmResult<T, E>,
110{
111    atomically_or_err_aux(|| NoAux, |_| f()).await
112}
113
114/// Like [atomically_or_err], but this version also takes an auxiliary transaction system.
115///
116/// Aux is passed explicitly to the closure as it's more important to see which methods
117/// use it and which don't, because it is ultimately an external dependency that needs to
118/// be carefully managed.
119///
120/// For example the method might need only read-only access, in which case a more
121/// permissive transaction can be constructed, than if we need write access to arbitrary
122/// data managed by that system.
123pub async fn atomically_or_err_aux<T, E, F, A, X>(aux: A, f: F) -> Result<T, E>
124where
125    X: Aux,
126    A: Fn() -> X,
127    F: Fn(&mut X) -> StmResult<T, E>,
128{
129    loop {
130        match exec_once(&aux, &f) {
131            ExecResult::Committed(value) => return Ok(value),
132            ExecResult::Abort(e) => return Err(e),
133            ExecResult::Restart => {}
134            ExecResult::Wait(tx) => tx.wait().await,
135        }
136    }
137}
138
139enum ExecResult<T, E> {
140    Committed(T),
141    Abort(E),
142    Restart,
143    Wait(Transaction),
144}
145
146/// Execute the STM transaction once, returning the outcome.
147///
148/// This method is synchronous, which has the effect of not requiring `X` to be `Send` and `Sync`,
149/// which would be the case if it was created in [atomically_or_err_aux].
150fn exec_once<T, E, F, A, X>(aux: &A, f: &F) -> ExecResult<T, E>
151where
152    X: Aux,
153    A: Fn() -> X,
154    F: Fn(&mut X) -> StmResult<T, E>,
155{
156    // Install a new transaction into the thread local context.
157    TX.with(|tref| {
158        let mut t = tref.borrow_mut();
159        if t.is_some() {
160            // Nesting is not supported. Use `or` instead.
161            panic!("Already running in an atomic transaction!")
162        }
163        *t = Some(Transaction::new());
164    });
165
166    // Create a new auxiliary transaction.
167    let mut atx = aux();
168
169    // Run one attempt of the atomic operation.
170    let result = f(&mut atx);
171
172    // Take the transaction from the thread local, leaving it empty.
173    let tx = TX.with(|tref| tref.borrow_mut().take().unwrap());
174
175    // See if we manage to commit some value.
176    match result {
177        Ok(value) => {
178            if let Some(version) = tx.commit(atx) {
179                tx.notify(version);
180                ExecResult::Committed(value)
181            } else {
182                ExecResult::Restart
183            }
184        }
185        Err(err) => {
186            atx.rollback();
187            match err {
188                StmError::Control(StmControl::Failure) => {
189                    // We can retry straight away.
190                    ExecResult::Restart
191                }
192                StmError::Control(StmControl::Retry) => {
193                    // Wait until there's a change.
194                    ExecResult::Wait(tx)
195                }
196                StmError::Abort(e) => {
197                    // Don't retry, return the error to the caller.
198                    ExecResult::Abort(e)
199                }
200            }
201        }
202    }
203}