jsonrpc_quic/
server_endpoint.rs

1// Copyright 2020 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under the MIT license <LICENSE-MIT
4// http://opensource.org/licenses/MIT> or the Modified BSD license <LICENSE-BSD
5// https://opensource.org/licenses/BSD-3-Clause>, at your option. This file may not be copied,
6// modified, or distributed except according to those terms. Please review the Licences for the
7// specific language governing permissions and limitations relating to use of the SAFE Network
8// Software.
9
10use super::{
11    jsonrpc::parse_jsonrpc_request, Error, JsonRpcRequest, JsonRpcResponse, Result, ALPN_QUIC_HTTP,
12};
13use crate::utils;
14use futures::StreamExt;
15use log::{debug, info, warn};
16use std::{fs, io, net::SocketAddr, str, sync::Arc};
17
18// JSON-RPC over QUIC server endpoint
19pub struct Endpoint {
20    config: quinn::ServerConfig,
21}
22
23impl Endpoint {
24    // cert_base_path: Base path where to locate custom certificate authority to trust, in DER format
25    // idle_timeout: Optional number of millis before timing out an idle connection
26    pub fn new(cert_base_path: &str, idle_timeout: Option<u64>) -> Result<Self> {
27        let mut server_config = quinn::ServerConfig::default();
28        if let Some(timeout) = idle_timeout {
29            server_config.transport = Arc::new(utils::new_transport_cfg(timeout)?)
30        };
31
32        let mut server_config = quinn::ServerConfigBuilder::new(server_config);
33        server_config.protocols(ALPN_QUIC_HTTP);
34
35        let cert_path = std::path::Path::new(&cert_base_path).join("cert.der");
36        let key_path = std::path::Path::new(&cert_base_path).join("key.der");
37        let (cert, key) = match fs::read(&cert_path).and_then(|x| Ok((x, fs::read(&key_path)?))) {
38            Ok(x) => x,
39            Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
40                info!("Generating self-signed certificate...");
41                let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).map_err(
42                    |err| {
43                        Error::GeneralError(format!(
44                            "Failed to generate self-signed certificate: {}",
45                            err
46                        ))
47                    },
48                )?;
49                let key = cert.serialize_private_key_der();
50                let cert = cert.serialize_der().map_err(|err| {
51                    Error::GeneralError(format!("Failed to serialise certificate: {}", err))
52                })?;
53                fs::create_dir_all(&std::path::Path::new(&cert_base_path)).map_err(|err| {
54                    Error::GeneralError(format!("Failed to create certificate directory: {}", err))
55                })?;
56                fs::write(&cert_path, &cert).map_err(|err| {
57                    Error::GeneralError(format!("Failed to write certificate: {}", err))
58                })?;
59                fs::write(&key_path, &key).map_err(|err| {
60                    Error::GeneralError(format!("Failed to write private key: {}", err))
61                })?;
62                (cert, key)
63            }
64            Err(e) => {
65                return Err(Error::GeneralError(format!(
66                    "Failed to read certificate: {}",
67                    e
68                )));
69            }
70        };
71        let key = quinn::PrivateKey::from_der(&key).map_err(|err| {
72            Error::GeneralError(format!("Failed parse private key from file: {}", err))
73        })?;
74        let cert = quinn::Certificate::from_der(&cert).map_err(|err| {
75            Error::GeneralError(format!("Failed to parse certificate from file: {}", err))
76        })?;
77        server_config
78            .certificate(quinn::CertificateChain::from_certs(vec![cert]), key)
79            .map_err(|err| {
80                Error::GeneralError(format!(
81                    "Failed to set certificate for communication: {}",
82                    err
83                ))
84            })?;
85
86        let config = server_config.build();
87        Ok(Self { config })
88    }
89
90    // Bind server endpoint to a socket address to start listening for connections on it
91    pub fn bind(&self, listen_socket_addr: &SocketAddr) -> Result<IncomingConn> {
92        let mut quinn_endpoint_builder = quinn::Endpoint::builder();
93        quinn_endpoint_builder.listen(self.config.clone());
94
95        let (_endpoint, incoming) = quinn_endpoint_builder
96            .bind(&listen_socket_addr)
97            .map_err(|err| Error::GeneralError(format!("Failed to bind QUIC endpoint: {}", err)))?;
98
99        Ok(IncomingConn::new(incoming))
100    }
101}
102
103// Stream of incoming QUIC connections
104pub struct IncomingConn {
105    quinn_incoming: quinn::Incoming,
106}
107
108impl IncomingConn {
109    pub(crate) fn new(quinn_incoming: quinn::Incoming) -> Self {
110        Self { quinn_incoming }
111    }
112
113    // Returns next QUIC connection established by a peer
114    pub async fn get_next(&mut self) -> Option<IncomingJsonRpcRequest> {
115        match self.quinn_incoming.next().await {
116            Some(quinn_conn) => match quinn_conn.await {
117                Ok(quinn::NewConnection { bi_streams, .. }) => {
118                    Some(IncomingJsonRpcRequest::new(bi_streams))
119                }
120                Err(_err) => None,
121            },
122            None => None,
123        }
124    }
125}
126
127// Stream of incoming JSON-RPC request messages
128pub struct IncomingJsonRpcRequest {
129    bi_streams: quinn::IncomingBiStreams,
130}
131
132impl IncomingJsonRpcRequest {
133    pub(crate) fn new(bi_streams: quinn::IncomingBiStreams) -> Self {
134        Self { bi_streams }
135    }
136
137    // Returns next JSON-RPC request sent by the peer on current QUIC connection
138    pub async fn get_next(&mut self) -> Option<(JsonRpcRequest, JsonRpcResponseStream)> {
139        // Each stream initiated by the client constitutes a new request
140        match self.bi_streams.next().await {
141            None => None,
142            Some(stream) => {
143                let (send, recv): (quinn::SendStream, quinn::RecvStream) = match stream {
144                    Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
145                        debug!("Connection terminated");
146                        return None;
147                    }
148                    Err(err) => {
149                        warn!("Failed to read incoming request: {}", err);
150                        return None;
151                    }
152                    Ok(bi_stream) => bi_stream,
153                };
154
155                match recv
156                    .read_to_end(64 * 1024) // Read the request's bytes, which must be at most 64KiB
157                    .await
158                {
159                    Ok(req_bytes) => {
160                        debug!("Got new request's bytes");
161                        match parse_jsonrpc_request(req_bytes) {
162                            Ok(jsonrpc_req) => {
163                                debug!("Request parsed successfully");
164                                Some((jsonrpc_req, JsonRpcResponseStream::new(send)))
165                            }
166                            Err(err) => {
167                                warn!("Failed to parse request as JSON-RPC: {}", err);
168                                None
169                            }
170                        }
171                    }
172                    Err(err) => {
173                        warn!("Failed reading request's bytes: {}", err);
174                        None
175                    }
176                }
177            }
178        }
179    }
180}
181
182// Stream of outgoing JSON-RPC responses
183pub struct JsonRpcResponseStream {
184    quinn_send_stream: quinn::SendStream,
185}
186
187impl JsonRpcResponseStream {
188    pub(crate) fn new(quinn_send_stream: quinn::SendStream) -> Self {
189        Self { quinn_send_stream }
190    }
191
192    // Write a JsonRpcResponse into the current connection's sending stream
193    pub async fn respond(&mut self, response: &JsonRpcResponse) -> Result<()> {
194        let serialised_res = serde_json::to_string(response).map_err(|err| {
195            Error::GeneralError(format!("Failed to serialise response: {:?}", err))
196        })?;
197
198        self.quinn_send_stream
199            .write_all(&serialised_res.into_bytes())
200            .await
201            .map_err(|err| {
202                Error::GeneralError(format!(
203                    "Failed to write entire buffer to response stream: {}",
204                    err
205                ))
206            })
207    }
208
209    // Gracefully finish current connection's stream
210    pub async fn finish(&mut self) -> Result<()> {
211        self.quinn_send_stream.finish().await.map_err(|err| {
212            Error::GeneralError(format!(
213                "Failed to shutdown the response stream gracefully: {}",
214                err
215            ))
216        })
217    }
218}