rotor_tools/
sync.rs

1//! Provides state machine that is guarded by Mutex
2//!
3//! This allows the machine to be manipulated from multiple threads. But you
4//! must be careful to not to break the state machine
5use std::mem;
6use std::sync::{Arc, Mutex};
7
8use rotor::{Machine, EventSet, Scope, Response};
9use rotor::{Void};
10
11pub struct Mutexed<M>(pub Arc<Mutex<M>>);
12
13/// A trait which allows to replace the state machine with dummy/null/None
14///
15/// Since we put machine (used with `Mutexed`) into a `Arc<Mutex<M>>` we need
16/// to replace it with some other value while the real value is moved off the
17/// state machine to be able to replace it by code, having self by-value.
18///
19/// When the thread working with the state machine panics while machine is
20/// moved off the place, the poisoned lock is left with `Replaceable::empty`
21/// value. When lock is poisoned we unwrap the value and use it as a valid
22/// state machine. So you can continue to work after panic with the clean
23/// state after a crash (perhaps if the panic was in different thread).
24///
25/// The text above means you should be able to "restart" the state machine
26/// from the `empty()` state.
27pub trait Replaceable: Machine {
28    /// Return the empty value that may be used as replacement
29    ///
30    /// **The method must be cheap to compute**. Because it's executed on
31    /// every action of a state machine (for `mem::replace`)
32    ///
33    /// Note in case the lock is poisoned (panic was received while keeping
34    /// state machine locked, perhaps in another thread), the `empty` value
35    /// is used on the poisoned value, before `restart()` is called. This
36    /// means you only want to use read-only parts of `self` here (better
37    /// none at all, but that's not always possible).
38    fn empty(&self) -> Self;
39    /// Restart a state machine from `empty()` state
40    ///
41    /// This method is called before calling any other action methods when
42    /// lock holding the state machine was poisoned.
43    ///
44    /// Note that after the `restart` current event is discarded, it's assumed
45    /// that state machine is already arranged to receive some new events
46    /// (i.e. it's useless to keep old `ready()` event if new connection is
47    /// just being established)
48    ///
49    /// While you can check the state of the old machine (a `self`), and even
50    /// return it as is, it's strongly discouraged, as you can't know exact
51    /// kind of failure that happened in other thread (when lock was
52    /// poisoned). But in case protocol is super-simple (like line-based
53    /// without exceptions) and it's not security critical (i.e. monitoring
54    /// using graphite), you may reuse old state machine or parts there of.
55    ///
56    /// Default implementation is just to panic
57    fn restart(self, _scope: &mut Scope<Self::Context>)
58        -> Response<Self, Self::Seed>
59    {
60        panic!("State machine has been poisoned");
61    }
62}
63
64#[inline]
65fn locked_call<M, F>(scope: &mut Scope<M::Context>, me: Mutexed<M>,
66    fun: F)
67    -> Response<Mutexed<M>, M::Seed>
68    where M: Replaceable,
69          F: FnOnce(M, &mut Scope<M::Context>) -> Response<M, M::Seed>
70{
71    let fake_result = match me.0.lock() {
72        Ok(mut guard) => {
73            let empty = guard.empty();
74            let fsm = mem::replace(&mut *guard, empty);
75            let res = fun(fsm, scope);
76            res.wrap(|new_machine| {
77                // thows off an `empty()` instance
78                mem::replace(&mut *guard, new_machine);
79                ()
80            })
81        }
82        Err(poisoned) => {
83            let mut guard = poisoned.into_inner();
84            let empty = guard.empty();
85            let fsm = mem::replace(&mut *guard, empty);
86            let res = fsm.restart(scope);
87            res.wrap(|new_machine| {
88                // thows off an `empty()` instance
89                mem::replace(&mut *guard, new_machine);
90                ()
91            })
92        }
93    };
94    fake_result.wrap(|()| me)
95}
96
97impl<M: Replaceable> Machine for Mutexed<M> {
98    type Context = M::Context;
99    type Seed = M::Seed;
100    fn create(seed: Self::Seed, scope: &mut Scope<M::Context>)
101        -> Response<Self, Void>
102    {
103        M::create(seed, scope).wrap(Mutex::new).wrap(Arc::new).wrap(Mutexed)
104    }
105    fn ready(self, events: EventSet, scope: &mut Scope<M::Context>)
106        -> Response<Self, Self::Seed>
107    {
108        locked_call(scope, self, |fsm, scope| fsm.ready(events, scope))
109    }
110    fn spawned(self, scope: &mut Scope<M::Context>)
111        -> Response<Self, Self::Seed>
112    {
113        locked_call(scope, self, |fsm, scope| fsm.spawned(scope))
114    }
115    fn timeout(self, scope: &mut Scope<M::Context>)
116        -> Response<Self, Self::Seed>
117    {
118        locked_call(scope, self, |fsm, scope| fsm.timeout(scope))
119    }
120    fn wakeup(self, scope: &mut Scope<M::Context>)
121        -> Response<Self, Self::Seed>
122    {
123        locked_call(scope, self, |fsm, scope| fsm.wakeup(scope))
124    }
125}