1use std::io;
13use std::time::Duration;
14
15use kevy_store::StoreError;
16
17use crate::pubsub::Subscription;
18use crate::store::{Store, commit_write, store_err};
19
20impl Store {
21 pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
28 let mut g = self.lock();
29 let ok = g.store.set(key, value.to_vec(), None, false, false);
30 commit_write(&mut g, &[b"SET", key, value])?;
31 Ok(ok)
32 }
33
34 pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
40 let mut g = self.lock();
41 let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
42 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
43 let deadline = kevy_store::now_unix_ms().saturating_add(ms);
44 commit_write(&mut g, &[b"SET", key, value])?;
45 commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
46 Ok(ok)
47 }
48
49 pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
51 let mut g = self.lock();
52 Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
53 }
54
55 pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
57 let mut g = self.lock();
58 let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
59 let n = g.store.del(&owned);
60 if n > 0 {
61 let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
62 parts.push(b"DEL");
63 for k in keys {
64 parts.push(k);
65 }
66 commit_write(&mut g, &parts)?;
67 }
68 Ok(n)
69 }
70
71 pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
74 let mut g = self.lock();
75 let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
76 Ok(g.store.exists(&owned))
77 }
78
79 pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
81 self.incr_by(key, 1)
82 }
83
84 pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
86 let mut g = self.lock();
87 let n = g.store.incr_by(key, delta).map_err(store_err)?;
88 commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
89 Ok(n)
90 }
91
92 pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
96 let mut g = self.lock();
97 let touched = g.store.expire(key, ttl);
98 if touched {
99 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
100 let deadline = kevy_store::now_unix_ms().saturating_add(ms);
101 commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
102 }
103 Ok(touched)
104 }
105
106 pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
108 let mut g = self.lock();
109 let touched = g.store.persist(key);
110 if touched {
111 commit_write(&mut g, &[b"PERSIST", key])?;
112 }
113 Ok(touched)
114 }
115
116 pub fn ttl_ms(&self, key: &[u8]) -> i64 {
118 self.lock().store.pttl(key)
119 }
120
121 pub fn type_of(&self, key: &[u8]) -> &'static str {
123 self.lock().store.type_of(key)
124 }
125
126 pub fn dbsize(&self) -> usize {
128 self.lock().store.dbsize()
129 }
130
131 pub fn flush(&self) -> io::Result<()> {
134 let mut g = self.lock();
135 g.store.flush();
136 commit_write(&mut g, &[b"FLUSHALL"])?;
137 Ok(())
138 }
139
140 pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
142 self.lock().store.estimate_key_bytes(key)
143 }
144
145 pub fn used_memory(&self) -> u64 {
147 self.lock().store.used_memory()
148 }
149
150 pub fn evictions_total(&self) -> u64 {
152 self.lock().store.evictions_total()
153 }
154
155 pub fn expired_keys_total(&self) -> u64 {
157 self.lock().store.expired_keys_total()
158 }
159
160 pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
164 let mut g = self.lock();
165 let owned: Vec<(Vec<u8>, Vec<u8>)> =
166 pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
167 let added = g.store.hset(key, &owned).map_err(store_err)?;
168 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
169 parts.push(b"HSET");
170 parts.push(key);
171 for (f, v) in pairs {
172 parts.push(f);
173 parts.push(v);
174 }
175 commit_write(&mut g, &parts)?;
176 Ok(added)
177 }
178
179 pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
181 let mut g = self.lock();
182 Ok(g.store
183 .hget(key, field)
184 .map_err(store_err)?
185 .map(|v| v.to_vec()))
186 }
187
188 pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
190 let mut g = self.lock();
191 let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
192 let removed = g.store.hdel(key, &owned).map_err(store_err)?;
193 if removed > 0 {
194 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
195 parts.push(b"HDEL");
196 parts.push(key);
197 for f in fields {
198 parts.push(f);
199 }
200 commit_write(&mut g, &parts)?;
201 }
202 Ok(removed)
203 }
204
205 pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
209 push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
210 }
211
212 pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
214 push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
215 }
216
217 pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
219 pop_helper(self, key, count, false)
220 }
221
222 pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
224 pop_helper(self, key, count, true)
225 }
226
227 pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
229 self.lock().store.llen(key).map_err(store_err)
230 }
231
232 pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
236 push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
237 }
238
239 pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
241 let mut g = self.lock();
242 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
243 let removed = g.store.srem(key, &owned).map_err(store_err)?;
244 if removed > 0 {
245 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
246 parts.push(b"SREM");
247 parts.push(key);
248 for m in members {
249 parts.push(m);
250 }
251 commit_write(&mut g, &parts)?;
252 }
253 Ok(removed)
254 }
255
256 pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
258 self.lock().store.smembers(key).map_err(store_err)
259 }
260
261 pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
263 self.lock().store.scard(key).map_err(store_err)
264 }
265
266 pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
270 let mut g = self.lock();
271 let owned: Vec<(f64, Vec<u8>)> =
272 pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
273 let added = g.store.zadd(key, &owned).map_err(store_err)?;
274 let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
275 for (s, _) in pairs {
276 score_strs.push(format!("{s}").into_bytes());
277 }
278 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
279 parts.push(b"ZADD");
280 parts.push(key);
281 for (i, (_, m)) in pairs.iter().enumerate() {
282 parts.push(&score_strs[i]);
283 parts.push(m);
284 }
285 commit_write(&mut g, &parts)?;
286 Ok(added)
287 }
288
289 pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
291 let mut g = self.lock();
292 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
293 let removed = g.store.zrem(key, &owned).map_err(store_err)?;
294 if removed > 0 {
295 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
296 parts.push(b"ZREM");
297 parts.push(key);
298 for m in members {
299 parts.push(m);
300 }
301 commit_write(&mut g, &parts)?;
302 }
303 Ok(removed)
304 }
305
306 pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
308 self.lock().store.zscore(key, member).map_err(store_err)
309 }
310
311 pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
313 self.lock().store.zcard(key).map_err(store_err)
314 }
315
316 pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
322 let plans = {
325 let g = self.lock();
326 g.bus.collect_delivery(channel, payload)
327 };
328 let mut count = 0;
329 for (frame, sender) in plans {
330 if sender.send(frame).is_ok() {
331 count += 1;
332 }
333 }
334 count
335 }
336
337 pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
342 let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
343 if !channels.is_empty() {
344 sub.subscribe(channels);
345 }
346 sub
347 }
348
349 pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
351 let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
352 if !patterns.is_empty() {
353 sub.psubscribe(patterns);
354 }
355 sub
356 }
357}
358
359fn push_helper<F>(
364 s: &Store,
365 key: &[u8],
366 values: &[&[u8]],
367 verb: &'static [u8],
368 op: F,
369) -> io::Result<usize>
370where
371 F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
372{
373 let mut g = s.lock();
374 let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
375 let n = op(&mut g.store, key, &owned).map_err(store_err)?;
376 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
377 parts.push(verb);
378 parts.push(key);
379 for v in values {
380 parts.push(v);
381 }
382 commit_write(&mut g, &parts)?;
383 Ok(n)
384}
385
386fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
387 let mut g = s.lock();
388 let popped = if from_tail {
389 g.store.rpop(key, count).map_err(store_err)?
390 } else {
391 g.store.lpop(key, count).map_err(store_err)?
392 };
393 if !popped.is_empty() {
394 let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
395 let count_str = popped.len().to_string();
396 let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
397 commit_write(&mut g, &parts)?;
398 }
399 Ok(popped)
400}