Skip to main content

kevy_client_async/
cluster.rs

1//! Async cluster-aware client: one connection per shard, CRC16
2//! routing per key. Mirror of `kevy_client::ClusterClient`.
3//!
4//! Topology discovered once at connect via `CLUSTER SLOTS`; subsequent
5//! key-routed commands go straight to the owner shard — `-MOVED` never
6//! fires for correct routing.
7
8use std::io;
9use std::time::Duration;
10
11use kevy_hash::key_hash_slot;
12use kevy_resp::Reply;
13
14use crate::cluster_topology::{build_topology, parse_cluster_slots};
15use crate::codec::AsyncRespCodec;
16use crate::reply::{string, unexpected, vec2, vec3};
17
18#[cfg(feature = "tokio")]
19type DefaultTransport = tokio::net::TcpStream;
20#[cfg(feature = "smol")]
21type DefaultTransport = smol::net::TcpStream;
22#[cfg(feature = "async-std")]
23type DefaultTransport = async_std::net::TcpStream;
24
25#[cfg(feature = "tokio")]
26async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
27    crate::rt_tokio::connect(host, port).await
28}
29#[cfg(feature = "smol")]
30async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
31    crate::rt_smol::connect(host, port).await
32}
33#[cfg(feature = "async-std")]
34async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
35    crate::rt_async_std::connect(host, port).await
36}
37
38/// One open connection per distinct shard node + a slot→shard table.
39pub struct AsyncClusterClient {
40    shards: Vec<AsyncRespCodec<DefaultTransport>>,
41    slot_to_shard: Vec<u16>,
42}
43
44impl AsyncClusterClient {
45    /// Connect via a seed node, discover topology, open one connection
46    /// per shard.
47    pub async fn connect(host: &str, port: u16) -> io::Result<Self> {
48        let mut seed_codec = AsyncRespCodec::new(connect_default(host, port).await?);
49        let reply = seed_codec
50            .request(&[b"CLUSTER".to_vec(), b"SLOTS".to_vec()])
51            .await?;
52        let ranges = parse_cluster_slots(reply)?;
53        let (nodes, slot_to_shard) = build_topology(&ranges)?;
54
55        let mut shards = Vec::with_capacity(nodes.len());
56        for (h, p) in &nodes {
57            let transport = connect_default(h, *p).await?;
58            shards.push(AsyncRespCodec::new(transport));
59        }
60        Ok(Self {
61            shards,
62            slot_to_shard,
63        })
64    }
65
66    /// Number of distinct shard nodes.
67    pub fn shard_count(&self) -> usize {
68        self.shards.len()
69    }
70
71    /// Route a single-key command to its owner shard.
72    pub async fn request_keyed(
73        &mut self,
74        key: &[u8],
75        args: &[Vec<u8>],
76    ) -> io::Result<Reply> {
77        let i = self.shard_for(key);
78        self.shards[i].request(args).await
79    }
80
81    /// Keyless command — answered identically by any shard.
82    pub async fn request_unkeyed(&mut self, args: &[Vec<u8>]) -> io::Result<Reply> {
83        self.shards[0].request(args).await
84    }
85
86    fn shard_for(&self, key: &[u8]) -> usize {
87        self.slot_to_shard[key_hash_slot(key) as usize] as usize
88    }
89
90    /// `PING`. Answered by any shard.
91    pub async fn ping(&mut self) -> io::Result<()> {
92        match self.request_unkeyed(&[b"PING".to_vec()]).await? {
93            Reply::Simple(s) if s == b"PONG" || s == b"OK" => Ok(()),
94            Reply::Error(e) => Err(io::Error::other(string(e))),
95            other => Err(unexpected(other)),
96        }
97    }
98
99    /// `PUBLISH channel message`. Returns subscriber count.
100    pub async fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
101        match self
102            .request_unkeyed(&vec3(b"PUBLISH", channel, message))
103            .await?
104        {
105            Reply::Int(n) if n >= 0 => Ok(n as usize),
106            Reply::Error(e) => Err(io::Error::other(string(e))),
107            other => Err(unexpected(other)),
108        }
109    }
110
111    /// `SET key value`.
112    pub async fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
113        match self.request_keyed(key, &vec3(b"SET", key, value)).await? {
114            Reply::Simple(s) if s == b"OK" => Ok(()),
115            Reply::Error(e) => Err(io::Error::other(string(e))),
116            other => Err(unexpected(other)),
117        }
118    }
119
120    /// `SET key value PX ttl_ms`.
121    pub async fn set_with_ttl(
122        &mut self,
123        key: &[u8],
124        value: &[u8],
125        ttl: Duration,
126    ) -> io::Result<()> {
127        let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
128        let args = vec![
129            b"SET".to_vec(),
130            key.to_vec(),
131            value.to_vec(),
132            b"PX".to_vec(),
133            ms.to_string().into_bytes(),
134        ];
135        match self.request_keyed(key, &args).await? {
136            Reply::Simple(s) if s == b"OK" => Ok(()),
137            Reply::Error(e) => Err(io::Error::other(string(e))),
138            other => Err(unexpected(other)),
139        }
140    }
141
142    /// `GET key`.
143    pub async fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
144        match self.request_keyed(key, &vec2(b"GET", key)).await? {
145            Reply::Bulk(v) => Ok(Some(v)),
146            Reply::Nil => Ok(None),
147            Reply::Error(e) => Err(io::Error::other(string(e))),
148            other => Err(unexpected(other)),
149        }
150    }
151
152    /// `INCR key`.
153    pub async fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
154        match self.request_keyed(key, &vec2(b"INCR", key)).await? {
155            Reply::Int(n) => Ok(n),
156            Reply::Error(e) => Err(io::Error::other(string(e))),
157            other => Err(unexpected(other)),
158        }
159    }
160
161    /// `INCRBY key delta`.
162    pub async fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
163        let args = vec![
164            b"INCRBY".to_vec(),
165            key.to_vec(),
166            delta.to_string().into_bytes(),
167        ];
168        match self.request_keyed(key, &args).await? {
169            Reply::Int(n) => Ok(n),
170            Reply::Error(e) => Err(io::Error::other(string(e))),
171            other => Err(unexpected(other)),
172        }
173    }
174
175    /// `PEXPIRE key ttl_ms`.
176    pub async fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
177        let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
178        let args = vec![
179            b"PEXPIRE".to_vec(),
180            key.to_vec(),
181            ms.to_string().into_bytes(),
182        ];
183        match self.request_keyed(key, &args).await? {
184            Reply::Int(1) => Ok(true),
185            Reply::Int(0) => Ok(false),
186            Reply::Error(e) => Err(io::Error::other(string(e))),
187            other => Err(unexpected(other)),
188        }
189    }
190
191    /// `PERSIST key`.
192    pub async fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
193        match self.request_keyed(key, &vec2(b"PERSIST", key)).await? {
194            Reply::Int(1) => Ok(true),
195            Reply::Int(0) => Ok(false),
196            Reply::Error(e) => Err(io::Error::other(string(e))),
197            other => Err(unexpected(other)),
198        }
199    }
200
201    /// `PTTL key`.
202    pub async fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
203        match self.request_keyed(key, &vec2(b"PTTL", key)).await? {
204            Reply::Int(n) => Ok(n),
205            Reply::Error(e) => Err(io::Error::other(string(e))),
206            other => Err(unexpected(other)),
207        }
208    }
209
210    /// `DEL key [key ...]` — routed per key, summed.
211    pub async fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
212        let mut removed = 0;
213        for k in keys {
214            match self.request_keyed(k, &vec2(b"DEL", k)).await? {
215                Reply::Int(n) if n >= 0 => removed += n as usize,
216                Reply::Error(e) => return Err(io::Error::other(string(e))),
217                other => return Err(unexpected(other)),
218            }
219        }
220        Ok(removed)
221    }
222
223    /// `EXISTS key [key ...]` — routed per key, summed.
224    pub async fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
225        let mut count = 0;
226        for k in keys {
227            match self.request_keyed(k, &vec2(b"EXISTS", k)).await? {
228                Reply::Int(n) if n >= 0 => count += n as usize,
229                Reply::Error(e) => return Err(io::Error::other(string(e))),
230                other => return Err(unexpected(other)),
231            }
232        }
233        Ok(count)
234    }
235
236    /// `DBSIZE` — cluster-wide total (server fans out internally).
237    pub async fn dbsize(&mut self) -> io::Result<usize> {
238        match self.request_unkeyed(&[b"DBSIZE".to_vec()]).await? {
239            Reply::Int(n) if n >= 0 => Ok(n as usize),
240            Reply::Error(e) => Err(io::Error::other(string(e))),
241            other => Err(unexpected(other)),
242        }
243    }
244
245    /// `FLUSHALL` — clears every shard.
246    pub async fn flushall(&mut self) -> io::Result<()> {
247        match self.request_unkeyed(&[b"FLUSHALL".to_vec()]).await? {
248            Reply::Simple(s) if s == b"OK" => Ok(()),
249            Reply::Error(e) => Err(io::Error::other(string(e))),
250            other => Err(unexpected(other)),
251        }
252    }
253}