Skip to main content

trueno_db/kv/
mod.rs

1//! Key-Value Store Module for PAIML Stack Integration (Phase 6)
2//!
3//! Provides a simple, high-performance key-value store with:
4//! - SIMD-optimized key hashing via `trueno::hash`
5//! - In-memory and persistent backends
6//! - Async-first API compatible with pforge `StateManager`
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use trueno_db::kv::{KvStore, MemoryKvStore};
12//!
13//! # async fn example() -> trueno_db::Result<()> {
14//! let store = MemoryKvStore::new();
15//!
16//! // Basic operations
17//! store.set("key", b"value".to_vec()).await?;
18//! let value = store.get("key").await?;
19//! assert_eq!(value, Some(b"value".to_vec()));
20//!
21//! store.delete("key").await?;
22//! assert!(!store.exists("key").await?);
23//! # Ok(())
24//! # }
25//! ```
26
27mod memory;
28
29#[cfg(feature = "compression")]
30mod compressed;
31
32pub use memory::MemoryKvStore;
33
34#[cfg(feature = "compression")]
35pub use compressed::{CompressedKvStore, Compression};
36
37// Re-export trueno hash functions for KV consumers
38pub use trueno::{hash_bytes, hash_key, hash_keys_batch};
39
40use crate::Result;
41use std::future::Future;
42
43/// Key-value store trait for pforge state management integration.
44///
45/// This trait is designed to match pforge's `StateManager` interface
46/// for seamless integration while leveraging trueno's SIMD capabilities.
47pub trait KvStore: Send + Sync {
48    /// Get a value by key.
49    ///
50    /// Returns `None` if the key doesn't exist.
51    fn get(&self, key: &str) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send;
52
53    /// Set a value for a key.
54    ///
55    /// Overwrites any existing value.
56    fn set(&self, key: &str, value: Vec<u8>) -> impl Future<Output = Result<()>> + Send;
57
58    /// Delete a key.
59    ///
60    /// No-op if the key doesn't exist.
61    fn delete(&self, key: &str) -> impl Future<Output = Result<()>> + Send;
62
63    /// Check if a key exists.
64    fn exists(&self, key: &str) -> impl Future<Output = Result<bool>> + Send;
65
66    /// Get multiple keys in a batch (SIMD-optimized).
67    ///
68    /// Returns values in the same order as keys. Missing keys return `None`.
69    fn batch_get(
70        &self,
71        keys: &[&str],
72    ) -> impl Future<Output = Result<Vec<Option<Vec<u8>>>>> + Send {
73        async move {
74            let mut results = Vec::with_capacity(keys.len());
75            for key in keys {
76                results.push(self.get(key).await?);
77            }
78            Ok(results)
79        }
80    }
81
82    /// Set multiple key-value pairs in a batch (SIMD-optimized).
83    fn batch_set(&self, pairs: Vec<(&str, Vec<u8>)>) -> impl Future<Output = Result<()>> + Send {
84        async move {
85            for (key, value) in pairs {
86                self.set(key, value).await?;
87            }
88            Ok(())
89        }
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96
97    // ============================================================
98    // RED PHASE: These tests define the expected behavior
99    // ============================================================
100
101    #[tokio::test]
102    async fn test_memory_kv_set_get() {
103        let store = MemoryKvStore::new();
104
105        store.set("key1", b"value1".to_vec()).await.unwrap();
106        let value = store.get("key1").await.unwrap();
107
108        assert_eq!(value, Some(b"value1".to_vec()));
109    }
110
111    #[tokio::test]
112    async fn test_memory_kv_get_nonexistent() {
113        let store = MemoryKvStore::new();
114
115        let value = store.get("nonexistent").await.unwrap();
116
117        assert_eq!(value, None);
118    }
119
120    #[tokio::test]
121    async fn test_memory_kv_overwrite() {
122        let store = MemoryKvStore::new();
123
124        store.set("key", b"value1".to_vec()).await.unwrap();
125        store.set("key", b"value2".to_vec()).await.unwrap();
126        let value = store.get("key").await.unwrap();
127
128        assert_eq!(value, Some(b"value2".to_vec()));
129    }
130
131    #[tokio::test]
132    async fn test_memory_kv_delete() {
133        let store = MemoryKvStore::new();
134
135        store.set("key", b"value".to_vec()).await.unwrap();
136        store.delete("key").await.unwrap();
137        let value = store.get("key").await.unwrap();
138
139        assert_eq!(value, None);
140    }
141
142    #[tokio::test]
143    async fn test_memory_kv_delete_nonexistent() {
144        let store = MemoryKvStore::new();
145
146        // Should not error
147        store.delete("nonexistent").await.unwrap();
148    }
149
150    #[tokio::test]
151    async fn test_memory_kv_exists() {
152        let store = MemoryKvStore::new();
153
154        assert!(!store.exists("key").await.unwrap());
155
156        store.set("key", b"value".to_vec()).await.unwrap();
157        assert!(store.exists("key").await.unwrap());
158
159        store.delete("key").await.unwrap();
160        assert!(!store.exists("key").await.unwrap());
161    }
162
163    #[tokio::test]
164    async fn test_memory_kv_batch_get() {
165        let store = MemoryKvStore::new();
166
167        store.set("a", b"1".to_vec()).await.unwrap();
168        store.set("b", b"2".to_vec()).await.unwrap();
169        // "c" intentionally not set
170
171        let results = store.batch_get(&["a", "b", "c"]).await.unwrap();
172
173        assert_eq!(results.len(), 3);
174        assert_eq!(results[0], Some(b"1".to_vec()));
175        assert_eq!(results[1], Some(b"2".to_vec()));
176        assert_eq!(results[2], None);
177    }
178
179    #[tokio::test]
180    async fn test_memory_kv_batch_set() {
181        let store = MemoryKvStore::new();
182
183        store
184            .batch_set(vec![("a", b"1".to_vec()), ("b", b"2".to_vec()), ("c", b"3".to_vec())])
185            .await
186            .unwrap();
187
188        assert_eq!(store.get("a").await.unwrap(), Some(b"1".to_vec()));
189        assert_eq!(store.get("b").await.unwrap(), Some(b"2".to_vec()));
190        assert_eq!(store.get("c").await.unwrap(), Some(b"3".to_vec()));
191    }
192
193    #[tokio::test]
194    async fn test_memory_kv_concurrent_access() {
195        use std::sync::Arc;
196
197        let store = Arc::new(MemoryKvStore::new());
198        let mut handles = vec![];
199
200        // Spawn 100 concurrent writers
201        for i in 0..100 {
202            let store = Arc::clone(&store);
203            handles.push(tokio::spawn(async move {
204                let key = format!("key{i}");
205                let value = format!("value{i}").into_bytes();
206                store.set(&key, value).await.unwrap();
207            }));
208        }
209
210        for handle in handles {
211            handle.await.unwrap();
212        }
213
214        // Verify all writes succeeded
215        for i in 0..100 {
216            let key = format!("key{i}");
217            let expected = format!("value{i}").into_bytes();
218            assert_eq!(store.get(&key).await.unwrap(), Some(expected));
219        }
220    }
221
222    #[tokio::test]
223    async fn test_memory_kv_empty_key() {
224        let store = MemoryKvStore::new();
225
226        store.set("", b"empty_key_value".to_vec()).await.unwrap();
227        assert_eq!(store.get("").await.unwrap(), Some(b"empty_key_value".to_vec()));
228    }
229
230    #[tokio::test]
231    async fn test_memory_kv_empty_value() {
232        let store = MemoryKvStore::new();
233
234        store.set("key", vec![]).await.unwrap();
235        assert_eq!(store.get("key").await.unwrap(), Some(vec![]));
236    }
237
238    #[tokio::test]
239    async fn test_memory_kv_large_value() {
240        let store = MemoryKvStore::new();
241
242        let large_value = vec![0u8; 1024 * 1024]; // 1MB
243        store.set("large", large_value.clone()).await.unwrap();
244
245        assert_eq!(store.get("large").await.unwrap(), Some(large_value));
246    }
247
248    #[tokio::test]
249    async fn test_memory_kv_with_capacity() {
250        let store = MemoryKvStore::with_capacity(100);
251        store.set("key", b"value".to_vec()).await.unwrap();
252        assert_eq!(store.get("key").await.unwrap(), Some(b"value".to_vec()));
253    }
254
255    #[tokio::test]
256    async fn test_memory_kv_len_and_is_empty() {
257        let store = MemoryKvStore::new();
258
259        assert!(store.is_empty());
260        assert_eq!(store.len(), 0);
261
262        store.set("key1", b"value1".to_vec()).await.unwrap();
263        assert!(!store.is_empty());
264        assert_eq!(store.len(), 1);
265
266        store.set("key2", b"value2".to_vec()).await.unwrap();
267        assert_eq!(store.len(), 2);
268    }
269
270    #[tokio::test]
271    async fn test_memory_kv_clear() {
272        let store = MemoryKvStore::new();
273
274        store.set("key1", b"value1".to_vec()).await.unwrap();
275        store.set("key2", b"value2".to_vec()).await.unwrap();
276        assert_eq!(store.len(), 2);
277
278        store.clear();
279        assert!(store.is_empty());
280        assert_eq!(store.len(), 0);
281        assert_eq!(store.get("key1").await.unwrap(), None);
282    }
283
284    #[test]
285    fn test_memory_kv_default() {
286        let store: MemoryKvStore = MemoryKvStore::default();
287        assert!(store.is_empty());
288    }
289
290    // ============================================================
291    // Compression Tests (GH-5) - RED PHASE
292    // ============================================================
293
294    #[cfg(feature = "compression")]
295    mod compression_tests {
296        use super::*;
297        use crate::kv::{CompressedKvStore, Compression};
298
299        #[tokio::test]
300        async fn test_compressed_kv_lz4_roundtrip() {
301            let inner = MemoryKvStore::new();
302            let store = CompressedKvStore::new(inner, Compression::Lz4);
303
304            let data = b"hello world compressed with lz4".to_vec();
305            store.set("key", data.clone()).await.unwrap();
306            let retrieved = store.get("key").await.unwrap();
307
308            assert_eq!(retrieved, Some(data));
309        }
310
311        #[tokio::test]
312        async fn test_compressed_kv_zstd_roundtrip() {
313            let inner = MemoryKvStore::new();
314            let store = CompressedKvStore::new(inner, Compression::Zstd);
315
316            let data = b"hello world compressed with zstd".to_vec();
317            store.set("key", data.clone()).await.unwrap();
318            let retrieved = store.get("key").await.unwrap();
319
320            assert_eq!(retrieved, Some(data));
321        }
322
323        #[tokio::test]
324        async fn test_compressed_kv_reduces_size() {
325            let inner = MemoryKvStore::new();
326            let store = CompressedKvStore::new(inner, Compression::Lz4);
327
328            // Highly compressible data (repeated zeros)
329            let data = vec![0u8; 10000];
330            store.set("key", data.clone()).await.unwrap();
331
332            // Get raw compressed size from inner store
333            let compressed = store.inner().get("key").await.unwrap().unwrap();
334
335            // LZ4 should compress zeros significantly
336            assert!(
337                compressed.len() < data.len() / 5,
338                "Compressed {} -> {} bytes (expected >5x reduction)",
339                data.len(),
340                compressed.len()
341            );
342        }
343
344        #[tokio::test]
345        async fn test_compressed_kv_empty_value() {
346            let inner = MemoryKvStore::new();
347            let store = CompressedKvStore::new(inner, Compression::Lz4);
348
349            store.set("empty", vec![]).await.unwrap();
350            assert_eq!(store.get("empty").await.unwrap(), Some(vec![]));
351        }
352
353        #[tokio::test]
354        async fn test_compressed_kv_delete() {
355            let inner = MemoryKvStore::new();
356            let store = CompressedKvStore::new(inner, Compression::Lz4);
357
358            store.set("key", b"value".to_vec()).await.unwrap();
359            store.delete("key").await.unwrap();
360            assert_eq!(store.get("key").await.unwrap(), None);
361        }
362
363        #[tokio::test]
364        async fn test_compressed_kv_exists() {
365            let inner = MemoryKvStore::new();
366            let store = CompressedKvStore::new(inner, Compression::Lz4);
367
368            assert!(!store.exists("key").await.unwrap());
369            store.set("key", b"value".to_vec()).await.unwrap();
370            assert!(store.exists("key").await.unwrap());
371        }
372
373        #[tokio::test]
374        async fn test_compressed_kv_batch_operations() {
375            let inner = MemoryKvStore::new();
376            let store = CompressedKvStore::new(inner, Compression::Lz4);
377
378            store.batch_set(vec![("a", b"alpha".to_vec()), ("b", b"beta".to_vec())]).await.unwrap();
379
380            let results = store.batch_get(&["a", "b", "c"]).await.unwrap();
381            assert_eq!(results[0], Some(b"alpha".to_vec()));
382            assert_eq!(results[1], Some(b"beta".to_vec()));
383            assert_eq!(results[2], None);
384        }
385
386        #[tokio::test]
387        async fn test_compression_enum_variants() {
388            assert_eq!(Compression::Lz4.as_str(), "lz4");
389            assert_eq!(Compression::Zstd.as_str(), "zstd");
390        }
391
392        #[tokio::test]
393        async fn test_compressed_kv_large_value() {
394            let inner = MemoryKvStore::new();
395            let store = CompressedKvStore::new(inner, Compression::Zstd);
396
397            // 1MB of random-ish data
398            let data: Vec<u8> = (0..1024 * 1024).map(|i| (i % 256) as u8).collect();
399            store.set("large", data.clone()).await.unwrap();
400
401            let retrieved = store.get("large").await.unwrap();
402            assert_eq!(retrieved, Some(data));
403        }
404    }
405}