1use std::io;
13use std::time::Duration;
14
15use kevy_store::StoreError;
16
17use crate::pubsub::Subscription;
18use crate::store::{Store, commit_write, store_err};
19
20impl Store {
21 pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
28 let mut g = self.wshard(key);
29 let ok = g.store.set(key, value.to_vec(), None, false, false);
30 commit_write(&mut g, &[b"SET", key, value])?;
31 Ok(ok)
32 }
33
34 pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
40 let mut g = self.wshard(key);
41 let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
42 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
43 let deadline = kevy_store::now_unix_ms().saturating_add(ms);
44 commit_write(&mut g, &[b"SET", key, value])?;
45 commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
46 Ok(ok)
47 }
48
49 pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
57 if self.config().maxmemory == 0 {
58 let g = self.rshard(key);
59 return Ok(g.store.get_shared(key).map_err(store_err)?.map(|v| v.to_vec()));
60 }
61 let mut g = self.wshard(key);
62 Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
63 }
64
65 pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
68 let mut total = 0;
69 for k in keys {
70 let owned = vec![k.to_vec()];
71 let mut g = self.wshard(k);
72 let n = g.store.del(&owned);
73 if n > 0 {
74 total += n;
75 commit_write(&mut g, &[b"DEL", k])?;
76 }
77 }
78 Ok(total)
79 }
80
81 pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
84 let mut total = 0;
85 for k in keys {
86 total += self.wshard(k).store.exists(&[k.to_vec()]);
87 }
88 Ok(total)
89 }
90
91 pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
93 self.incr_by(key, 1)
94 }
95
96 pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
98 let mut g = self.wshard(key);
99 let n = g.store.incr_by(key, delta).map_err(store_err)?;
100 commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
101 Ok(n)
102 }
103
104 pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
108 let mut g = self.wshard(key);
109 let touched = g.store.expire(key, ttl);
110 if touched {
111 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
112 let deadline = kevy_store::now_unix_ms().saturating_add(ms);
113 commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
114 }
115 Ok(touched)
116 }
117
118 pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
120 let mut g = self.wshard(key);
121 let touched = g.store.persist(key);
122 if touched {
123 commit_write(&mut g, &[b"PERSIST", key])?;
124 }
125 Ok(touched)
126 }
127
128 pub fn ttl_ms(&self, key: &[u8]) -> i64 {
130 self.wshard(key).store.pttl(key)
131 }
132
133 pub fn type_of(&self, key: &[u8]) -> &'static str {
135 self.wshard(key).store.type_of(key)
136 }
137
138 pub fn dbsize(&self) -> usize {
140 self.sum_shards(|i| i.store.dbsize())
141 }
142
143 pub fn flush(&self) -> io::Result<()> {
146 self.try_for_each_shard(|inner| {
147 inner.store.flush();
148 commit_write(inner, &[b"FLUSHALL"])
149 })
150 }
151
152 pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
154 self.wshard(key).store.estimate_key_bytes(key)
155 }
156
157 pub fn used_memory(&self) -> u64 {
159 self.sum_shards_u64(|i| i.store.used_memory())
160 }
161
162 pub fn evictions_total(&self) -> u64 {
164 self.sum_shards_u64(|i| i.store.evictions_total())
165 }
166
167 pub fn expired_keys_total(&self) -> u64 {
169 self.sum_shards_u64(|i| i.store.expired_keys_total())
170 }
171
172 pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
176 let mut g = self.wshard(key);
177 let owned: Vec<(Vec<u8>, Vec<u8>)> =
178 pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
179 let added = g.store.hset(key, &owned).map_err(store_err)?;
180 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
181 parts.push(b"HSET");
182 parts.push(key);
183 for (f, v) in pairs {
184 parts.push(f);
185 parts.push(v);
186 }
187 commit_write(&mut g, &parts)?;
188 Ok(added)
189 }
190
191 pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
193 let mut g = self.wshard(key);
194 Ok(g.store
195 .hget(key, field)
196 .map_err(store_err)?
197 .map(|v| v.to_vec()))
198 }
199
200 pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
202 let mut g = self.wshard(key);
203 let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
204 let removed = g.store.hdel(key, &owned).map_err(store_err)?;
205 if removed > 0 {
206 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
207 parts.push(b"HDEL");
208 parts.push(key);
209 for f in fields {
210 parts.push(f);
211 }
212 commit_write(&mut g, &parts)?;
213 }
214 Ok(removed)
215 }
216
217 pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
221 push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
222 }
223
224 pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
226 push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
227 }
228
229 pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
231 pop_helper(self, key, count, false)
232 }
233
234 pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
236 pop_helper(self, key, count, true)
237 }
238
239 pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
241 self.wshard(key).store.llen(key).map_err(store_err)
242 }
243
244 pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
248 push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
249 }
250
251 pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
253 let mut g = self.wshard(key);
254 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
255 let removed = g.store.srem(key, &owned).map_err(store_err)?;
256 if removed > 0 {
257 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
258 parts.push(b"SREM");
259 parts.push(key);
260 for m in members {
261 parts.push(m);
262 }
263 commit_write(&mut g, &parts)?;
264 }
265 Ok(removed)
266 }
267
268 pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
270 self.wshard(key).store.smembers(key).map_err(store_err)
271 }
272
273 pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
275 self.wshard(key).store.scard(key).map_err(store_err)
276 }
277
278 pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
282 let mut g = self.wshard(key);
283 let owned: Vec<(f64, Vec<u8>)> =
284 pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
285 let added = g.store.zadd(key, &owned).map_err(store_err)?;
286 let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
287 for (s, _) in pairs {
288 score_strs.push(format!("{s}").into_bytes());
289 }
290 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
291 parts.push(b"ZADD");
292 parts.push(key);
293 for (i, (_, m)) in pairs.iter().enumerate() {
294 parts.push(&score_strs[i]);
295 parts.push(m);
296 }
297 commit_write(&mut g, &parts)?;
298 Ok(added)
299 }
300
301 pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
303 let mut g = self.wshard(key);
304 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
305 let removed = g.store.zrem(key, &owned).map_err(store_err)?;
306 if removed > 0 {
307 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
308 parts.push(b"ZREM");
309 parts.push(key);
310 for m in members {
311 parts.push(m);
312 }
313 commit_write(&mut g, &parts)?;
314 }
315 Ok(removed)
316 }
317
318 pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
320 self.wshard(key).store.zscore(key, member).map_err(store_err)
321 }
322
323 pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
325 self.wshard(key).store.zcard(key).map_err(store_err)
326 }
327
328 pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
334 let plans = {
337 let g = self.lock();
339 g.bus.collect_delivery(channel, payload)
340 };
341 let mut count = 0;
342 for (frame, sender) in plans {
343 if sender.send(frame).is_ok() {
344 count += 1;
345 }
346 }
347 count
348 }
349
350 pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
355 let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
356 if !channels.is_empty() {
357 sub.subscribe(channels);
358 }
359 sub
360 }
361
362 pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
364 let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
365 if !patterns.is_empty() {
366 sub.psubscribe(patterns);
367 }
368 sub
369 }
370}
371
372fn push_helper<F>(
377 s: &Store,
378 key: &[u8],
379 values: &[&[u8]],
380 verb: &'static [u8],
381 op: F,
382) -> io::Result<usize>
383where
384 F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
385{
386 let mut g = s.wshard(key);
387 let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
388 let n = op(&mut g.store, key, &owned).map_err(store_err)?;
389 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
390 parts.push(verb);
391 parts.push(key);
392 for v in values {
393 parts.push(v);
394 }
395 commit_write(&mut g, &parts)?;
396 Ok(n)
397}
398
399fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
400 let mut g = s.wshard(key);
401 let popped = if from_tail {
402 g.store.rpop(key, count).map_err(store_err)?
403 } else {
404 g.store.lpop(key, count).map_err(store_err)?
405 };
406 if !popped.is_empty() {
407 let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
408 let count_str = popped.len().to_string();
409 let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
410 commit_write(&mut g, &parts)?;
411 }
412 Ok(popped)
413}