1use std::fmt::Debug;
2use std::error::Error;
3
4use rotor::{Machine, EventSet, PollOpt, Scope, Response, Time};
5use rotor::void::{unreachable, Void};
6use rotor::{GenericScope};
7
8use {ActiveStream, Protocol, Stream, ProtocolStop, Transport};
9use extensions::{ResponseExt, ScopeExt};
10
11
12pub const RECONNECT_TIMEOUT: u64 = 200;
16
17pub const CONNECT_TIMEOUT: u64 = 1_000;
21
22
23pub struct Persistent<P>(<P::Socket as ActiveStream>::Address,
27 P::Seed, Fsm<P>)
28 where P: Protocol, P::Socket: ActiveStream;
29
30#[derive(Debug)]
31pub enum Fsm<P: Protocol> {
32 Idle,
33 Connecting(P::Socket, Time),
34 Established(Stream<P>),
35 Sleeping(Time),
36}
37
38fn response<P>(addr: <P::Socket as ActiveStream>::Address,
39 seed: P::Seed, fsm: Fsm<P>)
40 -> Response<Persistent<P>, Void>
41 where P: Protocol, P::Socket: ActiveStream
42{
43 use self::Fsm::*;
44 let timeo = match *&fsm {
45 Idle => None,
46 Connecting(_, tm) => Some(tm),
47 Established(..) => unreachable!(),
50 Sleeping(tm) => Some(tm),
51 };
52 Response::ok(Persistent(addr, seed, fsm))
53 .deadline_opt(timeo)
54}
55
56impl<P> Persistent<P>
57 where P: Protocol,
58 P::Socket: ActiveStream,
59 <P::Socket as ActiveStream>::Address: Debug
60{
61 pub fn new<S: GenericScope>(_scope: &mut S,
62 address: <P::Socket as ActiveStream>::Address, seed: P::Seed)
63 -> Response<Persistent<P>, Void>
64 {
65 Response::ok(Persistent(address, seed, Fsm::Idle))
66 }
67
68 pub fn connect<S: GenericScope>(scope: &mut S,
69 address: <P::Socket as ActiveStream>::Address, seed: P::Seed)
70 -> Response<Persistent<P>, Void>
71 {
72 let fsm = match P::Socket::connect(&address) {
73 Ok(sock) => {
74 scope.register(&sock, EventSet::writable(), PollOpt::level())
75 .expect("Can't register socket");
76 Fsm::Connecting(sock, scope.after(CONNECT_TIMEOUT))
77 }
78 Err(e) => {
79 info!("Failed to connect to {:?}: {}", address, e);
80 Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT))
81 }
82 };
83 response(address, seed, fsm)
84 }
85}
86
87impl<P> Persistent<P>
88 where P: Protocol, P::Socket: ActiveStream
89{
90 pub fn transport(&mut self) -> Option<Transport<P::Socket>> {
98 match self.2 {
99 Fsm::Established(ref mut s) => Some(s.transport()),
100 _ => None,
101 }
102 }
103}
104
105impl<P> Fsm<P>
106 where P: Protocol,
107 P::Seed: Clone,
108 P::Socket: ActiveStream,
109 <P::Socket as ActiveStream>::Address: Debug
110{
111 fn action<S: GenericScope>(resp: Response<Stream<P>, Void>,
112 addr: <P::Socket as ActiveStream>::Address,
113 seed: P::Seed, scope: &mut S)
114 -> Response<Persistent<P>, Void>
115 {
116 if resp.is_stopped() {
117 if let Some(err) = resp.cause() {
118 warn!("Connection is failed: {}", err);
119 } else {
120 warn!("Connection is stopped by protocol");
121 }
122 response(addr, seed,
123 Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT)))
124 } else {
125 resp
126 .wrap(Fsm::Established)
127 .wrap(|x| Persistent(addr, seed, x))
128 }
129 }
130}
131
132impl<P: Protocol> Machine for Persistent<P>
133 where P: Protocol,
134 P::Seed: Clone,
135 P::Socket: ActiveStream,
136 <P::Socket as ActiveStream>::Address: Debug
137{
138 type Context = P::Context;
139 type Seed = Void;
140 fn create(seed: Self::Seed, _scope: &mut Scope<P::Context>)
141 -> Response<Self, Void>
142 {
143 unreachable(seed)
144 }
145 fn ready(self, events: EventSet, scope: &mut Scope<P::Context>)
146 -> Response<Self, Self::Seed>
147 {
148 use self::Fsm::*;
149 let Persistent(addr, seed, state) = self;
150 let state = match state {
151 Idle => Idle, Connecting(sock, dline) => {
153 if events.is_writable() {
154 let resp = Stream::connected(sock, seed.clone(), scope);
155 if resp.is_stopped() {
156 error!("Error creating stream FSM: {}",
157 resp.cause().unwrap_or(&ProtocolStop));
158 Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT))
159 } else {
160 return Fsm::action(resp, addr, seed, scope);
161 }
162 } else if events.is_hup() {
163 error!("Connection closed immediately");
164 Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT))
165 } else {
166 Connecting(sock, dline) }
168 }
169 Established(x) => {
170 return Fsm::action(x.ready(events, scope), addr, seed, scope);
171 }
172 Sleeping(dline) => Sleeping(dline), };
174 response(addr, seed, state)
175 }
176 fn spawned(self, _scope: &mut Scope<P::Context>)
177 -> Response<Self, Self::Seed>
178 {
179 unreachable!();
180 }
181 fn timeout(self, scope: &mut Scope<P::Context>)
182 -> Response<Self, Self::Seed>
183 {
184 use self::Fsm::*;
185 let Persistent(addr, seed, state) = self;
186 let state = match state {
187 Idle => Idle, Connecting(sock, dline) => {
189 if scope.now() >= dline {
190 warn!("Timeout while establishing connection");
191 Fsm::Sleeping(scope.after(RECONNECT_TIMEOUT))
192 } else { Connecting(sock, dline)
194 }
195 }
196 Established(x) => {
197 return Fsm::action(x.timeout(scope), addr, seed, scope);
198 }
199 Sleeping(dline) => {
200 if scope.now() >= dline {
201 return Self::connect(scope, addr, seed);
202 } else {
203 Sleeping(dline) }
205 }
206 };
207 response(addr, seed, state)
208 }
209 fn wakeup(self, scope: &mut Scope<P::Context>)
210 -> Response<Self, Self::Seed>
211 {
212 use self::Fsm::*;
213 let Persistent(addr, seed, state) = self;
214 let state = match state {
215 Established(x) => {
216 return Fsm::action(x.wakeup(scope), addr, seed, scope);
217 }
218 x => x, };
220 response(addr, seed, state)
221 }
222}
223
224#[cfg(feature="replaceable")]
225mod replaceable {
226
227 use std::fmt::Debug;
228
229 use {ActiveStream, Protocol, Persistent};
230 use rotor_tools::sync::Replaceable;
231
232 use super::Fsm;
233
234 impl<P: Protocol> Replaceable for Persistent<P>
235 where P: Protocol,
236 P::Seed: Clone,
237 <P::Socket as ActiveStream>::Address: Clone + Debug,
238 P::Socket: ActiveStream
239 {
240 fn empty(&self) -> Self {
241 Persistent(self.0.clone(), self.1.clone(), Fsm::Idle)
243 }
244 }
245}