Skip to main content

rustvello_core/
client_data_store.rs

1//! Client data store — external storage for large serialized values.
2//!
3//! Transparently routes small values inline and externalizes large values
4//! to a backend store with content-hash deduplication and LRU caching.
5//!
6//! Mirrors pynenc's `BaseClientDataStore` system.
7
8use std::num::NonZeroUsize;
9use std::sync::{Arc, Mutex};
10
11use async_trait::async_trait;
12use sha2::{Digest, Sha256};
13
14use rustvello_proto::config::ClientDataStoreConfig;
15
16use crate::error::{RustvelloError, RustvelloResult};
17
18/// Reference key prefix. Any string starting with this is a CDS reference.
19pub const REFERENCE_PREFIX: &str = "__rustvello__cds__:";
20
21/// Backend storage trait for client data.
22///
23/// Implementations need only provide three simple operations: store, retrieve,
24/// and purge. The [`ClientDataStoreManager`] handles size routing, LRU caching,
25/// and key generation on top of any backend.
26#[async_trait]
27pub trait ClientDataStore: Send + Sync {
28    /// Store a serialized value by its content-hash key.
29    ///
30    /// Backends should use upsert semantics (INSERT OR REPLACE) so that
31    /// storing the same key twice is a no-op (content-hash deduplication).
32    async fn store(&self, key: &str, value: &str) -> RustvelloResult<()>;
33
34    /// Retrieve a serialized value by its reference key.
35    ///
36    /// Returns an error if the key is not found.
37    async fn retrieve(&self, key: &str) -> RustvelloResult<String>;
38
39    /// Remove all stored data.
40    async fn purge(&self) -> RustvelloResult<()>;
41
42    /// Human-readable name of this backend implementation.
43    fn backend_name(&self) -> &'static str {
44        "Unknown"
45    }
46
47    /// Key-value statistics about this backend's current state.
48    async fn usage_stats(&self) -> Vec<(&'static str, String)> {
49        Vec::new()
50    }
51}
52
53/// Check whether a string is a CDS reference key.
54pub fn is_reference(value: &str) -> bool {
55    value.starts_with(REFERENCE_PREFIX)
56}
57
58/// Generate a content-hash reference key from serialized data.
59pub fn generate_reference_key(data: &str) -> String {
60    let hash = Sha256::digest(data.as_bytes());
61    format!("{REFERENCE_PREFIX}{hash:x}")
62}
63
64// ---------------------------------------------------------------------------
65// ClientDataStoreManager — wraps a backend with size routing + LRU cache
66// ---------------------------------------------------------------------------
67
68/// High-level client data store manager.
69///
70/// Wraps a [`ClientDataStore`] backend with:
71/// - Size-based routing (small → inline, large → external)
72/// - Content-hash key generation (SHA-256 dedup)
73/// - Process-local LRU cache
74///
75/// This is the type that [`RustvelloApp`] owns. It corresponds to pynenc's
76/// `BaseClientDataStore` public API.
77pub struct ClientDataStoreManager {
78    backend: Arc<dyn ClientDataStore>,
79    config: ClientDataStoreConfig,
80    cache: Mutex<lru::LruCache<String, String>>,
81}
82
83impl ClientDataStoreManager {
84    /// Create a new manager wrapping the given backend.
85    pub fn new(backend: Arc<dyn ClientDataStore>, config: ClientDataStoreConfig) -> Self {
86        let cap = NonZeroUsize::new(config.local_cache_size).unwrap_or(NonZeroUsize::MIN);
87        Self {
88            backend,
89            config,
90            cache: Mutex::new(lru::LruCache::new(cap)),
91        }
92    }
93
94    /// Store a serialized value, externalizing it if it exceeds the size threshold.
95    ///
96    /// Returns either:
97    /// - The original `serialized` string (inline, if small or disabled)
98    /// - A reference key string (if externalized)
99    pub async fn store_if_large(&self, serialized: &str) -> RustvelloResult<String> {
100        if self.config.disabled {
101            return Ok(serialized.to_owned());
102        }
103
104        // Already a reference? Pass through.
105        if is_reference(serialized) {
106            return Ok(serialized.to_owned());
107        }
108
109        let size = serialized.len();
110
111        // Below minimum: inline
112        if size < self.config.min_size_to_cache {
113            return Ok(serialized.to_owned());
114        }
115
116        // Above maximum (if set): inline with warning
117        if self.config.max_size_to_cache > 0 && size > self.config.max_size_to_cache {
118            tracing::warn!(
119                "Value size ({size} bytes) exceeds max_size_to_cache ({}). Returning inline.",
120                self.config.max_size_to_cache
121            );
122            return Ok(serialized.to_owned());
123        }
124
125        if size > self.config.warn_threshold {
126            tracing::warn!(
127                "Value size ({size} bytes) exceeds warn_threshold ({}). Consider restructuring.",
128                self.config.warn_threshold
129            );
130        }
131
132        let key = generate_reference_key(serialized);
133        self.backend.store(&key, serialized).await?;
134
135        // Cache the value for fast resolution
136        self.cache
137            .lock()
138            .map_err(|e| RustvelloError::Internal {
139                message: format!("CDS cache lock poisoned: {e}"),
140            })?
141            .put(key.clone(), serialized.to_owned());
142
143        Ok(key)
144    }
145
146    /// Resolve a value — if it's a reference key, retrieve from backend (with LRU cache).
147    /// If it's an inline value, return as-is.
148    pub async fn resolve(&self, data: &str) -> RustvelloResult<String> {
149        if !is_reference(data) {
150            return Ok(data.to_owned());
151        }
152
153        // Check LRU cache first
154        {
155            let mut cache = self.cache.lock().map_err(|e| RustvelloError::Internal {
156                message: format!("CDS cache lock poisoned: {e}"),
157            })?;
158            if let Some(cached) = cache.get(data) {
159                return Ok(cached.clone());
160            }
161        }
162
163        // Cache miss — retrieve from backend
164        let value = self.backend.retrieve(data).await?;
165
166        // Cache the retrieved value
167        self.cache
168            .lock()
169            .map_err(|e| RustvelloError::Internal {
170                message: format!("CDS cache lock poisoned: {e}"),
171            })?
172            .put(data.to_owned(), value.clone());
173
174        Ok(value)
175    }
176
177    /// Store a value directly by key (delegates to backend).
178    pub async fn store(&self, key: &str, value: &str) -> RustvelloResult<()> {
179        self.backend.store(key, value).await
180    }
181
182    /// Retrieve a value directly by key (delegates to backend).
183    pub async fn retrieve(&self, key: &str) -> RustvelloResult<String> {
184        self.backend.retrieve(key).await
185    }
186
187    /// Clear the LRU cache and purge all backend data.
188    pub async fn purge(&self) -> RustvelloResult<()> {
189        self.cache
190            .lock()
191            .map_err(|e| RustvelloError::Internal {
192                message: format!("CDS cache lock poisoned: {e}"),
193            })?
194            .clear();
195        self.backend.purge().await
196    }
197
198    /// Get the current configuration.
199    pub fn config(&self) -> &ClientDataStoreConfig {
200        &self.config
201    }
202
203    /// Human-readable name of the underlying backend implementation.
204    pub fn backend_name(&self) -> &'static str {
205        self.backend.backend_name()
206    }
207
208    /// Key-value statistics about the underlying backend's current state.
209    pub async fn usage_stats(&self) -> Vec<(&'static str, String)> {
210        self.backend.usage_stats().await
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::error::RustvelloError;
218    use std::collections::HashMap;
219
220    /// Minimal in-memory backend for testing the manager.
221    struct FakeBackend {
222        data: Mutex<HashMap<String, String>>,
223    }
224
225    impl FakeBackend {
226        fn new() -> Self {
227            Self {
228                data: Mutex::new(HashMap::new()),
229            }
230        }
231    }
232
233    #[async_trait]
234    impl ClientDataStore for FakeBackend {
235        async fn store(&self, key: &str, value: &str) -> RustvelloResult<()> {
236            self.data
237                .lock()
238                .unwrap()
239                .insert(key.to_owned(), value.to_owned());
240            Ok(())
241        }
242
243        async fn retrieve(&self, key: &str) -> RustvelloResult<String> {
244            self.data
245                .lock()
246                .unwrap()
247                .get(key)
248                .cloned()
249                .ok_or_else(|| RustvelloError::state_backend(format!("key not found: {key}")))
250        }
251
252        async fn purge(&self) -> RustvelloResult<()> {
253            self.data.lock().unwrap().clear();
254            Ok(())
255        }
256    }
257
258    fn make_manager(config: ClientDataStoreConfig) -> ClientDataStoreManager {
259        ClientDataStoreManager::new(Arc::new(FakeBackend::new()), config)
260    }
261
262    #[test]
263    fn reference_key_format() {
264        let key = generate_reference_key("hello world");
265        assert!(key.starts_with(REFERENCE_PREFIX));
266        assert!(is_reference(&key));
267        assert!(!is_reference("just a normal string"));
268    }
269
270    #[test]
271    fn reference_key_deterministic() {
272        let k1 = generate_reference_key("same content");
273        let k2 = generate_reference_key("same content");
274        assert_eq!(k1, k2);
275    }
276
277    #[test]
278    fn reference_key_differs_for_different_content() {
279        let k1 = generate_reference_key("content A");
280        let k2 = generate_reference_key("content B");
281        assert_ne!(k1, k2);
282    }
283
284    #[tokio::test]
285    async fn inline_below_threshold() {
286        let mut cds_config = ClientDataStoreConfig::default();
287        cds_config.min_size_to_cache = 100;
288        let mgr = make_manager(cds_config);
289        let small = "short";
290        let result = mgr.store_if_large(small).await.unwrap();
291        // Below threshold — returned inline
292        assert_eq!(result, small);
293        assert!(!is_reference(&result));
294    }
295
296    #[tokio::test]
297    async fn externalize_above_threshold() {
298        let mut cds_config = ClientDataStoreConfig::default();
299        cds_config.min_size_to_cache = 10;
300        let mgr = make_manager(cds_config);
301        let large = "a]".repeat(20); // 40 bytes, above 10
302        let result = mgr.store_if_large(&large).await.unwrap();
303        assert!(is_reference(&result));
304
305        // Resolve should return the original
306        let resolved = mgr.resolve(&result).await.unwrap();
307        assert_eq!(resolved, large);
308    }
309
310    #[tokio::test]
311    async fn inline_when_disabled() {
312        let mut cds_config = ClientDataStoreConfig::default();
313        cds_config.disabled = true;
314        cds_config.min_size_to_cache = 1; // Would externalize, but disabled
315        let mgr = make_manager(cds_config);
316        let data = "x".repeat(100);
317        let result = mgr.store_if_large(&data).await.unwrap();
318        assert!(!is_reference(&result));
319        assert_eq!(result, data);
320    }
321
322    #[tokio::test]
323    async fn inline_above_max_size() {
324        let mut cds_config = ClientDataStoreConfig::default();
325        cds_config.min_size_to_cache = 10;
326        cds_config.max_size_to_cache = 50;
327        let mgr = make_manager(cds_config);
328        let huge = "x".repeat(100); // 100 > max 50
329        let result = mgr.store_if_large(&huge).await.unwrap();
330        assert!(!is_reference(&result));
331        assert_eq!(result, huge);
332    }
333
334    #[tokio::test]
335    async fn resolve_inline_passthrough() {
336        let mgr = make_manager(ClientDataStoreConfig::default());
337        let data = "not a reference";
338        let result = mgr.resolve(data).await.unwrap();
339        assert_eq!(result, data);
340    }
341
342    #[tokio::test]
343    async fn content_hash_dedup() {
344        let mut cds_config = ClientDataStoreConfig::default();
345        cds_config.min_size_to_cache = 5;
346        let mgr = make_manager(cds_config);
347        let data = "deduplicate me";
348        let k1 = mgr.store_if_large(data).await.unwrap();
349        let k2 = mgr.store_if_large(data).await.unwrap();
350        assert_eq!(k1, k2); // Same content → same key
351    }
352
353    #[tokio::test]
354    async fn purge_clears_cache_and_backend() {
355        let mut cds_config = ClientDataStoreConfig::default();
356        cds_config.min_size_to_cache = 5;
357        let mgr = make_manager(cds_config);
358        let data = "some large payload here";
359        let key = mgr.store_if_large(data).await.unwrap();
360        assert!(is_reference(&key));
361
362        mgr.purge().await.unwrap();
363
364        // Backend is empty — resolve should fail
365        let err = mgr.resolve(&key).await;
366        assert!(err.is_err());
367    }
368
369    #[tokio::test]
370    async fn lru_cache_hit() {
371        let mut cds_config = ClientDataStoreConfig::default();
372        cds_config.min_size_to_cache = 5;
373        cds_config.local_cache_size = 10;
374        let mgr = make_manager(cds_config);
375        let data = "cached payload data";
376        let key = mgr.store_if_large(data).await.unwrap();
377        assert!(is_reference(&key));
378
379        // First resolve populates cache from backend (already cached from store_if_large)
380        let r1 = mgr.resolve(&key).await.unwrap();
381        assert_eq!(r1, data);
382
383        // Purge backend only (simulate offline backend) by storing then purging
384        mgr.backend.purge().await.unwrap();
385
386        // Second resolve should hit cache, not backend
387        let r2 = mgr.resolve(&key).await.unwrap();
388        assert_eq!(r2, data);
389    }
390
391    #[tokio::test]
392    async fn passthrough_existing_reference() {
393        let mut cds_config = ClientDataStoreConfig::default();
394        cds_config.min_size_to_cache = 5;
395        let mgr = make_manager(cds_config);
396        // If the value is already a reference, pass through
397        let ref_key = format!("{REFERENCE_PREFIX}abc123");
398        let result = mgr.store_if_large(&ref_key).await.unwrap();
399        assert_eq!(result, ref_key);
400    }
401
402    #[test]
403    fn lru_cache_eviction() {
404        let cap = NonZeroUsize::new(3).unwrap();
405        let mut cache = lru::LruCache::<String, String>::new(cap);
406        cache.put("a".into(), "1".into());
407        cache.put("b".into(), "2".into());
408        cache.put("c".into(), "3".into());
409        assert_eq!(cache.len(), 3);
410
411        // Adding a 4th evicts the oldest ("a")
412        cache.put("d".into(), "4".into());
413        assert_eq!(cache.len(), 3);
414        assert!(cache.get("a").is_none());
415        assert!(cache.get("d").is_some());
416    }
417
418    #[test]
419    fn lru_cache_access_refreshes() {
420        let cap = NonZeroUsize::new(3).unwrap();
421        let mut cache = lru::LruCache::<String, String>::new(cap);
422        cache.put("a".into(), "1".into());
423        cache.put("b".into(), "2".into());
424        cache.put("c".into(), "3".into());
425
426        // Access "a" to move it to the end
427        cache.get("a");
428
429        // Insert "d" — should evict "b" (oldest after "a" was refreshed)
430        cache.put("d".into(), "4".into());
431        assert!(cache.get("a").is_some());
432        assert!(cache.get("b").is_none());
433    }
434}