1use std::sync::RwLock;
16use std::time::Duration;
17use std::time::Instant;
18
19use fastrace_reqwest::traceparent_headers;
20use reqwest::StatusCode;
21use reqwest::Url;
22use reqwest::redirect::Policy;
23use serde::Deserialize;
24use uuid::Uuid;
25
26use crate::Error;
27use crate::protos::Version;
28use crate::route::RouteTable;
29
30const UPDATE_ROUTE_TABLE_INTERVAL: Duration = Duration::from_secs(10);
31
32fn make_opaque_error(msg: impl ToString) -> Error {
33 Error::Opaque(msg.to_string())
34}
35
36#[derive(Debug, Clone)]
38pub struct ClientBuilder {
39 data_url: String,
40 ctrl_url: String,
41 client: Option<reqwest::Client>,
42}
43
44impl ClientBuilder {
45 pub fn new(data_url: impl Into<String>, ctrl_url: impl Into<String>) -> Self {
57 Self {
58 data_url: data_url.into(),
59 ctrl_url: ctrl_url.into(),
60 client: None,
61 }
62 }
63
64 pub fn http_client(mut self, client: reqwest::Client) -> Self {
66 self.client = Some(client);
67 self
68 }
69
70 pub fn build(self) -> Result<Client, Error> {
72 let Self {
73 data_url,
74 ctrl_url,
75 client,
76 } = self;
77
78 let data_url = Url::parse(&data_url).map_err(make_opaque_error)?;
79 let ctrl_url = Url::parse(&ctrl_url).map_err(make_opaque_error)?;
80 let client = match client {
81 Some(client) => client,
82 None => reqwest::ClientBuilder::new()
83 .no_proxy()
84 .redirect(Policy::limited(2))
85 .build()
86 .map_err(make_opaque_error)?,
87 };
88
89 let last_updated = Instant::now() - UPDATE_ROUTE_TABLE_INTERVAL - Duration::from_secs(1);
91 Ok(Client {
92 client,
93 data_url,
94 ctrl_url,
95 last_updated: RwLock::new(last_updated),
96 route_table: RwLock::new(None),
97 })
98 }
99}
100
101pub struct Client {
103 client: reqwest::Client,
104 data_url: Url,
105 ctrl_url: Url,
106 last_updated: RwLock<Instant>,
107 route_table: RwLock<Option<RouteTable>>,
108}
109
110impl Client {
111 pub async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
113 self.update_route_table_if_needed().await?;
114
115 let url = self.route(key).join(key).map_err(make_opaque_error)?;
116
117 let resp = self
118 .client
119 .get(url)
120 .headers(traceparent_headers())
121 .send()
122 .await
123 .map_err(make_opaque_error)?;
124
125 match resp.status() {
126 StatusCode::NOT_FOUND => Ok(None),
127 StatusCode::OK => {
128 let body = resp.bytes().await.map_err(make_opaque_error)?;
129 Ok(Some(body.to_vec()))
130 }
131 StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
132 _ => Err(make_opaque_error(resp.status())),
133 }
134 }
135
136 pub async fn put(&self, key: &str, value: &[u8]) -> Result<(), Error> {
138 self.update_route_table_if_needed().await?;
139
140 let url = self.route(key).join(key).map_err(make_opaque_error)?;
141
142 let resp = self
143 .client
144 .put(url)
145 .headers(traceparent_headers())
146 .body(value.to_vec())
147 .send()
148 .await
149 .map_err(make_opaque_error)?;
150
151 match resp.status() {
152 StatusCode::OK | StatusCode::CREATED => Ok(()),
153 StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
154 status => Err(make_opaque_error(status)),
155 }
156 }
157
158 pub async fn delete(&self, key: &str) -> Result<(), Error> {
160 self.update_route_table_if_needed().await?;
161
162 let url = self.route(key).join(key).map_err(make_opaque_error)?;
163
164 let resp = self
165 .client
166 .delete(url)
167 .headers(traceparent_headers())
168 .send()
169 .await
170 .map_err(make_opaque_error)?;
171
172 match resp.status() {
173 StatusCode::OK | StatusCode::NO_CONTENT => Ok(()),
174 StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
175 status => Err(make_opaque_error(status)),
176 }
177 }
178
179 pub async fn version(&self) -> Result<Version, Error> {
181 let url = self.ctrl_url.join("version").map_err(make_opaque_error)?;
182
183 let resp = self
184 .client
185 .get(url)
186 .headers(traceparent_headers())
187 .send()
188 .await
189 .map_err(make_opaque_error)?;
190
191 match resp.status() {
192 StatusCode::OK => resp.json::<Version>().await.map_err(make_opaque_error),
193 StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
194 status => Err(make_opaque_error(status)),
195 }
196 }
197}
198
199impl Client {
200 fn route(&self, key: &str) -> Url {
201 if let Some(route_table) = &*self.route_table.read().unwrap()
202 && let Some((_, url)) = route_table.lookup(key)
203 {
204 url.clone()
205 } else {
206 self.data_url.clone()
207 }
208 }
209
210 async fn update_route_table_if_needed(&self) -> Result<(), Error> {
211 let url = self.ctrl_url.join("members").map_err(make_opaque_error)?;
212
213 if self.last_updated.read().unwrap().elapsed() > UPDATE_ROUTE_TABLE_INTERVAL {
214 #[derive(Deserialize)]
215 #[expect(dead_code)] struct Member {
217 node_id: Uuid,
218 advertise_data_url: Url,
219 advertise_ctrl_url: Url,
220 incarnation: u64,
221 vnodes: Vec<u32>,
222 }
223
224 #[derive(Deserialize)]
225 struct ListMembersResponse {
226 members: Vec<Member>,
227 }
228
229 let resp = self
230 .client
231 .get(url)
232 .headers(traceparent_headers())
233 .send()
234 .await
235 .map_err(make_opaque_error)?;
236
237 let members = match resp.status() {
238 StatusCode::OK => {
239 resp.json::<ListMembersResponse>()
240 .await
241 .map_err(make_opaque_error)?
242 .members
243 }
244 StatusCode::TOO_MANY_REQUESTS => return Err(Error::TooManyRequests),
245 status => return Err(make_opaque_error(status)),
246 };
247
248 let mut route_table = RouteTable::default();
249 for member in members {
250 for vnode in member.vnodes {
251 route_table.insert(vnode, member.node_id, member.advertise_data_url.clone());
252 }
253 }
254 *self.route_table.write().unwrap() = Some(route_table);
255 *self.last_updated.write().unwrap() = Instant::now();
256 }
257 Ok(())
258 }
259}