rotor_tools/sync.rs
1//! Provides state machine that is guarded by Mutex
2//!
3//! This allows the machine to be manipulated from multiple threads. But you
4//! must be careful to not to break the state machine
5use std::mem;
6use std::sync::{Arc, Mutex};
7
8use rotor::{Machine, EventSet, Scope, Response};
9use rotor::{Void};
10
11pub struct Mutexed<M>(pub Arc<Mutex<M>>);
12
13/// A trait which allows to replace the state machine with dummy/null/None
14///
15/// Since we put machine (used with `Mutexed`) into a `Arc<Mutex<M>>` we need
16/// to replace it with some other value while the real value is moved off the
17/// state machine to be able to replace it by code, having self by-value.
18///
19/// When the thread working with the state machine panics while machine is
20/// moved off the place, the poisoned lock is left with `Replaceable::empty`
21/// value. When lock is poisoned we unwrap the value and use it as a valid
22/// state machine. So you can continue to work after panic with the clean
23/// state after a crash (perhaps if the panic was in different thread).
24///
25/// The text above means you should be able to "restart" the state machine
26/// from the `empty()` state.
27pub trait Replaceable: Machine {
28 /// Return the empty value that may be used as replacement
29 ///
30 /// **The method must be cheap to compute**. Because it's executed on
31 /// every action of a state machine (for `mem::replace`)
32 ///
33 /// Note in case the lock is poisoned (panic was received while keeping
34 /// state machine locked, perhaps in another thread), the `empty` value
35 /// is used on the poisoned value, before `restart()` is called. This
36 /// means you only want to use read-only parts of `self` here (better
37 /// none at all, but that's not always possible).
38 fn empty(&self) -> Self;
39 /// Restart a state machine from `empty()` state
40 ///
41 /// This method is called before calling any other action methods when
42 /// lock holding the state machine was poisoned.
43 ///
44 /// Note that after the `restart` current event is discarded, it's assumed
45 /// that state machine is already arranged to receive some new events
46 /// (i.e. it's useless to keep old `ready()` event if new connection is
47 /// just being established)
48 ///
49 /// While you can check the state of the old machine (a `self`), and even
50 /// return it as is, it's strongly discouraged, as you can't know exact
51 /// kind of failure that happened in other thread (when lock was
52 /// poisoned). But in case protocol is super-simple (like line-based
53 /// without exceptions) and it's not security critical (i.e. monitoring
54 /// using graphite), you may reuse old state machine or parts there of.
55 ///
56 /// Default implementation is just to panic
57 fn restart(self, _scope: &mut Scope<Self::Context>)
58 -> Response<Self, Self::Seed>
59 {
60 panic!("State machine has been poisoned");
61 }
62}
63
64#[inline]
65fn locked_call<M, F>(scope: &mut Scope<M::Context>, me: Mutexed<M>,
66 fun: F)
67 -> Response<Mutexed<M>, M::Seed>
68 where M: Replaceable,
69 F: FnOnce(M, &mut Scope<M::Context>) -> Response<M, M::Seed>
70{
71 let fake_result = match me.0.lock() {
72 Ok(mut guard) => {
73 let empty = guard.empty();
74 let fsm = mem::replace(&mut *guard, empty);
75 let res = fun(fsm, scope);
76 res.wrap(|new_machine| {
77 // thows off an `empty()` instance
78 mem::replace(&mut *guard, new_machine);
79 ()
80 })
81 }
82 Err(poisoned) => {
83 let mut guard = poisoned.into_inner();
84 let empty = guard.empty();
85 let fsm = mem::replace(&mut *guard, empty);
86 let res = fsm.restart(scope);
87 res.wrap(|new_machine| {
88 // thows off an `empty()` instance
89 mem::replace(&mut *guard, new_machine);
90 ()
91 })
92 }
93 };
94 fake_result.wrap(|()| me)
95}
96
97impl<M: Replaceable> Machine for Mutexed<M> {
98 type Context = M::Context;
99 type Seed = M::Seed;
100 fn create(seed: Self::Seed, scope: &mut Scope<M::Context>)
101 -> Response<Self, Void>
102 {
103 M::create(seed, scope).wrap(Mutex::new).wrap(Arc::new).wrap(Mutexed)
104 }
105 fn ready(self, events: EventSet, scope: &mut Scope<M::Context>)
106 -> Response<Self, Self::Seed>
107 {
108 locked_call(scope, self, |fsm, scope| fsm.ready(events, scope))
109 }
110 fn spawned(self, scope: &mut Scope<M::Context>)
111 -> Response<Self, Self::Seed>
112 {
113 locked_call(scope, self, |fsm, scope| fsm.spawned(scope))
114 }
115 fn timeout(self, scope: &mut Scope<M::Context>)
116 -> Response<Self, Self::Seed>
117 {
118 locked_call(scope, self, |fsm, scope| fsm.timeout(scope))
119 }
120 fn wakeup(self, scope: &mut Scope<M::Context>)
121 -> Response<Self, Self::Seed>
122 {
123 locked_call(scope, self, |fsm, scope| fsm.wakeup(scope))
124 }
125}