Skip to main content

oxibonsai_runtime/
dedup.rs

1//! Request deduplication: cache identical requests to avoid redundant inference.
2//!
3//! Uses content hashing for exact deduplication and optional
4//! fuzzy matching for near-duplicate detection.
5//!
6//! # Overview
7//!
8//! The [`DedupCache`] computes an FNV-1a hash over the serialized request content
9//! (or a concatenation of role+content pairs for message lists) and uses that as
10//! the cache key.  On a hit the cached response is returned immediately, bypassing
11//! the inference pipeline entirely.  Entries carry a TTL and are lazily evicted on
12//! access as well as via the explicit [`DedupCache::evict_expired`] method.
13//!
14//! # Example
15//!
16//! ```rust
17//! use std::time::Duration;
18//! use oxibonsai_runtime::dedup::{DedupCache, RequestKey};
19//!
20//! let mut cache = DedupCache::with_capacity(128);
21//! let key = RequestKey::from_str("What is Rust?");
22//! cache.insert(key.clone(), "Rust is a systems language.".to_string());
23//!
24//! assert_eq!(cache.get(&key), Some("Rust is a systems language."));
25//! ```
26
27use std::collections::HashMap;
28use std::time::{Duration, Instant};
29
30// ─────────────────────────────────────────────────────────────────────────────
31// FNV-1a helpers (inline, no external crate)
32// ─────────────────────────────────────────────────────────────────────────────
33
34/// FNV-1a 64-bit offset basis.
35const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
36/// FNV-1a 64-bit prime.
37const FNV_PRIME: u64 = 1099511628211;
38
39/// Compute an FNV-1a 64-bit hash over arbitrary bytes.
40#[inline]
41fn fnv1a_hash(bytes: &[u8]) -> u64 {
42    let mut hash = FNV_OFFSET_BASIS;
43    for &byte in bytes {
44        hash ^= u64::from(byte);
45        hash = hash.wrapping_mul(FNV_PRIME);
46    }
47    hash
48}
49
50// ─────────────────────────────────────────────────────────────────────────────
51// RequestKey
52// ─────────────────────────────────────────────────────────────────────────────
53
54/// A hashed request key (FNV-1a of serialized request content).
55///
56/// Two [`RequestKey`] values are equal iff the underlying hash values are equal.
57/// Collisions are theoretically possible but extremely rare for the prompt sizes
58/// typical in LLM serving.
59#[derive(Debug, Clone, PartialEq, Eq, Hash)]
60pub struct RequestKey(pub u64);
61
62impl RequestKey {
63    /// Compute the key from a serialized request string.
64    ///
65    /// # Example
66    /// ```
67    /// use oxibonsai_runtime::dedup::RequestKey;
68    /// let k1 = RequestKey::from_str("hello world");
69    /// let k2 = RequestKey::from_str("hello world");
70    /// assert_eq!(k1, k2);
71    /// ```
72    #[allow(clippy::should_implement_trait)]
73    pub fn from_str(s: &str) -> Self {
74        Self(fnv1a_hash(s.as_bytes()))
75    }
76
77    /// Compute from a message list: FNV-1a over the concatenation of
78    /// `role + "\x00" + content + "\x01"` for every message pair, in order.
79    ///
80    /// The sentinel bytes (`\x00` and `\x01`) prevent role/content boundary
81    /// collisions (e.g. `("ab", "c")` vs `("a", "bc")`).
82    ///
83    /// # Example
84    /// ```
85    /// use oxibonsai_runtime::dedup::RequestKey;
86    /// let msgs = [("user", "What is Rust?")];
87    /// let k = RequestKey::from_messages(&msgs);
88    /// assert_eq!(k, RequestKey::from_messages(&msgs));
89    /// ```
90    pub fn from_messages(messages: &[(&str, &str)]) -> Self {
91        let mut hash = FNV_OFFSET_BASIS;
92        for (role, content) in messages {
93            for &byte in role.as_bytes() {
94                hash ^= u64::from(byte);
95                hash = hash.wrapping_mul(FNV_PRIME);
96            }
97            // Separator between role and content
98            hash ^= 0x00;
99            hash = hash.wrapping_mul(FNV_PRIME);
100            for &byte in content.as_bytes() {
101                hash ^= u64::from(byte);
102                hash = hash.wrapping_mul(FNV_PRIME);
103            }
104            // Separator between messages
105            hash ^= 0x01;
106            hash = hash.wrapping_mul(FNV_PRIME);
107        }
108        Self(hash)
109    }
110
111    /// Return the raw 64-bit hash value.
112    pub fn value(&self) -> u64 {
113        self.0
114    }
115}
116
117// ─────────────────────────────────────────────────────────────────────────────
118// CachedResponse
119// ─────────────────────────────────────────────────────────────────────────────
120
121/// A cached response entry stored in the [`DedupCache`].
122#[derive(Debug, Clone)]
123pub struct CachedResponse {
124    /// The cached response text.
125    pub content: String,
126    /// When this entry was first created.
127    pub created_at: Instant,
128    /// How many times this entry has been returned as a cache hit.
129    pub hit_count: u64,
130    /// How long this entry is considered valid.
131    pub ttl: Duration,
132}
133
134impl CachedResponse {
135    /// Create a new entry with `hit_count = 0`.
136    pub fn new(content: String, ttl: Duration) -> Self {
137        Self {
138            content,
139            created_at: Instant::now(),
140            hit_count: 0,
141            ttl,
142        }
143    }
144
145    /// Returns `true` if this entry's TTL has been exceeded.
146    pub fn is_expired(&self) -> bool {
147        self.created_at.elapsed() > self.ttl
148    }
149
150    /// Increment the hit counter by one.
151    pub fn record_hit(&mut self) {
152        self.hit_count += 1;
153    }
154}
155
156// ─────────────────────────────────────────────────────────────────────────────
157// DedupStats
158// ─────────────────────────────────────────────────────────────────────────────
159
160/// Aggregate statistics for the deduplication cache.
161///
162/// All counters are monotonically increasing; they are never reset unless
163/// [`DedupCache::clear`] is called.
164#[derive(Debug, Clone, Default)]
165pub struct DedupStats {
166    /// Total number of [`DedupCache::get`] calls (hits + misses).
167    pub total_requests: u64,
168    /// Number of lookups that returned a cached response.
169    pub cache_hits: u64,
170    /// Number of lookups that found no valid entry.
171    pub cache_misses: u64,
172    /// Number of entries evicted due to capacity overflow or TTL expiry.
173    pub evictions: u64,
174}
175
176impl DedupStats {
177    /// Fraction of lookups that were cache hits, in `[0.0, 1.0]`.
178    ///
179    /// Returns `0.0` if no requests have been made yet.
180    pub fn hit_rate(&self) -> f64 {
181        if self.total_requests == 0 {
182            0.0
183        } else {
184            self.cache_hits as f64 / self.total_requests as f64
185        }
186    }
187
188    /// Human-readable one-line summary of the statistics.
189    pub fn summary(&self) -> String {
190        format!(
191            "requests={} hits={} misses={} evictions={} hit_rate={:.1}%",
192            self.total_requests,
193            self.cache_hits,
194            self.cache_misses,
195            self.evictions,
196            self.hit_rate() * 100.0,
197        )
198    }
199}
200
201// ─────────────────────────────────────────────────────────────────────────────
202// DedupCache
203// ─────────────────────────────────────────────────────────────────────────────
204
205/// Request deduplication cache.
206///
207/// Stores responses keyed by [`RequestKey`].  When the cache is full, the
208/// oldest entry (by insertion order, tracked via a monotonic sequence counter)
209/// is evicted to make room for the new one.
210///
211/// # Thread safety
212///
213/// `DedupCache` is **not** `Sync` by design; wrap it in a `Mutex` or
214/// `RwLock` when sharing across threads.
215pub struct DedupCache {
216    /// The backing store: key → (entry, insertion_seq).
217    cache: HashMap<RequestKey, (CachedResponse, u64)>,
218    /// Maximum number of entries before eviction kicks in.
219    capacity: usize,
220    /// Default TTL used by [`DedupCache::insert`].
221    default_ttl: Duration,
222    /// Aggregate statistics.
223    stats: DedupStats,
224    /// Monotonically increasing insertion counter.
225    next_seq: u64,
226}
227
228impl DedupCache {
229    /// Create a new cache with the given `capacity` and `default_ttl`.
230    pub fn new(capacity: usize, default_ttl: Duration) -> Self {
231        Self {
232            cache: HashMap::new(),
233            capacity,
234            default_ttl,
235            stats: DedupStats::default(),
236            next_seq: 0,
237        }
238    }
239
240    /// Shorthand constructor with a 60-second default TTL.
241    pub fn with_capacity(n: usize) -> Self {
242        Self::new(n, Duration::from_secs(60))
243    }
244
245    /// Look up a cached response.
246    ///
247    /// Returns `None` if the key is not present or if the cached entry has
248    /// expired.  On a valid hit the `hit_count` of the entry is incremented
249    /// and the response string slice is returned.
250    pub fn get(&mut self, key: &RequestKey) -> Option<&str> {
251        self.stats.total_requests += 1;
252
253        // Check for expiry first without borrowing mutably into the hit path.
254        let expired = self
255            .cache
256            .get(key)
257            .map(|(entry, _)| entry.is_expired())
258            .unwrap_or(false);
259
260        if expired {
261            self.cache.remove(key);
262            self.stats.cache_misses += 1;
263            self.stats.evictions += 1;
264            return None;
265        }
266
267        match self.cache.get_mut(key) {
268            Some((entry, _seq)) => {
269                entry.record_hit();
270                self.stats.cache_hits += 1;
271                // Return a reference to the content inside the map.
272                // SAFETY: The entry lives as long as self.
273                Some(self.cache[key].0.content.as_str())
274            }
275            None => {
276                self.stats.cache_misses += 1;
277                None
278            }
279        }
280    }
281
282    /// Insert a response with the `DedupCache::default_ttl`.
283    ///
284    /// If the cache is at capacity, the entry with the smallest insertion
285    /// sequence number (i.e. the oldest entry) is evicted first.
286    pub fn insert(&mut self, key: RequestKey, response: String) {
287        let ttl = self.default_ttl;
288        self.insert_with_ttl(key, response, ttl);
289    }
290
291    /// Insert a response with a custom `ttl`.
292    ///
293    /// Evicts the oldest entry when the cache is at capacity before inserting.
294    pub fn insert_with_ttl(&mut self, key: RequestKey, response: String, ttl: Duration) {
295        // If key already exists, just update it in-place.
296        if self.cache.contains_key(&key) {
297            let seq = self.next_seq;
298            self.next_seq += 1;
299            let entry = CachedResponse::new(response, ttl);
300            self.cache.insert(key, (entry, seq));
301            return;
302        }
303
304        // Evict the oldest entry when at capacity.
305        if self.cache.len() >= self.capacity {
306            self.evict_oldest();
307        }
308
309        let seq = self.next_seq;
310        self.next_seq += 1;
311        let entry = CachedResponse::new(response, ttl);
312        self.cache.insert(key, (entry, seq));
313    }
314
315    /// Remove all expired entries.
316    ///
317    /// Returns the number of entries removed.
318    pub fn evict_expired(&mut self) -> usize {
319        let before = self.cache.len();
320        self.cache.retain(|_, (entry, _)| !entry.is_expired());
321        let removed = before - self.cache.len();
322        self.stats.evictions += removed as u64;
323        removed
324    }
325
326    /// Number of entries currently in the cache.
327    pub fn len(&self) -> usize {
328        self.cache.len()
329    }
330
331    /// Returns `true` if the cache contains no entries.
332    pub fn is_empty(&self) -> bool {
333        self.cache.is_empty()
334    }
335
336    /// Reference to the current statistics snapshot.
337    pub fn stats(&self) -> &DedupStats {
338        &self.stats
339    }
340
341    /// Remove all entries and reset statistics.
342    pub fn clear(&mut self) {
343        self.cache.clear();
344        self.stats = DedupStats::default();
345        self.next_seq = 0;
346    }
347
348    // ── Private helpers ───────────────────────────────────────────────────────
349
350    /// Evict the entry with the smallest insertion sequence number.
351    fn evict_oldest(&mut self) {
352        // Find the key with the minimum sequence number.
353        let oldest_key = self
354            .cache
355            .iter()
356            .min_by_key(|(_, (_, seq))| *seq)
357            .map(|(k, _)| k.clone());
358
359        if let Some(key) = oldest_key {
360            self.cache.remove(&key);
361            self.stats.evictions += 1;
362        }
363    }
364}
365
366// ─────────────────────────────────────────────────────────────────────────────
367// Tests (unit, inline)
368// ─────────────────────────────────────────────────────────────────────────────
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    #[test]
375    fn request_key_deterministic() {
376        let k1 = RequestKey::from_str("hello");
377        let k2 = RequestKey::from_str("hello");
378        assert_eq!(k1, k2);
379    }
380
381    #[test]
382    fn request_key_different_inputs() {
383        let k1 = RequestKey::from_str("foo");
384        let k2 = RequestKey::from_str("bar");
385        assert_ne!(k1, k2);
386    }
387
388    #[test]
389    fn cached_response_not_expired_immediately() {
390        let r = CachedResponse::new("hi".to_string(), Duration::from_secs(60));
391        assert!(!r.is_expired());
392    }
393
394    #[test]
395    fn dedup_cache_basic_insert_get() {
396        let mut cache = DedupCache::with_capacity(10);
397        let key = RequestKey::from_str("test");
398        cache.insert(key.clone(), "response".to_string());
399        assert_eq!(cache.get(&key), Some("response"));
400    }
401
402    #[test]
403    fn dedup_stats_hit_rate_zero_on_empty() {
404        let stats = DedupStats::default();
405        assert!((stats.hit_rate() - 0.0).abs() < f64::EPSILON);
406    }
407}