async_stm/ops.rs
1use std::mem;
2
3use crate::{auxtx::*, StmControl};
4use crate::{
5 transaction::{with_tx, Transaction, TX},
6 Stm, StmError, StmResult,
7};
8
9/// Abandon the transaction and retry after some of the variables read have changed.
10pub fn retry<T>() -> Stm<T> {
11 Err(StmControl::Retry)
12}
13
14/// Retry unless a given condition has been met.
15pub fn guard(cond: bool) -> Stm<()> {
16 if cond {
17 Ok(())
18 } else {
19 retry()
20 }
21}
22
23/// Abort the transaction with an error.
24///
25/// Use this with [atomically_or_err] and a method that returns [StmResult] instead of [Stm].
26pub fn abort<T, E1, E2>(e: E1) -> StmResult<T, E2>
27where
28 E1: Into<E2>,
29{
30 Err(StmError::Abort(e.into()))
31}
32
33/// Run the first function; if it returns a [StmControl::Retry],
34/// run the second function; if that too returns [StmControl::Retry]
35/// then combine the values they have read, so that
36/// the overall retry will react to any change.
37///
38/// If they return [StmControl::Failure] then just return that result,
39/// since the transaction can be retried right now.
40pub fn or<F, G, T>(f: F, g: G) -> Stm<T>
41where
42 F: FnOnce() -> Stm<T>,
43 G: FnOnce() -> Stm<T>,
44{
45 let mut snapshot = with_tx(|tx| tx.clone());
46
47 match f() {
48 Err(StmControl::Retry) => {
49 // Restore the original transaction state.
50 with_tx(|tx| {
51 mem::swap(tx, &mut snapshot);
52 });
53
54 match g() {
55 retry @ Err(StmControl::Retry) =>
56 // Add any variable read in the first attempt.
57 {
58 with_tx(|tx| {
59 for (id, lvar) in snapshot.log.into_iter() {
60 match tx.log.get(&id) {
61 Some(lvar) if lvar.read => {}
62 _ => {
63 tx.log.insert(id, lvar);
64 }
65 }
66 }
67 });
68 retry
69 }
70 other => other,
71 }
72 }
73 other => other,
74 }
75}
76
77/// Create a new transaction and run `f` until it returns a successful result and
78/// can be committed without running into version conflicts.
79///
80/// Make sure `f` is free of any side effects, because it can be called repeatedly.
81pub async fn atomically<T, F>(f: F) -> T
82where
83 F: Fn() -> Stm<T>,
84{
85 atomically_aux(|| NoAux, |_| f()).await
86}
87
88/// Like [atomically], but this version also takes an auxiliary transaction system
89/// that gets committed or rolled back together with the STM transaction.
90pub async fn atomically_aux<T, F, A, X>(aux: A, f: F) -> T
91where
92 X: Aux,
93 A: Fn() -> X,
94 F: Fn(&mut X) -> Stm<T>,
95{
96 atomically_or_err_aux::<_, (), _, _, _>(aux, |atx| f(atx).map_err(StmError::Control))
97 .await
98 .expect("Didn't expect `abort`. Use `atomically_or_err` instead.")
99}
100
101/// Create a new transaction and run `f` until it returns a successful result and
102/// can be committed without running into version conflicts, or until it returns
103/// an [StmError::Abort] in which case the contained error is returned.
104///
105/// Make sure `f` is free of any side effects, becuase it can be called repeatedly
106/// and also be aborted.
107pub async fn atomically_or_err<T, E, F>(f: F) -> Result<T, E>
108where
109 F: Fn() -> StmResult<T, E>,
110{
111 atomically_or_err_aux(|| NoAux, |_| f()).await
112}
113
114/// Like [atomically_or_err], but this version also takes an auxiliary transaction system.
115///
116/// Aux is passed explicitly to the closure as it's more important to see which methods
117/// use it and which don't, because it is ultimately an external dependency that needs to
118/// be carefully managed.
119///
120/// For example the method might need only read-only access, in which case a more
121/// permissive transaction can be constructed, than if we need write access to arbitrary
122/// data managed by that system.
123pub async fn atomically_or_err_aux<T, E, F, A, X>(aux: A, f: F) -> Result<T, E>
124where
125 X: Aux,
126 A: Fn() -> X,
127 F: Fn(&mut X) -> StmResult<T, E>,
128{
129 loop {
130 match exec_once(&aux, &f) {
131 ExecResult::Committed(value) => return Ok(value),
132 ExecResult::Abort(e) => return Err(e),
133 ExecResult::Restart => {}
134 ExecResult::Wait(tx) => tx.wait().await,
135 }
136 }
137}
138
139enum ExecResult<T, E> {
140 Committed(T),
141 Abort(E),
142 Restart,
143 Wait(Transaction),
144}
145
146/// Execute the STM transaction once, returning the outcome.
147///
148/// This method is synchronous, which has the effect of not requiring `X` to be `Send` and `Sync`,
149/// which would be the case if it was created in [atomically_or_err_aux].
150fn exec_once<T, E, F, A, X>(aux: &A, f: &F) -> ExecResult<T, E>
151where
152 X: Aux,
153 A: Fn() -> X,
154 F: Fn(&mut X) -> StmResult<T, E>,
155{
156 // Install a new transaction into the thread local context.
157 TX.with(|tref| {
158 let mut t = tref.borrow_mut();
159 if t.is_some() {
160 // Nesting is not supported. Use `or` instead.
161 panic!("Already running in an atomic transaction!")
162 }
163 *t = Some(Transaction::new());
164 });
165
166 // Create a new auxiliary transaction.
167 let mut atx = aux();
168
169 // Run one attempt of the atomic operation.
170 let result = f(&mut atx);
171
172 // Take the transaction from the thread local, leaving it empty.
173 let tx = TX.with(|tref| tref.borrow_mut().take().unwrap());
174
175 // See if we manage to commit some value.
176 match result {
177 Ok(value) => {
178 if let Some(version) = tx.commit(atx) {
179 tx.notify(version);
180 ExecResult::Committed(value)
181 } else {
182 ExecResult::Restart
183 }
184 }
185 Err(err) => {
186 atx.rollback();
187 match err {
188 StmError::Control(StmControl::Failure) => {
189 // We can retry straight away.
190 ExecResult::Restart
191 }
192 StmError::Control(StmControl::Retry) => {
193 // Wait until there's a change.
194 ExecResult::Wait(tx)
195 }
196 StmError::Abort(e) => {
197 // Don't retry, return the error to the caller.
198 ExecResult::Abort(e)
199 }
200 }
201 }
202 }
203}