stm_core/transaction/mod.rs
1// Copyright 2015-2016 rust-stm Developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9pub mod control_block;
10pub mod log_var;
11
12use std::collections::BTreeMap;
13use std::collections::btree_map::Entry::*;
14use std::mem;
15use std::sync::Arc;
16use std::any::Any;
17use std::cell::Cell;
18
19use self::log_var::LogVar;
20use self::log_var::LogVar::*;
21use self::control_block::ControlBlock;
22use super::tvar::{TVar, VarControlBlock};
23use super::result::*;
24use super::result::StmError::*;
25
26thread_local!(static TRANSACTION_RUNNING: Cell<bool> = Cell::new(false));
27
28/// `TransactionGuard` checks against nested STM calls.
29///
30/// Use guard, so that it correctly marks the Transaction as finished.
31struct TransactionGuard;
32
33impl TransactionGuard {
34 pub fn new() -> TransactionGuard {
35 TRANSACTION_RUNNING.with(|t| {
36 assert!(!t.get(), "STM: Nested Transaction");
37 t.set(true);
38 });
39 TransactionGuard
40 }
41}
42
43impl Drop for TransactionGuard {
44 fn drop(&mut self) {
45 TRANSACTION_RUNNING.with(|t| {
46 t.set(false);
47 });
48 }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum TransactionControl {
53 Retry, Abort
54}
55
56/// Transaction tracks all the read and written variables.
57///
58/// It is used for checking vars, to ensure atomicity.
59pub struct Transaction {
60
61 /// Map of all vars that map the `VarControlBlock` of a var to a `LogVar`.
62 /// The `VarControlBlock` is unique because it uses it's address for comparing.
63 ///
64 /// The logs need to be accessed in a order to prevend dead-locks on locking.
65 vars: BTreeMap<Arc<VarControlBlock>, LogVar>,
66}
67
68impl Transaction {
69 /// Create a new log.
70 ///
71 /// Normally you don't need to call this directly.
72 /// Use `atomically` instead.
73 fn new() -> Transaction {
74 Transaction { vars: BTreeMap::new() }
75 }
76
77 /// Run a function with a transaction.
78 ///
79 /// It is equivalent to `atomically`.
80 pub fn with<T, F>(f: F) -> T
81 where F: Fn(&mut Transaction) -> StmResult<T>,
82 {
83 match Transaction::with_control(|_| TransactionControl::Retry, f) {
84 Some(t) => t,
85 None => unreachable!()
86 }
87 }
88
89 /// Run a function with a transaction.
90 ///
91 /// `with_control` takes another control function, that
92 /// can steer the control flow and possible terminate early.
93 ///
94 /// `control` can react to counters, timeouts or external inputs.
95 ///
96 /// It allows the user to fall back to another strategy, like a global lock
97 /// in the case of too much contention.
98 ///
99 /// Please not, that the transaction may still infinitely wait for changes when `retry` is
100 /// called and `control` does not abort.
101 /// If you need a timeout, another thread should signal this through a TVar.
102 pub fn with_control<T, F, C>(mut control: C, f: F) -> Option<T>
103 where F: Fn(&mut Transaction) -> StmResult<T>,
104 C: FnMut(StmError) -> TransactionControl,
105 {
106 let _guard = TransactionGuard::new();
107
108 // create a log guard for initializing and cleaning up
109 // the log
110 let mut transaction = Transaction::new();
111
112 // loop until success
113 loop {
114 // run the computation
115 match f(&mut transaction) {
116 // on success exit loop
117 Ok(t) => {
118 if transaction.commit() {
119 return Some(t);
120 }
121 }
122
123 Err(e) => {
124 // Check if the user wants to abort the transaction.
125 if let TransactionControl::Abort = control(e) {
126 return None;
127 }
128
129 // on retry wait for changes
130 if let Retry = e {
131 transaction.wait_for_change();
132 }
133 }
134 }
135
136 // clear log before retrying computation
137 transaction.clear();
138 }
139 }
140
141 /// Perform a downcast on a var.
142 fn downcast<T: Any + Clone>(var: Arc<Any>) -> T {
143 match var.downcast_ref::<T>() {
144 Some(s) => s.clone(),
145 None => unreachable!("TVar has wrong type")
146 }
147 }
148
149 /// Read a variable and return the value.
150 ///
151 /// The returned value is not always consistent with the current value of the var,
152 /// but may be an outdated or or not yet commited value.
153 ///
154 /// The used code should be capable of handling inconsistent states
155 /// without running into infinite loops.
156 /// Just the commit of wrong values is prevented by STM.
157 pub fn read<T: Send + Sync + Any + Clone>(&mut self, var: &TVar<T>) -> StmResult<T> {
158 let ctrl = var.control_block().clone();
159 // Check if the same var was written before.
160 let value = match self.vars.entry(ctrl) {
161
162 // If the variable has been accessed before, then load that value.
163 Occupied(mut entry) => entry.get_mut().read(),
164
165 // Else load the variable statically.
166 Vacant(entry) => {
167 // Read the value from the var.
168 let value = var.read_ref_atomic();
169
170 // Store in in an entry.
171 entry.insert(Read(value.clone()));
172 value
173 }
174 };
175
176 // For now always succeeds, but that may change later.
177 Ok(Transaction::downcast(value))
178 }
179
180 /// Write a variable.
181 ///
182 /// The write is not immediately visible to other threads,
183 /// but atomically commited at the end of the computation.
184 pub fn write<T: Any + Send + Sync + Clone>(&mut self, var: &TVar<T>, value: T) -> StmResult<()> {
185 // box the value
186 let boxed = Arc::new(value);
187
188 // new control block
189 let ctrl = var.control_block().clone();
190 // update or create new entry
191 match self.vars.entry(ctrl) {
192 Occupied(mut entry) => entry.get_mut().write(boxed),
193 Vacant(entry) => { entry.insert(Write(boxed)); }
194 }
195
196 // For now always succeeds, but that may change later.
197 Ok(())
198 }
199
200 /// Combine two calculations. When one blocks with `retry`,
201 /// run the other, but don't commit the changes in the first.
202 ///
203 /// If both block, `Transaction::or` still waits for `TVar`s in both functions.
204 /// Use `Transaction::or` instead of handling errors directly with the `Result::or`.
205 /// The later does not handle all the blocking correctly.
206 pub fn or<T, F1, F2>(&mut self, first: F1, second: F2) -> StmResult<T>
207 where F1: Fn(&mut Transaction) -> StmResult<T>,
208 F2: Fn(&mut Transaction) -> StmResult<T>,
209 {
210 // Create a backup of the log.
211 let mut copy = Transaction {
212 vars: self.vars.clone()
213 };
214
215 // Run the first computation.
216 let f = first(self);
217
218 match f {
219 // Run other on manual retry call.
220 Err(Retry) => {
221 // swap, so that self is the current run
222 mem::swap(self, &mut copy);
223
224 // Run other action.
225 let s = second(self);
226
227 // If both called retry then exit.
228 match s {
229 Err(Failure) => Err(Failure),
230 s => {
231 self.combine(copy);
232 s
233 }
234 }
235 }
236
237 // Return success and failure directly
238 x => x,
239 }
240 }
241
242 /// Combine two logs into a single log, to allow waiting for all reads.
243 fn combine(&mut self, other: Transaction) {
244 // combine reads
245 for (var, value) in other.vars {
246 // only insert new values
247 if let Some(value) = value.obsolete() {
248 self.vars.entry(var).or_insert(value);
249 }
250 }
251 }
252
253 /// Clear the log's data.
254 ///
255 /// This should be used before redoing a computation, but
256 /// nowhere else.
257 fn clear(&mut self) {
258 self.vars.clear();
259 }
260
261 /// Wait for any variable to change,
262 /// because the change may lead to a new calculation result.
263 fn wait_for_change(&mut self) {
264 // Create control block for waiting.
265 let ctrl = Arc::new(ControlBlock::new());
266
267 let vars = mem::replace(&mut self.vars, BTreeMap::new());
268 let mut reads = Vec::with_capacity(self.vars.len());
269
270 let blocking = vars.into_iter()
271 .filter_map(|(a, b)| {
272 b.into_read_value()
273 .map(|b| (a, b))
274 })
275 // Check for consistency.
276 .all(|(var, value)| {
277 var.wait(&ctrl);
278 let x = {
279 // Take read lock and read value.
280 let guard = var.value.read();
281 Arc::ptr_eq(&value, &guard)
282 };
283 reads.push(var);
284 x
285 });
286
287 // If no var has changed, then block.
288 if blocking {
289 // Propably wait until one var has changed.
290 ctrl.wait();
291 }
292
293 // Let others know that ctrl is dead.
294 // It does not matter, if we set too many
295 // to dead since it may slightly reduce performance
296 // but not break the semantics.
297 for var in &reads {
298 var.set_dead();
299 }
300 }
301
302 /// Write the log back to the variables.
303 ///
304 /// Return true for success and false, if a read var has changed
305 fn commit(&mut self) -> bool {
306 // Use two phase locking for safely writing data back to the vars.
307
308 // First phase: acquire locks.
309 // Check for consistency of all the reads and perform
310 // an early return if something is not consistent.
311
312 // Created arrays for storing the locks
313 // vector of locks.
314 let mut read_vec = Vec::with_capacity(self.vars.len());
315
316 // vector of tuple (value, lock)
317 let mut write_vec = Vec::with_capacity(self.vars.len());
318
319 // vector of written variables
320 let mut written = Vec::with_capacity(self.vars.len());
321
322
323 for (var, value) in &self.vars {
324 // lock the variable and read the value
325
326 match *value {
327 // We need to take a write lock.
328 Write(ref w) | ReadObsoleteWrite(_,ref w)=> {
329 // take write lock
330 let lock = var.value.write();
331 // add all data to the vector
332 write_vec.push((w, lock));
333 written.push(var);
334 }
335
336 // We need to check for consistency and
337 // take a write lock.
338 ReadWrite(ref original,ref w) => {
339 // take write lock
340 let lock = var.value.write();
341
342 if !Arc::ptr_eq(&lock, original) {
343 return false;
344 }
345 // add all data to the vector
346 write_vec.push((w, lock));
347 written.push(var);
348 }
349 // Nothing to do. ReadObsolete is only needed for blocking, not
350 // for consistency checks.
351 ReadObsolete(_) => { }
352 // Take read lock and check for consistency.
353 Read(ref original) => {
354 // Take a read lock.
355 let lock = var.value.read();
356
357 if !Arc::ptr_eq(&lock, original) {
358 return false;
359 }
360
361 read_vec.push(lock);
362 }
363 }
364 }
365
366 // Second phase: write back and release
367
368 // Release the reads first.
369 // This allows other threads to continue quickly.
370 drop(read_vec);
371
372 for (value, mut lock) in write_vec {
373 // Commit value.
374 *lock = value.clone();
375 }
376
377 for var in written {
378 // Unblock all threads waiting for it.
379 var.wake_all();
380 }
381
382 // Commit succeded.
383 true
384 }
385}
386
387#[cfg(test)]
388mod test {
389 use super::*;
390 #[test]
391 fn read() {
392 let mut log = Transaction::new();
393 let var = TVar::new(vec![1, 2, 3, 4]);
394
395 // The variable can be read.
396 assert_eq!(&*log.read(&var).unwrap(), &[1, 2, 3, 4]);
397 }
398
399 #[test]
400 fn write_read() {
401 let mut log = Transaction::new();
402 let var = TVar::new(vec![1, 2]);
403
404 log.write(&var, vec![1, 2, 3, 4]).unwrap();
405
406 // Consecutive reads get the updated version.
407 assert_eq!(log.read(&var).unwrap(), [1, 2, 3, 4]);
408
409 // The original value is still preserved.
410 assert_eq!(var.read_atomic(), [1, 2]);
411 }
412
413 #[test]
414 fn transaction_simple() {
415 let x = Transaction::with(|_| Ok(42));
416 assert_eq!(x, 42);
417 }
418
419 #[test]
420 fn transaction_read() {
421 let read = TVar::new(42);
422
423 let x = Transaction::with(|trans| {
424 read.read(trans)
425 });
426
427 assert_eq!(x, 42);
428 }
429
430 /// Run a transaction with a control function, that always aborts.
431 /// The transaction still tries to run a single time and should successfully
432 /// commit in this test.
433 #[test]
434 fn transaction_with_control_abort_on_single_run() {
435 let read = TVar::new(42);
436
437 let x = Transaction::with_control(|_| TransactionControl::Abort, |tx| {
438 read.read(tx)
439 });
440
441 assert_eq!(x, Some(42));
442 }
443
444 /// Run a transaction with a control function, that always aborts.
445 /// The transaction retries infinitely often. The control function will abort this loop.
446 #[test]
447 fn transaction_with_control_abort_on_retry() {
448 let x: Option<i32> = Transaction::with_control(|_| TransactionControl::Abort, |_| {
449 Err(Retry)
450 });
451
452 assert_eq!(x, None);
453 }
454
455
456 #[test]
457 fn transaction_write() {
458 let write = TVar::new(42);
459
460 Transaction::with(|trans| {
461 write.write(trans, 0)
462 });
463
464 assert_eq!(write.read_atomic(), 0);
465 }
466
467 #[test]
468 fn transaction_copy() {
469 let read = TVar::new(42);
470 let write = TVar::new(0);
471
472 Transaction::with(|trans| {
473 let r = read.read(trans)?;
474 write.write(trans, r)
475 });
476
477 assert_eq!(write.read_atomic(), 42);
478 }
479
480 // Dat name. seriously?
481 #[test]
482 fn transaction_control_stuff() {
483 let read = TVar::new(42);
484 let write = TVar::new(0);
485
486 Transaction::with(|trans| {
487 let r = read.read(trans)?;
488 write.write(trans, r)
489 });
490
491 assert_eq!(write.read_atomic(), 42);
492 }
493
494 /// Test if nested transactions are correctly detected.
495 #[test]
496 #[should_panic]
497 fn transaction_nested_fail() {
498 Transaction::with(|_| {
499 Transaction::with(|_| Ok(42));
500 Ok(1)
501 });
502 }
503}