1use std::collections::HashMap;
26use std::sync::Arc;
27
28use futures_util::{Stream, StreamExt};
29use parking_lot::RwLock;
30use tokio::sync::broadcast;
31use tokio_stream::wrappers::BroadcastStream;
32
33const DEFAULT_MAX_VALUE_SIZE: usize = 1024 * 1024;
35
36const DEFAULT_MAX_KEYS: usize = 10000;
38
39const WATCH_CHANNEL_CAPACITY: usize = 1024;
41
42#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum KvError {
45 NotFound,
47 ValueTooLarge,
49 QuotaExceeded,
51 InvalidKey,
53 Storage(String),
55}
56
57impl std::fmt::Display for KvError {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 match self {
60 KvError::NotFound => write!(f, "key not found"),
61 KvError::ValueTooLarge => write!(f, "value too large"),
62 KvError::QuotaExceeded => write!(f, "storage quota exceeded"),
63 KvError::InvalidKey => write!(f, "invalid key format"),
64 KvError::Storage(msg) => write!(f, "storage error: {msg}"),
65 }
66 }
67}
68
69impl std::error::Error for KvError {}
70
71#[derive(Debug, Clone)]
73pub struct KvEntry {
74 pub value: Vec<u8>,
76 pub expires_at: Option<std::time::Instant>,
78}
79
80impl KvEntry {
81 #[must_use]
83 pub fn new(value: Vec<u8>) -> Self {
84 Self {
85 value,
86 expires_at: None,
87 }
88 }
89
90 #[must_use]
92 pub fn with_ttl(value: Vec<u8>, ttl_ns: u64) -> Self {
93 let expires_at = Some(std::time::Instant::now() + std::time::Duration::from_nanos(ttl_ns));
94 Self { value, expires_at }
95 }
96
97 #[must_use]
99 pub fn is_expired(&self) -> bool {
100 self.expires_at
101 .is_some_and(|exp| std::time::Instant::now() >= exp)
102 }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum KvEventKind {
108 Set,
110 Delete,
112}
113
114#[derive(Debug, Clone)]
116pub struct KvEvent {
117 pub key: String,
119 pub kind: KvEventKind,
121 pub value: Option<Vec<u8>>,
123}
124
125#[async_trait::async_trait]
137pub trait KvBackend: Send + Sync + std::fmt::Debug {
138 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, KvError>;
144
145 async fn set(&self, key: &str, value: &[u8]) -> Result<(), KvError>;
152
153 async fn set_with_ttl(&self, key: &str, value: &[u8], ttl_ns: u64) -> Result<(), KvError>;
160
161 async fn delete(&self, key: &str) -> Result<bool, KvError>;
167
168 async fn exists(&self, key: &str) -> Result<bool, KvError>;
174
175 async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, KvError>;
181
182 async fn increment(&self, key: &str, delta: i64) -> Result<i64, KvError>;
189
190 async fn compare_and_swap(
198 &self,
199 key: &str,
200 expected: Option<&[u8]>,
201 new: &[u8],
202 ) -> Result<bool, KvError>;
203}
204
205#[derive(Clone)]
209pub struct KvStore {
210 inner: Arc<RwLock<HashMap<String, KvEntry>>>,
211 max_value_size: usize,
212 max_keys: usize,
213 events: broadcast::Sender<KvEvent>,
214 backend: Option<Arc<dyn KvBackend>>,
217}
218
219impl std::fmt::Debug for KvStore {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("KvStore")
222 .field("len", &self.inner.read().len())
223 .field("max_value_size", &self.max_value_size)
224 .field("max_keys", &self.max_keys)
225 .field("clustered", &self.backend.is_some())
226 .finish_non_exhaustive()
227 }
228}
229
230impl Default for KvStore {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl KvStore {
237 #[must_use]
239 pub fn new() -> Self {
240 let (events, _rx) = broadcast::channel(WATCH_CHANNEL_CAPACITY);
241 Self {
242 inner: Arc::new(RwLock::new(HashMap::new())),
243 max_value_size: DEFAULT_MAX_VALUE_SIZE,
244 max_keys: DEFAULT_MAX_KEYS,
245 events,
246 backend: None,
247 }
248 }
249
250 #[must_use]
257 pub fn with_backend(mut self, backend: Arc<dyn KvBackend>) -> Self {
258 self.backend = Some(backend);
259 self
260 }
261
262 #[must_use]
264 pub fn is_clustered(&self) -> bool {
265 self.backend.is_some()
266 }
267
268 #[must_use]
270 pub fn with_max_value_size(mut self, size: usize) -> Self {
271 self.max_value_size = size;
272 self
273 }
274
275 #[must_use]
277 pub fn with_max_keys(mut self, count: usize) -> Self {
278 self.max_keys = count;
279 self
280 }
281
282 pub fn set_max_value_size(&mut self, size: usize) {
284 self.max_value_size = size;
285 }
286
287 pub fn set_max_keys(&mut self, count: usize) {
289 self.max_keys = count;
290 }
291
292 #[must_use]
294 pub fn max_value_size(&self) -> usize {
295 self.max_value_size
296 }
297
298 #[must_use]
300 pub fn max_keys(&self) -> usize {
301 self.max_keys
302 }
303
304 pub fn validate_key(key: &str) -> Result<(), KvError> {
313 if key.is_empty() {
314 return Err(KvError::InvalidKey);
315 }
316 if key.len() > 1024 {
317 return Err(KvError::InvalidKey);
318 }
319 if !key
321 .chars()
322 .all(|c| c.is_alphanumeric() || "-_./:".contains(c))
323 {
324 return Err(KvError::InvalidKey);
325 }
326 Ok(())
327 }
328
329 pub fn clean_expired(&self) {
331 let mut kv = self.inner.write();
332 kv.retain(|_, entry| !entry.is_expired());
333 }
334
335 fn emit(&self, key: &str, kind: KvEventKind, value: Option<Vec<u8>>) {
337 let _ = self.events.send(KvEvent {
338 key: key.to_string(),
339 kind,
340 value,
341 });
342 }
343
344 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>, KvError> {
352 Self::validate_key(key)?;
353 self.clean_expired();
354
355 let kv = self.inner.read();
356 match kv.get(key) {
357 Some(entry) if !entry.is_expired() => Ok(Some(entry.value.clone())),
358 _ => Ok(None),
359 }
360 }
361
362 pub fn get_string(&self, key: &str) -> Result<Option<String>, KvError> {
369 match self.get(key)? {
370 Some(bytes) => String::from_utf8(bytes)
371 .map(Some)
372 .map_err(|e| KvError::Storage(format!("invalid UTF-8: {e}"))),
373 None => Ok(None),
374 }
375 }
376
377 pub fn set(&self, key: &str, value: &[u8]) -> Result<(), KvError> {
386 Self::validate_key(key)?;
387
388 if value.len() > self.max_value_size {
389 return Err(KvError::ValueTooLarge);
390 }
391
392 {
393 let mut kv = self.inner.write();
394
395 if !kv.contains_key(key) && kv.len() >= self.max_keys {
397 return Err(KvError::QuotaExceeded);
398 }
399
400 kv.insert(key.to_string(), KvEntry::new(value.to_vec()));
401 }
402
403 self.emit(key, KvEventKind::Set, Some(value.to_vec()));
404 Ok(())
405 }
406
407 pub fn set_string(&self, key: &str, value: &str) -> Result<(), KvError> {
413 self.set(key, value.as_bytes())
414 }
415
416 pub fn set_with_ttl(&self, key: &str, value: &[u8], ttl_ns: u64) -> Result<(), KvError> {
422 Self::validate_key(key)?;
423
424 if value.len() > self.max_value_size {
425 return Err(KvError::ValueTooLarge);
426 }
427
428 {
429 let mut kv = self.inner.write();
430
431 if !kv.contains_key(key) && kv.len() >= self.max_keys {
433 return Err(KvError::QuotaExceeded);
434 }
435
436 kv.insert(key.to_string(), KvEntry::with_ttl(value.to_vec(), ttl_ns));
437 }
438
439 self.emit(key, KvEventKind::Set, Some(value.to_vec()));
440 Ok(())
441 }
442
443 pub fn delete(&self, key: &str) -> Result<bool, KvError> {
451 Self::validate_key(key)?;
452
453 let removed = {
454 let mut kv = self.inner.write();
455 kv.remove(key).is_some()
456 };
457
458 if removed {
459 self.emit(key, KvEventKind::Delete, None);
460 }
461 Ok(removed)
462 }
463
464 #[must_use]
466 pub fn exists(&self, key: &str) -> bool {
467 self.clean_expired();
468 let kv = self.inner.read();
469 kv.get(key).is_some_and(|e| !e.is_expired())
470 }
471
472 pub fn list_keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
479 self.clean_expired();
480 let kv = self.inner.read();
481 Ok(kv
482 .iter()
483 .filter(|(k, entry)| k.starts_with(prefix) && !entry.is_expired())
484 .map(|(k, _)| k.clone())
485 .collect())
486 }
487
488 pub fn increment(&self, key: &str, delta: i64) -> Result<i64, KvError> {
498 Self::validate_key(key)?;
499
500 let (new_value, bytes) = {
501 let mut kv = self.inner.write();
502
503 let current: i64 = match kv.get(key) {
504 Some(entry) if !entry.is_expired() => {
505 let s = String::from_utf8(entry.value.clone())
506 .map_err(|e| KvError::Storage(format!("invalid number: {e}")))?;
507 s.parse()
508 .map_err(|e| KvError::Storage(format!("invalid number: {e}")))?
509 }
510 _ => 0,
511 };
512
513 let new_value = current.saturating_add(delta);
514 let value_str = new_value.to_string();
515
516 if !kv.contains_key(key) && kv.len() >= self.max_keys {
518 return Err(KvError::QuotaExceeded);
519 }
520
521 let bytes = value_str.into_bytes();
522 kv.insert(key.to_string(), KvEntry::new(bytes.clone()));
523 (new_value, bytes)
524 };
525
526 self.emit(key, KvEventKind::Set, Some(bytes));
527 Ok(new_value)
528 }
529
530 pub fn compare_and_swap(
543 &self,
544 key: &str,
545 expected: Option<&[u8]>,
546 new_value: &[u8],
547 ) -> Result<bool, KvError> {
548 Self::validate_key(key)?;
549
550 if new_value.len() > self.max_value_size {
551 return Err(KvError::ValueTooLarge);
552 }
553
554 let swapped = {
555 let mut kv = self.inner.write();
556
557 let current = kv.get(key).and_then(|e| {
558 if e.is_expired() {
559 None
560 } else {
561 Some(e.value.as_slice())
562 }
563 });
564
565 if current == expected {
566 if current.is_none() && kv.len() >= self.max_keys {
568 return Err(KvError::QuotaExceeded);
569 }
570 kv.insert(key.to_string(), KvEntry::new(new_value.to_vec()));
571 true
572 } else {
573 false
574 }
575 };
576
577 if swapped {
578 self.emit(key, KvEventKind::Set, Some(new_value.to_vec()));
579 }
580 Ok(swapped)
581 }
582
583 pub async fn get_async(&self, key: &str) -> Result<Option<Vec<u8>>, KvError> {
591 match &self.backend {
592 Some(b) => b.get(key).await,
593 None => self.get(key),
594 }
595 }
596
597 pub async fn set_async(&self, key: &str, value: &[u8]) -> Result<(), KvError> {
604 match &self.backend {
605 Some(b) => b.set(key, value).await,
606 None => self.set(key, value),
607 }
608 }
609
610 pub async fn set_with_ttl_async(
618 &self,
619 key: &str,
620 value: &[u8],
621 ttl_ns: u64,
622 ) -> Result<(), KvError> {
623 match &self.backend {
624 Some(b) => b.set_with_ttl(key, value, ttl_ns).await,
625 None => self.set_with_ttl(key, value, ttl_ns),
626 }
627 }
628
629 pub async fn delete_async(&self, key: &str) -> Result<bool, KvError> {
637 match &self.backend {
638 Some(b) => b.delete(key).await,
639 None => self.delete(key),
640 }
641 }
642
643 pub async fn exists_async(&self, key: &str) -> Result<bool, KvError> {
650 match &self.backend {
651 Some(b) => b.exists(key).await,
652 None => Ok(self.exists(key)),
653 }
654 }
655
656 pub async fn list_keys_async(&self, prefix: &str) -> Result<Vec<String>, KvError> {
663 match &self.backend {
664 Some(b) => b.list_keys(prefix).await,
665 None => self.list_keys(prefix),
666 }
667 }
668
669 pub async fn increment_async(&self, key: &str, delta: i64) -> Result<i64, KvError> {
677 match &self.backend {
678 Some(b) => b.increment(key, delta).await,
679 None => self.increment(key, delta),
680 }
681 }
682
683 pub async fn compare_and_swap_async(
691 &self,
692 key: &str,
693 expected: Option<&[u8]>,
694 new: &[u8],
695 ) -> Result<bool, KvError> {
696 match &self.backend {
697 Some(b) => b.compare_and_swap(key, expected, new).await,
698 None => self.compare_and_swap(key, expected, new),
699 }
700 }
701
702 pub fn clear(&self) {
704 self.inner.write().clear();
705 }
706
707 #[must_use]
712 pub fn subscribe(&self) -> broadcast::Receiver<KvEvent> {
713 self.events.subscribe()
714 }
715
716 pub fn watch_prefix(&self, prefix: impl Into<String>) -> impl Stream<Item = KvEvent> {
721 let prefix = prefix.into();
722 BroadcastStream::new(self.events.subscribe()).filter_map(move |res| {
723 let prefix = prefix.clone();
724 async move {
725 match res {
726 Ok(event) if event.key.starts_with(&prefix) => Some(event),
727 _ => None,
728 }
729 }
730 })
731 }
732}
733
734static GLOBAL_KV: std::sync::OnceLock<KvStore> = std::sync::OnceLock::new();
740
741pub fn set_global_kv(store: KvStore) {
749 if GLOBAL_KV.set(store).is_err() {
750 tracing::warn!("global KvStore already set; ignoring duplicate set_global_kv call");
751 }
752}
753
754#[must_use]
766pub fn global_kv() -> Option<KvStore> {
767 GLOBAL_KV.get().cloned()
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use futures_util::StreamExt;
774
775 #[test]
776 fn set_and_get() {
777 let store = KvStore::new();
778 store.set("foo", b"bar").unwrap();
779 assert_eq!(store.get("foo").unwrap(), Some(b"bar".to_vec()));
780 assert_eq!(store.get_string("foo").unwrap(), Some("bar".to_string()));
781 }
782
783 #[test]
784 fn get_missing_returns_none() {
785 let store = KvStore::new();
786 assert_eq!(store.get("missing").unwrap(), None);
787 }
788
789 #[test]
790 fn ttl_expiry() {
791 let store = KvStore::new();
792 store.set_with_ttl("temp", b"v", 1_000_000).unwrap();
794 std::thread::sleep(std::time::Duration::from_millis(5));
795 assert_eq!(store.get("temp").unwrap(), None);
796 assert!(!store.exists("temp"));
797 }
798
799 #[test]
800 fn delete_reports_existence() {
801 let store = KvStore::new();
802 store.set("k", b"v").unwrap();
803 assert!(store.delete("k").unwrap());
804 assert!(!store.delete("k").unwrap());
805 assert_eq!(store.get("k").unwrap(), None);
806 }
807
808 #[test]
809 fn list_keys_prefix() {
810 let store = KvStore::new();
811 store.set("a/1", b"1").unwrap();
812 store.set("a/2", b"2").unwrap();
813 store.set("b/1", b"3").unwrap();
814 let mut keys = store.list_keys("a/").unwrap();
815 keys.sort();
816 assert_eq!(keys, vec!["a/1".to_string(), "a/2".to_string()]);
817 }
818
819 #[test]
820 fn increment() {
821 let store = KvStore::new();
822 assert_eq!(store.increment("counter", 5).unwrap(), 5);
823 assert_eq!(store.increment("counter", 3).unwrap(), 8);
824 assert_eq!(store.increment("counter", -10).unwrap(), -2);
825 }
826
827 #[test]
828 fn increment_saturates() {
829 let store = KvStore::new();
830 store.set("c", i64::MAX.to_string().as_bytes()).unwrap();
831 assert_eq!(store.increment("c", 1).unwrap(), i64::MAX);
832 }
833
834 #[test]
835 fn compare_and_swap_hit_and_miss() {
836 let store = KvStore::new();
837 assert!(store.compare_and_swap("k", None, b"v1").unwrap());
839 assert!(store.compare_and_swap("k", Some(b"v1"), b"v2").unwrap());
841 assert!(!store.compare_and_swap("k", Some(b"v1"), b"v3").unwrap());
843 assert_eq!(store.get("k").unwrap(), Some(b"v2".to_vec()));
844 }
845
846 #[test]
847 fn quota_exceeded() {
848 let store = KvStore::new().with_max_keys(2);
849 store.set("a", b"1").unwrap();
850 store.set("b", b"2").unwrap();
851 assert_eq!(store.set("c", b"3"), Err(KvError::QuotaExceeded));
852 assert!(store.set("a", b"x").is_ok());
854 }
855
856 #[test]
857 fn value_too_large() {
858 let store = KvStore::new().with_max_value_size(4);
859 assert_eq!(store.set("k", b"toolong"), Err(KvError::ValueTooLarge));
860 }
861
862 #[test]
863 fn invalid_key() {
864 let store = KvStore::new();
865 assert_eq!(store.set("", b"v"), Err(KvError::InvalidKey));
866 assert_eq!(store.set("bad key", b"v"), Err(KvError::InvalidKey));
867 }
868
869 #[test]
870 fn clone_shares_state() {
871 let a = KvStore::new();
872 let b = a.clone();
873 a.set("k", b"v").unwrap();
874 assert_eq!(b.get("k").unwrap(), Some(b"v".to_vec()));
875 }
876
877 #[tokio::test]
878 async fn watch_receives_set_event() {
879 let store = KvStore::new();
880 let mut rx = store.subscribe();
881 store.set("watched", b"hello").unwrap();
882 let event = rx.recv().await.unwrap();
883 assert_eq!(event.key, "watched");
884 assert_eq!(event.kind, KvEventKind::Set);
885 assert_eq!(event.value, Some(b"hello".to_vec()));
886 }
887
888 #[tokio::test]
889 async fn watch_prefix_filters() {
890 let store = KvStore::new();
891 let mut stream = Box::pin(store.watch_prefix("user/"));
892 store.set("other/1", b"x").unwrap();
893 store.set("user/1", b"y").unwrap();
894 let event = stream.next().await.unwrap();
895 assert_eq!(event.key, "user/1");
896 assert_eq!(event.value, Some(b"y".to_vec()));
897 }
898
899 #[tokio::test]
900 async fn watch_receives_delete_event() {
901 let store = KvStore::new();
902 store.set("k", b"v").unwrap();
903 let mut rx = store.subscribe();
904 store.delete("k").unwrap();
905 let event = rx.recv().await.unwrap();
906 assert_eq!(event.kind, KvEventKind::Delete);
907 assert_eq!(event.key, "k");
908 assert_eq!(event.value, None);
909 }
910
911 #[derive(Debug, Default)]
915 struct MockBackend {
916 map: std::sync::Mutex<HashMap<String, Vec<u8>>>,
917 calls: std::sync::Mutex<Vec<String>>,
918 }
919
920 impl MockBackend {
921 fn record(&self, op: &str) {
922 self.calls.lock().unwrap().push(op.to_string());
923 }
924
925 fn calls(&self) -> Vec<String> {
926 self.calls.lock().unwrap().clone()
927 }
928 }
929
930 #[async_trait::async_trait]
931 impl KvBackend for MockBackend {
932 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, KvError> {
933 self.record("get");
934 Ok(self.map.lock().unwrap().get(key).cloned())
935 }
936
937 async fn set(&self, key: &str, value: &[u8]) -> Result<(), KvError> {
938 self.record("set");
939 self.map
940 .lock()
941 .unwrap()
942 .insert(key.to_string(), value.to_vec());
943 Ok(())
944 }
945
946 async fn set_with_ttl(&self, key: &str, value: &[u8], _ttl_ns: u64) -> Result<(), KvError> {
947 self.record("set_with_ttl");
948 self.map
949 .lock()
950 .unwrap()
951 .insert(key.to_string(), value.to_vec());
952 Ok(())
953 }
954
955 async fn delete(&self, key: &str) -> Result<bool, KvError> {
956 self.record("delete");
957 Ok(self.map.lock().unwrap().remove(key).is_some())
958 }
959
960 async fn exists(&self, key: &str) -> Result<bool, KvError> {
961 self.record("exists");
962 Ok(self.map.lock().unwrap().contains_key(key))
963 }
964
965 async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
966 self.record("list_keys");
967 Ok(self
968 .map
969 .lock()
970 .unwrap()
971 .keys()
972 .filter(|k| k.starts_with(prefix))
973 .cloned()
974 .collect())
975 }
976
977 async fn increment(&self, key: &str, delta: i64) -> Result<i64, KvError> {
978 self.record("increment");
979 let mut map = self.map.lock().unwrap();
980 let current: i64 = map
981 .get(key)
982 .map_or(0, |v| String::from_utf8_lossy(v).parse().unwrap_or(0));
983 let new = current + delta;
984 map.insert(key.to_string(), new.to_string().into_bytes());
985 Ok(new)
986 }
987
988 async fn compare_and_swap(
989 &self,
990 key: &str,
991 expected: Option<&[u8]>,
992 new: &[u8],
993 ) -> Result<bool, KvError> {
994 self.record("compare_and_swap");
995 let mut map = self.map.lock().unwrap();
996 let current = map.get(key).map(Vec::as_slice);
997 if current == expected {
998 map.insert(key.to_string(), new.to_vec());
999 Ok(true)
1000 } else {
1001 Ok(false)
1002 }
1003 }
1004 }
1005
1006 #[test]
1007 fn is_clustered_reflects_backend() {
1008 let local = KvStore::new();
1009 assert!(!local.is_clustered());
1010 let clustered = KvStore::new().with_backend(Arc::new(MockBackend::default()));
1011 assert!(clustered.is_clustered());
1012 }
1013
1014 #[tokio::test]
1015 async fn async_routes_to_backend_when_clustered() {
1016 let backend = Arc::new(MockBackend::default());
1017 let store = KvStore::new().with_backend(backend.clone());
1018
1019 store.set_async("foo", b"bar").await.unwrap();
1021 assert_eq!(store.get_async("foo").await.unwrap(), Some(b"bar".to_vec()));
1022 assert!(store.exists_async("foo").await.unwrap());
1023
1024 store.set_with_ttl_async("ttlk", b"v", 1_000).await.unwrap();
1025 assert_eq!(store.increment_async("counter", 5).await.unwrap(), 5);
1026 assert_eq!(store.increment_async("counter", 3).await.unwrap(), 8);
1027 assert!(store
1028 .compare_and_swap_async("cas", None, b"v1")
1029 .await
1030 .unwrap());
1031
1032 let mut keys = store.list_keys_async("").await.unwrap();
1033 keys.sort();
1034 assert_eq!(
1035 keys,
1036 vec![
1037 "cas".to_string(),
1038 "counter".to_string(),
1039 "foo".to_string(),
1040 "ttlk".to_string(),
1041 ]
1042 );
1043
1044 assert!(store.delete_async("foo").await.unwrap());
1045 assert!(!store.exists_async("foo").await.unwrap());
1046
1047 assert_eq!(store.get("foo").unwrap(), None);
1049 assert!(!store.exists("counter"));
1050 assert_eq!(store.list_keys("").unwrap(), Vec::<String>::new());
1051
1052 let calls = backend.calls();
1054 for op in [
1055 "set",
1056 "get",
1057 "exists",
1058 "set_with_ttl",
1059 "increment",
1060 "compare_and_swap",
1061 "list_keys",
1062 "delete",
1063 ] {
1064 assert!(
1065 calls.contains(&op.to_string()),
1066 "missing backend call: {op}"
1067 );
1068 }
1069 }
1070
1071 #[tokio::test]
1072 async fn async_uses_local_when_not_clustered() {
1073 let store = KvStore::new();
1074
1075 store.set_async("foo", b"bar").await.unwrap();
1076 assert_eq!(store.get("foo").unwrap(), Some(b"bar".to_vec()));
1078 assert_eq!(
1080 store.get_async("foo").await.unwrap(),
1081 store.get("foo").unwrap()
1082 );
1083
1084 assert_eq!(
1085 store.exists_async("foo").await.unwrap(),
1086 store.exists("foo")
1087 );
1088
1089 store
1090 .set_with_ttl_async("ttlk", b"v", 1_000_000_000)
1091 .await
1092 .unwrap();
1093 assert!(store.exists("ttlk"));
1094
1095 assert_eq!(store.increment_async("c", 4).await.unwrap(), 4);
1096 assert_eq!(store.increment("c", 0).unwrap(), 4);
1097
1098 assert!(store
1099 .compare_and_swap_async("cas", None, b"v1")
1100 .await
1101 .unwrap());
1102 assert_eq!(store.get("cas").unwrap(), Some(b"v1".to_vec()));
1103
1104 let mut a = store.list_keys_async("").await.unwrap();
1105 let mut b = store.list_keys("").unwrap();
1106 a.sort();
1107 b.sort();
1108 assert_eq!(a, b);
1109
1110 assert!(store.delete_async("foo").await.unwrap());
1111 assert_eq!(store.get("foo").unwrap(), None);
1112 }
1113
1114 #[test]
1115 fn global_kv_accessor_shares_state() {
1116 set_global_kv(KvStore::new());
1121
1122 let a = global_kv().expect("global KvStore should be set after set_global_kv");
1123 let b = global_kv().expect("global KvStore should still be set");
1124
1125 a.set("global-kv-share-test", b"shared").unwrap();
1128 assert_eq!(
1129 b.get("global-kv-share-test").unwrap(),
1130 Some(b"shared".to_vec()),
1131 "writes through one global_kv() clone must be visible through another"
1132 );
1133 }
1134}