1use std::fmt::Debug;
2use std::error::Error;
3
4use rotor::{Machine, EventSet, PollOpt, Scope, Response, Time};
5use rotor::void::{unreachable, Void};
6use rotor::{GenericScope};
7
8use {ActiveStream, Protocol, Stream, ProtocolStop, Transport};
9use extensions::{ResponseExt, ScopeExt};
10
11
12pub const RECONNECT_TIMEOUT: u64 = 200;
16
17pub const CONNECT_TIMEOUT: u64 = 1_000;
21
22
23pub struct Persistent<P>(<P::Socket as ActiveStream>::Address,
27                         P::Seed, Fsm<P>)
28    where P: Protocol, P::Socket: ActiveStream;
29
30#[derive(Debug)]
31pub enum Fsm<P: Protocol> {
32    Idle,
33    Connecting(P::Socket, Time),
34    Established(Stream<P>),
35    Sleeping(Time),
36}
37
38fn response<P>(addr: <P::Socket as ActiveStream>::Address,
39    seed: P::Seed, fsm: Fsm<P>)
40    -> Response<Persistent<P>, Void>
41    where P: Protocol, P::Socket: ActiveStream
42{
43    use self::Fsm::*;
44    let timeo = match *&fsm {
45        Idle => None,
46        Connecting(_, tm) => Some(tm),
47        Established(..) => unreachable!(),
50        Sleeping(tm) => Some(tm),
51    };
52    Response::ok(Persistent(addr, seed, fsm))
53        .deadline_opt(timeo)
54}
55
56impl<P> Persistent<P>
57    where P: Protocol,
58          P::Socket: ActiveStream,
59          <P::Socket as ActiveStream>::Address: Debug
60{
61    pub fn new<S: GenericScope>(_scope: &mut S,
62            address: <P::Socket as ActiveStream>::Address, seed: P::Seed)
63        -> Response<Persistent<P>, Void>
64    {
65        Response::ok(Persistent(address, seed, Fsm::Idle))
66    }
67
68    pub fn connect<S: GenericScope>(scope: &mut S,
69            address: <P::Socket as ActiveStream>::Address, seed: P::Seed)
70        -> Response<Persistent<P>, Void>
71    {
72        let fsm = match P::Socket::connect(&address) {
73            Ok(sock) => {
74                scope.register(&sock, EventSet::writable(), PollOpt::level())
75                    .expect("Can't register socket");
76                Fsm::Connecting(sock, scope.after(CONNECT_TIMEOUT))
77            }
78            Err(e) => {
79                info!("Failed to connect to {:?}: {}", address, e);
80                Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT))
81            }
82        };
83        response(address, seed, fsm)
84    }
85}
86
87impl<P> Persistent<P>
88    where P: Protocol, P::Socket: ActiveStream
89{
90    pub fn transport(&mut self) -> Option<Transport<P::Socket>> {
98        match self.2 {
99            Fsm::Established(ref mut s) => Some(s.transport()),
100            _ => None,
101        }
102    }
103}
104
105impl<P> Fsm<P>
106    where P: Protocol,
107          P::Seed: Clone,
108          P::Socket: ActiveStream,
109          <P::Socket as ActiveStream>::Address: Debug
110{
111    fn action<S: GenericScope>(resp: Response<Stream<P>, Void>,
112        addr: <P::Socket as ActiveStream>::Address,
113        seed: P::Seed, scope: &mut S)
114        -> Response<Persistent<P>, Void>
115    {
116        if resp.is_stopped() {
117            if let Some(err) = resp.cause() {
118                warn!("Connection is failed: {}", err);
119            } else {
120                warn!("Connection is stopped by protocol");
121            }
122            response(addr, seed,
123                Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT)))
124        } else {
125            resp
126                .wrap(Fsm::Established)
127                .wrap(|x| Persistent(addr, seed, x))
128        }
129    }
130}
131
132impl<P: Protocol> Machine for Persistent<P>
133    where P: Protocol,
134          P::Seed: Clone,
135          P::Socket: ActiveStream,
136          <P::Socket as ActiveStream>::Address: Debug
137{
138    type Context = P::Context;
139    type Seed = Void;
140    fn create(seed: Self::Seed, _scope: &mut Scope<P::Context>)
141        -> Response<Self, Void>
142    {
143        unreachable(seed)
144    }
145    fn ready(self, events: EventSet, scope: &mut Scope<P::Context>)
146        -> Response<Self, Self::Seed>
147    {
148        use self::Fsm::*;
149        let Persistent(addr, seed, state) = self;
150        let state = match state {
151            Idle => Idle,  Connecting(sock, dline) => {
153                if events.is_writable() {
154                    let resp =  Stream::connected(sock, seed.clone(), scope);
155                    if resp.is_stopped() {
156                        error!("Error creating stream FSM: {}",
157                            resp.cause().unwrap_or(&ProtocolStop));
158                        Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT))
159                    } else {
160                        return Fsm::action(resp, addr, seed, scope);
161                    }
162                } else if events.is_hup() {
163                    error!("Connection closed immediately");
164                    Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT))
165                } else {
166                    Connecting(sock, dline) }
168            }
169            Established(x) => {
170                return Fsm::action(x.ready(events, scope), addr, seed, scope);
171            }
172            Sleeping(dline) => Sleeping(dline), };
174        response(addr, seed, state)
175    }
176    fn spawned(self, _scope: &mut Scope<P::Context>)
177        -> Response<Self, Self::Seed>
178    {
179        unreachable!();
180    }
181    fn timeout(self, scope: &mut Scope<P::Context>)
182        -> Response<Self, Self::Seed>
183    {
184        use self::Fsm::*;
185        let Persistent(addr, seed, state) = self;
186        let state = match state {
187            Idle => Idle,  Connecting(sock, dline) => {
189                if scope.now() >= dline {
190                    warn!("Timeout while establishing connection");
191                    Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT))
192                } else {  Connecting(sock, dline)
194                }
195            }
196            Established(x) => {
197                return Fsm::action(x.timeout(scope), addr, seed, scope);
198            }
199            Sleeping(dline) => {
200                if scope.now() >= dline {
201                    return Self::connect(scope, addr, seed);
202                } else {
203                    Sleeping(dline)  }
205            }
206        };
207        response(addr, seed, state)
208    }
209    fn wakeup(self, scope: &mut Scope<P::Context>)
210        -> Response<Self, Self::Seed>
211    {
212        use self::Fsm::*;
213        let Persistent(addr, seed, state) = self;
214        let state = match state {
215            Established(x) => {
216                return Fsm::action(x.wakeup(scope), addr, seed, scope);
217            }
218            x => x, };
220        response(addr, seed, state)
221    }
222}
223
224#[cfg(feature="replaceable")]
225mod replaceable {
226
227    use std::fmt::Debug;
228
229    use {ActiveStream, Protocol, Persistent};
230    use rotor_tools::sync::Replaceable;
231
232    use super::Fsm;
233
234    impl<P: Protocol> Replaceable for Persistent<P>
235        where P: Protocol,
236              P::Seed: Clone,
237              <P::Socket as ActiveStream>::Address: Clone + Debug,
238              P::Socket: ActiveStream
239    {
240        fn empty(&self) -> Self {
241            Persistent(self.0.clone(), self.1.clone(), Fsm::Idle)
243        }
244    }
245}