Skip to main content

fast_rpc/
server.rs

1// Copyright 2019 Joyent, Inc.
2
3//! This module provides the interface for creating Fast servers.
4
5use std::io::Error;
6
7use serde_json::json;
8use slog::{debug, error, o, Drain, Logger};
9use tokio;
10use tokio::codec::Decoder;
11use tokio::net::TcpStream;
12use tokio::prelude::*;
13
14use crate::protocol::{FastMessage, FastMessageData, FastRpc};
15
16/// Create a task to be used by the tokio runtime for handling responses to Fast
17/// protocol requests.
18pub fn make_task<F>(
19    socket: TcpStream,
20    mut response_handler: F,
21    log: Option<&Logger>,
22) -> impl Future<Item = (), Error = ()> + Send
23where
24    F: FnMut(&FastMessage, &Logger) -> Result<Vec<FastMessage>, Error> + Send,
25{
26    let (tx, rx) = FastRpc.framed(socket).split();
27
28    // If no logger was provided use the slog StdLog drain by default
29    let rx_log = log
30        .cloned()
31        .unwrap_or_else(|| Logger::root(slog_stdlog::StdLog.fuse(), o!()));
32
33    let tx_log = rx_log.clone();
34    tx.send_all(rx.and_then(move |x| {
35        debug!(rx_log, "processing fast message");
36        respond(x, &mut response_handler, &rx_log)
37    }))
38    .then(move |res| {
39        if let Err(e) = res {
40            error!(tx_log, "failed to process connection"; "err" => %e);
41        }
42
43        debug!(tx_log, "transmitted response to client");
44        Ok(())
45    })
46}
47
48fn respond<F>(
49    msgs: Vec<FastMessage>,
50    response_handler: &mut F,
51    log: &Logger,
52) -> impl Future<Item = Vec<FastMessage>, Error = Error> + Send
53where
54    F: FnMut(&FastMessage, &Logger) -> Result<Vec<FastMessage>, Error> + Send,
55{
56    debug!(log, "responding to {} messages", msgs.len());
57
58    let mut responses: Vec<FastMessage> = Vec::new();
59
60    for msg in msgs {
61        match response_handler(&msg, &log) {
62            Ok(mut response) => {
63                // Make sure there is room in responses to fit another response plus an
64                // end message
65                let responses_len = responses.len();
66                let response_len = response.len();
67                let responses_capacity = responses.capacity();
68                if responses_len + response_len > responses_capacity {
69                    let needed_capacity =
70                        responses_len + response_len - responses_capacity;
71                    responses.reserve(needed_capacity);
72                }
73
74                // Add all response messages for this message to the vector of
75                // all responses
76                response.drain(..).for_each(|r| {
77                    responses.push(r);
78                });
79
80                debug!(log, "generated response");
81                let method = msg.data.m.name.clone();
82                responses.push(FastMessage::end(msg.id, method));
83            }
84            Err(err) => {
85                let method = msg.data.m.name.clone();
86                let value = json!({
87                    "name": "FastError",
88                    "message": err.to_string()
89                });
90
91                let err_msg = FastMessage::error(
92                    msg.id,
93                    FastMessageData::new(method, value),
94                );
95                responses.push(err_msg);
96            }
97        }
98    }
99
100    Box::new(future::ok(responses))
101}