Skip to main content

skiff_rs/
client.rs

1use std::net::{Ipv4Addr, SocketAddrV4};
2
3use serde::{de::DeserializeOwned, Serialize};
4use tonic::{transport::Channel, Request};
5use uuid::Uuid;
6
7use crate::{
8    error::Error,
9    skiff::skiff_proto::{
10        skiff_client::SkiffClient, DeleteRequest, Empty, GetRequest, InsertRequest,
11        ListKeysRequest, ServerRequest, SubscribeRequest,
12    },
13    Subscriber,
14};
15
16/// A client for interacting with a skiff cluster.
17///
18/// `Client` maintains a single gRPC connection to one cluster node.  All
19/// write requests are automatically forwarded to the current leader by the
20/// node, so the client does not need to track leadership itself.
21///
22/// # Example
23///
24/// ```no_run
25/// use skiff_rs::Client;
26///
27/// # #[tokio::main]
28/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
29/// let mut client = Client::new(vec!["127.0.0.1".parse()?]);
30/// client.connect().await?;
31///
32/// client.insert("key", "value").await?;
33/// let val: Option<String> = client.get("key").await?;
34/// # Ok(())
35/// # }
36/// ```
37#[derive(Debug)]
38pub struct Client {
39    conn: Option<SkiffClient<Channel>>,
40    cluster: Vec<Ipv4Addr>,
41    port: u16,
42}
43impl Client {
44    /// Create a new client that will try each address in `cluster` in order
45    /// when connecting.
46    pub fn new(cluster: Vec<Ipv4Addr>) -> Self {
47        Self {
48            conn: None,
49            cluster,
50            port: 9400,
51        }
52    }
53
54    /// Override the port used to connect to cluster nodes. Defaults to `9400`.
55    pub fn with_port(mut self, port: u16) -> Self {
56        self.port = port;
57        self
58    }
59
60    /// Establish a gRPC connection to the first reachable node in the cluster.
61    ///
62    /// This is called automatically by the other methods, but can be called
63    /// explicitly to verify connectivity before performing operations.
64    ///
65    /// # Errors
66    ///
67    /// Returns [`Error::ClientConnectFailed`] if no node in the cluster list
68    /// can be reached.
69    pub async fn connect(&mut self) -> Result<(), Error> {
70        if self.conn.is_some() {
71            return Ok(());
72        }
73
74        for peer in self.cluster.iter() {
75            match SkiffClient::connect(format!("http://{}", SocketAddrV4::new(*peer, self.port)))
76                .await
77            {
78                Ok(conn) => {
79                    self.conn = Some(conn);
80                    return Ok(());
81                }
82                Err(_) => continue,
83            }
84        }
85
86        Err(Error::ClientConnectFailed)
87    }
88
89    /// Retrieve the value stored at `key`, deserializing it as `T`.
90    ///
91    /// Returns `Ok(None)` if the key does not exist.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the RPC fails or the stored bytes cannot be
96    /// deserialized into `T`.
97    pub async fn get<T: DeserializeOwned>(&mut self, key: &str) -> Result<Option<T>, Error> {
98        self.connect().await?;
99        let response = self
100            .conn
101            .as_mut()
102            .unwrap()
103            .get(Request::new(GetRequest {
104                key: key.to_string(),
105            }))
106            .await;
107
108        match response {
109            Ok(resp) => match resp.into_inner().value {
110                Some(value) => match bincode::deserialize::<T>(value.as_slice()) {
111                    Ok(value2) => Ok(Some(value2)),
112                    Err(_) => Err(Error::DeserializeFailed),
113                },
114                None => Ok(None),
115            },
116            Err(_) => Err(Error::RPCCallFailed),
117        }
118    }
119
120    /// Insert or overwrite `key` with `value`.
121    ///
122    /// The call blocks until the entry has been committed to a majority of
123    /// cluster nodes.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if serialization fails, the RPC fails, or the cluster
128    /// does not commit the entry within the timeout.
129    pub async fn insert<T: Serialize>(&mut self, key: &str, value: T) -> Result<(), Error> {
130        self.connect().await?;
131        let response = self
132            .conn
133            .as_mut()
134            .unwrap()
135            .insert(Request::new(InsertRequest {
136                key: key.to_string(),
137                value: bincode::serialize(&value)?.to_vec(),
138            }))
139            .await;
140
141        match response {
142            Ok(resp) => match resp.into_inner().success {
143                true => Ok(()),
144                false => Err(Error::RPCCallFailed),
145            },
146            Err(_) => Err(Error::RPCCallFailed),
147        }
148    }
149
150    /// Remove `key` from the store.
151    ///
152    /// The call blocks until the deletion has been committed to a majority of
153    /// cluster nodes.  Returns `Ok(())` even if the key did not exist.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the RPC fails or the cluster does not commit the
158    /// deletion within the timeout.
159    pub async fn remove(&mut self, key: &str) -> Result<(), Error> {
160        self.connect().await?;
161        let response = self
162            .conn
163            .as_mut()
164            .unwrap()
165            .delete(Request::new(DeleteRequest {
166                key: key.to_string(),
167            }))
168            .await;
169
170        match response {
171            Ok(resp) => match resp.into_inner().success {
172                true => Ok(()),
173                false => Err(Error::RPCCallFailed),
174            },
175            Err(_) => Err(Error::RPCCallFailed),
176        }
177    }
178
179    /// Return all key prefixes (namespace segments) currently in the store.
180    ///
181    /// Keys are organised hierarchically using `/` as a separator.  This
182    /// method returns the distinct prefix segments, e.g. `["users", "posts"]`
183    /// for a store containing `"users/alice"` and `"posts/hello"`.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if the RPC fails.
188    pub async fn get_prefixes(&mut self) -> Result<Vec<String>, Error> {
189        self.connect().await?;
190        let response = self
191            .conn
192            .as_mut()
193            .unwrap()
194            .get_prefixes(Request::new(Empty {}))
195            .await;
196
197        match response {
198            Ok(resp) => Ok(resp.into_inner().prefixes),
199            Err(_) => Err(Error::RPCCallFailed),
200        }
201    }
202
203    /// Return all keys whose path starts with `prefix`.
204    ///
205    /// Pass an empty string or `"/"` to list all keys at the root level.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if the RPC fails.
210    pub async fn list_keys(&mut self, prefix: &str) -> Result<Vec<String>, Error> {
211        self.connect().await?;
212        let response = self
213            .conn
214            .as_mut()
215            .unwrap()
216            .list_keys(Request::new(ListKeysRequest {
217                prefix: String::from(prefix),
218            }))
219            .await;
220
221        match response {
222            Ok(resp) => Ok(resp.into_inner().keys),
223            Err(_) => Err(Error::RPCCallFailed),
224        }
225    }
226
227    /// Remove a node from the cluster configuration.
228    ///
229    /// `id` and `address` must match an existing member.  The removal is
230    /// propagated through the Raft log so all surviving nodes eventually drop
231    /// the node from their cluster view.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the RPC fails or the node is not found.
236    pub async fn remove_node(&mut self, id: Uuid, address: Ipv4Addr) -> Result<(), Error> {
237        self.connect().await?;
238        let response = self
239            .conn
240            .as_mut()
241            .unwrap()
242            .remove_server(Request::new(ServerRequest {
243                id: id.to_string(),
244                address: address.to_string(),
245            }))
246            .await;
247
248        match response {
249            Ok(resp) => match resp.into_inner().success {
250                true => Ok(()),
251                false => Err(Error::RPCCallFailed),
252            },
253            Err(_) => Err(Error::RPCCallFailed),
254        }
255    }
256
257    /// Subscribe to changes under `prefix`.
258    ///
259    /// Returns a [`Subscriber`] that yields a `(key, value)` pair each time
260    /// an entry whose path starts with `prefix` is inserted.  The stream
261    /// remains open until the connection is dropped or the server closes it.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the RPC fails to establish the stream.
266    pub async fn watch(&mut self, prefix: &str) -> Result<Subscriber, Error> {
267        self.connect().await?;
268        let response = self
269            .conn
270            .as_mut()
271            .unwrap()
272            .subscribe(Request::new(SubscribeRequest {
273                prefix: String::from(prefix),
274            }))
275            .await;
276
277        match response {
278            Ok(resp) => Ok(Subscriber::new(resp.into_inner())),
279            Err(_) => Err(Error::RPCCallFailed),
280        }
281    }
282}