Skip to main content

allsource_core/infrastructure/persistence/
tenant_loader.rs

1//! Per-tenant lazy-load bookkeeping (Step 2 of the sustainable data
2//! strategy).
3//!
4//! Boot used to read every Parquet file into a single shared
5//! in-memory pile. Once the dataset crossed the available memory
6//! cap, Core OOM'd during recovery (issue #160). Step 2 replaces
7//! the boot-time full-load with on-demand per-tenant hydration:
8//! the first query for tenant `T` triggers `load_events_for_tenant(T)`,
9//! and subsequent queries hit the now-warm cache.
10//!
11//! This module owns the bookkeeping — which tenants are loaded, and a
12//! per-tenant lock so two concurrent first-queries for the same
13//! tenant don't double-load. The actual disk read lives in
14//! `ParquetStorage::load_events_for_tenant`; the side-effect of
15//! splicing loaded events into the in-memory index/projection
16//! structures lives in `EventStore::ensure_tenant_loaded`.
17//!
18//! Step 3 will add a memory-budgeted cache that evicts cold tenants;
19//! at that point the `loaded` set becomes "currently in memory"
20//! rather than "ever loaded since boot", and a `mark_unloaded`
21//! method shows up.
22
23use dashmap::DashMap;
24use parking_lot::Mutex;
25use std::{
26    sync::{
27        Arc,
28        atomic::{AtomicU64, Ordering},
29    },
30    time::{Duration, Instant},
31};
32
33/// Default ceiling on how long a query may wait for an in-flight
34/// load of the same tenant before giving up. The 100k-event load
35/// budget for cold tenants is single-digit seconds (per the Step 2
36/// acceptance criteria); 30s is comfortable headroom for that, well
37/// short of a request-timeout indistinguishable from a hang.
38pub const DEFAULT_LOAD_TIMEOUT: Duration = Duration::from_secs(30);
39
40/// Tracks which tenants have been hydrated into memory, serializes
41/// concurrent first-loads of the same tenant, and accounts for the
42/// approximate bytes each loaded tenant occupies (Step 3 cache
43/// budget input).
44///
45/// `loaded` is the source of truth for "is this tenant warm?". It's
46/// insert-only until Step 3 #2's `mark_unloaded` (eviction) lands.
47///
48/// `locks` holds one Mutex per tenant ever queried. Concurrent
49/// first-queries get the same Mutex; the second waiter re-checks
50/// `loaded` after acquiring it (the standard double-checked-lock
51/// pattern, sound here because `loaded`'s DashMap insert
52/// happens-before the lock release).
53///
54/// `bytes` accumulates per-tenant resident-byte estimates as
55/// events get spliced in via `append_loaded_event`. It's the input
56/// the budget-enforcement step (Step 3 #3) keys off — the LRU
57/// eviction policy compares the sum of these counters against the
58/// configured byte budget.
59pub struct TenantLoader {
60    loaded: DashMap<String, ()>,
61    locks: DashMap<String, Arc<Mutex<()>>>,
62    bytes: DashMap<String, u64>,
63    /// Last-used Instant per tenant — refreshed on every query
64    /// via `touch`. The eviction policy (Step 3 #3) picks the
65    /// tenant with the oldest entry as the LRU victim.
66    last_used: DashMap<String, Instant>,
67    /// Cache byte budget. 0 disables enforcement (keep every
68    /// loaded tenant resident). Production reads this from the
69    /// `ALLSOURCE_CACHE_BYTES` env var.
70    byte_budget: AtomicU64,
71    load_timeout: Duration,
72}
73
74impl TenantLoader {
75    pub fn new() -> Self {
76        Self::with_timeout(DEFAULT_LOAD_TIMEOUT)
77    }
78
79    pub fn with_timeout(load_timeout: Duration) -> Self {
80        Self {
81            loaded: DashMap::new(),
82            locks: DashMap::new(),
83            bytes: DashMap::new(),
84            last_used: DashMap::new(),
85            byte_budget: AtomicU64::new(0),
86            load_timeout,
87        }
88    }
89
90    /// Set the cache byte budget. 0 disables enforcement.
91    pub fn set_byte_budget(&self, budget: u64) {
92        self.byte_budget.store(budget, Ordering::Relaxed);
93    }
94
95    pub fn byte_budget(&self) -> u64 {
96        self.byte_budget.load(Ordering::Relaxed)
97    }
98
99    /// True iff a budget is set AND total resident bytes exceed it.
100    pub fn over_budget(&self) -> bool {
101        let b = self.byte_budget();
102        b != 0 && self.total_bytes() > b
103    }
104
105    /// Mark `tenant_id` as just-used (LRU-Y end of the order).
106    /// Cheap — single DashMap insert. Called on every query that
107    /// touches this tenant. If the eviction policy picks an LRU
108    /// victim, the most-recently-touched tenants are last to go.
109    pub fn touch(&self, tenant_id: &str) {
110        self.last_used.insert(tenant_id.to_string(), Instant::now());
111    }
112
113    /// Pick the loaded tenant with the oldest `last_used` Instant,
114    /// excluding `excluded` (the freshly-loaded tenant we don't
115    /// want to immediately evict — would thrash). Returns None if
116    /// no other tenant is loaded — the caller should accept the
117    /// over-budget state in that case (a single tenant whose data
118    /// alone exceeds the budget can't be evicted in favor of
119    /// itself).
120    ///
121    /// `mark_loaded` always stamps `last_used`, so every loaded
122    /// tenant has a touch timestamp by construction.
123    pub fn pick_lru_excluding(&self, excluded: &str) -> Option<String> {
124        let mut victim: Option<(String, Instant)> = None;
125        for kv in &self.loaded {
126            let tenant = kv.key();
127            if tenant == excluded {
128                continue;
129            }
130            let Some(last) = self.last_used.get(tenant).map(|v| *v) else {
131                // mark_loaded guarantees a stamp; if it's missing
132                // we have a state-management bug. Skip rather than
133                // crash production.
134                tracing::warn!(
135                    tenant_id = %tenant,
136                    "loaded tenant missing last_used stamp — skipping in LRU pick"
137                );
138                continue;
139            };
140            match &victim {
141                None => victim = Some((tenant.clone(), last)),
142                Some((_, t)) if last < *t => victim = Some((tenant.clone(), last)),
143                _ => {}
144            }
145        }
146        victim.map(|(t, _)| t)
147    }
148
149    /// Fast path probe — true if a previous call to `mark_loaded`
150    /// has succeeded for this tenant.
151    pub fn is_loaded(&self, tenant_id: &str) -> bool {
152        self.loaded.contains_key(tenant_id)
153    }
154
155    /// Record that this tenant has been hydrated. Idempotent.
156    /// Also stamps a fresh `last_used` so the tenant immediately
157    /// participates in LRU ordering and isn't picked as an
158    /// eviction victim before its first explicit touch.
159    pub fn mark_loaded(&self, tenant_id: &str) {
160        self.loaded.insert(tenant_id.to_string(), ());
161        self.last_used.insert(tenant_id.to_string(), Instant::now());
162    }
163
164    /// Reverse of `mark_loaded`: forget that this tenant is in
165    /// memory and reset its byte counter to 0. The next call to
166    /// `is_loaded` returns false; the next ensure-load will
167    /// re-walk the tenant's subtree from disk. Called by the
168    /// eviction path.
169    pub fn mark_unloaded(&self, tenant_id: &str) {
170        self.loaded.remove(tenant_id);
171        self.bytes.remove(tenant_id);
172        self.last_used.remove(tenant_id);
173    }
174
175    /// Add `n` bytes to the resident-size estimate for `tenant_id`.
176    /// Called once per event spliced into memory for that tenant.
177    /// The total is what the budget check compares against.
178    pub fn add_bytes(&self, tenant_id: &str, n: u64) {
179        *self.bytes.entry(tenant_id.to_string()).or_insert(0) += n;
180    }
181
182    /// Resident-byte estimate for a single tenant. Returns 0 for
183    /// tenants that have never been loaded (or that were evicted —
184    /// once eviction lands the counter resets to 0).
185    pub fn bytes_for(&self, tenant_id: &str) -> u64 {
186        self.bytes.get(tenant_id).map_or(0, |v| *v)
187    }
188
189    /// Sum of resident-byte estimates across every loaded tenant —
190    /// the input the budget check compares against. O(loaded
191    /// tenants), expected to be small.
192    pub fn total_bytes(&self) -> u64 {
193        self.bytes.iter().map(|kv| *kv.value()).sum()
194    }
195
196    /// Snapshot of `(tenant_id, bytes)` pairs for every tenant
197    /// that has any resident bytes. Used by the eviction policy
198    /// to pick a victim and by metrics endpoints.
199    pub fn bytes_per_tenant(&self) -> Vec<(String, u64)> {
200        self.bytes
201            .iter()
202            .map(|kv| (kv.key().clone(), *kv.value()))
203            .collect()
204    }
205
206    /// Get-or-insert the per-tenant Mutex used for singleflight
207    /// loading. The first caller for a given tenant creates the
208    /// Mutex; later callers see the same instance and serialize on
209    /// it. Returns an `Arc` so the caller can hold the lock guard
210    /// without keeping a borrow into the DashMap.
211    pub fn lock_for(&self, tenant_id: &str) -> Arc<Mutex<()>> {
212        self.locks
213            .entry(tenant_id.to_string())
214            .or_insert_with(|| Arc::new(Mutex::new(())))
215            .clone()
216    }
217
218    pub fn load_timeout(&self) -> Duration {
219        self.load_timeout
220    }
221
222    /// Number of tenants currently marked loaded. Diagnostic only.
223    #[cfg(test)]
224    pub fn loaded_count(&self) -> usize {
225        self.loaded.len()
226    }
227}
228
229impl Default for TenantLoader {
230    fn default() -> Self {
231        Self::new()
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use std::{
239        sync::atomic::{AtomicUsize, Ordering},
240        thread,
241    };
242
243    #[test]
244    fn test_is_loaded_false_until_marked() {
245        let loader = TenantLoader::new();
246        assert!(!loader.is_loaded("alice"));
247        loader.mark_loaded("alice");
248        assert!(loader.is_loaded("alice"));
249        // Other tenants stay cold — independent state per tenant.
250        assert!(!loader.is_loaded("bob"));
251    }
252
253    #[test]
254    fn test_mark_loaded_is_idempotent() {
255        let loader = TenantLoader::new();
256        loader.mark_loaded("alice");
257        loader.mark_loaded("alice");
258        assert_eq!(loader.loaded_count(), 1);
259    }
260
261    #[test]
262    fn test_lock_for_returns_same_mutex_per_tenant() {
263        // Two callers for the same tenant must get the same lock —
264        // otherwise they don't serialize and the singleflight guarantee
265        // is broken.
266        let loader = TenantLoader::new();
267        let lock_a1 = loader.lock_for("alice");
268        let lock_a2 = loader.lock_for("alice");
269        let lock_b = loader.lock_for("bob");
270
271        assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
272        assert!(!Arc::ptr_eq(&lock_a1, &lock_b));
273    }
274
275    #[test]
276    fn test_singleflight_blocks_second_caller_until_first_releases() {
277        // Mimics the lazy-load path: caller 1 holds the Mutex while
278        // "loading", caller 2 must wait until caller 1 releases.
279        // We assert ordering by recording each thread's progress in a
280        // counter.
281        let loader = Arc::new(TenantLoader::new());
282        let progress = Arc::new(AtomicUsize::new(0));
283
284        let l1 = loader.clone();
285        let p1 = progress.clone();
286        let t1 = thread::spawn(move || {
287            let lock = l1.lock_for("alice");
288            let _g = lock.lock();
289            // Caller 1 is now "loading" — record that we have the lock.
290            p1.store(1, Ordering::SeqCst);
291            thread::sleep(Duration::from_millis(80));
292            p1.store(2, Ordering::SeqCst);
293            l1.mark_loaded("alice");
294        });
295
296        // Make sure thread 1 has acquired the lock before thread 2
297        // starts. Avoids a flaky race where t2 acquires first.
298        while progress.load(Ordering::SeqCst) < 1 {
299            thread::sleep(Duration::from_millis(5));
300        }
301
302        let l2 = loader.clone();
303        let p2 = progress.clone();
304        let t2 = thread::spawn(move || {
305            let lock = l2.lock_for("alice");
306            let _g = lock.lock();
307            // When we get here, t1 must already have advanced to 2.
308            assert_eq!(
309                p2.load(Ordering::SeqCst),
310                2,
311                "second caller acquired lock before first finished — singleflight broken"
312            );
313            // And the tenant must now be marked loaded.
314            assert!(l2.is_loaded("alice"));
315        });
316
317        t1.join().unwrap();
318        t2.join().unwrap();
319    }
320
321    #[test]
322    fn test_mark_unloaded_clears_loaded_and_bytes() {
323        let loader = TenantLoader::new();
324        loader.mark_loaded("alice");
325        loader.add_bytes("alice", 100);
326        loader.mark_loaded("bob");
327        loader.add_bytes("bob", 200);
328
329        loader.mark_unloaded("alice");
330
331        assert!(!loader.is_loaded("alice"));
332        assert_eq!(loader.bytes_for("alice"), 0);
333        // Bob untouched.
334        assert!(loader.is_loaded("bob"));
335        assert_eq!(loader.bytes_for("bob"), 200);
336        assert_eq!(loader.total_bytes(), 200);
337    }
338
339    #[test]
340    fn test_bytes_default_to_zero() {
341        let loader = TenantLoader::new();
342        assert_eq!(loader.bytes_for("alice"), 0);
343        assert_eq!(loader.total_bytes(), 0);
344        assert!(loader.bytes_per_tenant().is_empty());
345    }
346
347    #[test]
348    fn test_add_bytes_accumulates_per_tenant() {
349        let loader = TenantLoader::new();
350        loader.add_bytes("alice", 100);
351        loader.add_bytes("alice", 50);
352        loader.add_bytes("bob", 200);
353        assert_eq!(loader.bytes_for("alice"), 150);
354        assert_eq!(loader.bytes_for("bob"), 200);
355        assert_eq!(loader.total_bytes(), 350);
356
357        let mut snapshot = loader.bytes_per_tenant();
358        snapshot.sort();
359        assert_eq!(
360            snapshot,
361            vec![("alice".to_string(), 150), ("bob".to_string(), 200)]
362        );
363    }
364
365    #[test]
366    fn test_pick_lru_returns_oldest_excluding_target() {
367        // Touch alice, then bob, then carol — alice is oldest.
368        // Pick excluding "carol" (the freshly-loaded one) → alice.
369        let loader = TenantLoader::new();
370        loader.mark_loaded("alice");
371        std::thread::sleep(std::time::Duration::from_millis(5));
372        loader.mark_loaded("bob");
373        std::thread::sleep(std::time::Duration::from_millis(5));
374        loader.mark_loaded("carol");
375
376        assert_eq!(
377            loader.pick_lru_excluding("carol"),
378            Some("alice".to_string())
379        );
380
381        // After touching alice, bob becomes oldest.
382        loader.touch("alice");
383        assert_eq!(loader.pick_lru_excluding("carol"), Some("bob".to_string()));
384    }
385
386    #[test]
387    fn test_pick_lru_returns_none_when_only_excluded_is_loaded() {
388        let loader = TenantLoader::new();
389        loader.mark_loaded("alice");
390        // Excluding alice from a single-tenant set → no victim.
391        assert_eq!(loader.pick_lru_excluding("alice"), None);
392    }
393
394    #[test]
395    fn test_pick_lru_returns_none_when_nothing_loaded() {
396        let loader = TenantLoader::new();
397        assert_eq!(loader.pick_lru_excluding("anyone"), None);
398    }
399
400    #[test]
401    fn test_over_budget_zero_budget_means_disabled() {
402        let loader = TenantLoader::new();
403        loader.mark_loaded("alice");
404        loader.add_bytes("alice", 1_000_000_000);
405        // No budget set (default 0) → over_budget is false even
406        // with massive resident bytes.
407        assert!(!loader.over_budget());
408
409        loader.set_byte_budget(100);
410        assert!(loader.over_budget());
411        loader.set_byte_budget(2_000_000_000);
412        assert!(!loader.over_budget());
413    }
414
415    #[test]
416    fn test_lock_for_distinct_tenants_does_not_serialize() {
417        // Two tenants must get distinct locks — alice loading must
418        // not block bob's queries.
419        let loader = Arc::new(TenantLoader::new());
420
421        let l1 = loader.clone();
422        let alice_lock = l1.lock_for("alice");
423        let _alice_held = alice_lock.lock();
424
425        // bob's lock should be acquirable immediately even while
426        // alice's is held. try_lock returns Some(_) on success.
427        let bob_lock = loader.lock_for("bob");
428        let bob_held = bob_lock.try_lock();
429        assert!(
430            bob_held.is_some(),
431            "bob's lock should not be blocked by alice's"
432        );
433    }
434}