stm_core/
tvar.rs

1// Copyright 2015-2016 rust-stm Developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use std::sync::{Arc, Weak};
10use parking_lot::{Mutex, RwLock};
11use std::mem;
12use std::sync::atomic::{self, AtomicUsize};
13use std::cmp;
14use std::any::Any;
15use std::marker::PhantomData;
16use std::fmt::{Debug, self};
17
18use super::result::*;
19use super::transaction::control_block::ControlBlock;
20use super::Transaction;
21
22/// `VarControlBlock` contains all the useful data for a `Var` while beeing the same type.
23///
24/// The control block is accessed from other threads directly whereas `Var`
25/// is just a typesafe wrapper around it.
26pub struct VarControlBlock {
27    /// `waiting_threads` is a list of all waiting threads protected by a mutex.
28    waiting_threads: Mutex<Vec<Weak<ControlBlock>>>,
29
30    /// `dead_threads` is a counter for all dead threads.
31    ///
32    /// When there are many dead threads waiting for a change, but
33    /// nobody changes the value, then an automatic collection is
34    /// performed.
35    dead_threads: AtomicUsize,
36
37    /// The inner value of the Var.
38    ///
39    /// It can be shared through a Arc without copying it too often.
40    ///
41    /// The Arc is also used by the threads to detect changes.
42    /// The value in it should not be changed or locked because
43    /// that may cause multiple threads to block unforeseen as well as
44    /// causing deadlocks.
45    ///
46    /// The shared reference is protected by a `RWLock` so that multiple
47    /// threads can safely block it. This ensures consistency, without
48    /// preventing other threads from accessing the values.
49    ///
50    /// Starvation may occur, if one thread wants to write-lock but others
51    /// keep holding read-locks.
52    pub value: RwLock<Arc<Any + Send + Sync>>,
53}
54
55
56impl VarControlBlock {
57    /// create a new empty `VarControlBlock`
58    pub fn new<T>(val: T) -> Arc<VarControlBlock>
59        where T: Any + Sync + Send
60    {
61        let ctrl = VarControlBlock {
62            waiting_threads: Mutex::new(Vec::new()),
63            dead_threads: AtomicUsize::new(0),
64            value: RwLock::new(Arc::new(val)),
65        };
66        Arc::new(ctrl)
67    }
68
69    /// Wake all threads that are waiting for this block.
70    pub fn wake_all(&self) {
71        // Atomically take all waiting threads from the value.
72        let threads = {
73            let mut guard = self.waiting_threads.lock();
74            let inner: &mut Vec<_> = &mut guard;
75            mem::replace(inner, Vec::new())
76        };
77
78        // Take all, that are still alive.
79        let threads = threads.iter()
80            .filter_map(Weak::upgrade);
81
82        // Release all the semaphores to start the thread.
83        for thread in threads {
84            // Inform thread that this var has changed.
85            thread.set_changed();
86        }
87    }
88
89    /// Add another thread, that waits for mutations of `self`.
90    pub fn wait(&self, thread: &Arc<ControlBlock>) {
91        let mut guard = self.waiting_threads.lock();
92
93        guard.push(Arc::downgrade(thread));
94    }
95
96    /// Mark another `StmControlBlock` as dead.
97    ///
98    /// If the count of dead control blocks is too high,
99    /// perform a cleanup.
100    /// This prevents masses of old `StmControlBlock` to
101    /// pile up when a variable is often read but rarely written.
102    pub fn set_dead(&self) {
103        // Increase by one.
104        let deads = self.dead_threads.fetch_add(1, atomic::Ordering::Relaxed);
105
106        // If there are too many then cleanup.
107
108        // There is a potential data race that may occure when
109        // one thread reads the number and then operates on
110        // outdated data, but no serious mistakes may happen.
111        if deads >= 64 {
112            let mut guard = self.waiting_threads.lock();
113            self.dead_threads.store(0, atomic::Ordering::SeqCst);
114
115            // Remove all dead ones. Possibly free up the memory.
116            guard.retain(|t| t.upgrade().is_some());
117        }
118    }
119
120    fn get_address(&self) -> usize {
121        self as *const VarControlBlock as usize
122    }
123}
124
125
126// Implement some operators so that VarControlBlocks can be sorted.
127
128impl PartialEq for VarControlBlock {
129    fn eq(&self, other: &Self) -> bool {
130        self.get_address() == other.get_address()
131    }
132}
133
134impl Eq for VarControlBlock {}
135
136impl Ord for VarControlBlock {
137    fn cmp(&self, other: &Self) -> cmp::Ordering {
138        self.get_address().cmp(&other.get_address())
139    }
140}
141
142impl PartialOrd for VarControlBlock {
143    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
144        Some(self.cmp(other))
145    }
146}
147
148
149
150/// A variable that can be used in a STM-Block
151#[derive(Clone)]
152pub struct TVar<T> {
153    /// The control block is the inner of the variable.
154    /// 
155    /// The rest of `TVar` is just the typesafe interface.
156    control_block: Arc<VarControlBlock>,
157
158    /// This marker is needed so that the variable can be used in a typesafe
159    /// manner.
160    _marker: PhantomData<T>,
161}
162
163impl<T> TVar<T>
164    where T: Any + Sync + Send + Clone
165{
166    /// Create a new `TVar`.
167    pub fn new(val: T) -> TVar<T> {
168        TVar {
169            control_block: VarControlBlock::new(val),
170            _marker: PhantomData,
171        }
172    }
173
174    /// `read_atomic` reads a value atomically, without starting a transaction.
175    ///
176    /// It is semantically equivalent to 
177    ///
178    /// ```
179    /// # use stm_core::*;
180    ///
181    /// let var = TVar::new(0);
182    /// atomically(|trans| var.read(trans));
183    /// ```
184    ///
185    /// but more efficient.
186    ///
187    /// `read_atomic` returns a clone of the value.
188    pub fn read_atomic(&self) -> T {
189        let val = self.read_ref_atomic();
190
191        (&*val as &Any)
192            .downcast_ref::<T>()
193            .expect("wrong type in Var<T>")
194            .clone()
195    }
196
197    /// Read a value atomically but return a reference.
198    ///
199    /// This is mostly used internally, but can be useful in
200    /// some cases, because `read_atomic` clones the
201    /// inner value, which may be expensive.
202    pub fn read_ref_atomic(&self) -> Arc<Any + Send + Sync> {
203        self.control_block
204            .value
205            .read()
206            .clone()
207    }
208
209    /// The normal way to access a var.
210    ///
211    /// It is equivalent to `transaction.read(&var)`, but more
212    /// convenient.
213    pub fn read(&self, transaction: &mut Transaction) -> StmResult<T> {
214        transaction.read(self)
215    }
216
217    /// The normal way to write a var.
218    ///
219    /// It is equivalent to `transaction.write(&var, value)`, but more
220    /// convenient.
221    pub fn write(&self, transaction: &mut Transaction, value: T) -> StmResult<()> {
222        transaction.write(self, value)
223    }
224
225    /// Modify the content of a `TVar` with the function f.
226    ///
227    /// ```
228    /// # use stm_core::*;
229    ///
230    ///
231    /// let var = TVar::new(21);
232    /// atomically(|trans| 
233    ///     var.modify(trans, |x| x*2)
234    /// );
235    ///
236    /// assert_eq!(var.read_atomic(), 42);
237    /// ```
238    pub fn modify<F>(&self, transaction: &mut Transaction, f: F) -> StmResult<()> 
239    where F: FnOnce(T) -> T
240    {
241        let old = self.read(transaction)?;
242        self.write(transaction, f(old))
243    }
244    
245    /// Replaces the value of a `TVar` with a new one, returning
246    /// the old one.
247    ///
248    /// ```
249    /// # use stm_core::*;
250    ///
251    /// let var = TVar::new(0);
252    /// let x = atomically(|trans| 
253    ///     var.replace(trans, 42)
254    /// );
255    ///
256    /// assert_eq!(x, 0);
257    /// assert_eq!(var.read_atomic(), 42);
258    /// ```
259    pub fn replace(&self, transaction: &mut Transaction, value: T) -> StmResult<T> {
260        let old = self.read(transaction)?;
261        self.write(transaction, value)?;
262        Ok(old)
263    }
264
265    /// Check if two `TVar`s refer to the same position.
266    pub fn ref_eq(this: &TVar<T>, other: &TVar<T>) -> bool {
267        Arc::ptr_eq(&this.control_block, &other.control_block)
268    }
269    
270    /// Access the control block of the var.
271    ///
272    /// Internal use only!
273    pub fn control_block(&self) -> &Arc<VarControlBlock> {
274        &self.control_block
275    }
276}
277
278/// Debug output a struct.
279///
280/// Note that this function does not print the state atomically.
281/// If another thread modifies the datastructure at the same time, it may print an inconsistent state.
282/// If you need an accurate view, that reflects current thread-local state, you can implement it easily yourself with 
283/// atomically.
284///
285/// Running `atomically` inside a running transaction panics. Therefore `fmt` uses
286/// prints the state.
287impl<T> Debug for TVar<T>
288    where T: Any + Sync + Send + Clone,
289          T: Debug,
290{
291    #[inline(never)]
292    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
293        let x = self.read_atomic();
294        f.debug_struct("TVar")
295            .field("value", &x)
296            .finish()
297    }
298}
299
300
301
302#[test]
303// Test if creating and reading a TVar works.
304fn test_read_atomic() {
305    let var = TVar::new(42);
306
307    assert_eq!(42, var.read_atomic());
308}
309
310
311// More tests are in lib.rs.