1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//! Provides state machine that is guarded by Mutex
//!
//! This allows the machine to be manipulated from multiple threads. But you
//! must be careful to not to break the state machine
use std::mem;
use std::sync::{Arc, Mutex};

use rotor::{Machine, EventSet, Scope, Response};
use rotor::{Void};

pub struct Mutexed<M>(pub Arc<Mutex<M>>);

/// A trait which allows to replace the state machine with dummy/null/None
///
/// Since we put machine (used with `Mutexed`) into a `Arc<Mutex<M>>` we need
/// to replace it with some other value while the real value is moved off the
/// state machine to be able to replace it by code, having self by-value.
///
/// When the thread working with the state machine panics while machine is
/// moved off the place, the poisoned lock is left with `Replaceable::empty`
/// value. When lock is poisoned we unwrap the value and use it as a valid
/// state machine. So you can continue to work after panic with the clean
/// state after a crash (perhaps if the panic was in different thread).
///
/// The text above means you should be able to "restart" the state machine
/// from the `empty()` state.
pub trait Replaceable: Machine {
    /// Return the empty value that may be used as replacement
    ///
    /// **The method must be cheap to compute**. Because it's executed on
    /// every action of a state machine (for `mem::replace`)
    ///
    /// Note in case the lock is poisoned (panic was received while keeping
    /// state machine locked, perhaps in another thread), the `empty` value
    /// is used on the poisoned value, before `restart()` is called. This
    /// means you only want to use read-only parts of `self` here (better
    /// none at all, but that's not always possible).
    fn empty(&self) -> Self;
    /// Restart a state machine from `empty()` state
    ///
    /// This method is called before calling any other action methods when
    /// lock holding the state machine was poisoned.
    ///
    /// Note that after the `restart` current event is discarded, it's assumed
    /// that state machine is already arranged to receive some new events
    /// (i.e. it's useless to keep old `ready()` event if new connection is
    /// just being established)
    ///
    /// While you can check the state of the old machine (a `self`), and even
    /// return it as is, it's strongly discouraged, as you can't know exact
    /// kind of failure that happened in other thread (when lock was
    /// poisoned). But in case protocol is super-simple (like line-based
    /// without exceptions) and it's not security critical (i.e. monitoring
    /// using graphite), you may reuse old state machine or parts there of.
    ///
    /// Default implementation is just to panic
    fn restart(self, _scope: &mut Scope<Self::Context>)
        -> Response<Self, Self::Seed>
    {
        panic!("State machine has been poisoned");
    }
}

#[inline]
fn locked_call<M, F>(scope: &mut Scope<M::Context>, me: Mutexed<M>,
    fun: F)
    -> Response<Mutexed<M>, M::Seed>
    where M: Replaceable,
          F: FnOnce(M, &mut Scope<M::Context>) -> Response<M, M::Seed>
{
    let fake_result = match me.0.lock() {
        Ok(mut guard) => {
            let empty = guard.empty();
            let fsm = mem::replace(&mut *guard, empty);
            let res = fun(fsm, scope);
            res.wrap(|new_machine| {
                // thows off an `empty()` instance
                mem::replace(&mut *guard, new_machine);
                ()
            })
        }
        Err(poisoned) => {
            let mut guard = poisoned.into_inner();
            let empty = guard.empty();
            let fsm = mem::replace(&mut *guard, empty);
            let res = fsm.restart(scope);
            res.wrap(|new_machine| {
                // thows off an `empty()` instance
                mem::replace(&mut *guard, new_machine);
                ()
            })
        }
    };
    fake_result.wrap(|()| me)
}

impl<M: Replaceable> Machine for Mutexed<M> {
    type Context = M::Context;
    type Seed = M::Seed;
    fn create(seed: Self::Seed, scope: &mut Scope<M::Context>)
        -> Response<Self, Void>
    {
        M::create(seed, scope).wrap(Mutex::new).wrap(Arc::new).wrap(Mutexed)
    }
    fn ready(self, events: EventSet, scope: &mut Scope<M::Context>)
        -> Response<Self, Self::Seed>
    {
        locked_call(scope, self, |fsm, scope| fsm.ready(events, scope))
    }
    fn spawned(self, scope: &mut Scope<M::Context>)
        -> Response<Self, Self::Seed>
    {
        locked_call(scope, self, |fsm, scope| fsm.spawned(scope))
    }
    fn timeout(self, scope: &mut Scope<M::Context>)
        -> Response<Self, Self::Seed>
    {
        locked_call(scope, self, |fsm, scope| fsm.timeout(scope))
    }
    fn wakeup(self, scope: &mut Scope<M::Context>)
        -> Response<Self, Self::Seed>
    {
        locked_call(scope, self, |fsm, scope| fsm.wakeup(scope))
    }
}