1use std::io;
13use std::time::Duration;
14
15use kevy_store::StoreError;
16
17use crate::pubsub::Subscription;
18use crate::store::{Store, commit_write, store_err};
19
20impl Store {
21 pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
28 let mut g = self.wshard(key);
29 let ok = g.store.set(key, value.to_vec(), None, false, false);
30 commit_write(&mut g, &[b"SET", key, value])?;
31 Ok(ok)
32 }
33
34 pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
40 let mut g = self.wshard(key);
41 let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
42 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
43 let deadline = kevy_store::now_unix_ms().saturating_add(ms);
44 commit_write(&mut g, &[b"SET", key, value])?;
45 commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
46 Ok(ok)
47 }
48
49 pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
57 if self.config().maxmemory == 0 {
58 let g = self.rshard(key);
59 return Ok(g.store.get_shared(key).map_err(store_err)?.map(|v| v.to_vec()));
60 }
61 let mut g = self.wshard(key);
62 Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
63 }
64
65 pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
68 let mut total = 0;
69 for k in keys {
70 let owned = vec![k.to_vec()];
71 let mut g = self.wshard(k);
72 let n = g.store.del(&owned);
73 if n > 0 {
74 total += n;
75 commit_write(&mut g, &[b"DEL", k])?;
76 }
77 }
78 Ok(total)
79 }
80
81 pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
84 let mut total = 0;
85 for k in keys {
86 total += self.wshard(k).store.exists(&[k.to_vec()]);
87 }
88 Ok(total)
89 }
90
91 pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
93 self.incr_by(key, 1)
94 }
95
96 pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
98 let mut g = self.wshard(key);
99 let n = g.store.incr_by(key, delta).map_err(store_err)?;
100 commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
101 Ok(n)
102 }
103
104 pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
108 let mut g = self.wshard(key);
109 let touched = g.store.expire(key, ttl);
110 if touched {
111 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
112 let deadline = kevy_store::now_unix_ms().saturating_add(ms);
113 commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
114 }
115 Ok(touched)
116 }
117
118 pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
120 let mut g = self.wshard(key);
121 let touched = g.store.persist(key);
122 if touched {
123 commit_write(&mut g, &[b"PERSIST", key])?;
124 }
125 Ok(touched)
126 }
127
128 pub fn ttl_ms(&self, key: &[u8]) -> i64 {
130 self.wshard(key).store.pttl(key)
131 }
132
133 pub fn type_of(&self, key: &[u8]) -> &'static str {
135 self.wshard(key).store.type_of(key)
136 }
137
138 pub fn dbsize(&self) -> usize {
140 self.sum_shards(|i| i.store.dbsize())
141 }
142
143 pub fn flushall(&self) -> io::Result<()> {
154 self.try_for_each_shard(|inner| {
155 inner.store.flushall();
156 commit_write(inner, &[b"FLUSHALL"])
157 })
158 }
159
160 #[deprecated(
164 since = "1.2.0",
165 note = "renamed to `flushall`: `flush` collides with Write::flush (sync-to-disk); this WIPES the store"
166 )]
167 pub fn flush(&self) -> io::Result<()> {
168 self.flushall()
169 }
170
171 pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
173 self.wshard(key).store.estimate_key_bytes(key)
174 }
175
176 pub fn used_memory(&self) -> u64 {
178 self.sum_shards_u64(|i| i.store.used_memory())
179 }
180
181 pub fn evictions_total(&self) -> u64 {
183 self.sum_shards_u64(|i| i.store.evictions_total())
184 }
185
186 pub fn expired_keys_total(&self) -> u64 {
188 self.sum_shards_u64(|i| i.store.expired_keys_total())
189 }
190
191 pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
195 let mut g = self.wshard(key);
196 let owned: Vec<(Vec<u8>, Vec<u8>)> =
197 pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
198 let added = g.store.hset(key, &owned).map_err(store_err)?;
199 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
200 parts.push(b"HSET");
201 parts.push(key);
202 for (f, v) in pairs {
203 parts.push(f);
204 parts.push(v);
205 }
206 commit_write(&mut g, &parts)?;
207 Ok(added)
208 }
209
210 pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
212 let mut g = self.wshard(key);
213 Ok(g.store
214 .hget(key, field)
215 .map_err(store_err)?
216 .map(|v| v.to_vec()))
217 }
218
219 pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
221 let mut g = self.wshard(key);
222 let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
223 let removed = g.store.hdel(key, &owned).map_err(store_err)?;
224 if removed > 0 {
225 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
226 parts.push(b"HDEL");
227 parts.push(key);
228 for f in fields {
229 parts.push(f);
230 }
231 commit_write(&mut g, &parts)?;
232 }
233 Ok(removed)
234 }
235
236 pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
240 push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
241 }
242
243 pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
245 push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
246 }
247
248 pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
250 pop_helper(self, key, count, false)
251 }
252
253 pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
255 pop_helper(self, key, count, true)
256 }
257
258 pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
260 self.wshard(key).store.llen(key).map_err(store_err)
261 }
262
263 pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
267 push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
268 }
269
270 pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
272 let mut g = self.wshard(key);
273 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
274 let removed = g.store.srem(key, &owned).map_err(store_err)?;
275 if removed > 0 {
276 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
277 parts.push(b"SREM");
278 parts.push(key);
279 for m in members {
280 parts.push(m);
281 }
282 commit_write(&mut g, &parts)?;
283 }
284 Ok(removed)
285 }
286
287 pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
289 self.wshard(key).store.smembers(key).map_err(store_err)
290 }
291
292 pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
294 self.wshard(key).store.scard(key).map_err(store_err)
295 }
296
297 pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
301 let mut g = self.wshard(key);
302 let owned: Vec<(f64, Vec<u8>)> =
303 pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
304 let added = g.store.zadd(key, &owned).map_err(store_err)?;
305 let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
306 for (s, _) in pairs {
307 score_strs.push(format!("{s}").into_bytes());
308 }
309 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
310 parts.push(b"ZADD");
311 parts.push(key);
312 for (i, (_, m)) in pairs.iter().enumerate() {
313 parts.push(&score_strs[i]);
314 parts.push(m);
315 }
316 commit_write(&mut g, &parts)?;
317 Ok(added)
318 }
319
320 pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
322 let mut g = self.wshard(key);
323 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
324 let removed = g.store.zrem(key, &owned).map_err(store_err)?;
325 if removed > 0 {
326 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
327 parts.push(b"ZREM");
328 parts.push(key);
329 for m in members {
330 parts.push(m);
331 }
332 commit_write(&mut g, &parts)?;
333 }
334 Ok(removed)
335 }
336
337 pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
339 self.wshard(key).store.zscore(key, member).map_err(store_err)
340 }
341
342 pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
344 self.wshard(key).store.zcard(key).map_err(store_err)
345 }
346
347 pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
353 let plans = {
356 let g = self.lock();
358 g.bus.collect_delivery(channel, payload)
359 };
360 let mut count = 0;
361 for (frame, sender) in plans {
362 if sender.send(frame).is_ok() {
363 count += 1;
364 }
365 }
366 count
367 }
368
369 pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
374 let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
375 if !channels.is_empty() {
376 sub.subscribe(channels);
377 }
378 sub
379 }
380
381 pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
383 let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
384 if !patterns.is_empty() {
385 sub.psubscribe(patterns);
386 }
387 sub
388 }
389}
390
391fn push_helper<F>(
396 s: &Store,
397 key: &[u8],
398 values: &[&[u8]],
399 verb: &'static [u8],
400 op: F,
401) -> io::Result<usize>
402where
403 F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
404{
405 let mut g = s.wshard(key);
406 let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
407 let n = op(&mut g.store, key, &owned).map_err(store_err)?;
408 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
409 parts.push(verb);
410 parts.push(key);
411 for v in values {
412 parts.push(v);
413 }
414 commit_write(&mut g, &parts)?;
415 Ok(n)
416}
417
418fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
419 let mut g = s.wshard(key);
420 let popped = if from_tail {
421 g.store.rpop(key, count).map_err(store_err)?
422 } else {
423 g.store.lpop(key, count).map_err(store_err)?
424 };
425 if !popped.is_empty() {
426 let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
427 let count_str = popped.len().to_string();
428 let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
429 commit_write(&mut g, &parts)?;
430 }
431 Ok(popped)
432}