Skip to main content

rusmes_search/
cache.rs

1//! LRU result cache for the search index.
2//!
3//! Caches `(normalized_query, user)` -> `Vec<MessageId>` pairs to short-circuit
4//! repeated identical queries. Invalidation is global: every write to the index
5//! bumps a version counter, and cache entries stamped with an older version are
6//! treated as stale on lookup.
7//!
8//! The version-stamp approach was chosen over per-user walk-and-evict for two
9//! reasons:
10//! 1. `index_message` does not carry a user identity (the rusmes-search
11//!    `SearchIndex` trait is decoupled from the storage account model), so
12//!    walking the cache by user is not directly possible at the public API.
13//! 2. A stale entry can otherwise miss a freshly-indexed message, breaking the
14//!    primary correctness invariant of the cache.
15//!
16//! Capacity defaults to 256 entries.
17//!
18//! # Concurrency
19//!
20//! The internal `LruCache` is not `Sync` on its own (mutation requires `&mut`),
21//! so it is wrapped in a `parking_lot`-style `std::sync::Mutex`. The version
22//! counter is an `AtomicU64` so writers can bump it without taking the cache
23//! lock.
24
25use lru::LruCache;
26use rusmes_proto::MessageId;
27use std::num::NonZeroUsize;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Mutex;
30
31/// Default cache capacity (entries).
32pub const DEFAULT_CAPACITY: usize = 256;
33
34/// Cache key: normalized query string + user (or empty string for global).
35pub type CacheKey = (String, String);
36
37/// Cached value: list of matching message IDs plus the version stamp at insert
38/// time. A lookup that finds an entry whose version is below the current
39/// invalidation version treats the entry as stale.
40#[derive(Clone, Debug)]
41struct CacheValue {
42    ids: Vec<MessageId>,
43    version: u64,
44}
45
46/// LRU + version-stamped result cache.
47pub struct ResultCache {
48    inner: Mutex<LruCache<CacheKey, CacheValue>>,
49    /// Monotonically increasing invalidation counter. Bumped on every write
50    /// (`index_message` / `delete_message`).
51    version: AtomicU64,
52}
53
54impl ResultCache {
55    /// Create a new cache with the default capacity.
56    pub fn new_default() -> Self {
57        // `DEFAULT_CAPACITY` is non-zero; if a future change sets it to zero,
58        // fall back to a 1-entry cache instead of panicking.
59        let cap = NonZeroUsize::new(DEFAULT_CAPACITY).unwrap_or(NonZeroUsize::MIN);
60        Self::with_capacity(cap)
61    }
62
63    /// Create a new cache with the given capacity.
64    pub fn with_capacity(cap: NonZeroUsize) -> Self {
65        Self {
66            inner: Mutex::new(LruCache::new(cap)),
67            version: AtomicU64::new(0),
68        }
69    }
70
71    /// Normalize a query string: lowercase + collapse whitespace.
72    pub fn normalize_query(query: &str) -> String {
73        let lower = query.to_lowercase();
74        lower.split_whitespace().collect::<Vec<_>>().join(" ")
75    }
76
77    /// Build a cache key from a query and an optional user.
78    pub fn make_key(query: &str, user: Option<&str>) -> CacheKey {
79        (Self::normalize_query(query), user.unwrap_or("").to_string())
80    }
81
82    /// Look up an entry. Returns `Some(ids)` only if the entry exists AND its
83    /// stamp matches the current global version (i.e. it has not been
84    /// invalidated by a write).
85    pub fn get(&self, key: &CacheKey) -> Option<Vec<MessageId>> {
86        let current = self.version.load(Ordering::Acquire);
87        let mut guard = match self.inner.lock() {
88            Ok(g) => g,
89            Err(poisoned) => poisoned.into_inner(),
90        };
91        let value = guard.get(key)?;
92        if value.version == current {
93            Some(value.ids.clone())
94        } else {
95            // Stale: drop it now to free the slot.
96            guard.pop(key);
97            None
98        }
99    }
100
101    /// Insert (or replace) an entry stamped at the current version.
102    pub fn put(&self, key: CacheKey, ids: Vec<MessageId>) {
103        let current = self.version.load(Ordering::Acquire);
104        let mut guard = match self.inner.lock() {
105            Ok(g) => g,
106            Err(poisoned) => poisoned.into_inner(),
107        };
108        guard.put(
109            key,
110            CacheValue {
111                ids,
112                version: current,
113            },
114        );
115    }
116
117    /// Bump the invalidation version. Called after any successful index
118    /// mutation (add or delete). All subsequent lookups will treat existing
119    /// entries as stale until they are re-populated.
120    pub fn invalidate_all(&self) {
121        // wrapping_add so we never panic; the cycle is 2^64 invalidations.
122        self.version.fetch_add(1, Ordering::AcqRel);
123    }
124
125    /// Return the current number of entries in the cache.
126    pub fn len(&self) -> usize {
127        let guard = match self.inner.lock() {
128            Ok(g) => g,
129            Err(poisoned) => poisoned.into_inner(),
130        };
131        guard.len()
132    }
133
134    /// Return the current invalidation version (test helper / observability).
135    pub fn version(&self) -> u64 {
136        self.version.load(Ordering::Acquire)
137    }
138
139    /// Whether the cache currently holds zero entries.
140    pub fn is_empty(&self) -> bool {
141        self.len() == 0
142    }
143}
144
145impl Default for ResultCache {
146    fn default() -> Self {
147        Self::new_default()
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use rusmes_proto::MessageId;
155
156    #[test]
157    fn normalize_lowercases_and_collapses_whitespace() {
158        let n = ResultCache::normalize_query("  Hello   WORLD\t\nfoo  ");
159        assert_eq!(n, "hello world foo");
160    }
161
162    #[test]
163    fn put_get_roundtrip_returns_ids() {
164        let cache = ResultCache::new_default();
165        let key = ResultCache::make_key("hello world", Some("alice"));
166        let id1 = MessageId::new();
167        let id2 = MessageId::new();
168        cache.put(key.clone(), vec![id1, id2]);
169        let hit = cache.get(&key).expect("entry should be present");
170        assert_eq!(hit, vec![id1, id2]);
171    }
172
173    #[test]
174    fn invalidate_all_makes_existing_entries_stale() {
175        let cache = ResultCache::new_default();
176        let key = ResultCache::make_key("q", None);
177        cache.put(key.clone(), vec![MessageId::new()]);
178        assert!(cache.get(&key).is_some());
179        cache.invalidate_all();
180        assert!(cache.get(&key).is_none());
181    }
182
183    #[test]
184    fn key_is_user_aware() {
185        let cache = ResultCache::new_default();
186        let k_alice = ResultCache::make_key("foo", Some("alice"));
187        let k_bob = ResultCache::make_key("foo", Some("bob"));
188        let id = MessageId::new();
189        cache.put(k_alice.clone(), vec![id]);
190        assert!(cache.get(&k_alice).is_some());
191        assert!(cache.get(&k_bob).is_none());
192    }
193
194    #[test]
195    fn make_key_normalizes_query_text() {
196        let k1 = ResultCache::make_key("Hello World", Some("u"));
197        let k2 = ResultCache::make_key("hello   world", Some("u"));
198        assert_eq!(k1, k2);
199    }
200}