Skip to main content

orpc_client/
rpc_link.rs

1use crate::envelope::RpcEnvelope;
2use crate::error::ClientError;
3use crate::link::{Link, ValueStream};
4use crate::sse;
5
6/// HTTP transport for the oRPC RPC protocol.
7///
8/// Converts dot-separated procedure paths to URL paths and sends
9/// HTTP POST requests matching the `@orpc/client` wire format.
10///
11/// # Example
12/// ```ignore
13/// let link = RpcLink::new("http://localhost:3000/rpc");
14/// let client = Client::with_link(link);
15/// ```
16pub struct RpcLink {
17    http: reqwest::Client,
18    base_url: String,
19}
20
21impl RpcLink {
22    /// Create a new RPC link with the given base URL.
23    ///
24    /// Uses a default `reqwest::Client`.
25    pub fn new(base_url: impl Into<String>) -> Self {
26        RpcLink {
27            http: reqwest::Client::new(),
28            base_url: base_url.into(),
29        }
30    }
31
32    /// Use a custom `reqwest::Client` (for proxy, TLS, auth, etc.).
33    pub fn with_client(mut self, client: reqwest::Client) -> Self {
34        self.http = client;
35        self
36    }
37
38    fn build_url(&self, path: &str) -> String {
39        let path_url = path.replace('.', "/");
40        let base = self.base_url.trim_end_matches('/');
41        format!("{base}/{path_url}")
42    }
43
44    fn build_body(input: serde_json::Value) -> serde_json::Value {
45        if input.is_null() {
46            // No input: send {} (matches @orpc/client behavior)
47            serde_json::json!({})
48        } else {
49            serde_json::json!({ "json": input })
50        }
51    }
52}
53
54impl Link for RpcLink {
55    async fn call(
56        &self,
57        path: &str,
58        input: serde_json::Value,
59    ) -> Result<serde_json::Value, ClientError> {
60        let url = self.build_url(path);
61        let body = Self::build_body(input);
62
63        let response = self
64            .http
65            .post(&url)
66            .header("content-type", "application/json")
67            .json(&body)
68            .send()
69            .await?;
70
71        let status = response.status();
72        let bytes = response.bytes().await?;
73
74        if status.is_success() {
75            let envelope: RpcEnvelope<serde_json::Value> =
76                serde_json::from_slice(&bytes).map_err(ClientError::Deserialize)?;
77            Ok(envelope.json)
78        } else {
79            let envelope: RpcEnvelope<orpc::ORPCError> =
80                serde_json::from_slice(&bytes).map_err(ClientError::Deserialize)?;
81            Err(ClientError::Rpc(envelope.json))
82        }
83    }
84
85    async fn subscribe(
86        &self,
87        path: &str,
88        input: serde_json::Value,
89        last_event_id: Option<u64>,
90    ) -> Result<ValueStream, ClientError> {
91        let url = self.build_url(path);
92        let body = Self::build_body(input);
93
94        let mut request = self
95            .http
96            .post(&url)
97            .header("content-type", "application/json")
98            .json(&body);
99
100        if let Some(id) = last_event_id {
101            request = request.header("last-event-id", id.to_string());
102        }
103
104        let response = request.send().await?;
105        let status = response.status();
106
107        if !status.is_success() {
108            let bytes = response.bytes().await?;
109            let envelope: RpcEnvelope<orpc::ORPCError> =
110                serde_json::from_slice(&bytes).map_err(ClientError::Deserialize)?;
111            return Err(ClientError::Rpc(envelope.json));
112        }
113
114        let byte_stream = response.bytes_stream();
115        Ok(sse::sse_to_values(byte_stream))
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn build_url_simple() {
125        let link = RpcLink::new("http://localhost:3000/rpc");
126        assert_eq!(link.build_url("ping"), "http://localhost:3000/rpc/ping");
127    }
128
129    #[test]
130    fn build_url_nested() {
131        let link = RpcLink::new("http://localhost:3000/rpc");
132        assert_eq!(
133            link.build_url("planet.find"),
134            "http://localhost:3000/rpc/planet/find"
135        );
136    }
137
138    #[test]
139    fn build_url_trailing_slash() {
140        let link = RpcLink::new("http://localhost:3000/rpc/");
141        assert_eq!(link.build_url("ping"), "http://localhost:3000/rpc/ping");
142    }
143
144    #[test]
145    fn build_body_null_input() {
146        let body = RpcLink::build_body(serde_json::Value::Null);
147        assert_eq!(body, serde_json::json!({}));
148    }
149
150    #[test]
151    fn build_body_with_input() {
152        let body = RpcLink::build_body(serde_json::json!("Earth"));
153        assert_eq!(body, serde_json::json!({"json": "Earth"}));
154    }
155
156    #[test]
157    fn build_body_with_object() {
158        let body = RpcLink::build_body(serde_json::json!({"name": "Earth"}));
159        assert_eq!(body, serde_json::json!({"json": {"name": "Earth"}}));
160    }
161}