oxibonsai_runtime/dedup.rs
1//! Request deduplication: cache identical requests to avoid redundant inference.
2//!
3//! Uses content hashing for exact deduplication and optional
4//! fuzzy matching for near-duplicate detection.
5//!
6//! # Overview
7//!
8//! The [`DedupCache`] computes an FNV-1a hash over the serialized request content
9//! (or a concatenation of role+content pairs for message lists) and uses that as
10//! the cache key. On a hit the cached response is returned immediately, bypassing
11//! the inference pipeline entirely. Entries carry a TTL and are lazily evicted on
12//! access as well as via the explicit [`DedupCache::evict_expired`] method.
13//!
14//! # Example
15//!
16//! ```rust
17//! use std::time::Duration;
18//! use oxibonsai_runtime::dedup::{DedupCache, RequestKey};
19//!
20//! let mut cache = DedupCache::with_capacity(128);
21//! let key = RequestKey::from_str("What is Rust?");
22//! cache.insert(key.clone(), "Rust is a systems language.".to_string());
23//!
24//! assert_eq!(cache.get(&key), Some("Rust is a systems language."));
25//! ```
26
27use std::collections::HashMap;
28use std::time::{Duration, Instant};
29
30// ─────────────────────────────────────────────────────────────────────────────
31// FNV-1a helpers (inline, no external crate)
32// ─────────────────────────────────────────────────────────────────────────────
33
34/// FNV-1a 64-bit offset basis.
35const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
36/// FNV-1a 64-bit prime.
37const FNV_PRIME: u64 = 1099511628211;
38
39/// Compute an FNV-1a 64-bit hash over arbitrary bytes.
40#[inline]
41fn fnv1a_hash(bytes: &[u8]) -> u64 {
42 let mut hash = FNV_OFFSET_BASIS;
43 for &byte in bytes {
44 hash ^= u64::from(byte);
45 hash = hash.wrapping_mul(FNV_PRIME);
46 }
47 hash
48}
49
50// ─────────────────────────────────────────────────────────────────────────────
51// RequestKey
52// ─────────────────────────────────────────────────────────────────────────────
53
54/// A hashed request key (FNV-1a of serialized request content).
55///
56/// Two [`RequestKey`] values are equal iff the underlying hash values are equal.
57/// Collisions are theoretically possible but extremely rare for the prompt sizes
58/// typical in LLM serving.
59#[derive(Debug, Clone, PartialEq, Eq, Hash)]
60pub struct RequestKey(pub u64);
61
62impl RequestKey {
63 /// Compute the key from a serialized request string.
64 ///
65 /// # Example
66 /// ```
67 /// use oxibonsai_runtime::dedup::RequestKey;
68 /// let k1 = RequestKey::from_str("hello world");
69 /// let k2 = RequestKey::from_str("hello world");
70 /// assert_eq!(k1, k2);
71 /// ```
72 #[allow(clippy::should_implement_trait)]
73 pub fn from_str(s: &str) -> Self {
74 Self(fnv1a_hash(s.as_bytes()))
75 }
76
77 /// Compute from a message list: FNV-1a over the concatenation of
78 /// `role + "\x00" + content + "\x01"` for every message pair, in order.
79 ///
80 /// The sentinel bytes (`\x00` and `\x01`) prevent role/content boundary
81 /// collisions (e.g. `("ab", "c")` vs `("a", "bc")`).
82 ///
83 /// # Example
84 /// ```
85 /// use oxibonsai_runtime::dedup::RequestKey;
86 /// let msgs = [("user", "What is Rust?")];
87 /// let k = RequestKey::from_messages(&msgs);
88 /// assert_eq!(k, RequestKey::from_messages(&msgs));
89 /// ```
90 pub fn from_messages(messages: &[(&str, &str)]) -> Self {
91 let mut hash = FNV_OFFSET_BASIS;
92 for (role, content) in messages {
93 for &byte in role.as_bytes() {
94 hash ^= u64::from(byte);
95 hash = hash.wrapping_mul(FNV_PRIME);
96 }
97 // Separator between role and content
98 hash ^= 0x00;
99 hash = hash.wrapping_mul(FNV_PRIME);
100 for &byte in content.as_bytes() {
101 hash ^= u64::from(byte);
102 hash = hash.wrapping_mul(FNV_PRIME);
103 }
104 // Separator between messages
105 hash ^= 0x01;
106 hash = hash.wrapping_mul(FNV_PRIME);
107 }
108 Self(hash)
109 }
110
111 /// Return the raw 64-bit hash value.
112 pub fn value(&self) -> u64 {
113 self.0
114 }
115}
116
117// ─────────────────────────────────────────────────────────────────────────────
118// CachedResponse
119// ─────────────────────────────────────────────────────────────────────────────
120
121/// A cached response entry stored in the [`DedupCache`].
122#[derive(Debug, Clone)]
123pub struct CachedResponse {
124 /// The cached response text.
125 pub content: String,
126 /// When this entry was first created.
127 pub created_at: Instant,
128 /// How many times this entry has been returned as a cache hit.
129 pub hit_count: u64,
130 /// How long this entry is considered valid.
131 pub ttl: Duration,
132}
133
134impl CachedResponse {
135 /// Create a new entry with `hit_count = 0`.
136 pub fn new(content: String, ttl: Duration) -> Self {
137 Self {
138 content,
139 created_at: Instant::now(),
140 hit_count: 0,
141 ttl,
142 }
143 }
144
145 /// Returns `true` if this entry's TTL has been exceeded.
146 pub fn is_expired(&self) -> bool {
147 self.created_at.elapsed() > self.ttl
148 }
149
150 /// Increment the hit counter by one.
151 pub fn record_hit(&mut self) {
152 self.hit_count += 1;
153 }
154}
155
156// ─────────────────────────────────────────────────────────────────────────────
157// DedupStats
158// ─────────────────────────────────────────────────────────────────────────────
159
160/// Aggregate statistics for the deduplication cache.
161///
162/// All counters are monotonically increasing; they are never reset unless
163/// [`DedupCache::clear`] is called.
164#[derive(Debug, Clone, Default)]
165pub struct DedupStats {
166 /// Total number of [`DedupCache::get`] calls (hits + misses).
167 pub total_requests: u64,
168 /// Number of lookups that returned a cached response.
169 pub cache_hits: u64,
170 /// Number of lookups that found no valid entry.
171 pub cache_misses: u64,
172 /// Number of entries evicted due to capacity overflow or TTL expiry.
173 pub evictions: u64,
174}
175
176impl DedupStats {
177 /// Fraction of lookups that were cache hits, in `[0.0, 1.0]`.
178 ///
179 /// Returns `0.0` if no requests have been made yet.
180 pub fn hit_rate(&self) -> f64 {
181 if self.total_requests == 0 {
182 0.0
183 } else {
184 self.cache_hits as f64 / self.total_requests as f64
185 }
186 }
187
188 /// Human-readable one-line summary of the statistics.
189 pub fn summary(&self) -> String {
190 format!(
191 "requests={} hits={} misses={} evictions={} hit_rate={:.1}%",
192 self.total_requests,
193 self.cache_hits,
194 self.cache_misses,
195 self.evictions,
196 self.hit_rate() * 100.0,
197 )
198 }
199}
200
201// ─────────────────────────────────────────────────────────────────────────────
202// DedupCache
203// ─────────────────────────────────────────────────────────────────────────────
204
205/// Request deduplication cache.
206///
207/// Stores responses keyed by [`RequestKey`]. When the cache is full, the
208/// oldest entry (by insertion order, tracked via a monotonic sequence counter)
209/// is evicted to make room for the new one.
210///
211/// # Thread safety
212///
213/// `DedupCache` is **not** `Sync` by design; wrap it in a `Mutex` or
214/// `RwLock` when sharing across threads.
215pub struct DedupCache {
216 /// The backing store: key → (entry, insertion_seq).
217 cache: HashMap<RequestKey, (CachedResponse, u64)>,
218 /// Maximum number of entries before eviction kicks in.
219 capacity: usize,
220 /// Default TTL used by [`DedupCache::insert`].
221 default_ttl: Duration,
222 /// Aggregate statistics.
223 stats: DedupStats,
224 /// Monotonically increasing insertion counter.
225 next_seq: u64,
226}
227
228impl DedupCache {
229 /// Create a new cache with the given `capacity` and `default_ttl`.
230 pub fn new(capacity: usize, default_ttl: Duration) -> Self {
231 Self {
232 cache: HashMap::new(),
233 capacity,
234 default_ttl,
235 stats: DedupStats::default(),
236 next_seq: 0,
237 }
238 }
239
240 /// Shorthand constructor with a 60-second default TTL.
241 pub fn with_capacity(n: usize) -> Self {
242 Self::new(n, Duration::from_secs(60))
243 }
244
245 /// Look up a cached response.
246 ///
247 /// Returns `None` if the key is not present or if the cached entry has
248 /// expired. On a valid hit the `hit_count` of the entry is incremented
249 /// and the response string slice is returned.
250 pub fn get(&mut self, key: &RequestKey) -> Option<&str> {
251 self.stats.total_requests += 1;
252
253 // Check for expiry first without borrowing mutably into the hit path.
254 let expired = self
255 .cache
256 .get(key)
257 .map(|(entry, _)| entry.is_expired())
258 .unwrap_or(false);
259
260 if expired {
261 self.cache.remove(key);
262 self.stats.cache_misses += 1;
263 self.stats.evictions += 1;
264 return None;
265 }
266
267 match self.cache.get_mut(key) {
268 Some((entry, _seq)) => {
269 entry.record_hit();
270 self.stats.cache_hits += 1;
271 // Return a reference to the content inside the map.
272 // SAFETY: The entry lives as long as self.
273 Some(self.cache[key].0.content.as_str())
274 }
275 None => {
276 self.stats.cache_misses += 1;
277 None
278 }
279 }
280 }
281
282 /// Insert a response with the `DedupCache::default_ttl`.
283 ///
284 /// If the cache is at capacity, the entry with the smallest insertion
285 /// sequence number (i.e. the oldest entry) is evicted first.
286 pub fn insert(&mut self, key: RequestKey, response: String) {
287 let ttl = self.default_ttl;
288 self.insert_with_ttl(key, response, ttl);
289 }
290
291 /// Insert a response with a custom `ttl`.
292 ///
293 /// Evicts the oldest entry when the cache is at capacity before inserting.
294 pub fn insert_with_ttl(&mut self, key: RequestKey, response: String, ttl: Duration) {
295 // If key already exists, just update it in-place.
296 if self.cache.contains_key(&key) {
297 let seq = self.next_seq;
298 self.next_seq += 1;
299 let entry = CachedResponse::new(response, ttl);
300 self.cache.insert(key, (entry, seq));
301 return;
302 }
303
304 // Evict the oldest entry when at capacity.
305 if self.cache.len() >= self.capacity {
306 self.evict_oldest();
307 }
308
309 let seq = self.next_seq;
310 self.next_seq += 1;
311 let entry = CachedResponse::new(response, ttl);
312 self.cache.insert(key, (entry, seq));
313 }
314
315 /// Remove all expired entries.
316 ///
317 /// Returns the number of entries removed.
318 pub fn evict_expired(&mut self) -> usize {
319 let before = self.cache.len();
320 self.cache.retain(|_, (entry, _)| !entry.is_expired());
321 let removed = before - self.cache.len();
322 self.stats.evictions += removed as u64;
323 removed
324 }
325
326 /// Number of entries currently in the cache.
327 pub fn len(&self) -> usize {
328 self.cache.len()
329 }
330
331 /// Returns `true` if the cache contains no entries.
332 pub fn is_empty(&self) -> bool {
333 self.cache.is_empty()
334 }
335
336 /// Reference to the current statistics snapshot.
337 pub fn stats(&self) -> &DedupStats {
338 &self.stats
339 }
340
341 /// Remove all entries and reset statistics.
342 pub fn clear(&mut self) {
343 self.cache.clear();
344 self.stats = DedupStats::default();
345 self.next_seq = 0;
346 }
347
348 // ── Private helpers ───────────────────────────────────────────────────────
349
350 /// Evict the entry with the smallest insertion sequence number.
351 fn evict_oldest(&mut self) {
352 // Find the key with the minimum sequence number.
353 let oldest_key = self
354 .cache
355 .iter()
356 .min_by_key(|(_, (_, seq))| *seq)
357 .map(|(k, _)| k.clone());
358
359 if let Some(key) = oldest_key {
360 self.cache.remove(&key);
361 self.stats.evictions += 1;
362 }
363 }
364}
365
366// ─────────────────────────────────────────────────────────────────────────────
367// Tests (unit, inline)
368// ─────────────────────────────────────────────────────────────────────────────
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 #[test]
375 fn request_key_deterministic() {
376 let k1 = RequestKey::from_str("hello");
377 let k2 = RequestKey::from_str("hello");
378 assert_eq!(k1, k2);
379 }
380
381 #[test]
382 fn request_key_different_inputs() {
383 let k1 = RequestKey::from_str("foo");
384 let k2 = RequestKey::from_str("bar");
385 assert_ne!(k1, k2);
386 }
387
388 #[test]
389 fn cached_response_not_expired_immediately() {
390 let r = CachedResponse::new("hi".to_string(), Duration::from_secs(60));
391 assert!(!r.is_expired());
392 }
393
394 #[test]
395 fn dedup_cache_basic_insert_get() {
396 let mut cache = DedupCache::with_capacity(10);
397 let key = RequestKey::from_str("test");
398 cache.insert(key.clone(), "response".to_string());
399 assert_eq!(cache.get(&key), Some("response"));
400 }
401
402 #[test]
403 fn dedup_stats_hit_rate_zero_on_empty() {
404 let stats = DedupStats::default();
405 assert!((stats.hit_rate() - 0.0).abs() < f64::EPSILON);
406 }
407}