synapse_rpc/
http_client.rs1use crate::{
7 ContentType,
8 codec::{decode_message, encode_message},
9 message::create_rpc_request,
10};
11use anyhow::{Context, Result};
12use bytes::Bytes;
13use synapse_primitives::Uuid;
14use synapse_proto::{RpcRequest, RpcResponse, SynapseMessage, synapse_message};
15
16pub struct HttpRpcClient {
21 gateway_url: String,
22 content_type: ContentType,
23 client: reqwest::Client,
24 timeout: std::time::Duration,
25}
26
27impl HttpRpcClient {
28 const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
30
31 pub fn new(gateway_url: impl Into<String>, content_type: ContentType) -> Self {
33 Self {
34 gateway_url: gateway_url.into(),
35 content_type,
36 client: reqwest::Client::new(),
37 timeout: Self::DEFAULT_TIMEOUT,
38 }
39 }
40
41 pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
43 self.timeout = timeout;
44 self
45 }
46
47 pub fn with_mtls(
49 gateway_url: impl Into<String>,
50 content_type: ContentType,
51 cert_path: impl AsRef<std::path::Path>,
52 key_path: impl AsRef<std::path::Path>,
53 ca_cert_path: impl AsRef<std::path::Path>,
54 ) -> Result<Self> {
55 use std::io::BufReader;
56
57 tracing::info!(
58 "Creating mTLS client: cert={}, key={}, ca={}",
59 cert_path.as_ref().display(),
60 key_path.as_ref().display(),
61 ca_cert_path.as_ref().display()
62 );
63
64 let cert_file =
66 std::fs::File::open(cert_path.as_ref()).context("Failed to open client certificate")?;
67 let mut cert_reader = BufReader::new(cert_file);
68 let certs: Vec<_> = rustls_pemfile::certs(&mut cert_reader)
69 .collect::<std::result::Result<Vec<_>, _>>()
70 .context("Failed to parse client certificates")?;
71 tracing::info!("Loaded {} client certificate(s)", certs.len());
72
73 let key_file =
75 std::fs::File::open(key_path.as_ref()).context("Failed to open client key")?;
76 let mut key_reader = BufReader::new(key_file);
77 let key = rustls_pemfile::private_key(&mut key_reader)
78 .context("Failed to read private key")?
79 .context("No private key found")?;
80 tracing::info!("Client private key loaded");
81
82 let ca_file =
84 std::fs::File::open(ca_cert_path.as_ref()).context("Failed to open CA certificate")?;
85 let mut ca_reader = BufReader::new(ca_file);
86 let ca_certs: Vec<_> = rustls_pemfile::certs(&mut ca_reader)
87 .collect::<std::result::Result<Vec<_>, _>>()
88 .context("Failed to parse CA certificates")?;
89 tracing::info!("Loaded {} CA certificate(s)", ca_certs.len());
90
91 let mut root_store = rustls::RootCertStore::empty();
93 for cert in ca_certs {
94 root_store.add(cert).context("Failed to add CA cert")?;
95 }
96 tracing::info!("Root store has {} certificates", root_store.len());
97
98 let tls_config = rustls::ClientConfig::builder()
100 .with_root_certificates(root_store)
101 .with_client_auth_cert(certs, key)
102 .context("Failed to build TLS config")?;
103 tracing::info!("TLS client config built successfully");
104
105 let client = reqwest::Client::builder()
107 .use_preconfigured_tls(tls_config)
108 .build()
109 .context("Failed to build HTTP client")?;
110
111 Ok(Self {
112 gateway_url: gateway_url.into(),
113 content_type,
114 client,
115 timeout: Self::DEFAULT_TIMEOUT,
116 })
117 }
118
119 pub fn json(gateway_url: impl Into<String>) -> Self {
121 Self::new(gateway_url, ContentType::Json)
122 }
123
124 pub fn protobuf(gateway_url: impl Into<String>) -> Self {
126 Self::new(gateway_url, ContentType::Protobuf)
127 }
128
129 pub fn json_mtls(
131 gateway_url: impl Into<String>,
132 cert_path: impl AsRef<std::path::Path>,
133 key_path: impl AsRef<std::path::Path>,
134 ca_cert_path: impl AsRef<std::path::Path>,
135 ) -> Result<Self> {
136 Self::with_mtls(
137 gateway_url,
138 ContentType::Json,
139 cert_path,
140 key_path,
141 ca_cert_path,
142 )
143 }
144
145 pub fn protobuf_mtls(
147 gateway_url: impl Into<String>,
148 cert_path: impl AsRef<std::path::Path>,
149 key_path: impl AsRef<std::path::Path>,
150 ca_cert_path: impl AsRef<std::path::Path>,
151 ) -> Result<Self> {
152 Self::with_mtls(
153 gateway_url,
154 ContentType::Protobuf,
155 cert_path,
156 key_path,
157 ca_cert_path,
158 )
159 }
160
161 pub async fn call(&self, request: RpcRequest) -> Result<RpcResponse> {
163 let request_id = Uuid::new_v4();
164
165 let request_body = create_rpc_request(request_id, request, self.content_type)?;
167
168 let url = format!("{}/rpc", self.gateway_url);
170 let http_response = self
171 .client
172 .post(&url)
173 .header("Content-Type", self.content_type.mime_type())
174 .timeout(self.timeout)
175 .body(request_body.to_vec())
176 .send()
177 .await
178 .context("HTTP request failed")?;
179
180 if !http_response.status().is_success() {
182 let status = http_response.status();
183 let body = http_response.text().await.unwrap_or_default();
184 anyhow::bail!("HTTP error {}: {}", status, body);
185 }
186
187 let response_body = http_response
189 .bytes()
190 .await
191 .context("Failed to read response body")?;
192
193 let synapse_msg = decode_message(&response_body, self.content_type)?;
195
196 match synapse_msg.message {
198 Some(synapse_message::Message::RpcResponse(resp)) => Ok(resp),
199 _ => anyhow::bail!(
200 "Expected RPC_RESPONSE message, got kind {:?}",
201 synapse_msg.kind
202 ),
203 }
204 }
205
206 pub fn gateway_url(&self) -> &str {
208 &self.gateway_url
209 }
210
211 pub fn content_type(&self) -> ContentType {
213 self.content_type
214 }
215
216 pub async fn send(&self, message: &SynapseMessage) -> Result<SynapseMessage> {
218 let request_body = encode_message(message, self.content_type)?;
220
221 let url = format!("{}/rpc", self.gateway_url);
223 tracing::debug!("Sending SynapseMessage to: {}", url);
224
225 let http_response = self
226 .client
227 .post(&url)
228 .header("Content-Type", self.content_type.mime_type())
229 .timeout(self.timeout)
230 .body(request_body.to_vec())
231 .send()
232 .await
233 .context("HTTP request failed")?;
234
235 if !http_response.status().is_success() {
237 let status = http_response.status();
238 let body = http_response.text().await.unwrap_or_default();
239 anyhow::bail!("HTTP error {}: {}", status, body);
240 }
241
242 let response_body = http_response
244 .bytes()
245 .await
246 .context("Failed to read response body")?;
247
248 decode_message(&response_body, self.content_type)
250 }
251
252 pub async fn send_raw(&self, body: Bytes) -> Result<Bytes> {
254 let url = format!("{}/rpc", self.gateway_url);
256 let http_response = self
257 .client
258 .post(&url)
259 .header("Content-Type", self.content_type.mime_type())
260 .timeout(self.timeout)
261 .body(body.to_vec())
262 .send()
263 .await
264 .context("HTTP request failed")?;
265
266 if !http_response.status().is_success() {
268 let status = http_response.status();
269 let body = http_response.text().await.unwrap_or_default();
270 anyhow::bail!("HTTP error {}: {}", status, body);
271 }
272
273 let response_body = http_response
275 .bytes()
276 .await
277 .context("Failed to read response body")?;
278
279 Ok(Bytes::from(response_body.to_vec()))
280 }
281}