jsonrpc_quic/
client_endpoint.rs1use super::{jsonrpc::parse_jsonrpc_response, Error, JsonRpcRequest, Result, ALPN_QUIC_HTTP};
11use crate::utils;
12use log::debug;
13use serde::de::DeserializeOwned;
14use std::{fs, path::PathBuf, str, sync::Arc, time::Instant};
15use url::Url;
16
17pub struct ClientEndpoint {
19 config: quinn::ClientConfig,
20}
21
22impl ClientEndpoint {
23 pub fn new(cert_base_path: &str, idle_timeout: Option<u64>, keylog: bool) -> Result<Self> {
27 let mut client_config = quinn::ClientConfigBuilder::default();
28 client_config.protocols(ALPN_QUIC_HTTP);
29
30 if keylog {
31 client_config.enable_keylog();
32 }
33
34 let ca_path = PathBuf::from(cert_base_path).join("cert.der");
35
36 let ca_certificate = fs::read(&ca_path).map_err(|err| {
37 Error::ClientError(format!(
38 "Failed to read certificate from '{}': {}",
39 ca_path.display(),
40 err
41 ))
42 })?;
43 let ca_authority = quinn::Certificate::from_der(&ca_certificate).map_err(|err| {
44 Error::ClientError(format!(
45 "Failed to obtain CA authority from certificate found at '{}': {}",
46 ca_path.display(),
47 err
48 ))
49 })?;
50
51 client_config
52 .add_certificate_authority(ca_authority)
53 .map_err(|err| {
54 Error::ClientError(format!(
55 "Failed to add CA authority to QUIC client configuration: {}",
56 err
57 ))
58 })?;
59
60 let mut config = client_config.build();
61 if let Some(timeout) = idle_timeout {
62 config.transport = Arc::new(utils::new_transport_cfg(timeout)?)
63 };
64 Ok(Self { config })
65 }
66
67 pub fn bind(&self) -> Result<OutgoingConn> {
68 let mut quinn_endpoint_builder = quinn::Endpoint::builder();
69 quinn_endpoint_builder.default_client_config(self.config.clone());
70
71 let socket_addr = "[::]:0".parse().map_err(|err| {
72 Error::ClientError(format!("Failed to parse client endpoint address: {}", err))
73 })?;
74
75 let (endpoint, _) = quinn_endpoint_builder.bind(&socket_addr).map_err(|err| {
76 Error::ClientError(format!("Failed to bind client endpoint: {}", err))
77 })?;
78
79 Ok(OutgoingConn::new(endpoint))
80 }
81}
82
83pub struct OutgoingConn {
85 pub quinn_endpoint: quinn::Endpoint,
86}
87
88impl OutgoingConn {
89 pub(crate) fn new(quinn_endpoint: quinn::Endpoint) -> Self {
90 Self { quinn_endpoint }
91 }
92
93 pub async fn connect(
97 &mut self,
98 dest_endpoint: &str,
99 cert_host: Option<&str>,
100 ) -> Result<OutgoingJsonRpcRequest> {
101 let start = Instant::now();
102 let url = Url::parse(dest_endpoint).map_err(|_| {
103 Error::ClientError("Failed to parse remote end point address".to_string())
104 })?;
105 let remote = url
106 .socket_addrs(|| None)
107 .map_err(|_| Error::ClientError("Invalid remote end point address".to_string()))?[0];
108 let host = cert_host
109 .as_ref()
110 .map_or_else(|| url.host_str(), |x| Some(&x))
111 .ok_or_else(|| Error::ClientError("No certificate hostname specified".to_string()))?;
112
113 let new_conn = self
114 .quinn_endpoint
115 .connect(&remote, &host)
116 .map_err(|err| {
117 Error::ClientError(format!(
118 "Failed when attempting to create a connection with remote QUIC endpoint: {}",
119 err
120 ))
121 })?
122 .await
123 .map_err(|err| {
124 Error::ClientError(format!(
125 "Failed to establish connection with remote QUIC endpoint: {}",
126 err
127 ))
128 })?;
129
130 debug!(
131 "Connected with remote QUIC endpoint at {:?}",
132 start.elapsed()
133 );
134 let quinn::NewConnection {
135 connection: conn, ..
136 } = { new_conn };
137
138 Ok(OutgoingJsonRpcRequest::new(conn))
139 }
140}
141
142pub struct OutgoingJsonRpcRequest {
144 quinn_connection: quinn::Connection,
145}
146
147impl OutgoingJsonRpcRequest {
148 pub(crate) fn new(quinn_connection: quinn::Connection) -> Self {
149 Self { quinn_connection }
150 }
151
152 pub async fn send<T>(&mut self, method: &str, params: serde_json::Value) -> Result<T>
157 where
158 T: DeserializeOwned,
159 {
160 let (mut send, recv) = self.quinn_connection.open_bi().await.map_err(|err| {
161 Error::ClientError(format!("Failed to open communication stream: {}", err))
162 })?;
163
164 let jsonrpc_req = JsonRpcRequest::new(method, params);
165
166 let serialised_req = serde_json::to_string(&jsonrpc_req).map_err(|err| {
167 Error::ClientError(format!("Failed to serialise request to be sent: {}", err))
168 })?;
169
170 send.write_all(serialised_req.as_bytes())
172 .await
173 .map_err(|err| Error::ClientError(format!("Failed to send request: {}", err)))?;
174
175 send.finish().await.map_err(|err| {
176 Error::ClientError(format!(
177 "Failed to gracefully shutdown communication stream: {}",
178 err
179 ))
180 })?;
181
182 debug!("Request sent to remote endpoint");
183 let received_bytes = recv
184 .read_to_end(usize::max_value())
185 .await
186 .map_err(|err| Error::ClientError(format!("Response not received: {}", err)))?;
187
188 self.quinn_connection.close(0u32.into(), b"");
189
190 parse_jsonrpc_response(received_bytes.as_slice())
191 }
192}