1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
use std::collections::HashMap;
use slab::Slab;

use std::thread;
use service::Service;
use session::{self, Alternative, Context, Output, Builder, Session};
use worker::{Realize, Shortcut};
use flow::Flow;

// TODO Change services on the fly
pub struct Suite<T: Session, B: Builder<T>> {
    builder: B,
    services: HashMap<String, Box<Service<T>>>,
}

impl<T: Session, B: Builder<T>> Suite<T, B> {

    pub fn new(builder: B) -> Self {
        Suite {
            builder: builder,
            services: HashMap::new(),
        }
    }

    pub fn register<S: Service<T>>(&mut self, name: &str, service: S) {
        self.services.insert(name.to_owned(), Box::new(service));
    }

}

pub fn process_session<T, B, R>(suite: &Suite<T, B>, rut: R)
    where B: Builder<T>, T: Session, R: Flow {

    let who = rut.who();

    debug!("Start session with {}", who);

    let mut session: Context<T, R> = Context::new(rut, suite.builder.build());
    // TODO Determine handler by action name (refactoring handler needed)

    let mut suspended_workers = Slab::with_capacity(10);
    loop { // Session loop
        debug!("Begin new request processing for {}", who);
        let result: Result<(), session::Error> = (|session: &mut Context<T, R>| {
            loop { // Request loop
                let mut worker = match try!(session.recv_request_or_resume()) {
                    Alternative::Usual((service_name, request)) => {
                        let service = match suite.services.get(&service_name) {
                            Some(value) => value,
                            None => return Err(session::Error::ServiceNotFound),
                        };

                        let mut worker = service.route(&request);

                        match try!(worker.prepare(session, request)) {
                            Shortcut::Done => {
                                try!(session.send(Output::Done));
                                continue;
                            },
                            Shortcut::Reject(reason) => {
                                try!(session.send(Output::Reject(reason)));
                                continue;
                            },
                            Shortcut::Tuned => (),
                        }
                        worker
                    },
                    Alternative::Unusual(task_id) => {
                        match suspended_workers.remove(task_id) {
                            Some(worker) => {
                                worker
                            },
                            None => {
                                return Err(session::Error::WorkerNotFound);
                            },
                        }
                    },
                };
                loop {
                    try!(session.send(Output::Ready));
                    match try!(session.recv_next_or_suspend()) {
                        Alternative::Usual(option_request) => {
                            match try!(worker.realize(session, option_request)) {
                                Realize::OneItem(item) => {
                                    try!(session.send(Output::Item(item)));
                                },
                                Realize::OneItemAndDone(item) => {
                                    try!(session.send(Output::Item(item)));
                                    try!(session.send(Output::Done));
                                    break;
                                },
                                Realize::ManyItems(iter) => {
                                    for item in iter {
                                        try!(session.send(Output::Item(item)));
                                    }
                                },
                                Realize::ManyItemsAndDone(iter) => {
                                    for item in iter {
                                        try!(session.send(Output::Item(item)));
                                    }
                                    try!(session.send(Output::Done));
                                    break;
                                },
                                Realize::Reject(reason) => {
                                    try!(session.send(Output::Reject(reason)));
                                    break;
                                },
                                Realize::Empty => {
                                    thread::yield_now();
                                },
                                Realize::Done => {
                                    try!(session.send(Output::Done));
                                    break;
                                },
                            }
                        },
                        Alternative::Unusual(()) => {
                            match suspended_workers.insert(worker) {
                                Ok(task_id) => {
                                    try!(session.send(Output::Suspended(task_id)));
                                    break;
                                },
                                Err(_) => {
                                    // TODO Conside to continue worker (don't fail)
                                    return Err(session::Error::CannotSuspend);
                                },
                            }
                        },
                    }
                }
            }
        })(&mut session);
        // Inform user if
        if let Err(reason) = result {
            let output = match reason {
                // TODO Refactor cancel (rename to StopAll and add CancelWorker)
                session::Error::Canceled => continue,
                session::Error::ConnectorFail(_) => break,
                session::Error::ConnectionClosed => break,
                _ => {
                    warn!("Request processing {} have catch an error {:?}", who, reason);
                    Output::Fail(reason.to_string())
                },
            };
            session.send(output).unwrap();
        }
    }
    debug!("Ends session with {}", who);

    // Standard sequence! Only one task simultaneous!
    // Simple to debug, Simple to implement client, corresponds to websocket main principle!

}

#[cfg(feature = "wsmould")]
pub mod wsmould {
    use std::thread;
    use std::io::ErrorKind;
    use std::sync::Arc;
    use std::net::ToSocketAddrs;
    use std::str::{self, Utf8Error};
    use std::time::{SystemTime, Duration};
    use websocket::Server;
    use websocket::message::{Message, Type};
    use websocket::client::Client as WSClient;
    use websocket::stream::TcpStream;
    use websocket::result::{WebSocketResult, WebSocketError};
    use session::{Builder, Session};
    use flow::{self, Flow};

