Skip to main content

synapse_rpc/
http_client.rs

1//! HTTP RPC client for calling gateway over HTTP
2//!
3//! This client allows services to make RPC calls to the gateway
4//! using HTTP as the transport. Supports both plain HTTP (dev) and mTLS (production).
5
6use crate::{
7    ContentType,
8    codec::{decode_message, encode_message},
9    message::create_rpc_request,
10};
11use anyhow::{Context, Result};
12use bytes::Bytes;
13use synapse_primitives::Uuid;
14use synapse_proto::{RpcRequest, RpcResponse, SynapseMessage, synapse_message};
15
16/// HTTP RPC client
17///
18/// Makes RPC calls to the gateway over HTTP POST requests.
19/// Supports both plain HTTP (dev mode) and mTLS (production).
20pub struct HttpRpcClient {
21    gateway_url: String,
22    content_type: ContentType,
23    client: reqwest::Client,
24    timeout: std::time::Duration,
25}
26
27impl HttpRpcClient {
28    /// Default request timeout (30 seconds)
29    const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
30
31    /// Create a new HTTP RPC client (plain HTTP, no TLS)
32    pub fn new(gateway_url: impl Into<String>, content_type: ContentType) -> Self {
33        Self {
34            gateway_url: gateway_url.into(),
35            content_type,
36            client: reqwest::Client::new(),
37            timeout: Self::DEFAULT_TIMEOUT,
38        }
39    }
40
41    /// Set the request timeout
42    pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
43        self.timeout = timeout;
44        self
45    }
46
47    /// Create a new HTTP RPC client with mTLS
48    pub fn with_mtls(
49        gateway_url: impl Into<String>,
50        content_type: ContentType,
51        cert_path: impl AsRef<std::path::Path>,
52        key_path: impl AsRef<std::path::Path>,
53        ca_cert_path: impl AsRef<std::path::Path>,
54    ) -> Result<Self> {
55        use std::io::BufReader;
56
57        tracing::info!(
58            "Creating mTLS client: cert={}, key={}, ca={}",
59            cert_path.as_ref().display(),
60            key_path.as_ref().display(),
61            ca_cert_path.as_ref().display()
62        );
63
64        // Load client certificate chain
65        let cert_file =
66            std::fs::File::open(cert_path.as_ref()).context("Failed to open client certificate")?;
67        let mut cert_reader = BufReader::new(cert_file);
68        let certs: Vec<_> = rustls_pemfile::certs(&mut cert_reader)
69            .collect::<std::result::Result<Vec<_>, _>>()
70            .context("Failed to parse client certificates")?;
71        tracing::info!("Loaded {} client certificate(s)", certs.len());
72
73        // Load client private key
74        let key_file =
75            std::fs::File::open(key_path.as_ref()).context("Failed to open client key")?;
76        let mut key_reader = BufReader::new(key_file);
77        let key = rustls_pemfile::private_key(&mut key_reader)
78            .context("Failed to read private key")?
79            .context("No private key found")?;
80        tracing::info!("Client private key loaded");
81
82        // Load CA certificate for server verification
83        let ca_file =
84            std::fs::File::open(ca_cert_path.as_ref()).context("Failed to open CA certificate")?;
85        let mut ca_reader = BufReader::new(ca_file);
86        let ca_certs: Vec<_> = rustls_pemfile::certs(&mut ca_reader)
87            .collect::<std::result::Result<Vec<_>, _>>()
88            .context("Failed to parse CA certificates")?;
89        tracing::info!("Loaded {} CA certificate(s)", ca_certs.len());
90
91        // Build root cert store for verifying the server's certificate
92        let mut root_store = rustls::RootCertStore::empty();
93        for cert in ca_certs {
94            root_store.add(cert).context("Failed to add CA cert")?;
95        }
96        tracing::info!("Root store has {} certificates", root_store.len());
97
98        // Build TLS config with client auth
99        let tls_config = rustls::ClientConfig::builder()
100            .with_root_certificates(root_store)
101            .with_client_auth_cert(certs, key)
102            .context("Failed to build TLS config")?;
103        tracing::info!("TLS client config built successfully");
104
105        // Build reqwest client with custom TLS
106        let client = reqwest::Client::builder()
107            .use_preconfigured_tls(tls_config)
108            .build()
109            .context("Failed to build HTTP client")?;
110
111        Ok(Self {
112            gateway_url: gateway_url.into(),
113            content_type,
114            client,
115            timeout: Self::DEFAULT_TIMEOUT,
116        })
117    }
118
119    /// Create a JSON client (human-readable, debugging)
120    pub fn json(gateway_url: impl Into<String>) -> Self {
121        Self::new(gateway_url, ContentType::Json)
122    }
123
124    /// Create a Protobuf client (efficient, production)
125    pub fn protobuf(gateway_url: impl Into<String>) -> Self {
126        Self::new(gateway_url, ContentType::Protobuf)
127    }
128
129    /// Create a JSON client with mTLS
130    pub fn json_mtls(
131        gateway_url: impl Into<String>,
132        cert_path: impl AsRef<std::path::Path>,
133        key_path: impl AsRef<std::path::Path>,
134        ca_cert_path: impl AsRef<std::path::Path>,
135    ) -> Result<Self> {
136        Self::with_mtls(
137            gateway_url,
138            ContentType::Json,
139            cert_path,
140            key_path,
141            ca_cert_path,
142        )
143    }
144
145    /// Create a Protobuf client with mTLS
146    pub fn protobuf_mtls(
147        gateway_url: impl Into<String>,
148        cert_path: impl AsRef<std::path::Path>,
149        key_path: impl AsRef<std::path::Path>,
150        ca_cert_path: impl AsRef<std::path::Path>,
151    ) -> Result<Self> {
152        Self::with_mtls(
153            gateway_url,
154            ContentType::Protobuf,
155            cert_path,
156            key_path,
157            ca_cert_path,
158        )
159    }
160
161    /// Make an RPC call
162    pub async fn call(&self, request: RpcRequest) -> Result<RpcResponse> {
163        let request_id = Uuid::new_v4();
164
165        // Encode the request into a SynapseMessage
166        let request_body = create_rpc_request(request_id, request, self.content_type)?;
167
168        // Make HTTP POST request
169        let url = format!("{}/rpc", self.gateway_url);
170        let http_response = self
171            .client
172            .post(&url)
173            .header("Content-Type", self.content_type.mime_type())
174            .timeout(self.timeout)
175            .body(request_body.to_vec())
176            .send()
177            .await
178            .context("HTTP request failed")?;
179
180        // Check status
181        if !http_response.status().is_success() {
182            let status = http_response.status();
183            let body = http_response.text().await.unwrap_or_default();
184            anyhow::bail!("HTTP error {}: {}", status, body);
185        }
186
187        // Read response body
188        let response_body = http_response
189            .bytes()
190            .await
191            .context("Failed to read response body")?;
192
193        // Decode response
194        let synapse_msg = decode_message(&response_body, self.content_type)?;
195
196        // Extract RpcResponse from SynapseMessage
197        match synapse_msg.message {
198            Some(synapse_message::Message::RpcResponse(resp)) => Ok(resp),
199            _ => anyhow::bail!(
200                "Expected RPC_RESPONSE message, got kind {:?}",
201                synapse_msg.kind
202            ),
203        }
204    }
205
206    /// Get the gateway URL
207    pub fn gateway_url(&self) -> &str {
208        &self.gateway_url
209    }
210
211    /// Get the content type
212    pub fn content_type(&self) -> ContentType {
213        self.content_type
214    }
215
216    /// Send a raw SynapseMessage and receive a response
217    pub async fn send(&self, message: &SynapseMessage) -> Result<SynapseMessage> {
218        // Encode the message
219        let request_body = encode_message(message, self.content_type)?;
220
221        // Make HTTP POST request
222        let url = format!("{}/rpc", self.gateway_url);
223        tracing::debug!("Sending SynapseMessage to: {}", url);
224
225        let http_response = self
226            .client
227            .post(&url)
228            .header("Content-Type", self.content_type.mime_type())
229            .timeout(self.timeout)
230            .body(request_body.to_vec())
231            .send()
232            .await
233            .context("HTTP request failed")?;
234
235        // Check status
236        if !http_response.status().is_success() {
237            let status = http_response.status();
238            let body = http_response.text().await.unwrap_or_default();
239            anyhow::bail!("HTTP error {}: {}", status, body);
240        }
241
242        // Read response body
243        let response_body = http_response
244            .bytes()
245            .await
246            .context("Failed to read response body")?;
247
248        // Decode response
249        decode_message(&response_body, self.content_type)
250    }
251
252    /// Send raw bytes and receive raw bytes
253    pub async fn send_raw(&self, body: Bytes) -> Result<Bytes> {
254        // Make HTTP POST request
255        let url = format!("{}/rpc", self.gateway_url);
256        let http_response = self
257            .client
258            .post(&url)
259            .header("Content-Type", self.content_type.mime_type())
260            .timeout(self.timeout)
261            .body(body.to_vec())
262            .send()
263            .await
264            .context("HTTP request failed")?;
265
266        // Check status
267        if !http_response.status().is_success() {
268            let status = http_response.status();
269            let body = http_response.text().await.unwrap_or_default();
270            anyhow::bail!("HTTP error {}: {}", status, body);
271        }
272
273        // Read response body
274        let response_body = http_response
275            .bytes()
276            .await
277            .context("Failed to read response body")?;
278
279        Ok(Bytes::from(response_body.to_vec()))
280    }
281}