Skip to main content

mx_cache/
local.rs

1//! Local in-memory cache implementation using Moka.
2//!
3//! This implementation supports per-entry TTL to ensure behavioral consistency
4//! with [`RedisCache`](crate::RedisCache). Each `set()` and `set_nx_px()` call
5//! respects the provided TTL parameter.
6//!
7//! # Performance Optimizations
8//!
9//! This implementation uses `Bytes` for both keys and values to minimize
10//! allocations on the hot path:
11//! - Keys use `Bytes` which is reference-counted and cheap to clone
12//! - Values use `Bytes` avoiding per-lookup allocations
13//! - Lookups can use borrowed slices via the `equivalent` trait
14
15use std::hash::{Hash, Hasher};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use bytes::Bytes;
20use moka::Expiry;
21use moka::sync::Cache;
22
23use crate::Cache as CacheTrait;
24
25/// A wrapper around `Bytes` that supports borrowed lookups via the `equivalent` trait.
26///
27/// This allows Moka to look up keys using `&[u8]` without allocating a new `Bytes`.
28#[derive(Clone, Debug, Eq)]
29struct CacheKey(Bytes);
30
31impl CacheKey {
32    #[inline]
33    fn new(key: &[u8]) -> Self {
34        Self(Bytes::copy_from_slice(key))
35    }
36
37    #[inline]
38    fn as_slice(&self) -> &[u8] {
39        &self.0
40    }
41}
42
43impl PartialEq for CacheKey {
44    #[inline]
45    fn eq(&self, other: &Self) -> bool {
46        self.0 == other.0
47    }
48}
49
50impl Hash for CacheKey {
51    #[inline]
52    fn hash<H: Hasher>(&self, state: &mut H) {
53        self.0.hash(state);
54    }
55}
56
57/// Wrapper for borrowed key lookups without allocation.
58///
59/// This implements the `equivalent` trait pattern used by Moka to allow
60/// looking up entries using `&[u8]` without creating a `CacheKey`.
61#[derive(PartialEq, Eq)]
62struct BorrowedKey<'a>(&'a [u8]);
63
64impl Hash for BorrowedKey<'_> {
65    #[inline]
66    fn hash<H: Hasher>(&self, state: &mut H) {
67        self.0.hash(state);
68    }
69}
70
71impl equivalent::Equivalent<CacheKey> for BorrowedKey<'_> {
72    #[inline]
73    fn equivalent(&self, key: &CacheKey) -> bool {
74        self.0 == key.as_slice()
75    }
76}
77
78/// Cache entry containing value and its expiration time.
79///
80/// Uses `Bytes` for the value to avoid cloning the data on reads.
81#[derive(Clone, Debug)]
82struct CacheEntry {
83    value: Bytes,
84    expires_at: Instant,
85}
86
87/// Custom expiry policy that uses per-entry expiration times.
88///
89/// This allows each entry to have its own TTL, matching Redis behavior
90/// where each SET operation can specify a different expiration.
91struct PerEntryExpiry;
92
93impl Expiry<CacheKey, CacheEntry> for PerEntryExpiry {
94    /// Returns the TTL for a newly created entry.
95    fn expire_after_create(
96        &self,
97        _key: &CacheKey,
98        value: &CacheEntry,
99        _current_time: Instant,
100    ) -> Option<Duration> {
101        let now = Instant::now();
102        if value.expires_at > now {
103            Some(value.expires_at.duration_since(now))
104        } else {
105            // Already expired, expire immediately
106            Some(Duration::ZERO)
107        }
108    }
109
110    /// Returns the TTL after a read operation (no change).
111    fn expire_after_read(
112        &self,
113        _key: &CacheKey,
114        _value: &CacheEntry,
115        _current_time: Instant,
116        current_duration: Option<Duration>,
117        _last_modified_at: Instant,
118    ) -> Option<Duration> {
119        // Don't extend TTL on read - maintain original expiration
120        current_duration
121    }
122
123    /// Returns the TTL after an update operation.
124    fn expire_after_update(
125        &self,
126        _key: &CacheKey,
127        value: &CacheEntry,
128        _current_time: Instant,
129        _current_duration: Option<Duration>,
130    ) -> Option<Duration> {
131        let now = Instant::now();
132        if value.expires_at > now {
133            Some(value.expires_at.duration_since(now))
134        } else {
135            Some(Duration::ZERO)
136        }
137    }
138}
139
140/// Local in-memory cache with per-entry TTL support.
141///
142/// Uses Moka's sync cache with a custom expiry policy to support
143/// per-operation TTL values, ensuring behavioral consistency with
144/// [`RedisCache`](crate::RedisCache).
145///
146/// # Performance
147///
148/// This implementation is optimized for hot-path usage:
149/// - Uses `Bytes` for keys and values (reference-counted, cheap clones)
150/// - Supports borrowed lookups via the `equivalent` trait for `contains_sync`
151/// - Minimizes allocations on repeated operations with the same key
152///
153/// # Example
154///
155/// ```
156/// use std::time::Duration;
157/// use mx_cache::{LocalCache, Cache};
158///
159/// # tokio_test::block_on(async {
160/// let cache = LocalCache::new(10_000, Duration::from_secs(300));
161///
162/// // Each operation can specify its own TTL
163/// cache.set(b"short_lived", b"value1", Duration::from_secs(10)).await.unwrap();
164/// cache.set(b"long_lived", b"value2", Duration::from_secs(3600)).await.unwrap();
165/// # });
166/// ```
167#[derive(Clone, Debug)]
168pub struct LocalCache {
169    inner: Arc<Cache<CacheKey, CacheEntry>>,
170}
171
172impl LocalCache {
173    /// Creates a new local cache with the specified capacity and default TTL.
174    ///
175    /// # Arguments
176    ///
177    /// * `capacity` - Maximum number of entries in the cache
178    /// * `default_ttl` - Default TTL used as an upper bound for entries.
179    ///   Individual operations can specify shorter TTLs, but entries will
180    ///   never live longer than this default.
181    ///
182    /// # Note
183    ///
184    /// The `default_ttl` parameter sets a maximum lifetime for entries.
185    /// Per-operation TTLs (passed to `set()` and `set_nx_px()`) take
186    /// precedence and are fully respected, as long as they don't exceed
187    /// this default.
188    pub fn new(capacity: u64, default_ttl: Duration) -> Self {
189        let cache = Cache::builder()
190            .max_capacity(capacity.max(1))
191            // Set a maximum TTL as a safety bound
192            .time_to_live(default_ttl)
193            // Use custom expiry for per-entry TTL support
194            .expire_after(PerEntryExpiry)
195            .build();
196        Self {
197            inner: Arc::new(cache),
198        }
199    }
200
201    /// Synchronous check for local cache presence - fast path for deduplication.
202    ///
203    /// Returns `true` if the key exists in the cache and has not expired.
204    ///
205    /// # Performance
206    ///
207    /// This is a synchronous operation optimized for hot-path deduplication
208    /// checks. Uses borrowed key lookup via the `equivalent` trait to avoid
209    /// allocating a new `Bytes` on every call.
210    #[inline]
211    pub fn contains_sync(&self, key: &[u8]) -> bool {
212        // Use borrowed key lookup to avoid allocation
213        self.inner.contains_key(&BorrowedKey(key))
214    }
215
216    /// Synchronous get for local cache - fast path for hot data.
217    ///
218    /// Returns the value if the key exists and has not expired.
219    ///
220    /// # Performance
221    ///
222    /// This is a synchronous operation that returns a `Bytes` reference
223    /// to avoid copying the value data.
224    #[inline]
225    pub fn get_sync(&self, key: &[u8]) -> Option<Bytes> {
226        self.inner.get(&BorrowedKey(key)).map(|entry| entry.value)
227    }
228}
229
230impl CacheTrait for LocalCache {
231    /// Sets a value only if the key does not exist (NX = Not eXists).
232    ///
233    /// # Arguments
234    ///
235    /// * `key` - The cache key
236    /// * `value` - The value to store
237    /// * `ttl` - Time-to-live for this entry
238    ///
239    /// # Returns
240    ///
241    /// * `Ok(true)` - The key was newly inserted
242    /// * `Ok(false)` - The key already existed, no change made
243    fn set_nx_px(
244        &self,
245        key: &[u8],
246        value: &[u8],
247        ttl: Duration,
248    ) -> impl Future<Output = anyhow::Result<bool>> + Send {
249        // Single allocation for key, single allocation for value
250        let cache_key = CacheKey::new(key);
251        let entry = CacheEntry {
252            value: Bytes::copy_from_slice(value),
253            expires_at: Instant::now() + ttl,
254        };
255        let inner = Arc::clone(&self.inner);
256
257        async move {
258            // Use Moka's entry API for atomic insert-if-not-exists operation.
259            // This eliminates the TOCTOU race condition between contains_key and insert.
260            let result = inner.entry(cache_key).or_insert(entry);
261
262            // is_fresh() returns true if the value was just inserted (key didn't exist),
263            // false if the key already existed.
264            Ok(result.is_fresh())
265        }
266    }
267
268    /// Sets a value with the specified TTL.
269    ///
270    /// # Arguments
271    ///
272    /// * `key` - The cache key
273    /// * `value` - The value to store
274    /// * `ttl` - Time-to-live for this entry
275    fn set(
276        &self,
277        key: &[u8],
278        value: &[u8],
279        ttl: Duration,
280    ) -> impl Future<Output = anyhow::Result<()>> + Send {
281        // Single allocation for key, single allocation for value
282        let cache_key = CacheKey::new(key);
283        let entry = CacheEntry {
284            value: Bytes::copy_from_slice(value),
285            expires_at: Instant::now() + ttl,
286        };
287        let inner = Arc::clone(&self.inner);
288
289        async move {
290            inner.insert(cache_key, entry);
291            Ok(())
292        }
293    }
294
295    /// Gets a value from the cache.
296    ///
297    /// # Returns
298    ///
299    /// * `Ok(Some(value))` - The value exists and has not expired
300    /// * `Ok(None)` - The key does not exist or has expired
301    fn get(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<Option<Vec<u8>>>> + Send {
302        // Use borrowed key lookup to avoid allocation on lookup
303        let result = self.inner.get(&BorrowedKey(key)).map(|entry| entry.value);
304        async move { Ok(result.map(|bytes| bytes.to_vec())) }
305    }
306
307    /// Deletes a key from the cache.
308    fn del(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<()>> + Send {
309        // Use borrowed key for invalidation
310        self.inner.invalidate(&BorrowedKey(key));
311        async move { Ok(()) }
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[tokio::test]
320    async fn test_set_and_get() {
321        let cache = LocalCache::new(100, Duration::from_secs(60));
322
323        cache
324            .set(b"key1", b"value1", Duration::from_secs(60))
325            .await
326            .unwrap();
327
328        let result = cache.get(b"key1").await.unwrap();
329        assert_eq!(result, Some(b"value1".to_vec()));
330    }
331
332    #[tokio::test]
333    async fn test_get_nonexistent() {
334        let cache = LocalCache::new(100, Duration::from_secs(60));
335
336        let result = cache.get(b"nonexistent").await.unwrap();
337        assert_eq!(result, None);
338    }
339
340    #[tokio::test]
341    async fn test_set_nx_px_new_key() {
342        let cache = LocalCache::new(100, Duration::from_secs(60));
343
344        let was_set = cache
345            .set_nx_px(b"key1", b"value1", Duration::from_secs(60))
346            .await
347            .unwrap();
348        assert!(was_set, "Expected key to be set (new key)");
349
350        let result = cache.get(b"key1").await.unwrap();
351        assert_eq!(result, Some(b"value1".to_vec()));
352    }
353
354    #[tokio::test]
355    async fn test_set_nx_px_existing_key() {
356        let cache = LocalCache::new(100, Duration::from_secs(60));
357
358        // First set should succeed
359        let was_set1 = cache
360            .set_nx_px(b"key1", b"value1", Duration::from_secs(60))
361            .await
362            .unwrap();
363        assert!(was_set1);
364
365        // Second set should fail (key exists)
366        let was_set2 = cache
367            .set_nx_px(b"key1", b"value2", Duration::from_secs(60))
368            .await
369            .unwrap();
370        assert!(!was_set2, "Expected key NOT to be set (key exists)");
371
372        // Original value should remain
373        let result = cache.get(b"key1").await.unwrap();
374        assert_eq!(result, Some(b"value1".to_vec()));
375    }
376
377    #[tokio::test]
378    async fn test_del() {
379        let cache = LocalCache::new(100, Duration::from_secs(60));
380
381        cache
382            .set(b"key1", b"value1", Duration::from_secs(60))
383            .await
384            .unwrap();
385        cache.del(b"key1").await.unwrap();
386
387        let result = cache.get(b"key1").await.unwrap();
388        assert_eq!(result, None);
389    }
390
391    #[tokio::test]
392    async fn test_contains_sync() {
393        let cache = LocalCache::new(100, Duration::from_secs(60));
394
395        assert!(!cache.contains_sync(b"key1"));
396
397        cache
398            .set(b"key1", b"value1", Duration::from_secs(60))
399            .await
400            .unwrap();
401
402        assert!(cache.contains_sync(b"key1"));
403    }
404
405    #[tokio::test]
406    async fn test_get_sync() {
407        let cache = LocalCache::new(100, Duration::from_secs(60));
408
409        assert!(cache.get_sync(b"key1").is_none());
410
411        cache
412            .set(b"key1", b"value1", Duration::from_secs(60))
413            .await
414            .unwrap();
415
416        let result = cache.get_sync(b"key1");
417        assert_eq!(result, Some(Bytes::from_static(b"value1")));
418    }
419
420    #[tokio::test]
421    async fn test_per_entry_ttl_respected() {
422        let cache = LocalCache::new(100, Duration::from_secs(60));
423
424        // Set with very short TTL
425        cache
426            .set(b"short_ttl", b"value", Duration::from_millis(50))
427            .await
428            .unwrap();
429
430        // Should exist immediately
431        let result = cache.get(b"short_ttl").await.unwrap();
432        assert_eq!(result, Some(b"value".to_vec()));
433
434        // Wait for expiration
435        tokio::time::sleep(Duration::from_millis(100)).await;
436
437        // Should be expired now
438        let result = cache.get(b"short_ttl").await.unwrap();
439        assert_eq!(result, None, "Entry should have expired after TTL");
440    }
441
442    #[tokio::test]
443    async fn test_different_ttls_for_different_keys() {
444        let cache = LocalCache::new(100, Duration::from_secs(60));
445
446        // Set entries with different TTLs
447        cache
448            .set(b"short", b"value1", Duration::from_millis(50))
449            .await
450            .unwrap();
451        cache
452            .set(b"long", b"value2", Duration::from_secs(10))
453            .await
454            .unwrap();
455
456        // Both should exist initially
457        assert!(cache.get(b"short").await.unwrap().is_some());
458        assert!(cache.get(b"long").await.unwrap().is_some());
459
460        // Wait for short TTL to expire
461        tokio::time::sleep(Duration::from_millis(100)).await;
462
463        // Short TTL entry should be gone, long TTL should remain
464        assert!(
465            cache.get(b"short").await.unwrap().is_none(),
466            "Short TTL entry should have expired"
467        );
468        assert!(
469            cache.get(b"long").await.unwrap().is_some(),
470            "Long TTL entry should still exist"
471        );
472    }
473
474    #[tokio::test]
475    async fn test_set_nx_px_ttl_respected() {
476        let cache = LocalCache::new(100, Duration::from_secs(60));
477
478        // Set with very short TTL using set_nx_px
479        let was_set = cache
480            .set_nx_px(b"key", b"value", Duration::from_millis(50))
481            .await
482            .unwrap();
483        assert!(was_set);
484
485        // Wait for expiration
486        tokio::time::sleep(Duration::from_millis(100)).await;
487
488        // Should be expired now
489        let result = cache.get(b"key").await.unwrap();
490        assert_eq!(result, None, "Entry should have expired after TTL");
491
492        // Should be able to set again since key expired
493        let was_set_again = cache
494            .set_nx_px(b"key", b"new_value", Duration::from_secs(60))
495            .await
496            .unwrap();
497        assert!(
498            was_set_again,
499            "Should be able to set after previous entry expired"
500        );
501
502        let result = cache.get(b"key").await.unwrap();
503        assert_eq!(result, Some(b"new_value".to_vec()));
504    }
505}