percas_client/
client.rs

1// Copyright 2025 ScopeDB <contact@scopedb.io>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::RwLock;
16use std::time::Duration;
17use std::time::Instant;
18
19use fastrace_reqwest::traceparent_headers;
20use reqwest::StatusCode;
21use reqwest::Url;
22use reqwest::redirect::Policy;
23use serde::Deserialize;
24use uuid::Uuid;
25
26use crate::Error;
27use crate::protos::Version;
28use crate::route::RouteTable;
29
30const UPDATE_ROUTE_TABLE_INTERVAL: Duration = Duration::from_secs(10);
31
32fn make_opaque_error(msg: impl ToString) -> Error {
33    Error::Opaque(msg.to_string())
34}
35
36/// A builder for creating a `Client`.
37#[derive(Debug, Clone)]
38pub struct ClientBuilder {
39    data_url: String,
40    ctrl_url: String,
41    client: Option<reqwest::Client>,
42}
43
44impl ClientBuilder {
45    /// Create a new client builder with the given data server url and control server url.
46    ///
47    /// # Examples
48    ///
49    /// ```rust
50    /// use percas_client::ClientBuilder;
51    ///
52    /// let builder = ClientBuilder::new("http://percas-data:8080", "http://percas-ctrl:8081");
53    /// let client = builder.build().unwrap();
54    /// let _ = client; // use client
55    /// ```
56    pub fn new(data_url: impl Into<String>, ctrl_url: impl Into<String>) -> Self {
57        Self {
58            data_url: data_url.into(),
59            ctrl_url: ctrl_url.into(),
60            client: None,
61        }
62    }
63
64    /// Set a custom HTTP client. If not set, a default client will be used.
65    pub fn http_client(mut self, client: reqwest::Client) -> Self {
66        self.client = Some(client);
67        self
68    }
69
70    /// Build the client.
71    pub fn build(self) -> Result<Client, Error> {
72        let Self {
73            data_url,
74            ctrl_url,
75            client,
76        } = self;
77
78        let data_url = Url::parse(&data_url).map_err(make_opaque_error)?;
79        let ctrl_url = Url::parse(&ctrl_url).map_err(make_opaque_error)?;
80        let client = match client {
81            Some(client) => client,
82            None => reqwest::ClientBuilder::new()
83                .no_proxy()
84                .redirect(Policy::limited(2))
85                .build()
86                .map_err(make_opaque_error)?,
87        };
88
89        // force an initial route table update on first use
90        let last_updated = Instant::now() - UPDATE_ROUTE_TABLE_INTERVAL - Duration::from_secs(1);
91        Ok(Client {
92            client,
93            data_url,
94            ctrl_url,
95            last_updated: RwLock::new(last_updated),
96            route_table: RwLock::new(None),
97        })
98    }
99}
100
101/// A client for interacting with a Percas cluster.
102pub struct Client {
103    client: reqwest::Client,
104    data_url: Url,
105    ctrl_url: Url,
106    last_updated: RwLock<Instant>,
107    route_table: RwLock<Option<RouteTable>>,
108}
109
110impl Client {
111    /// Get the value associated with the given key.
112    pub async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
113        self.update_route_table_if_needed().await?;
114
115        let url = self.route(key).join(key).map_err(make_opaque_error)?;
116
117        let resp = self
118            .client
119            .get(url)
120            .headers(traceparent_headers())
121            .send()
122            .await
123            .map_err(make_opaque_error)?;
124
125        match resp.status() {
126            StatusCode::NOT_FOUND => Ok(None),
127            StatusCode::OK => {
128                let body = resp.bytes().await.map_err(make_opaque_error)?;
129                Ok(Some(body.to_vec()))
130            }
131            StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
132            _ => Err(make_opaque_error(resp.status())),
133        }
134    }
135
136    /// Set the value associated with the given key.
137    pub async fn put(&self, key: &str, value: &[u8]) -> Result<(), Error> {
138        self.update_route_table_if_needed().await?;
139
140        let url = self.route(key).join(key).map_err(make_opaque_error)?;
141
142        let resp = self
143            .client
144            .put(url)
145            .headers(traceparent_headers())
146            .body(value.to_vec())
147            .send()
148            .await
149            .map_err(make_opaque_error)?;
150
151        match resp.status() {
152            StatusCode::OK | StatusCode::CREATED => Ok(()),
153            StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
154            status => Err(make_opaque_error(status)),
155        }
156    }
157
158    /// Delete the value associated with the given key.
159    pub async fn delete(&self, key: &str) -> Result<(), Error> {
160        self.update_route_table_if_needed().await?;
161
162        let url = self.route(key).join(key).map_err(make_opaque_error)?;
163
164        let resp = self
165            .client
166            .delete(url)
167            .headers(traceparent_headers())
168            .send()
169            .await
170            .map_err(make_opaque_error)?;
171
172        match resp.status() {
173            StatusCode::OK | StatusCode::NO_CONTENT => Ok(()),
174            StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
175            status => Err(make_opaque_error(status)),
176        }
177    }
178
179    /// Get the version of the Percas server.
180    pub async fn version(&self) -> Result<Version, Error> {
181        let url = self.ctrl_url.join("version").map_err(make_opaque_error)?;
182
183        let resp = self
184            .client
185            .get(url)
186            .headers(traceparent_headers())
187            .send()
188            .await
189            .map_err(make_opaque_error)?;
190
191        match resp.status() {
192            StatusCode::OK => resp.json::<Version>().await.map_err(make_opaque_error),
193            StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
194            status => Err(make_opaque_error(status)),
195        }
196    }
197}
198
199impl Client {
200    fn route(&self, key: &str) -> Url {
201        if let Some(route_table) = &*self.route_table.read().unwrap()
202            && let Some((_, url)) = route_table.lookup(key)
203        {
204            url.clone()
205        } else {
206            self.data_url.clone()
207        }
208    }
209
210    async fn update_route_table_if_needed(&self) -> Result<(), Error> {
211        let url = self.ctrl_url.join("members").map_err(make_opaque_error)?;
212
213        if self.last_updated.read().unwrap().elapsed() > UPDATE_ROUTE_TABLE_INTERVAL {
214            #[derive(Deserialize)]
215            #[expect(dead_code)] // some fields may be unused
216            struct Member {
217                node_id: Uuid,
218                advertise_data_url: Url,
219                advertise_ctrl_url: Url,
220                incarnation: u64,
221                vnodes: Vec<u32>,
222            }
223
224            #[derive(Deserialize)]
225            struct ListMembersResponse {
226                members: Vec<Member>,
227            }
228
229            let resp = self
230                .client
231                .get(url)
232                .headers(traceparent_headers())
233                .send()
234                .await
235                .map_err(make_opaque_error)?;
236
237            let members = match resp.status() {
238                StatusCode::OK => {
239                    resp.json::<ListMembersResponse>()
240                        .await
241                        .map_err(make_opaque_error)?
242                        .members
243                }
244                StatusCode::TOO_MANY_REQUESTS => return Err(Error::TooManyRequests),
245                status => return Err(make_opaque_error(status)),
246            };
247
248            let mut route_table = RouteTable::default();
249            for member in members {
250                for vnode in member.vnodes {
251                    route_table.insert(vnode, member.node_id, member.advertise_data_url.clone());
252                }
253            }
254            *self.route_table.write().unwrap() = Some(route_table);
255            *self.last_updated.write().unwrap() = Instant::now();
256        }
257        Ok(())
258    }
259}