rustvello_core/
client_data_store.rs1use std::num::NonZeroUsize;
9use std::sync::{Arc, Mutex};
10
11use async_trait::async_trait;
12use sha2::{Digest, Sha256};
13
14use rustvello_proto::config::ClientDataStoreConfig;
15
16use crate::error::{RustvelloError, RustvelloResult};
17
18pub const REFERENCE_PREFIX: &str = "__rustvello__cds__:";
20
21#[async_trait]
27pub trait ClientDataStore: Send + Sync {
28 async fn store(&self, key: &str, value: &str) -> RustvelloResult<()>;
33
34 async fn retrieve(&self, key: &str) -> RustvelloResult<String>;
38
39 async fn purge(&self) -> RustvelloResult<()>;
41
42 fn backend_name(&self) -> &'static str {
44 "Unknown"
45 }
46
47 async fn usage_stats(&self) -> Vec<(&'static str, String)> {
49 Vec::new()
50 }
51}
52
53pub fn is_reference(value: &str) -> bool {
55 value.starts_with(REFERENCE_PREFIX)
56}
57
58pub fn generate_reference_key(data: &str) -> String {
60 let hash = Sha256::digest(data.as_bytes());
61 format!("{REFERENCE_PREFIX}{hash:x}")
62}
63
64pub struct ClientDataStoreManager {
78 backend: Arc<dyn ClientDataStore>,
79 config: ClientDataStoreConfig,
80 cache: Mutex<lru::LruCache<String, String>>,
81}
82
83impl ClientDataStoreManager {
84 pub fn new(backend: Arc<dyn ClientDataStore>, config: ClientDataStoreConfig) -> Self {
86 let cap = NonZeroUsize::new(config.local_cache_size).unwrap_or(NonZeroUsize::MIN);
87 Self {
88 backend,
89 config,
90 cache: Mutex::new(lru::LruCache::new(cap)),
91 }
92 }
93
94 pub async fn store_if_large(&self, serialized: &str) -> RustvelloResult<String> {
100 if self.config.disabled {
101 return Ok(serialized.to_owned());
102 }
103
104 if is_reference(serialized) {
106 return Ok(serialized.to_owned());
107 }
108
109 let size = serialized.len();
110
111 if size < self.config.min_size_to_cache {
113 return Ok(serialized.to_owned());
114 }
115
116 if self.config.max_size_to_cache > 0 && size > self.config.max_size_to_cache {
118 tracing::warn!(
119 "Value size ({size} bytes) exceeds max_size_to_cache ({}). Returning inline.",
120 self.config.max_size_to_cache
121 );
122 return Ok(serialized.to_owned());
123 }
124
125 if size > self.config.warn_threshold {
126 tracing::warn!(
127 "Value size ({size} bytes) exceeds warn_threshold ({}). Consider restructuring.",
128 self.config.warn_threshold
129 );
130 }
131
132 let key = generate_reference_key(serialized);
133 self.backend.store(&key, serialized).await?;
134
135 self.cache
137 .lock()
138 .map_err(|e| RustvelloError::Internal {
139 message: format!("CDS cache lock poisoned: {e}"),
140 })?
141 .put(key.clone(), serialized.to_owned());
142
143 Ok(key)
144 }
145
146 pub async fn resolve(&self, data: &str) -> RustvelloResult<String> {
149 if !is_reference(data) {
150 return Ok(data.to_owned());
151 }
152
153 {
155 let mut cache = self.cache.lock().map_err(|e| RustvelloError::Internal {
156 message: format!("CDS cache lock poisoned: {e}"),
157 })?;
158 if let Some(cached) = cache.get(data) {
159 return Ok(cached.clone());
160 }
161 }
162
163 let value = self.backend.retrieve(data).await?;
165
166 self.cache
168 .lock()
169 .map_err(|e| RustvelloError::Internal {
170 message: format!("CDS cache lock poisoned: {e}"),
171 })?
172 .put(data.to_owned(), value.clone());
173
174 Ok(value)
175 }
176
177 pub async fn store(&self, key: &str, value: &str) -> RustvelloResult<()> {
179 self.backend.store(key, value).await
180 }
181
182 pub async fn retrieve(&self, key: &str) -> RustvelloResult<String> {
184 self.backend.retrieve(key).await
185 }
186
187 pub async fn purge(&self) -> RustvelloResult<()> {
189 self.cache
190 .lock()
191 .map_err(|e| RustvelloError::Internal {
192 message: format!("CDS cache lock poisoned: {e}"),
193 })?
194 .clear();
195 self.backend.purge().await
196 }
197
198 pub fn config(&self) -> &ClientDataStoreConfig {
200 &self.config
201 }
202
203 pub fn backend_name(&self) -> &'static str {
205 self.backend.backend_name()
206 }
207
208 pub async fn usage_stats(&self) -> Vec<(&'static str, String)> {
210 self.backend.usage_stats().await
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::error::RustvelloError;
218 use std::collections::HashMap;
219
220 struct FakeBackend {
222 data: Mutex<HashMap<String, String>>,
223 }
224
225 impl FakeBackend {
226 fn new() -> Self {
227 Self {
228 data: Mutex::new(HashMap::new()),
229 }
230 }
231 }
232
233 #[async_trait]
234 impl ClientDataStore for FakeBackend {
235 async fn store(&self, key: &str, value: &str) -> RustvelloResult<()> {
236 self.data
237 .lock()
238 .unwrap()
239 .insert(key.to_owned(), value.to_owned());
240 Ok(())
241 }
242
243 async fn retrieve(&self, key: &str) -> RustvelloResult<String> {
244 self.data
245 .lock()
246 .unwrap()
247 .get(key)
248 .cloned()
249 .ok_or_else(|| RustvelloError::state_backend(format!("key not found: {key}")))
250 }
251
252 async fn purge(&self) -> RustvelloResult<()> {
253 self.data.lock().unwrap().clear();
254 Ok(())
255 }
256 }
257
258 fn make_manager(config: ClientDataStoreConfig) -> ClientDataStoreManager {
259 ClientDataStoreManager::new(Arc::new(FakeBackend::new()), config)
260 }
261
262 #[test]
263 fn reference_key_format() {
264 let key = generate_reference_key("hello world");
265 assert!(key.starts_with(REFERENCE_PREFIX));
266 assert!(is_reference(&key));
267 assert!(!is_reference("just a normal string"));
268 }
269
270 #[test]
271 fn reference_key_deterministic() {
272 let k1 = generate_reference_key("same content");
273 let k2 = generate_reference_key("same content");
274 assert_eq!(k1, k2);
275 }
276
277 #[test]
278 fn reference_key_differs_for_different_content() {
279 let k1 = generate_reference_key("content A");
280 let k2 = generate_reference_key("content B");
281 assert_ne!(k1, k2);
282 }
283
284 #[tokio::test]
285 async fn inline_below_threshold() {
286 let mut cds_config = ClientDataStoreConfig::default();
287 cds_config.min_size_to_cache = 100;
288 let mgr = make_manager(cds_config);
289 let small = "short";
290 let result = mgr.store_if_large(small).await.unwrap();
291 assert_eq!(result, small);
293 assert!(!is_reference(&result));
294 }
295
296 #[tokio::test]
297 async fn externalize_above_threshold() {
298 let mut cds_config = ClientDataStoreConfig::default();
299 cds_config.min_size_to_cache = 10;
300 let mgr = make_manager(cds_config);
301 let large = "a]".repeat(20); let result = mgr.store_if_large(&large).await.unwrap();
303 assert!(is_reference(&result));
304
305 let resolved = mgr.resolve(&result).await.unwrap();
307 assert_eq!(resolved, large);
308 }
309
310 #[tokio::test]
311 async fn inline_when_disabled() {
312 let mut cds_config = ClientDataStoreConfig::default();
313 cds_config.disabled = true;
314 cds_config.min_size_to_cache = 1; let mgr = make_manager(cds_config);
316 let data = "x".repeat(100);
317 let result = mgr.store_if_large(&data).await.unwrap();
318 assert!(!is_reference(&result));
319 assert_eq!(result, data);
320 }
321
322 #[tokio::test]
323 async fn inline_above_max_size() {
324 let mut cds_config = ClientDataStoreConfig::default();
325 cds_config.min_size_to_cache = 10;
326 cds_config.max_size_to_cache = 50;
327 let mgr = make_manager(cds_config);
328 let huge = "x".repeat(100); let result = mgr.store_if_large(&huge).await.unwrap();
330 assert!(!is_reference(&result));
331 assert_eq!(result, huge);
332 }
333
334 #[tokio::test]
335 async fn resolve_inline_passthrough() {
336 let mgr = make_manager(ClientDataStoreConfig::default());
337 let data = "not a reference";
338 let result = mgr.resolve(data).await.unwrap();
339 assert_eq!(result, data);
340 }
341
342 #[tokio::test]
343 async fn content_hash_dedup() {
344 let mut cds_config = ClientDataStoreConfig::default();
345 cds_config.min_size_to_cache = 5;
346 let mgr = make_manager(cds_config);
347 let data = "deduplicate me";
348 let k1 = mgr.store_if_large(data).await.unwrap();
349 let k2 = mgr.store_if_large(data).await.unwrap();
350 assert_eq!(k1, k2); }
352
353 #[tokio::test]
354 async fn purge_clears_cache_and_backend() {
355 let mut cds_config = ClientDataStoreConfig::default();
356 cds_config.min_size_to_cache = 5;
357 let mgr = make_manager(cds_config);
358 let data = "some large payload here";
359 let key = mgr.store_if_large(data).await.unwrap();
360 assert!(is_reference(&key));
361
362 mgr.purge().await.unwrap();
363
364 let err = mgr.resolve(&key).await;
366 assert!(err.is_err());
367 }
368
369 #[tokio::test]
370 async fn lru_cache_hit() {
371 let mut cds_config = ClientDataStoreConfig::default();
372 cds_config.min_size_to_cache = 5;
373 cds_config.local_cache_size = 10;
374 let mgr = make_manager(cds_config);
375 let data = "cached payload data";
376 let key = mgr.store_if_large(data).await.unwrap();
377 assert!(is_reference(&key));
378
379 let r1 = mgr.resolve(&key).await.unwrap();
381 assert_eq!(r1, data);
382
383 mgr.backend.purge().await.unwrap();
385
386 let r2 = mgr.resolve(&key).await.unwrap();
388 assert_eq!(r2, data);
389 }
390
391 #[tokio::test]
392 async fn passthrough_existing_reference() {
393 let mut cds_config = ClientDataStoreConfig::default();
394 cds_config.min_size_to_cache = 5;
395 let mgr = make_manager(cds_config);
396 let ref_key = format!("{REFERENCE_PREFIX}abc123");
398 let result = mgr.store_if_large(&ref_key).await.unwrap();
399 assert_eq!(result, ref_key);
400 }
401
402 #[test]
403 fn lru_cache_eviction() {
404 let cap = NonZeroUsize::new(3).unwrap();
405 let mut cache = lru::LruCache::<String, String>::new(cap);
406 cache.put("a".into(), "1".into());
407 cache.put("b".into(), "2".into());
408 cache.put("c".into(), "3".into());
409 assert_eq!(cache.len(), 3);
410
411 cache.put("d".into(), "4".into());
413 assert_eq!(cache.len(), 3);
414 assert!(cache.get("a").is_none());
415 assert!(cache.get("d").is_some());
416 }
417
418 #[test]
419 fn lru_cache_access_refreshes() {
420 let cap = NonZeroUsize::new(3).unwrap();
421 let mut cache = lru::LruCache::<String, String>::new(cap);
422 cache.put("a".into(), "1".into());
423 cache.put("b".into(), "2".into());
424 cache.put("c".into(), "3".into());
425
426 cache.get("a");
428
429 cache.put("d".into(), "4".into());
431 assert!(cache.get("a").is_some());
432 assert!(cache.get("b").is_none());
433 }
434}