percas_client/
client.rs

1// Copyright 2025 ScopeDB <contact@scopedb.io>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::RwLock;
16use std::time::Duration;
17use std::time::Instant;
18
19use fastrace_reqwest::traceparent_headers;
20use reqwest::Body;
21use reqwest::StatusCode;
22use reqwest::Url;
23use reqwest::redirect::Policy;
24use serde::Deserialize;
25use uuid::Uuid;
26
27use crate::Error;
28use crate::protos::Version;
29use crate::route::RouteTable;
30
31const UPDATE_ROUTE_TABLE_INTERVAL: Duration = Duration::from_secs(10);
32
33fn make_opaque_error(msg: impl ToString) -> Error {
34    Error::Opaque(msg.to_string())
35}
36
37/// A builder for creating a `Client`.
38#[derive(Debug, Clone)]
39pub struct ClientBuilder {
40    data_url: String,
41    ctrl_url: String,
42    client: Option<reqwest::Client>,
43}
44
45impl ClientBuilder {
46    /// Create a new client builder with the given data server url and control server url.
47    ///
48    /// # Examples
49    ///
50    /// ```rust
51    /// use percas_client::ClientBuilder;
52    ///
53    /// let builder = ClientBuilder::new("http://percas-data:8080", "http://percas-ctrl:8081");
54    /// let client = builder.build().unwrap();
55    /// let _ = client; // use client
56    /// ```
57    pub fn new(data_url: impl Into<String>, ctrl_url: impl Into<String>) -> Self {
58        Self {
59            data_url: data_url.into(),
60            ctrl_url: ctrl_url.into(),
61            client: None,
62        }
63    }
64
65    /// Set a custom HTTP client. If not set, a default client will be used.
66    pub fn http_client(mut self, client: reqwest::Client) -> Self {
67        self.client = Some(client);
68        self
69    }
70
71    /// Build the client.
72    pub fn build(self) -> Result<Client, Error> {
73        let Self {
74            data_url,
75            ctrl_url,
76            client,
77        } = self;
78
79        let data_url = Url::parse(&data_url).map_err(make_opaque_error)?;
80        let ctrl_url = Url::parse(&ctrl_url).map_err(make_opaque_error)?;
81        let client = match client {
82            Some(client) => client,
83            None => reqwest::ClientBuilder::new()
84                .no_proxy()
85                .redirect(Policy::limited(2))
86                .build()
87                .map_err(make_opaque_error)?,
88        };
89
90        // force an initial route table update on first use
91        let last_updated = Instant::now() - UPDATE_ROUTE_TABLE_INTERVAL - Duration::from_secs(1);
92        Ok(Client {
93            client,
94            data_url,
95            ctrl_url,
96            last_updated: RwLock::new(last_updated),
97            route_table: RwLock::new(None),
98        })
99    }
100}
101
102/// A client for interacting with a Percas cluster.
103pub struct Client {
104    client: reqwest::Client,
105    data_url: Url,
106    ctrl_url: Url,
107    last_updated: RwLock<Instant>,
108    route_table: RwLock<Option<RouteTable>>,
109}
110
111impl Client {
112    /// Get the value associated with the given key.
113    pub async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
114        self.update_route_table_if_needed().await?;
115
116        let url = self.route(key).join(key).map_err(make_opaque_error)?;
117
118        let resp = self
119            .client
120            .get(url)
121            .headers(traceparent_headers())
122            .send()
123            .await
124            .map_err(make_opaque_error)?;
125
126        match resp.status() {
127            StatusCode::NOT_FOUND => Ok(None),
128            StatusCode::OK => {
129                let body = resp.bytes().await.map_err(make_opaque_error)?;
130                Ok(Some(body.to_vec()))
131            }
132            StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
133            _ => Err(make_opaque_error(resp.status())),
134        }
135    }
136
137    /// Set the value associated with the given key.
138    pub async fn put(&self, key: &str, value: &[u8]) -> Result<(), Error> {
139        self.update_route_table_if_needed().await?;
140
141        let url = self.route(key).join(key).map_err(make_opaque_error)?;
142
143        let resp = self
144            .client
145            .put(url)
146            .headers(traceparent_headers())
147            .body(value.to_vec())
148            .send()
149            .await
150            .map_err(make_opaque_error)?;
151
152        match resp.status() {
153            StatusCode::OK | StatusCode::CREATED => Ok(()),
154            StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
155            status => Err(make_opaque_error(status)),
156        }
157    }
158
159    /// Set the value associated with the given key.
160    ///
161    /// This method exists to avoid an extra copy when the caller has ownership of the data:
162    ///
163    /// * `&'static str`
164    /// * `Vec<u8>`
165    /// * `bytes::Bytes`
166    /// * `reqwest::Body`
167    pub async fn put_owned<T: Into<Body>>(&self, key: &str, value: T) -> Result<(), Error> {
168        self.update_route_table_if_needed().await?;
169
170        let url = self.route(key).join(key).map_err(make_opaque_error)?;
171
172        let resp = self
173            .client
174            .put(url)
175            .headers(traceparent_headers())
176            .body(value)
177            .send()
178            .await
179            .map_err(make_opaque_error)?;
180
181        match resp.status() {
182            StatusCode::OK | StatusCode::CREATED => Ok(()),
183            StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
184            status => Err(make_opaque_error(status)),
185        }
186    }
187
188    /// Delete the value associated with the given key.
189    pub async fn delete(&self, key: &str) -> Result<(), Error> {
190        self.update_route_table_if_needed().await?;
191
192        let url = self.route(key).join(key).map_err(make_opaque_error)?;
193
194        let resp = self
195            .client
196            .delete(url)
197            .headers(traceparent_headers())
198            .send()
199            .await
200            .map_err(make_opaque_error)?;
201
202        match resp.status() {
203            StatusCode::OK | StatusCode::NO_CONTENT => Ok(()),
204            StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
205            status => Err(make_opaque_error(status)),
206        }
207    }
208
209    /// Get the version of the Percas server.
210    pub async fn version(&self) -> Result<Version, Error> {
211        let url = self.ctrl_url.join("version").map_err(make_opaque_error)?;
212
213        let resp = self
214            .client
215            .get(url)
216            .headers(traceparent_headers())
217            .send()
218            .await
219            .map_err(make_opaque_error)?;
220
221        match resp.status() {
222            StatusCode::OK => resp.json::<Version>().await.map_err(make_opaque_error),
223            StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
224            status => Err(make_opaque_error(status)),
225        }
226    }
227}
228
229impl Client {
230    fn route(&self, key: &str) -> Url {
231        if let Some(route_table) = &*self.route_table.read().unwrap()
232            && let Some((_, url)) = route_table.lookup(key)
233        {
234            url.clone()
235        } else {
236            self.data_url.clone()
237        }
238    }
239
240    async fn update_route_table_if_needed(&self) -> Result<(), Error> {
241        let url = self.ctrl_url.join("members").map_err(make_opaque_error)?;
242
243        if self.last_updated.read().unwrap().elapsed() > UPDATE_ROUTE_TABLE_INTERVAL {
244            #[derive(Deserialize)]
245            #[expect(dead_code)] // some fields may be unused
246            struct Member {
247                node_id: Uuid,
248                advertise_data_url: Url,
249                advertise_ctrl_url: Url,
250                incarnation: u64,
251                vnodes: Vec<u32>,
252            }
253
254            #[derive(Deserialize)]
255            struct ListMembersResponse {
256                members: Vec<Member>,
257            }
258
259            let resp = self
260                .client
261                .get(url)
262                .headers(traceparent_headers())
263                .send()
264                .await
265                .map_err(make_opaque_error)?;
266
267            let members = match resp.status() {
268                StatusCode::OK => {
269                    resp.json::<ListMembersResponse>()
270                        .await
271                        .map_err(make_opaque_error)?
272                        .members
273                }
274                StatusCode::TOO_MANY_REQUESTS => return Err(Error::TooManyRequests),
275                status => return Err(make_opaque_error(status)),
276            };
277
278            let mut route_table = RouteTable::default();
279            for member in members {
280                for vnode in member.vnodes {
281                    route_table.insert(vnode, member.node_id, member.advertise_data_url.clone());
282                }
283            }
284            *self.route_table.write().unwrap() = Some(route_table);
285            *self.last_updated.write().unwrap() = Instant::now();
286        }
287        Ok(())
288    }
289}