stm_core/tvar.rs
1// Copyright 2015-2016 rust-stm Developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use std::sync::{Arc, Weak};
10use parking_lot::{Mutex, RwLock};
11use std::mem;
12use std::sync::atomic::{self, AtomicUsize};
13use std::cmp;
14use std::any::Any;
15use std::marker::PhantomData;
16use std::fmt::{Debug, self};
17
18use super::result::*;
19use super::transaction::control_block::ControlBlock;
20use super::Transaction;
21
22/// `VarControlBlock` contains all the useful data for a `Var` while beeing the same type.
23///
24/// The control block is accessed from other threads directly whereas `Var`
25/// is just a typesafe wrapper around it.
26pub struct VarControlBlock {
27 /// `waiting_threads` is a list of all waiting threads protected by a mutex.
28 waiting_threads: Mutex<Vec<Weak<ControlBlock>>>,
29
30 /// `dead_threads` is a counter for all dead threads.
31 ///
32 /// When there are many dead threads waiting for a change, but
33 /// nobody changes the value, then an automatic collection is
34 /// performed.
35 dead_threads: AtomicUsize,
36
37 /// The inner value of the Var.
38 ///
39 /// It can be shared through a Arc without copying it too often.
40 ///
41 /// The Arc is also used by the threads to detect changes.
42 /// The value in it should not be changed or locked because
43 /// that may cause multiple threads to block unforeseen as well as
44 /// causing deadlocks.
45 ///
46 /// The shared reference is protected by a `RWLock` so that multiple
47 /// threads can safely block it. This ensures consistency, without
48 /// preventing other threads from accessing the values.
49 ///
50 /// Starvation may occur, if one thread wants to write-lock but others
51 /// keep holding read-locks.
52 pub value: RwLock<Arc<Any + Send + Sync>>,
53}
54
55
56impl VarControlBlock {
57 /// create a new empty `VarControlBlock`
58 pub fn new<T>(val: T) -> Arc<VarControlBlock>
59 where T: Any + Sync + Send
60 {
61 let ctrl = VarControlBlock {
62 waiting_threads: Mutex::new(Vec::new()),
63 dead_threads: AtomicUsize::new(0),
64 value: RwLock::new(Arc::new(val)),
65 };
66 Arc::new(ctrl)
67 }
68
69 /// Wake all threads that are waiting for this block.
70 pub fn wake_all(&self) {
71 // Atomically take all waiting threads from the value.
72 let threads = {
73 let mut guard = self.waiting_threads.lock();
74 let inner: &mut Vec<_> = &mut guard;
75 mem::replace(inner, Vec::new())
76 };
77
78 // Take all, that are still alive.
79 let threads = threads.iter()
80 .filter_map(Weak::upgrade);
81
82 // Release all the semaphores to start the thread.
83 for thread in threads {
84 // Inform thread that this var has changed.
85 thread.set_changed();
86 }
87 }
88
89 /// Add another thread, that waits for mutations of `self`.
90 pub fn wait(&self, thread: &Arc<ControlBlock>) {
91 let mut guard = self.waiting_threads.lock();
92
93 guard.push(Arc::downgrade(thread));
94 }
95
96 /// Mark another `StmControlBlock` as dead.
97 ///
98 /// If the count of dead control blocks is too high,
99 /// perform a cleanup.
100 /// This prevents masses of old `StmControlBlock` to
101 /// pile up when a variable is often read but rarely written.
102 pub fn set_dead(&self) {
103 // Increase by one.
104 let deads = self.dead_threads.fetch_add(1, atomic::Ordering::Relaxed);
105
106 // If there are too many then cleanup.
107
108 // There is a potential data race that may occure when
109 // one thread reads the number and then operates on
110 // outdated data, but no serious mistakes may happen.
111 if deads >= 64 {
112 let mut guard = self.waiting_threads.lock();
113 self.dead_threads.store(0, atomic::Ordering::SeqCst);
114
115 // Remove all dead ones. Possibly free up the memory.
116 guard.retain(|t| t.upgrade().is_some());
117 }
118 }
119
120 fn get_address(&self) -> usize {
121 self as *const VarControlBlock as usize
122 }
123}
124
125
126// Implement some operators so that VarControlBlocks can be sorted.
127
128impl PartialEq for VarControlBlock {
129 fn eq(&self, other: &Self) -> bool {
130 self.get_address() == other.get_address()
131 }
132}
133
134impl Eq for VarControlBlock {}
135
136impl Ord for VarControlBlock {
137 fn cmp(&self, other: &Self) -> cmp::Ordering {
138 self.get_address().cmp(&other.get_address())
139 }
140}
141
142impl PartialOrd for VarControlBlock {
143 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
144 Some(self.cmp(other))
145 }
146}
147
148
149
150/// A variable that can be used in a STM-Block
151#[derive(Clone)]
152pub struct TVar<T> {
153 /// The control block is the inner of the variable.
154 ///
155 /// The rest of `TVar` is just the typesafe interface.
156 control_block: Arc<VarControlBlock>,
157
158 /// This marker is needed so that the variable can be used in a typesafe
159 /// manner.
160 _marker: PhantomData<T>,
161}
162
163impl<T> TVar<T>
164 where T: Any + Sync + Send + Clone
165{
166 /// Create a new `TVar`.
167 pub fn new(val: T) -> TVar<T> {
168 TVar {
169 control_block: VarControlBlock::new(val),
170 _marker: PhantomData,
171 }
172 }
173
174 /// `read_atomic` reads a value atomically, without starting a transaction.
175 ///
176 /// It is semantically equivalent to
177 ///
178 /// ```
179 /// # use stm_core::*;
180 ///
181 /// let var = TVar::new(0);
182 /// atomically(|trans| var.read(trans));
183 /// ```
184 ///
185 /// but more efficient.
186 ///
187 /// `read_atomic` returns a clone of the value.
188 pub fn read_atomic(&self) -> T {
189 let val = self.read_ref_atomic();
190
191 (&*val as &Any)
192 .downcast_ref::<T>()
193 .expect("wrong type in Var<T>")
194 .clone()
195 }
196
197 /// Read a value atomically but return a reference.
198 ///
199 /// This is mostly used internally, but can be useful in
200 /// some cases, because `read_atomic` clones the
201 /// inner value, which may be expensive.
202 pub fn read_ref_atomic(&self) -> Arc<Any + Send + Sync> {
203 self.control_block
204 .value
205 .read()
206 .clone()
207 }
208
209 /// The normal way to access a var.
210 ///
211 /// It is equivalent to `transaction.read(&var)`, but more
212 /// convenient.
213 pub fn read(&self, transaction: &mut Transaction) -> StmResult<T> {
214 transaction.read(self)
215 }
216
217 /// The normal way to write a var.
218 ///
219 /// It is equivalent to `transaction.write(&var, value)`, but more
220 /// convenient.
221 pub fn write(&self, transaction: &mut Transaction, value: T) -> StmResult<()> {
222 transaction.write(self, value)
223 }
224
225 /// Modify the content of a `TVar` with the function f.
226 ///
227 /// ```
228 /// # use stm_core::*;
229 ///
230 ///
231 /// let var = TVar::new(21);
232 /// atomically(|trans|
233 /// var.modify(trans, |x| x*2)
234 /// );
235 ///
236 /// assert_eq!(var.read_atomic(), 42);
237 /// ```
238 pub fn modify<F>(&self, transaction: &mut Transaction, f: F) -> StmResult<()>
239 where F: FnOnce(T) -> T
240 {
241 let old = self.read(transaction)?;
242 self.write(transaction, f(old))
243 }
244
245 /// Replaces the value of a `TVar` with a new one, returning
246 /// the old one.
247 ///
248 /// ```
249 /// # use stm_core::*;
250 ///
251 /// let var = TVar::new(0);
252 /// let x = atomically(|trans|
253 /// var.replace(trans, 42)
254 /// );
255 ///
256 /// assert_eq!(x, 0);
257 /// assert_eq!(var.read_atomic(), 42);
258 /// ```
259 pub fn replace(&self, transaction: &mut Transaction, value: T) -> StmResult<T> {
260 let old = self.read(transaction)?;
261 self.write(transaction, value)?;
262 Ok(old)
263 }
264
265 /// Check if two `TVar`s refer to the same position.
266 pub fn ref_eq(this: &TVar<T>, other: &TVar<T>) -> bool {
267 Arc::ptr_eq(&this.control_block, &other.control_block)
268 }
269
270 /// Access the control block of the var.
271 ///
272 /// Internal use only!
273 pub fn control_block(&self) -> &Arc<VarControlBlock> {
274 &self.control_block
275 }
276}
277
278/// Debug output a struct.
279///
280/// Note that this function does not print the state atomically.
281/// If another thread modifies the datastructure at the same time, it may print an inconsistent state.
282/// If you need an accurate view, that reflects current thread-local state, you can implement it easily yourself with
283/// atomically.
284///
285/// Running `atomically` inside a running transaction panics. Therefore `fmt` uses
286/// prints the state.
287impl<T> Debug for TVar<T>
288 where T: Any + Sync + Send + Clone,
289 T: Debug,
290{
291 #[inline(never)]
292 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
293 let x = self.read_atomic();
294 f.debug_struct("TVar")
295 .field("value", &x)
296 .finish()
297 }
298}
299
300
301
302#[test]
303// Test if creating and reading a TVar works.
304fn test_read_atomic() {
305 let var = TVar::new(42);
306
307 assert_eq!(42, var.read_atomic());
308}
309
310
311// More tests are in lib.rs.