jsonrpc_quic/
server_endpoint.rs1use super::{
11 jsonrpc::parse_jsonrpc_request, Error, JsonRpcRequest, JsonRpcResponse, Result, ALPN_QUIC_HTTP,
12};
13use crate::utils;
14use futures::StreamExt;
15use log::{debug, info, warn};
16use std::{fs, io, net::SocketAddr, str, sync::Arc};
17
18pub struct Endpoint {
20 config: quinn::ServerConfig,
21}
22
23impl Endpoint {
24 pub fn new(cert_base_path: &str, idle_timeout: Option<u64>) -> Result<Self> {
27 let mut server_config = quinn::ServerConfig::default();
28 if let Some(timeout) = idle_timeout {
29 server_config.transport = Arc::new(utils::new_transport_cfg(timeout)?)
30 };
31
32 let mut server_config = quinn::ServerConfigBuilder::new(server_config);
33 server_config.protocols(ALPN_QUIC_HTTP);
34
35 let cert_path = std::path::Path::new(&cert_base_path).join("cert.der");
36 let key_path = std::path::Path::new(&cert_base_path).join("key.der");
37 let (cert, key) = match fs::read(&cert_path).and_then(|x| Ok((x, fs::read(&key_path)?))) {
38 Ok(x) => x,
39 Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
40 info!("Generating self-signed certificate...");
41 let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).map_err(
42 |err| {
43 Error::GeneralError(format!(
44 "Failed to generate self-signed certificate: {}",
45 err
46 ))
47 },
48 )?;
49 let key = cert.serialize_private_key_der();
50 let cert = cert.serialize_der().map_err(|err| {
51 Error::GeneralError(format!("Failed to serialise certificate: {}", err))
52 })?;
53 fs::create_dir_all(&std::path::Path::new(&cert_base_path)).map_err(|err| {
54 Error::GeneralError(format!("Failed to create certificate directory: {}", err))
55 })?;
56 fs::write(&cert_path, &cert).map_err(|err| {
57 Error::GeneralError(format!("Failed to write certificate: {}", err))
58 })?;
59 fs::write(&key_path, &key).map_err(|err| {
60 Error::GeneralError(format!("Failed to write private key: {}", err))
61 })?;
62 (cert, key)
63 }
64 Err(e) => {
65 return Err(Error::GeneralError(format!(
66 "Failed to read certificate: {}",
67 e
68 )));
69 }
70 };
71 let key = quinn::PrivateKey::from_der(&key).map_err(|err| {
72 Error::GeneralError(format!("Failed parse private key from file: {}", err))
73 })?;
74 let cert = quinn::Certificate::from_der(&cert).map_err(|err| {
75 Error::GeneralError(format!("Failed to parse certificate from file: {}", err))
76 })?;
77 server_config
78 .certificate(quinn::CertificateChain::from_certs(vec![cert]), key)
79 .map_err(|err| {
80 Error::GeneralError(format!(
81 "Failed to set certificate for communication: {}",
82 err
83 ))
84 })?;
85
86 let config = server_config.build();
87 Ok(Self { config })
88 }
89
90 pub fn bind(&self, listen_socket_addr: &SocketAddr) -> Result<IncomingConn> {
92 let mut quinn_endpoint_builder = quinn::Endpoint::builder();
93 quinn_endpoint_builder.listen(self.config.clone());
94
95 let (_endpoint, incoming) = quinn_endpoint_builder
96 .bind(&listen_socket_addr)
97 .map_err(|err| Error::GeneralError(format!("Failed to bind QUIC endpoint: {}", err)))?;
98
99 Ok(IncomingConn::new(incoming))
100 }
101}
102
103pub struct IncomingConn {
105 quinn_incoming: quinn::Incoming,
106}
107
108impl IncomingConn {
109 pub(crate) fn new(quinn_incoming: quinn::Incoming) -> Self {
110 Self { quinn_incoming }
111 }
112
113 pub async fn get_next(&mut self) -> Option<IncomingJsonRpcRequest> {
115 match self.quinn_incoming.next().await {
116 Some(quinn_conn) => match quinn_conn.await {
117 Ok(quinn::NewConnection { bi_streams, .. }) => {
118 Some(IncomingJsonRpcRequest::new(bi_streams))
119 }
120 Err(_err) => None,
121 },
122 None => None,
123 }
124 }
125}
126
127pub struct IncomingJsonRpcRequest {
129 bi_streams: quinn::IncomingBiStreams,
130}
131
132impl IncomingJsonRpcRequest {
133 pub(crate) fn new(bi_streams: quinn::IncomingBiStreams) -> Self {
134 Self { bi_streams }
135 }
136
137 pub async fn get_next(&mut self) -> Option<(JsonRpcRequest, JsonRpcResponseStream)> {
139 match self.bi_streams.next().await {
141 None => None,
142 Some(stream) => {
143 let (send, recv): (quinn::SendStream, quinn::RecvStream) = match stream {
144 Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
145 debug!("Connection terminated");
146 return None;
147 }
148 Err(err) => {
149 warn!("Failed to read incoming request: {}", err);
150 return None;
151 }
152 Ok(bi_stream) => bi_stream,
153 };
154
155 match recv
156 .read_to_end(64 * 1024) .await
158 {
159 Ok(req_bytes) => {
160 debug!("Got new request's bytes");
161 match parse_jsonrpc_request(req_bytes) {
162 Ok(jsonrpc_req) => {
163 debug!("Request parsed successfully");
164 Some((jsonrpc_req, JsonRpcResponseStream::new(send)))
165 }
166 Err(err) => {
167 warn!("Failed to parse request as JSON-RPC: {}", err);
168 None
169 }
170 }
171 }
172 Err(err) => {
173 warn!("Failed reading request's bytes: {}", err);
174 None
175 }
176 }
177 }
178 }
179 }
180}
181
182pub struct JsonRpcResponseStream {
184 quinn_send_stream: quinn::SendStream,
185}
186
187impl JsonRpcResponseStream {
188 pub(crate) fn new(quinn_send_stream: quinn::SendStream) -> Self {
189 Self { quinn_send_stream }
190 }
191
192 pub async fn respond(&mut self, response: &JsonRpcResponse) -> Result<()> {
194 let serialised_res = serde_json::to_string(response).map_err(|err| {
195 Error::GeneralError(format!("Failed to serialise response: {:?}", err))
196 })?;
197
198 self.quinn_send_stream
199 .write_all(&serialised_res.into_bytes())
200 .await
201 .map_err(|err| {
202 Error::GeneralError(format!(
203 "Failed to write entire buffer to response stream: {}",
204 err
205 ))
206 })
207 }
208
209 pub async fn finish(&mut self) -> Result<()> {
211 self.quinn_send_stream.finish().await.map_err(|err| {
212 Error::GeneralError(format!(
213 "Failed to shutdown the response stream gracefully: {}",
214 err
215 ))
216 })
217 }
218}