1use crate::envelope::RpcEnvelope;
2use crate::error::ClientError;
3use crate::link::{Link, ValueStream};
4use crate::sse;
5
6pub struct RpcLink {
17 http: reqwest::Client,
18 base_url: String,
19}
20
21impl RpcLink {
22 pub fn new(base_url: impl Into<String>) -> Self {
26 RpcLink {
27 http: reqwest::Client::new(),
28 base_url: base_url.into(),
29 }
30 }
31
32 pub fn with_client(mut self, client: reqwest::Client) -> Self {
34 self.http = client;
35 self
36 }
37
38 fn build_url(&self, path: &str) -> String {
39 let path_url = path.replace('.', "/");
40 let base = self.base_url.trim_end_matches('/');
41 format!("{base}/{path_url}")
42 }
43
44 fn build_body(input: serde_json::Value) -> serde_json::Value {
45 if input.is_null() {
46 serde_json::json!({})
48 } else {
49 serde_json::json!({ "json": input })
50 }
51 }
52}
53
54impl Link for RpcLink {
55 async fn call(
56 &self,
57 path: &str,
58 input: serde_json::Value,
59 ) -> Result<serde_json::Value, ClientError> {
60 let url = self.build_url(path);
61 let body = Self::build_body(input);
62
63 let response = self
64 .http
65 .post(&url)
66 .header("content-type", "application/json")
67 .json(&body)
68 .send()
69 .await?;
70
71 let status = response.status();
72 let bytes = response.bytes().await?;
73
74 if status.is_success() {
75 let envelope: RpcEnvelope<serde_json::Value> =
76 serde_json::from_slice(&bytes).map_err(ClientError::Deserialize)?;
77 Ok(envelope.json)
78 } else {
79 let envelope: RpcEnvelope<orpc::ORPCError> =
80 serde_json::from_slice(&bytes).map_err(ClientError::Deserialize)?;
81 Err(ClientError::Rpc(envelope.json))
82 }
83 }
84
85 async fn subscribe(
86 &self,
87 path: &str,
88 input: serde_json::Value,
89 last_event_id: Option<u64>,
90 ) -> Result<ValueStream, ClientError> {
91 let url = self.build_url(path);
92 let body = Self::build_body(input);
93
94 let mut request = self
95 .http
96 .post(&url)
97 .header("content-type", "application/json")
98 .json(&body);
99
100 if let Some(id) = last_event_id {
101 request = request.header("last-event-id", id.to_string());
102 }
103
104 let response = request.send().await?;
105 let status = response.status();
106
107 if !status.is_success() {
108 let bytes = response.bytes().await?;
109 let envelope: RpcEnvelope<orpc::ORPCError> =
110 serde_json::from_slice(&bytes).map_err(ClientError::Deserialize)?;
111 return Err(ClientError::Rpc(envelope.json));
112 }
113
114 let byte_stream = response.bytes_stream();
115 Ok(sse::sse_to_values(byte_stream))
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 #[test]
124 fn build_url_simple() {
125 let link = RpcLink::new("http://localhost:3000/rpc");
126 assert_eq!(link.build_url("ping"), "http://localhost:3000/rpc/ping");
127 }
128
129 #[test]
130 fn build_url_nested() {
131 let link = RpcLink::new("http://localhost:3000/rpc");
132 assert_eq!(
133 link.build_url("planet.find"),
134 "http://localhost:3000/rpc/planet/find"
135 );
136 }
137
138 #[test]
139 fn build_url_trailing_slash() {
140 let link = RpcLink::new("http://localhost:3000/rpc/");
141 assert_eq!(link.build_url("ping"), "http://localhost:3000/rpc/ping");
142 }
143
144 #[test]
145 fn build_body_null_input() {
146 let body = RpcLink::build_body(serde_json::Value::Null);
147 assert_eq!(body, serde_json::json!({}));
148 }
149
150 #[test]
151 fn build_body_with_input() {
152 let body = RpcLink::build_body(serde_json::json!("Earth"));
153 assert_eq!(body, serde_json::json!({"json": "Earth"}));
154 }
155
156 #[test]
157 fn build_body_with_object() {
158 let body = RpcLink::build_body(serde_json::json!({"name": "Earth"}));
159 assert_eq!(body, serde_json::json!({"json": {"name": "Earth"}}));
160 }
161}