1use std::io;
9use std::time::Duration;
10
11use kevy_hash::key_hash_slot;
12use kevy_resp::Reply;
13
14use crate::cluster_topology::{build_topology, parse_cluster_slots};
15use crate::codec::AsyncRespCodec;
16use crate::reply::{string, unexpected, vec2, vec3};
17
18#[cfg(feature = "tokio")]
19type DefaultTransport = tokio::net::TcpStream;
20#[cfg(feature = "smol")]
21type DefaultTransport = smol::net::TcpStream;
22#[cfg(feature = "async-std")]
23type DefaultTransport = async_std::net::TcpStream;
24
25#[cfg(feature = "tokio")]
26async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
27 crate::rt_tokio::connect(host, port).await
28}
29#[cfg(feature = "smol")]
30async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
31 crate::rt_smol::connect(host, port).await
32}
33#[cfg(feature = "async-std")]
34async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
35 crate::rt_async_std::connect(host, port).await
36}
37
38pub struct AsyncClusterClient {
40 shards: Vec<AsyncRespCodec<DefaultTransport>>,
41 slot_to_shard: Vec<u16>,
42}
43
44impl AsyncClusterClient {
45 pub async fn connect(host: &str, port: u16) -> io::Result<Self> {
48 let mut seed_codec = AsyncRespCodec::new(connect_default(host, port).await?);
49 let reply = seed_codec
50 .request(&[b"CLUSTER".to_vec(), b"SLOTS".to_vec()])
51 .await?;
52 let ranges = parse_cluster_slots(reply)?;
53 let (nodes, slot_to_shard) = build_topology(&ranges)?;
54
55 let mut shards = Vec::with_capacity(nodes.len());
56 for (h, p) in &nodes {
57 let transport = connect_default(h, *p).await?;
58 shards.push(AsyncRespCodec::new(transport));
59 }
60 Ok(Self {
61 shards,
62 slot_to_shard,
63 })
64 }
65
66 pub fn shard_count(&self) -> usize {
68 self.shards.len()
69 }
70
71 pub async fn request_keyed(
73 &mut self,
74 key: &[u8],
75 args: &[Vec<u8>],
76 ) -> io::Result<Reply> {
77 let i = self.shard_for(key);
78 self.shards[i].request(args).await
79 }
80
81 pub async fn request_unkeyed(&mut self, args: &[Vec<u8>]) -> io::Result<Reply> {
83 self.shards[0].request(args).await
84 }
85
86 fn shard_for(&self, key: &[u8]) -> usize {
87 self.slot_to_shard[key_hash_slot(key) as usize] as usize
88 }
89
90 pub async fn ping(&mut self) -> io::Result<()> {
92 match self.request_unkeyed(&[b"PING".to_vec()]).await? {
93 Reply::Simple(s) if s == b"PONG" || s == b"OK" => Ok(()),
94 Reply::Error(e) => Err(io::Error::other(string(e))),
95 other => Err(unexpected(other)),
96 }
97 }
98
99 pub async fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
101 match self
102 .request_unkeyed(&vec3(b"PUBLISH", channel, message))
103 .await?
104 {
105 Reply::Int(n) if n >= 0 => Ok(n as usize),
106 Reply::Error(e) => Err(io::Error::other(string(e))),
107 other => Err(unexpected(other)),
108 }
109 }
110
111 pub async fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
113 match self.request_keyed(key, &vec3(b"SET", key, value)).await? {
114 Reply::Simple(s) if s == b"OK" => Ok(()),
115 Reply::Error(e) => Err(io::Error::other(string(e))),
116 other => Err(unexpected(other)),
117 }
118 }
119
120 pub async fn set_with_ttl(
122 &mut self,
123 key: &[u8],
124 value: &[u8],
125 ttl: Duration,
126 ) -> io::Result<()> {
127 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
128 let args = vec![
129 b"SET".to_vec(),
130 key.to_vec(),
131 value.to_vec(),
132 b"PX".to_vec(),
133 ms.to_string().into_bytes(),
134 ];
135 match self.request_keyed(key, &args).await? {
136 Reply::Simple(s) if s == b"OK" => Ok(()),
137 Reply::Error(e) => Err(io::Error::other(string(e))),
138 other => Err(unexpected(other)),
139 }
140 }
141
142 pub async fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
144 match self.request_keyed(key, &vec2(b"GET", key)).await? {
145 Reply::Bulk(v) => Ok(Some(v)),
146 Reply::Nil => Ok(None),
147 Reply::Error(e) => Err(io::Error::other(string(e))),
148 other => Err(unexpected(other)),
149 }
150 }
151
152 pub async fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
154 match self.request_keyed(key, &vec2(b"INCR", key)).await? {
155 Reply::Int(n) => Ok(n),
156 Reply::Error(e) => Err(io::Error::other(string(e))),
157 other => Err(unexpected(other)),
158 }
159 }
160
161 pub async fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
163 let args = vec![
164 b"INCRBY".to_vec(),
165 key.to_vec(),
166 delta.to_string().into_bytes(),
167 ];
168 match self.request_keyed(key, &args).await? {
169 Reply::Int(n) => Ok(n),
170 Reply::Error(e) => Err(io::Error::other(string(e))),
171 other => Err(unexpected(other)),
172 }
173 }
174
175 pub async fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
177 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
178 let args = vec![
179 b"PEXPIRE".to_vec(),
180 key.to_vec(),
181 ms.to_string().into_bytes(),
182 ];
183 match self.request_keyed(key, &args).await? {
184 Reply::Int(1) => Ok(true),
185 Reply::Int(0) => Ok(false),
186 Reply::Error(e) => Err(io::Error::other(string(e))),
187 other => Err(unexpected(other)),
188 }
189 }
190
191 pub async fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
193 match self.request_keyed(key, &vec2(b"PERSIST", key)).await? {
194 Reply::Int(1) => Ok(true),
195 Reply::Int(0) => Ok(false),
196 Reply::Error(e) => Err(io::Error::other(string(e))),
197 other => Err(unexpected(other)),
198 }
199 }
200
201 pub async fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
203 match self.request_keyed(key, &vec2(b"PTTL", key)).await? {
204 Reply::Int(n) => Ok(n),
205 Reply::Error(e) => Err(io::Error::other(string(e))),
206 other => Err(unexpected(other)),
207 }
208 }
209
210 pub async fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
212 let mut removed = 0;
213 for k in keys {
214 match self.request_keyed(k, &vec2(b"DEL", k)).await? {
215 Reply::Int(n) if n >= 0 => removed += n as usize,
216 Reply::Error(e) => return Err(io::Error::other(string(e))),
217 other => return Err(unexpected(other)),
218 }
219 }
220 Ok(removed)
221 }
222
223 pub async fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
225 let mut count = 0;
226 for k in keys {
227 match self.request_keyed(k, &vec2(b"EXISTS", k)).await? {
228 Reply::Int(n) if n >= 0 => count += n as usize,
229 Reply::Error(e) => return Err(io::Error::other(string(e))),
230 other => return Err(unexpected(other)),
231 }
232 }
233 Ok(count)
234 }
235
236 pub async fn dbsize(&mut self) -> io::Result<usize> {
238 match self.request_unkeyed(&[b"DBSIZE".to_vec()]).await? {
239 Reply::Int(n) if n >= 0 => Ok(n as usize),
240 Reply::Error(e) => Err(io::Error::other(string(e))),
241 other => Err(unexpected(other)),
242 }
243 }
244
245 pub async fn flushall(&mut self) -> io::Result<()> {
247 match self.request_unkeyed(&[b"FLUSHALL".to_vec()]).await? {
248 Reply::Simple(s) if s == b"OK" => Ok(()),
249 Reply::Error(e) => Err(io::Error::other(string(e))),
250 other => Err(unexpected(other)),
251 }
252 }
253}