skiff_rs/client.rs
1use std::net::{Ipv4Addr, SocketAddrV4};
2
3use serde::{de::DeserializeOwned, Serialize};
4use tonic::{transport::Channel, Request};
5use uuid::Uuid;
6
7use crate::{
8 error::Error,
9 skiff::skiff_proto::{
10 skiff_client::SkiffClient, DeleteRequest, Empty, GetRequest, InsertRequest,
11 ListKeysRequest, ServerRequest, SubscribeRequest,
12 },
13 Subscriber,
14};
15
16/// A client for interacting with a skiff cluster.
17///
18/// `Client` maintains a single gRPC connection to one cluster node. All
19/// write requests are automatically forwarded to the current leader by the
20/// node, so the client does not need to track leadership itself.
21///
22/// # Example
23///
24/// ```no_run
25/// use skiff_rs::Client;
26///
27/// # #[tokio::main]
28/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
29/// let mut client = Client::new(vec!["127.0.0.1".parse()?]);
30/// client.connect().await?;
31///
32/// client.insert("key", "value").await?;
33/// let val: Option<String> = client.get("key").await?;
34/// # Ok(())
35/// # }
36/// ```
37#[derive(Debug)]
38pub struct Client {
39 conn: Option<SkiffClient<Channel>>,
40 cluster: Vec<Ipv4Addr>,
41 port: u16,
42}
43impl Client {
44 /// Create a new client that will try each address in `cluster` in order
45 /// when connecting.
46 pub fn new(cluster: Vec<Ipv4Addr>) -> Self {
47 Self {
48 conn: None,
49 cluster,
50 port: 9400,
51 }
52 }
53
54 /// Override the port used to connect to cluster nodes. Defaults to `9400`.
55 pub fn with_port(mut self, port: u16) -> Self {
56 self.port = port;
57 self
58 }
59
60 /// Establish a gRPC connection to the first reachable node in the cluster.
61 ///
62 /// This is called automatically by the other methods, but can be called
63 /// explicitly to verify connectivity before performing operations.
64 ///
65 /// # Errors
66 ///
67 /// Returns [`Error::ClientConnectFailed`] if no node in the cluster list
68 /// can be reached.
69 pub async fn connect(&mut self) -> Result<(), Error> {
70 if self.conn.is_some() {
71 return Ok(());
72 }
73
74 for peer in self.cluster.iter() {
75 match SkiffClient::connect(format!("http://{}", SocketAddrV4::new(*peer, self.port)))
76 .await
77 {
78 Ok(conn) => {
79 self.conn = Some(conn);
80 return Ok(());
81 }
82 Err(_) => continue,
83 }
84 }
85
86 Err(Error::ClientConnectFailed)
87 }
88
89 /// Retrieve the value stored at `key`, deserializing it as `T`.
90 ///
91 /// Returns `Ok(None)` if the key does not exist.
92 ///
93 /// # Errors
94 ///
95 /// Returns an error if the RPC fails or the stored bytes cannot be
96 /// deserialized into `T`.
97 pub async fn get<T: DeserializeOwned>(&mut self, key: &str) -> Result<Option<T>, Error> {
98 self.connect().await?;
99 let response = self
100 .conn
101 .as_mut()
102 .unwrap()
103 .get(Request::new(GetRequest {
104 key: key.to_string(),
105 }))
106 .await;
107
108 match response {
109 Ok(resp) => match resp.into_inner().value {
110 Some(value) => match bincode::deserialize::<T>(value.as_slice()) {
111 Ok(value2) => Ok(Some(value2)),
112 Err(_) => Err(Error::DeserializeFailed),
113 },
114 None => Ok(None),
115 },
116 Err(_) => Err(Error::RPCCallFailed),
117 }
118 }
119
120 /// Insert or overwrite `key` with `value`.
121 ///
122 /// The call blocks until the entry has been committed to a majority of
123 /// cluster nodes.
124 ///
125 /// # Errors
126 ///
127 /// Returns an error if serialization fails, the RPC fails, or the cluster
128 /// does not commit the entry within the timeout.
129 pub async fn insert<T: Serialize>(&mut self, key: &str, value: T) -> Result<(), Error> {
130 self.connect().await?;
131 let response = self
132 .conn
133 .as_mut()
134 .unwrap()
135 .insert(Request::new(InsertRequest {
136 key: key.to_string(),
137 value: bincode::serialize(&value)?.to_vec(),
138 }))
139 .await;
140
141 match response {
142 Ok(resp) => match resp.into_inner().success {
143 true => Ok(()),
144 false => Err(Error::RPCCallFailed),
145 },
146 Err(_) => Err(Error::RPCCallFailed),
147 }
148 }
149
150 /// Remove `key` from the store.
151 ///
152 /// The call blocks until the deletion has been committed to a majority of
153 /// cluster nodes. Returns `Ok(())` even if the key did not exist.
154 ///
155 /// # Errors
156 ///
157 /// Returns an error if the RPC fails or the cluster does not commit the
158 /// deletion within the timeout.
159 pub async fn remove(&mut self, key: &str) -> Result<(), Error> {
160 self.connect().await?;
161 let response = self
162 .conn
163 .as_mut()
164 .unwrap()
165 .delete(Request::new(DeleteRequest {
166 key: key.to_string(),
167 }))
168 .await;
169
170 match response {
171 Ok(resp) => match resp.into_inner().success {
172 true => Ok(()),
173 false => Err(Error::RPCCallFailed),
174 },
175 Err(_) => Err(Error::RPCCallFailed),
176 }
177 }
178
179 /// Return all key prefixes (namespace segments) currently in the store.
180 ///
181 /// Keys are organised hierarchically using `/` as a separator. This
182 /// method returns the distinct prefix segments, e.g. `["users", "posts"]`
183 /// for a store containing `"users/alice"` and `"posts/hello"`.
184 ///
185 /// # Errors
186 ///
187 /// Returns an error if the RPC fails.
188 pub async fn get_prefixes(&mut self) -> Result<Vec<String>, Error> {
189 self.connect().await?;
190 let response = self
191 .conn
192 .as_mut()
193 .unwrap()
194 .get_prefixes(Request::new(Empty {}))
195 .await;
196
197 match response {
198 Ok(resp) => Ok(resp.into_inner().prefixes),
199 Err(_) => Err(Error::RPCCallFailed),
200 }
201 }
202
203 /// Return all keys whose path starts with `prefix`.
204 ///
205 /// Pass an empty string or `"/"` to list all keys at the root level.
206 ///
207 /// # Errors
208 ///
209 /// Returns an error if the RPC fails.
210 pub async fn list_keys(&mut self, prefix: &str) -> Result<Vec<String>, Error> {
211 self.connect().await?;
212 let response = self
213 .conn
214 .as_mut()
215 .unwrap()
216 .list_keys(Request::new(ListKeysRequest {
217 prefix: String::from(prefix),
218 }))
219 .await;
220
221 match response {
222 Ok(resp) => Ok(resp.into_inner().keys),
223 Err(_) => Err(Error::RPCCallFailed),
224 }
225 }
226
227 /// Remove a node from the cluster configuration.
228 ///
229 /// `id` and `address` must match an existing member. The removal is
230 /// propagated through the Raft log so all surviving nodes eventually drop
231 /// the node from their cluster view.
232 ///
233 /// # Errors
234 ///
235 /// Returns an error if the RPC fails or the node is not found.
236 pub async fn remove_node(&mut self, id: Uuid, address: Ipv4Addr) -> Result<(), Error> {
237 self.connect().await?;
238 let response = self
239 .conn
240 .as_mut()
241 .unwrap()
242 .remove_server(Request::new(ServerRequest {
243 id: id.to_string(),
244 address: address.to_string(),
245 }))
246 .await;
247
248 match response {
249 Ok(resp) => match resp.into_inner().success {
250 true => Ok(()),
251 false => Err(Error::RPCCallFailed),
252 },
253 Err(_) => Err(Error::RPCCallFailed),
254 }
255 }
256
257 /// Subscribe to changes under `prefix`.
258 ///
259 /// Returns a [`Subscriber`] that yields a `(key, value)` pair each time
260 /// an entry whose path starts with `prefix` is inserted. The stream
261 /// remains open until the connection is dropped or the server closes it.
262 ///
263 /// # Errors
264 ///
265 /// Returns an error if the RPC fails to establish the stream.
266 pub async fn watch(&mut self, prefix: &str) -> Result<Subscriber, Error> {
267 self.connect().await?;
268 let response = self
269 .conn
270 .as_mut()
271 .unwrap()
272 .subscribe(Request::new(SubscribeRequest {
273 prefix: String::from(prefix),
274 }))
275 .await;
276
277 match response {
278 Ok(resp) => Ok(Subscriber::new(resp.into_inner())),
279 Err(_) => Err(Error::RPCCallFailed),
280 }
281 }
282}