1mod memory;
28
29#[cfg(feature = "compression")]
30mod compressed;
31
32pub use memory::MemoryKvStore;
33
34#[cfg(feature = "compression")]
35pub use compressed::{CompressedKvStore, Compression};
36
37pub use trueno::{hash_bytes, hash_key, hash_keys_batch};
39
40use crate::Result;
41use std::future::Future;
42
43pub trait KvStore: Send + Sync {
48 fn get(&self, key: &str) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send;
52
53 fn set(&self, key: &str, value: Vec<u8>) -> impl Future<Output = Result<()>> + Send;
57
58 fn delete(&self, key: &str) -> impl Future<Output = Result<()>> + Send;
62
63 fn exists(&self, key: &str) -> impl Future<Output = Result<bool>> + Send;
65
66 fn batch_get(
70 &self,
71 keys: &[&str],
72 ) -> impl Future<Output = Result<Vec<Option<Vec<u8>>>>> + Send {
73 async move {
74 let mut results = Vec::with_capacity(keys.len());
75 for key in keys {
76 results.push(self.get(key).await?);
77 }
78 Ok(results)
79 }
80 }
81
82 fn batch_set(&self, pairs: Vec<(&str, Vec<u8>)>) -> impl Future<Output = Result<()>> + Send {
84 async move {
85 for (key, value) in pairs {
86 self.set(key, value).await?;
87 }
88 Ok(())
89 }
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96
97 #[tokio::test]
102 async fn test_memory_kv_set_get() {
103 let store = MemoryKvStore::new();
104
105 store.set("key1", b"value1".to_vec()).await.unwrap();
106 let value = store.get("key1").await.unwrap();
107
108 assert_eq!(value, Some(b"value1".to_vec()));
109 }
110
111 #[tokio::test]
112 async fn test_memory_kv_get_nonexistent() {
113 let store = MemoryKvStore::new();
114
115 let value = store.get("nonexistent").await.unwrap();
116
117 assert_eq!(value, None);
118 }
119
120 #[tokio::test]
121 async fn test_memory_kv_overwrite() {
122 let store = MemoryKvStore::new();
123
124 store.set("key", b"value1".to_vec()).await.unwrap();
125 store.set("key", b"value2".to_vec()).await.unwrap();
126 let value = store.get("key").await.unwrap();
127
128 assert_eq!(value, Some(b"value2".to_vec()));
129 }
130
131 #[tokio::test]
132 async fn test_memory_kv_delete() {
133 let store = MemoryKvStore::new();
134
135 store.set("key", b"value".to_vec()).await.unwrap();
136 store.delete("key").await.unwrap();
137 let value = store.get("key").await.unwrap();
138
139 assert_eq!(value, None);
140 }
141
142 #[tokio::test]
143 async fn test_memory_kv_delete_nonexistent() {
144 let store = MemoryKvStore::new();
145
146 store.delete("nonexistent").await.unwrap();
148 }
149
150 #[tokio::test]
151 async fn test_memory_kv_exists() {
152 let store = MemoryKvStore::new();
153
154 assert!(!store.exists("key").await.unwrap());
155
156 store.set("key", b"value".to_vec()).await.unwrap();
157 assert!(store.exists("key").await.unwrap());
158
159 store.delete("key").await.unwrap();
160 assert!(!store.exists("key").await.unwrap());
161 }
162
163 #[tokio::test]
164 async fn test_memory_kv_batch_get() {
165 let store = MemoryKvStore::new();
166
167 store.set("a", b"1".to_vec()).await.unwrap();
168 store.set("b", b"2".to_vec()).await.unwrap();
169 let results = store.batch_get(&["a", "b", "c"]).await.unwrap();
172
173 assert_eq!(results.len(), 3);
174 assert_eq!(results[0], Some(b"1".to_vec()));
175 assert_eq!(results[1], Some(b"2".to_vec()));
176 assert_eq!(results[2], None);
177 }
178
179 #[tokio::test]
180 async fn test_memory_kv_batch_set() {
181 let store = MemoryKvStore::new();
182
183 store
184 .batch_set(vec![("a", b"1".to_vec()), ("b", b"2".to_vec()), ("c", b"3".to_vec())])
185 .await
186 .unwrap();
187
188 assert_eq!(store.get("a").await.unwrap(), Some(b"1".to_vec()));
189 assert_eq!(store.get("b").await.unwrap(), Some(b"2".to_vec()));
190 assert_eq!(store.get("c").await.unwrap(), Some(b"3".to_vec()));
191 }
192
193 #[tokio::test]
194 async fn test_memory_kv_concurrent_access() {
195 use std::sync::Arc;
196
197 let store = Arc::new(MemoryKvStore::new());
198 let mut handles = vec![];
199
200 for i in 0..100 {
202 let store = Arc::clone(&store);
203 handles.push(tokio::spawn(async move {
204 let key = format!("key{i}");
205 let value = format!("value{i}").into_bytes();
206 store.set(&key, value).await.unwrap();
207 }));
208 }
209
210 for handle in handles {
211 handle.await.unwrap();
212 }
213
214 for i in 0..100 {
216 let key = format!("key{i}");
217 let expected = format!("value{i}").into_bytes();
218 assert_eq!(store.get(&key).await.unwrap(), Some(expected));
219 }
220 }
221
222 #[tokio::test]
223 async fn test_memory_kv_empty_key() {
224 let store = MemoryKvStore::new();
225
226 store.set("", b"empty_key_value".to_vec()).await.unwrap();
227 assert_eq!(store.get("").await.unwrap(), Some(b"empty_key_value".to_vec()));
228 }
229
230 #[tokio::test]
231 async fn test_memory_kv_empty_value() {
232 let store = MemoryKvStore::new();
233
234 store.set("key", vec![]).await.unwrap();
235 assert_eq!(store.get("key").await.unwrap(), Some(vec![]));
236 }
237
238 #[tokio::test]
239 async fn test_memory_kv_large_value() {
240 let store = MemoryKvStore::new();
241
242 let large_value = vec![0u8; 1024 * 1024]; store.set("large", large_value.clone()).await.unwrap();
244
245 assert_eq!(store.get("large").await.unwrap(), Some(large_value));
246 }
247
248 #[tokio::test]
249 async fn test_memory_kv_with_capacity() {
250 let store = MemoryKvStore::with_capacity(100);
251 store.set("key", b"value".to_vec()).await.unwrap();
252 assert_eq!(store.get("key").await.unwrap(), Some(b"value".to_vec()));
253 }
254
255 #[tokio::test]
256 async fn test_memory_kv_len_and_is_empty() {
257 let store = MemoryKvStore::new();
258
259 assert!(store.is_empty());
260 assert_eq!(store.len(), 0);
261
262 store.set("key1", b"value1".to_vec()).await.unwrap();
263 assert!(!store.is_empty());
264 assert_eq!(store.len(), 1);
265
266 store.set("key2", b"value2".to_vec()).await.unwrap();
267 assert_eq!(store.len(), 2);
268 }
269
270 #[tokio::test]
271 async fn test_memory_kv_clear() {
272 let store = MemoryKvStore::new();
273
274 store.set("key1", b"value1".to_vec()).await.unwrap();
275 store.set("key2", b"value2".to_vec()).await.unwrap();
276 assert_eq!(store.len(), 2);
277
278 store.clear();
279 assert!(store.is_empty());
280 assert_eq!(store.len(), 0);
281 assert_eq!(store.get("key1").await.unwrap(), None);
282 }
283
284 #[test]
285 fn test_memory_kv_default() {
286 let store: MemoryKvStore = MemoryKvStore::default();
287 assert!(store.is_empty());
288 }
289
290 #[cfg(feature = "compression")]
295 mod compression_tests {
296 use super::*;
297 use crate::kv::{CompressedKvStore, Compression};
298
299 #[tokio::test]
300 async fn test_compressed_kv_lz4_roundtrip() {
301 let inner = MemoryKvStore::new();
302 let store = CompressedKvStore::new(inner, Compression::Lz4);
303
304 let data = b"hello world compressed with lz4".to_vec();
305 store.set("key", data.clone()).await.unwrap();
306 let retrieved = store.get("key").await.unwrap();
307
308 assert_eq!(retrieved, Some(data));
309 }
310
311 #[tokio::test]
312 async fn test_compressed_kv_zstd_roundtrip() {
313 let inner = MemoryKvStore::new();
314 let store = CompressedKvStore::new(inner, Compression::Zstd);
315
316 let data = b"hello world compressed with zstd".to_vec();
317 store.set("key", data.clone()).await.unwrap();
318 let retrieved = store.get("key").await.unwrap();
319
320 assert_eq!(retrieved, Some(data));
321 }
322
323 #[tokio::test]
324 async fn test_compressed_kv_reduces_size() {
325 let inner = MemoryKvStore::new();
326 let store = CompressedKvStore::new(inner, Compression::Lz4);
327
328 let data = vec![0u8; 10000];
330 store.set("key", data.clone()).await.unwrap();
331
332 let compressed = store.inner().get("key").await.unwrap().unwrap();
334
335 assert!(
337 compressed.len() < data.len() / 5,
338 "Compressed {} -> {} bytes (expected >5x reduction)",
339 data.len(),
340 compressed.len()
341 );
342 }
343
344 #[tokio::test]
345 async fn test_compressed_kv_empty_value() {
346 let inner = MemoryKvStore::new();
347 let store = CompressedKvStore::new(inner, Compression::Lz4);
348
349 store.set("empty", vec![]).await.unwrap();
350 assert_eq!(store.get("empty").await.unwrap(), Some(vec![]));
351 }
352
353 #[tokio::test]
354 async fn test_compressed_kv_delete() {
355 let inner = MemoryKvStore::new();
356 let store = CompressedKvStore::new(inner, Compression::Lz4);
357
358 store.set("key", b"value".to_vec()).await.unwrap();
359 store.delete("key").await.unwrap();
360 assert_eq!(store.get("key").await.unwrap(), None);
361 }
362
363 #[tokio::test]
364 async fn test_compressed_kv_exists() {
365 let inner = MemoryKvStore::new();
366 let store = CompressedKvStore::new(inner, Compression::Lz4);
367
368 assert!(!store.exists("key").await.unwrap());
369 store.set("key", b"value".to_vec()).await.unwrap();
370 assert!(store.exists("key").await.unwrap());
371 }
372
373 #[tokio::test]
374 async fn test_compressed_kv_batch_operations() {
375 let inner = MemoryKvStore::new();
376 let store = CompressedKvStore::new(inner, Compression::Lz4);
377
378 store.batch_set(vec![("a", b"alpha".to_vec()), ("b", b"beta".to_vec())]).await.unwrap();
379
380 let results = store.batch_get(&["a", "b", "c"]).await.unwrap();
381 assert_eq!(results[0], Some(b"alpha".to_vec()));
382 assert_eq!(results[1], Some(b"beta".to_vec()));
383 assert_eq!(results[2], None);
384 }
385
386 #[tokio::test]
387 async fn test_compression_enum_variants() {
388 assert_eq!(Compression::Lz4.as_str(), "lz4");
389 assert_eq!(Compression::Zstd.as_str(), "zstd");
390 }
391
392 #[tokio::test]
393 async fn test_compressed_kv_large_value() {
394 let inner = MemoryKvStore::new();
395 let store = CompressedKvStore::new(inner, Compression::Zstd);
396
397 let data: Vec<u8> = (0..1024 * 1024).map(|i| (i % 256) as u8).collect();
399 store.set("large", data.clone()).await.unwrap();
400
401 let retrieved = store.get("large").await.unwrap();
402 assert_eq!(retrieved, Some(data));
403 }
404 }
405}