1use std::io;
13use std::time::Duration;
14
15use kevy_store::StoreError;
16
17use crate::pubsub::Subscription;
18use crate::store::{Store, commit_write, store_err};
19
20impl Store {
21 pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
28 let mut g = self.lock();
29 let ok = g.store.set(key, value.to_vec(), None, false, false);
30 commit_write(&mut g, &[b"SET", key, value])?;
31 Ok(ok)
32 }
33
34 pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
36 let mut g = self.lock();
37 let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
38 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
39 commit_write(&mut g, &[b"SET", key, value])?;
40 commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
41 Ok(ok)
42 }
43
44 pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
46 let mut g = self.lock();
47 Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
48 }
49
50 pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
52 let mut g = self.lock();
53 let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
54 let n = g.store.del(&owned);
55 if n > 0 {
56 let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
57 parts.push(b"DEL");
58 for k in keys {
59 parts.push(k);
60 }
61 commit_write(&mut g, &parts)?;
62 }
63 Ok(n)
64 }
65
66 pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
69 let mut g = self.lock();
70 let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
71 Ok(g.store.exists(&owned))
72 }
73
74 pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
76 self.incr_by(key, 1)
77 }
78
79 pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
81 let mut g = self.lock();
82 let n = g.store.incr_by(key, delta).map_err(store_err)?;
83 commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
84 Ok(n)
85 }
86
87 pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
89 let mut g = self.lock();
90 let touched = g.store.expire(key, ttl);
91 if touched {
92 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
93 commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
94 }
95 Ok(touched)
96 }
97
98 pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
100 let mut g = self.lock();
101 let touched = g.store.persist(key);
102 if touched {
103 commit_write(&mut g, &[b"PERSIST", key])?;
104 }
105 Ok(touched)
106 }
107
108 pub fn ttl_ms(&self, key: &[u8]) -> i64 {
110 self.lock().store.pttl(key)
111 }
112
113 pub fn type_of(&self, key: &[u8]) -> &'static str {
115 self.lock().store.type_of(key)
116 }
117
118 pub fn dbsize(&self) -> usize {
120 self.lock().store.dbsize()
121 }
122
123 pub fn flush(&self) -> io::Result<()> {
126 let mut g = self.lock();
127 g.store.flush();
128 commit_write(&mut g, &[b"FLUSHALL"])?;
129 Ok(())
130 }
131
132 pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
134 self.lock().store.estimate_key_bytes(key)
135 }
136
137 pub fn used_memory(&self) -> u64 {
139 self.lock().store.used_memory()
140 }
141
142 pub fn evictions_total(&self) -> u64 {
144 self.lock().store.evictions_total()
145 }
146
147 pub fn expired_keys_total(&self) -> u64 {
149 self.lock().store.expired_keys_total()
150 }
151
152 pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
156 let mut g = self.lock();
157 let owned: Vec<(Vec<u8>, Vec<u8>)> =
158 pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
159 let added = g.store.hset(key, &owned).map_err(store_err)?;
160 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
161 parts.push(b"HSET");
162 parts.push(key);
163 for (f, v) in pairs {
164 parts.push(f);
165 parts.push(v);
166 }
167 commit_write(&mut g, &parts)?;
168 Ok(added)
169 }
170
171 pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
173 let mut g = self.lock();
174 Ok(g.store
175 .hget(key, field)
176 .map_err(store_err)?
177 .map(|v| v.to_vec()))
178 }
179
180 pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
182 let mut g = self.lock();
183 let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
184 let removed = g.store.hdel(key, &owned).map_err(store_err)?;
185 if removed > 0 {
186 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
187 parts.push(b"HDEL");
188 parts.push(key);
189 for f in fields {
190 parts.push(f);
191 }
192 commit_write(&mut g, &parts)?;
193 }
194 Ok(removed)
195 }
196
197 pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
201 push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
202 }
203
204 pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
206 push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
207 }
208
209 pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
211 pop_helper(self, key, count, false)
212 }
213
214 pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
216 pop_helper(self, key, count, true)
217 }
218
219 pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
221 self.lock().store.llen(key).map_err(store_err)
222 }
223
224 pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
228 push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
229 }
230
231 pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
233 let mut g = self.lock();
234 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
235 let removed = g.store.srem(key, &owned).map_err(store_err)?;
236 if removed > 0 {
237 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
238 parts.push(b"SREM");
239 parts.push(key);
240 for m in members {
241 parts.push(m);
242 }
243 commit_write(&mut g, &parts)?;
244 }
245 Ok(removed)
246 }
247
248 pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
250 self.lock().store.smembers(key).map_err(store_err)
251 }
252
253 pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
255 self.lock().store.scard(key).map_err(store_err)
256 }
257
258 pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
262 let mut g = self.lock();
263 let owned: Vec<(f64, Vec<u8>)> =
264 pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
265 let added = g.store.zadd(key, &owned).map_err(store_err)?;
266 let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
267 for (s, _) in pairs {
268 score_strs.push(format!("{s}").into_bytes());
269 }
270 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
271 parts.push(b"ZADD");
272 parts.push(key);
273 for (i, (_, m)) in pairs.iter().enumerate() {
274 parts.push(&score_strs[i]);
275 parts.push(m);
276 }
277 commit_write(&mut g, &parts)?;
278 Ok(added)
279 }
280
281 pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
283 let mut g = self.lock();
284 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
285 let removed = g.store.zrem(key, &owned).map_err(store_err)?;
286 if removed > 0 {
287 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
288 parts.push(b"ZREM");
289 parts.push(key);
290 for m in members {
291 parts.push(m);
292 }
293 commit_write(&mut g, &parts)?;
294 }
295 Ok(removed)
296 }
297
298 pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
300 self.lock().store.zscore(key, member).map_err(store_err)
301 }
302
303 pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
305 self.lock().store.zcard(key).map_err(store_err)
306 }
307
308 pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
314 let plans = {
317 let g = self.lock();
318 g.bus.collect_delivery(channel, payload)
319 };
320 let mut count = 0;
321 for (frame, sender) in plans {
322 if sender.send(frame).is_ok() {
323 count += 1;
324 }
325 }
326 count
327 }
328
329 pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
334 let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
335 if !channels.is_empty() {
336 sub.subscribe(channels);
337 }
338 sub
339 }
340
341 pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
343 let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
344 if !patterns.is_empty() {
345 sub.psubscribe(patterns);
346 }
347 sub
348 }
349}
350
351fn push_helper<F>(
356 s: &Store,
357 key: &[u8],
358 values: &[&[u8]],
359 verb: &'static [u8],
360 op: F,
361) -> io::Result<usize>
362where
363 F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
364{
365 let mut g = s.lock();
366 let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
367 let n = op(&mut g.store, key, &owned).map_err(store_err)?;
368 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
369 parts.push(verb);
370 parts.push(key);
371 for v in values {
372 parts.push(v);
373 }
374 commit_write(&mut g, &parts)?;
375 Ok(n)
376}
377
378fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
379 let mut g = s.lock();
380 let popped = if from_tail {
381 g.store.rpop(key, count).map_err(store_err)?
382 } else {
383 g.store.lpop(key, count).map_err(store_err)?
384 };
385 if !popped.is_empty() {
386 let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
387 let count_str = popped.len().to_string();
388 let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
389 commit_write(&mut g, &parts)?;
390 }
391 Ok(popped)
392}