Skip to main content

oxirs_arq/jit/
jit_cache.rs

1//! Thread-safe cache of compiled JIT filter functions.
2//!
3//! [`JitFilterCache`] wraps a [`FilterCompiler`] and an LRU-bounded cache keyed by a
4//! `u64` plan fingerprint.  Compilation requires exclusive access (via `Mutex`) because
5//! each [`FilterCompiler::compile`] call creates and finalizes a `JITModule`; cache reads
6//! are lock-free (`RwLock`).
7//!
8//! # Eviction
9//!
10//! When the cache reaches `max_entries`, the **oldest** entry is evicted (FIFO order
11//! using insertion-order tracking via a `VecDeque`).  This is simpler than a full LRU
12//! but sufficient for the hot-path use case where the set of hot queries is small.
13
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16
17use parking_lot::{Mutex, RwLock};
18
19use super::filter_compiler::{
20    CompiledFilter, FilterCompiler, FilterCompilerError, FilterExpr, VarIndexMap,
21};
22
23/// Thread-safe, bounded cache of compiled JIT filter functions.
24///
25/// # Thread safety
26///
27/// - Cache reads use a shared `RwLock` — many readers can proceed concurrently.
28/// - Compilation uses an exclusive `Mutex<FilterCompiler>` — only one thread compiles
29///   at a time, but reads are never blocked during compilation.
30pub struct JitFilterCache {
31    /// Exclusive access for compilation (one `JITModule` at a time).
32    compiler: Mutex<FilterCompiler>,
33    /// Shared read access to the compiled-function cache.
34    cache: RwLock<CacheInner>,
35}
36
37struct CacheInner {
38    map: HashMap<u64, Arc<CompiledFilter>>,
39    /// Insertion-order queue for FIFO eviction.
40    order: VecDeque<u64>,
41    max_entries: usize,
42    /// Total compilations performed (hits excluded).
43    compile_count: usize,
44    /// Total cache hits.
45    hit_count: usize,
46}
47
48impl CacheInner {
49    fn new(max_entries: usize) -> Self {
50        Self {
51            map: HashMap::with_capacity(max_entries.min(256)),
52            order: VecDeque::with_capacity(max_entries.min(256)),
53            max_entries,
54            compile_count: 0,
55            hit_count: 0,
56        }
57    }
58
59    fn get(&mut self, key: u64) -> Option<Arc<CompiledFilter>> {
60        let result = self.map.get(&key).cloned();
61        if result.is_some() {
62            self.hit_count += 1;
63        }
64        result
65    }
66
67    fn insert(&mut self, key: u64, compiled: Arc<CompiledFilter>) {
68        if self.map.contains_key(&key) {
69            // Already inserted by a concurrent compile — do nothing
70            return;
71        }
72        // Evict oldest if at capacity
73        while self.order.len() >= self.max_entries {
74            if let Some(old_key) = self.order.pop_front() {
75                self.map.remove(&old_key);
76            }
77        }
78        self.map.insert(key, compiled);
79        self.order.push_back(key);
80        self.compile_count += 1;
81    }
82}
83
84/// Statistics snapshot from a [`JitFilterCache`].
85#[derive(Debug, Clone, Copy)]
86pub struct CacheStats {
87    /// Number of entries currently in the cache.
88    pub len: usize,
89    /// Maximum number of entries (capacity bound).
90    pub max_entries: usize,
91    /// Total cache hits since creation.
92    pub hit_count: usize,
93    /// Total successful compilations since creation.
94    pub compile_count: usize,
95}
96
97impl JitFilterCache {
98    /// Create a new cache with the given capacity bound.
99    ///
100    /// `max_entries` must be at least 1; values of `0` are clamped to `1`.
101    pub fn new(max_entries: usize) -> Result<Self, FilterCompilerError> {
102        let max_entries = max_entries.max(1);
103        Ok(Self {
104            compiler: Mutex::new(FilterCompiler::new()),
105            cache: RwLock::new(CacheInner::new(max_entries)),
106        })
107    }
108
109    /// Return the compiled filter for `key` if it is already cached.
110    pub fn get(&self, key: u64) -> Option<Arc<CompiledFilter>> {
111        // Acquire write lock so we can atomically read + update hit_count.
112        // The write lock is only contested during compilation (rare); reads are
113        // otherwise uncontested on the hot path because compilations are infrequent.
114        let mut write = self.cache.write();
115        write.get(key)
116    }
117
118    /// Compile `expr` + `var_map` and insert the result under `key`.
119    ///
120    /// If `key` is already in the cache (from a concurrent compile), the existing
121    /// entry is returned without re-compiling.
122    ///
123    /// Returns `Ok(None)` if the expression is not in the JIT-supported subset.
124    pub fn compile_and_insert(
125        &self,
126        key: u64,
127        expr: &FilterExpr,
128        var_map: VarIndexMap,
129    ) -> Result<Option<Arc<CompiledFilter>>, FilterCompilerError> {
130        // Check again before compiling (double-checked locking)
131        {
132            let mut write = self.cache.write();
133            if let Some(existing) = write.get(key) {
134                return Ok(Some(existing));
135            }
136        }
137
138        // Compile (exclusive lock — one compilation at a time)
139        let compiled_opt = {
140            let compiler = self.compiler.lock();
141            compiler.compile(expr, var_map)?
142        };
143
144        match compiled_opt {
145            None => Ok(None),
146            Some(compiled) => {
147                let arc = Arc::new(compiled);
148                let mut write = self.cache.write();
149                write.insert(key, arc.clone());
150                Ok(Some(arc))
151            }
152        }
153    }
154
155    /// Return the number of entries currently in the cache.
156    pub fn len(&self) -> usize {
157        self.cache.read().map.len()
158    }
159
160    /// Return `true` if the cache is empty.
161    pub fn is_empty(&self) -> bool {
162        self.len() == 0
163    }
164
165    /// Return a snapshot of cache statistics.
166    pub fn stats(&self) -> CacheStats {
167        let inner = self.cache.read();
168        CacheStats {
169            len: inner.map.len(),
170            max_entries: inner.max_entries,
171            hit_count: inner.hit_count,
172            compile_count: inner.compile_count,
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use crate::jit::filter_compiler::{BinOp, FilterExpr};
181
182    fn simple_gt_expr() -> (FilterExpr, VarIndexMap) {
183        let mut vm = VarIndexMap::new();
184        vm.insert("x".to_string(), 0);
185        let expr = FilterExpr::BinOp {
186            op: BinOp::Gt,
187            left: Box::new(FilterExpr::Variable("x".to_string())),
188            right: Box::new(FilterExpr::Literal(3.0)),
189        };
190        (expr, vm)
191    }
192
193    #[test]
194    fn cache_starts_empty() {
195        let cache = JitFilterCache::new(16).expect("cache init");
196        assert!(cache.is_empty());
197        assert_eq!(cache.len(), 0);
198    }
199
200    #[test]
201    fn get_before_insert_returns_none() {
202        let cache = JitFilterCache::new(16).expect("cache init");
203        assert!(cache.get(12345).is_none());
204    }
205
206    #[test]
207    fn insert_then_get_hit() {
208        let cache = JitFilterCache::new(16).expect("cache init");
209        let (expr, vm) = simple_gt_expr();
210        let compiled = cache
211            .compile_and_insert(42, &expr, vm)
212            .expect("compile ok")
213            .expect("filter compiled (not unsupported)");
214        assert_eq!(cache.len(), 1);
215
216        let hit = cache.get(42).expect("should be in cache");
217        assert!(Arc::ptr_eq(&compiled, &hit));
218    }
219
220    #[test]
221    fn eviction_at_capacity() {
222        let cache = JitFilterCache::new(2).expect("cache init");
223        let (expr1, vm1) = simple_gt_expr();
224        let (expr2, vm2) = simple_gt_expr();
225        let mut vm3 = VarIndexMap::new();
226        vm3.insert("y".to_string(), 0);
227        let expr3 = FilterExpr::BinOp {
228            op: BinOp::Lt,
229            left: Box::new(FilterExpr::Variable("y".to_string())),
230            right: Box::new(FilterExpr::Literal(10.0)),
231        };
232
233        cache.compile_and_insert(1, &expr1, vm1).expect("ok");
234        cache.compile_and_insert(2, &expr2, vm2).expect("ok");
235        assert_eq!(cache.len(), 2);
236
237        // Inserting key 3 should evict key 1 (oldest)
238        cache.compile_and_insert(3, &expr3, vm3).expect("ok");
239        assert_eq!(cache.len(), 2);
240        assert!(cache.get(1).is_none(), "key 1 should have been evicted");
241        assert!(cache.get(2).is_some());
242        assert!(cache.get(3).is_some());
243    }
244
245    #[test]
246    fn stats_tracks_hits_and_compiles() {
247        let cache = JitFilterCache::new(16).expect("cache init");
248        let (expr, vm) = simple_gt_expr();
249        cache.compile_and_insert(99, &expr, vm).expect("compile ok");
250        let _ = cache.get(99);
251        let _ = cache.get(99);
252
253        let stats = cache.stats();
254        assert_eq!(stats.compile_count, 1);
255        assert!(stats.hit_count >= 1);
256    }
257}