1use std::io::Error;
6
7use serde_json::json;
8use slog::{debug, error, o, Drain, Logger};
9use tokio;
10use tokio::codec::Decoder;
11use tokio::net::TcpStream;
12use tokio::prelude::*;
13
14use crate::protocol::{FastMessage, FastMessageData, FastRpc};
15
16pub fn make_task<F>(
19 socket: TcpStream,
20 mut response_handler: F,
21 log: Option<&Logger>,
22) -> impl Future<Item = (), Error = ()> + Send
23where
24 F: FnMut(&FastMessage, &Logger) -> Result<Vec<FastMessage>, Error> + Send,
25{
26 let (tx, rx) = FastRpc.framed(socket).split();
27
28 let rx_log = log
30 .cloned()
31 .unwrap_or_else(|| Logger::root(slog_stdlog::StdLog.fuse(), o!()));
32
33 let tx_log = rx_log.clone();
34 tx.send_all(rx.and_then(move |x| {
35 debug!(rx_log, "processing fast message");
36 respond(x, &mut response_handler, &rx_log)
37 }))
38 .then(move |res| {
39 if let Err(e) = res {
40 error!(tx_log, "failed to process connection"; "err" => %e);
41 }
42
43 debug!(tx_log, "transmitted response to client");
44 Ok(())
45 })
46}
47
48fn respond<F>(
49 msgs: Vec<FastMessage>,
50 response_handler: &mut F,
51 log: &Logger,
52) -> impl Future<Item = Vec<FastMessage>, Error = Error> + Send
53where
54 F: FnMut(&FastMessage, &Logger) -> Result<Vec<FastMessage>, Error> + Send,
55{
56 debug!(log, "responding to {} messages", msgs.len());
57
58 let mut responses: Vec<FastMessage> = Vec::new();
59
60 for msg in msgs {
61 match response_handler(&msg, &log) {
62 Ok(mut response) => {
63 let responses_len = responses.len();
66 let response_len = response.len();
67 let responses_capacity = responses.capacity();
68 if responses_len + response_len > responses_capacity {
69 let needed_capacity =
70 responses_len + response_len - responses_capacity;
71 responses.reserve(needed_capacity);
72 }
73
74 response.drain(..).for_each(|r| {
77 responses.push(r);
78 });
79
80 debug!(log, "generated response");
81 let method = msg.data.m.name.clone();
82 responses.push(FastMessage::end(msg.id, method));
83 }
84 Err(err) => {
85 let method = msg.data.m.name.clone();
86 let value = json!({
87 "name": "FastError",
88 "message": err.to_string()
89 });
90
91 let err_msg = FastMessage::error(
92 msg.id,
93 FastMessageData::new(method, value),
94 );
95 responses.push(err_msg);
96 }
97 }
98 }
99
100 Box::new(future::ok(responses))
101}