    impl From<WebSocketError> for flow::Error {
        fn from(_: WebSocketError) -> Self {
            flow::Error::ConnectionBroken
        }
    }

    impl From<Utf8Error> for flow::Error {
        fn from(_: Utf8Error) -> Self {
            flow::Error::BadMessageEncoding
        }
    }

    pub type Client = WSClient<TcpStream>;

    impl Flow for Client {
        fn who(&self) -> String {
            let ip = self.peer_addr().unwrap();
            format!("WS IP {}", ip)
        }

        fn pull(&mut self) -> Result<Option<String>, flow::Error> {
            let mut last_ping = SystemTime::now();
            let ping_interval = Duration::from_secs(20);
            loop {
                let message: WebSocketResult<Message> = self.recv_message();
                match message {
                    Ok(message) => {
                        match message.opcode {
                            Type::Text => {
                                let content = str::from_utf8(&*message.payload)?;
                                return Ok(Some(content.to_owned()));
                            },
                            Type::Close => {
                                return Ok(None);
                            },
                            Type::Ping => {
                                self.send_message(&Message::pong(message.payload))?;
                            },
                            Type::Pong => {
                                trace!("pong received: {:?}", message.payload);
                            },
                            Type::Binary => (),
                        }
                        // No need ping if interaction was successful
                        last_ping = SystemTime::now();
                    },
                    Err(WebSocketError::IoError(ref err)) if err.kind() == ErrorKind::WouldBlock => {
                        let elapsed = last_ping.elapsed().map(|dur| dur > ping_interval).unwrap_or(false);
                        if elapsed {
                            // Reset time to stop ping flood
                            last_ping = SystemTime::now();
                            trace!("sending ping");
                            self.send_message(&Message::ping("mould-ping".as_bytes()))?;
                        }
                        thread::sleep(Duration::from_millis(50));
                    },
                    Err(err) => {
                        return Err(flow::Error::from(err));
                    },
                }
            }
        }

        fn push(&mut self, content: String) -> Result<(), flow::Error> {
            self.send_message(&Message::text(content)).map_err(flow::Error::from)
        }
    }



    pub fn start<T, A, B>(addr: A, suite: Arc<super::Suite<T, B>>)
        where A: ToSocketAddrs, B: Builder<T>, T: Session {
        // CLIENTS HANDLING
        // Fail if can't bind, safe to unwrap
        let server = Server::bind(addr).unwrap();

        for connection in server.filter_map(Result::ok) {
            let suite = suite.clone();
            thread::spawn(move || {
                let client = connection.accept().unwrap();
                client.set_nonblocking(true).expect("can't use non-blocking webosckets");
                debug!("Connection from {}", client.who());
                super::process_session(suite.as_ref(), client);
            });
        }
    }
}

#[cfg(feature = "iomould")]
pub mod iomould {
    use std::sync::Arc;
    use std::io::{self, Read, Write, BufRead, BufReader, BufWriter};
    use session::{Builder, Session};
    use flow::{self, Flow};

    impl From<io::Error> for flow::Error {
        fn from(_: io::Error) -> Self {
            flow::Error::ConnectionBroken
        }
    }


    pub struct IoFlow<R: Read, W: Write> {
        who: String,
        reader: BufReader<R>,
        writer: BufWriter<W>,
    }

    // Can read from stdin, files, sockets, etc!
    // It's simpler to implemet async reactor with this flow
    impl<R: Read, W: Write> IoFlow<R, W> {
        pub fn new(who: &str, reader: R, writer: W) -> Self {
            IoFlow {
                who: who.to_owned(),
                reader: BufReader::new(reader),
                writer: BufWriter::new(writer),
            }
        }
    }

    impl IoFlow<io::Stdin, io::Stdout> {
        pub fn stdio() -> Self {
            IoFlow::new("STDIO", io::stdin(), io::stdout())
        }
    }


    impl<R: Read, W: Write> Flow for IoFlow<R, W> {
        fn who(&self) -> String {
            self.who.clone()
        }

        fn pull(&mut self) -> Result<Option<String>, flow::Error> {
            let mut buf = String::new();
            let read = self.reader.read_line(&mut buf)?;
            if read > 0 {
                Ok(Some(buf))
            } else {
                Ok(None)
            }
        }

        fn push(&mut self, content: String) -> Result<(), flow::Error> {
            self.writer.write_all(content.as_bytes())?;
            self.writer.write_all(&['\n' as u8])?;
            self.writer.flush().map_err(flow::Error::from)
        }
    }

    pub fn start<T, B>(suite: Arc<super::Suite<T, B>>)
        where B: Builder<T>, T: Session {
        let client = IoFlow::stdio();
        // Use Arc to allow joining diferent start functions
        debug!("Connection from {}", client.who());
        super::process_session(suite.as_ref(), client);
    }
}