1use std::sync::RwLock;
16use std::time::Duration;
17use std::time::Instant;
18
19use fastrace_reqwest::traceparent_headers;
20use reqwest::Body;
21use reqwest::StatusCode;
22use reqwest::Url;
23use reqwest::redirect::Policy;
24use serde::Deserialize;
25use uuid::Uuid;
26
27use crate::Error;
28use crate::protos::Version;
29use crate::route::RouteTable;
30
31const UPDATE_ROUTE_TABLE_INTERVAL: Duration = Duration::from_secs(10);
32
33fn make_opaque_error(msg: impl ToString) -> Error {
34 Error::Opaque(msg.to_string())
35}
36
37#[derive(Debug, Clone)]
39pub struct ClientBuilder {
40 data_url: String,
41 ctrl_url: String,
42 client: Option<reqwest::Client>,
43}
44
45impl ClientBuilder {
46 pub fn new(data_url: impl Into<String>, ctrl_url: impl Into<String>) -> Self {
58 Self {
59 data_url: data_url.into(),
60 ctrl_url: ctrl_url.into(),
61 client: None,
62 }
63 }
64
65 pub fn http_client(mut self, client: reqwest::Client) -> Self {
67 self.client = Some(client);
68 self
69 }
70
71 pub fn build(self) -> Result<Client, Error> {
73 let Self {
74 data_url,
75 ctrl_url,
76 client,
77 } = self;
78
79 let data_url = Url::parse(&data_url).map_err(make_opaque_error)?;
80 let ctrl_url = Url::parse(&ctrl_url).map_err(make_opaque_error)?;
81 let client = match client {
82 Some(client) => client,
83 None => reqwest::ClientBuilder::new()
84 .no_proxy()
85 .redirect(Policy::limited(2))
86 .build()
87 .map_err(make_opaque_error)?,
88 };
89
90 let last_updated = Instant::now() - UPDATE_ROUTE_TABLE_INTERVAL - Duration::from_secs(1);
92 Ok(Client {
93 client,
94 data_url,
95 ctrl_url,
96 last_updated: RwLock::new(last_updated),
97 route_table: RwLock::new(None),
98 })
99 }
100}
101
102pub struct Client {
104 client: reqwest::Client,
105 data_url: Url,
106 ctrl_url: Url,
107 last_updated: RwLock<Instant>,
108 route_table: RwLock<Option<RouteTable>>,
109}
110
111impl Client {
112 pub async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
114 self.update_route_table_if_needed().await?;
115
116 let url = self.route(key).join(key).map_err(make_opaque_error)?;
117
118 let resp = self
119 .client
120 .get(url)
121 .headers(traceparent_headers())
122 .send()
123 .await
124 .map_err(make_opaque_error)?;
125
126 match resp.status() {
127 StatusCode::NOT_FOUND => Ok(None),
128 StatusCode::OK => {
129 let body = resp.bytes().await.map_err(make_opaque_error)?;
130 Ok(Some(body.to_vec()))
131 }
132 StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
133 _ => Err(make_opaque_error(resp.status())),
134 }
135 }
136
137 pub async fn put(&self, key: &str, value: &[u8]) -> Result<(), Error> {
139 self.update_route_table_if_needed().await?;
140
141 let url = self.route(key).join(key).map_err(make_opaque_error)?;
142
143 let resp = self
144 .client
145 .put(url)
146 .headers(traceparent_headers())
147 .body(value.to_vec())
148 .send()
149 .await
150 .map_err(make_opaque_error)?;
151
152 match resp.status() {
153 StatusCode::OK | StatusCode::CREATED => Ok(()),
154 StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
155 status => Err(make_opaque_error(status)),
156 }
157 }
158
159 pub async fn put_owned<T: Into<Body>>(&self, key: &str, value: T) -> Result<(), Error> {
168 self.update_route_table_if_needed().await?;
169
170 let url = self.route(key).join(key).map_err(make_opaque_error)?;
171
172 let resp = self
173 .client
174 .put(url)
175 .headers(traceparent_headers())
176 .body(value)
177 .send()
178 .await
179 .map_err(make_opaque_error)?;
180
181 match resp.status() {
182 StatusCode::OK | StatusCode::CREATED => Ok(()),
183 StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
184 status => Err(make_opaque_error(status)),
185 }
186 }
187
188 pub async fn delete(&self, key: &str) -> Result<(), Error> {
190 self.update_route_table_if_needed().await?;
191
192 let url = self.route(key).join(key).map_err(make_opaque_error)?;
193
194 let resp = self
195 .client
196 .delete(url)
197 .headers(traceparent_headers())
198 .send()
199 .await
200 .map_err(make_opaque_error)?;
201
202 match resp.status() {
203 StatusCode::OK | StatusCode::NO_CONTENT => Ok(()),
204 StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
205 status => Err(make_opaque_error(status)),
206 }
207 }
208
209 pub async fn version(&self) -> Result<Version, Error> {
211 let url = self.ctrl_url.join("version").map_err(make_opaque_error)?;
212
213 let resp = self
214 .client
215 .get(url)
216 .headers(traceparent_headers())
217 .send()
218 .await
219 .map_err(make_opaque_error)?;
220
221 match resp.status() {
222 StatusCode::OK => resp.json::<Version>().await.map_err(make_opaque_error),
223 StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
224 status => Err(make_opaque_error(status)),
225 }
226 }
227}
228
229impl Client {
230 fn route(&self, key: &str) -> Url {
231 if let Some(route_table) = &*self.route_table.read().unwrap()
232 && let Some((_, url)) = route_table.lookup(key)
233 {
234 url.clone()
235 } else {
236 self.data_url.clone()
237 }
238 }
239
240 async fn update_route_table_if_needed(&self) -> Result<(), Error> {
241 let url = self.ctrl_url.join("members").map_err(make_opaque_error)?;
242
243 if self.last_updated.read().unwrap().elapsed() > UPDATE_ROUTE_TABLE_INTERVAL {
244 #[derive(Deserialize)]
245 #[expect(dead_code)] struct Member {
247 node_id: Uuid,
248 advertise_data_url: Url,
249 advertise_ctrl_url: Url,
250 incarnation: u64,
251 vnodes: Vec<u32>,
252 }
253
254 #[derive(Deserialize)]
255 struct ListMembersResponse {
256 members: Vec<Member>,
257 }
258
259 let resp = self
260 .client
261 .get(url)
262 .headers(traceparent_headers())
263 .send()
264 .await
265 .map_err(make_opaque_error)?;
266
267 let members = match resp.status() {
268 StatusCode::OK => {
269 resp.json::<ListMembersResponse>()
270 .await
271 .map_err(make_opaque_error)?
272 .members
273 }
274 StatusCode::TOO_MANY_REQUESTS => return Err(Error::TooManyRequests),
275 status => return Err(make_opaque_error(status)),
276 };
277
278 let mut route_table = RouteTable::default();
279 for member in members {
280 for vnode in member.vnodes {
281 route_table.insert(vnode, member.node_id, member.advertise_data_url.clone());
282 }
283 }
284 *self.route_table.write().unwrap() = Some(route_table);
285 *self.last_updated.write().unwrap() = Instant::now();
286 }
287 Ok(())
288 }
289}