Skip to main content

crabtalk_proxy/storage/
mod.rs

1use crabtalk_core::{BoxFuture, Error, KvPairs, Prefix, Storage};
2use dashmap::DashMap;
3use std::sync::atomic::{AtomicI64, Ordering};
4
5#[cfg(feature = "storage-redis")]
6mod redis;
7#[cfg(feature = "storage-sqlite")]
8mod sqlite;
9
10#[cfg(feature = "storage-redis")]
11pub use self::redis::RedisStorage;
12#[cfg(feature = "storage-sqlite")]
13pub use self::sqlite::SqliteStorage;
14
15/// In-memory storage backend using `DashMap`.
16pub struct MemoryStorage {
17    data: DashMap<Vec<u8>, Vec<u8>>,
18    counters: DashMap<Vec<u8>, AtomicI64>,
19}
20
21impl MemoryStorage {
22    pub fn new() -> Self {
23        Self {
24            data: DashMap::new(),
25            counters: DashMap::new(),
26        }
27    }
28}
29
30impl Default for MemoryStorage {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl Storage for MemoryStorage {
37    fn get(&self, key: &[u8]) -> BoxFuture<'_, Result<Option<Vec<u8>>, Error>> {
38        let result = self.data.get(key).map(|v| v.value().clone());
39        Box::pin(async move { Ok(result) })
40    }
41
42    fn set(&self, key: &[u8], value: Vec<u8>) -> BoxFuture<'_, Result<(), Error>> {
43        let key = key.to_vec();
44        Box::pin(async move {
45            self.data.insert(key, value);
46            Ok(())
47        })
48    }
49
50    fn increment(&self, key: &[u8], delta: i64) -> BoxFuture<'_, Result<i64, Error>> {
51        let key = key.to_vec();
52        Box::pin(async move {
53            let entry = self
54                .counters
55                .entry(key)
56                .or_insert_with(|| AtomicI64::new(0));
57            let new_val = entry.value().fetch_add(delta, Ordering::Relaxed) + delta;
58            Ok(new_val)
59        })
60    }
61
62    fn list(&self, prefix: &Prefix) -> BoxFuture<'_, Result<KvPairs, Error>> {
63        let prefix = *prefix;
64        Box::pin(async move {
65            let mut result = Vec::new();
66            for entry in self.data.iter() {
67                if entry.key().starts_with(&prefix) {
68                    result.push((entry.key().clone(), entry.value().clone()));
69                }
70            }
71            // Also include counter keys (used by rate_limit, usage, budget).
72            for entry in self.counters.iter() {
73                if entry.key().starts_with(&prefix) {
74                    let val = entry.value().load(Ordering::Relaxed);
75                    result.push((entry.key().clone(), val.to_le_bytes().to_vec()));
76                }
77            }
78            Ok(result)
79        })
80    }
81
82    fn delete(&self, key: &[u8]) -> BoxFuture<'_, Result<(), Error>> {
83        let key = key.to_vec();
84        Box::pin(async move {
85            self.data.remove(&key);
86            self.counters.remove(&key);
87            Ok(())
88        })
89    }
90}