allsource_core/infrastructure/persistence/tenant_loader.rs
1//! Per-tenant lazy-load bookkeeping (Step 2 of the sustainable data
2//! strategy).
3//!
4//! Boot used to read every Parquet file into a single shared
5//! in-memory pile. Once the dataset crossed the available memory
6//! cap, Core OOM'd during recovery (issue #160). Step 2 replaces
7//! the boot-time full-load with on-demand per-tenant hydration:
8//! the first query for tenant `T` triggers `load_events_for_tenant(T)`,
9//! and subsequent queries hit the now-warm cache.
10//!
11//! This module owns the bookkeeping — which tenants are loaded, and a
12//! per-tenant lock so two concurrent first-queries for the same
13//! tenant don't double-load. The actual disk read lives in
14//! `ParquetStorage::load_events_for_tenant`; the side-effect of
15//! splicing loaded events into the in-memory index/projection
16//! structures lives in `EventStore::ensure_tenant_loaded`.
17//!
18//! Step 3 will add a memory-budgeted cache that evicts cold tenants;
19//! at that point the `loaded` set becomes "currently in memory"
20//! rather than "ever loaded since boot", and a `mark_unloaded`
21//! method shows up.
22
23use dashmap::DashMap;
24use parking_lot::Mutex;
25use std::{
26 sync::{
27 Arc,
28 atomic::{AtomicU64, Ordering},
29 },
30 time::{Duration, Instant},
31};
32
33/// Default ceiling on how long a query may wait for an in-flight
34/// load of the same tenant before giving up. The 100k-event load
35/// budget for cold tenants is single-digit seconds (per the Step 2
36/// acceptance criteria); 30s is comfortable headroom for that, well
37/// short of a request-timeout indistinguishable from a hang.
38pub const DEFAULT_LOAD_TIMEOUT: Duration = Duration::from_secs(30);
39
40/// Tracks which tenants have been hydrated into memory, serializes
41/// concurrent first-loads of the same tenant, and accounts for the
42/// approximate bytes each loaded tenant occupies (Step 3 cache
43/// budget input).
44///
45/// `loaded` is the source of truth for "is this tenant warm?". It's
46/// insert-only until Step 3 #2's `mark_unloaded` (eviction) lands.
47///
48/// `locks` holds one Mutex per tenant ever queried. Concurrent
49/// first-queries get the same Mutex; the second waiter re-checks
50/// `loaded` after acquiring it (the standard double-checked-lock
51/// pattern, sound here because `loaded`'s DashMap insert
52/// happens-before the lock release).
53///
54/// `bytes` accumulates per-tenant resident-byte estimates as
55/// events get spliced in via `append_loaded_event`. It's the input
56/// the budget-enforcement step (Step 3 #3) keys off — the LRU
57/// eviction policy compares the sum of these counters against the
58/// configured byte budget.
59pub struct TenantLoader {
60 loaded: DashMap<String, ()>,
61 locks: DashMap<String, Arc<Mutex<()>>>,
62 bytes: DashMap<String, u64>,
63 /// Last-used Instant per tenant — refreshed on every query
64 /// via `touch`. The eviction policy (Step 3 #3) picks the
65 /// tenant with the oldest entry as the LRU victim.
66 last_used: DashMap<String, Instant>,
67 /// Cache byte budget. 0 disables enforcement (keep every
68 /// loaded tenant resident). Production reads this from the
69 /// `ALLSOURCE_CACHE_BYTES` env var.
70 byte_budget: AtomicU64,
71 load_timeout: Duration,
72}
73
74impl TenantLoader {
75 pub fn new() -> Self {
76 Self::with_timeout(DEFAULT_LOAD_TIMEOUT)
77 }
78
79 pub fn with_timeout(load_timeout: Duration) -> Self {
80 Self {
81 loaded: DashMap::new(),
82 locks: DashMap::new(),
83 bytes: DashMap::new(),
84 last_used: DashMap::new(),
85 byte_budget: AtomicU64::new(0),
86 load_timeout,
87 }
88 }
89
90 /// Set the cache byte budget. 0 disables enforcement.
91 pub fn set_byte_budget(&self, budget: u64) {
92 self.byte_budget.store(budget, Ordering::Relaxed);
93 }
94
95 pub fn byte_budget(&self) -> u64 {
96 self.byte_budget.load(Ordering::Relaxed)
97 }
98
99 /// True iff a budget is set AND total resident bytes exceed it.
100 pub fn over_budget(&self) -> bool {
101 let b = self.byte_budget();
102 b != 0 && self.total_bytes() > b
103 }
104
105 /// Mark `tenant_id` as just-used (LRU-Y end of the order).
106 /// Cheap — single DashMap insert. Called on every query that
107 /// touches this tenant. If the eviction policy picks an LRU
108 /// victim, the most-recently-touched tenants are last to go.
109 pub fn touch(&self, tenant_id: &str) {
110 self.last_used.insert(tenant_id.to_string(), Instant::now());
111 }
112
113 /// Pick the loaded tenant with the oldest `last_used` Instant,
114 /// excluding `excluded` (the freshly-loaded tenant we don't
115 /// want to immediately evict — would thrash). Returns None if
116 /// no other tenant is loaded — the caller should accept the
117 /// over-budget state in that case (a single tenant whose data
118 /// alone exceeds the budget can't be evicted in favor of
119 /// itself).
120 ///
121 /// `mark_loaded` always stamps `last_used`, so every loaded
122 /// tenant has a touch timestamp by construction.
123 pub fn pick_lru_excluding(&self, excluded: &str) -> Option<String> {
124 let mut victim: Option<(String, Instant)> = None;
125 for kv in &self.loaded {
126 let tenant = kv.key();
127 if tenant == excluded {
128 continue;
129 }
130 let Some(last) = self.last_used.get(tenant).map(|v| *v) else {
131 // mark_loaded guarantees a stamp; if it's missing
132 // we have a state-management bug. Skip rather than
133 // crash production.
134 tracing::warn!(
135 tenant_id = %tenant,
136 "loaded tenant missing last_used stamp — skipping in LRU pick"
137 );
138 continue;
139 };
140 match &victim {
141 None => victim = Some((tenant.clone(), last)),
142 Some((_, t)) if last < *t => victim = Some((tenant.clone(), last)),
143 _ => {}
144 }
145 }
146 victim.map(|(t, _)| t)
147 }
148
149 /// Fast path probe — true if a previous call to `mark_loaded`
150 /// has succeeded for this tenant.
151 pub fn is_loaded(&self, tenant_id: &str) -> bool {
152 self.loaded.contains_key(tenant_id)
153 }
154
155 /// Record that this tenant has been hydrated. Idempotent.
156 /// Also stamps a fresh `last_used` so the tenant immediately
157 /// participates in LRU ordering and isn't picked as an
158 /// eviction victim before its first explicit touch.
159 pub fn mark_loaded(&self, tenant_id: &str) {
160 self.loaded.insert(tenant_id.to_string(), ());
161 self.last_used.insert(tenant_id.to_string(), Instant::now());
162 }
163
164 /// Reverse of `mark_loaded`: forget that this tenant is in
165 /// memory and reset its byte counter to 0. The next call to
166 /// `is_loaded` returns false; the next ensure-load will
167 /// re-walk the tenant's subtree from disk. Called by the
168 /// eviction path.
169 pub fn mark_unloaded(&self, tenant_id: &str) {
170 self.loaded.remove(tenant_id);
171 self.bytes.remove(tenant_id);
172 self.last_used.remove(tenant_id);
173 }
174
175 /// Add `n` bytes to the resident-size estimate for `tenant_id`.
176 /// Called once per event spliced into memory for that tenant.
177 /// The total is what the budget check compares against.
178 pub fn add_bytes(&self, tenant_id: &str, n: u64) {
179 *self.bytes.entry(tenant_id.to_string()).or_insert(0) += n;
180 }
181
182 /// Resident-byte estimate for a single tenant. Returns 0 for
183 /// tenants that have never been loaded (or that were evicted —
184 /// once eviction lands the counter resets to 0).
185 pub fn bytes_for(&self, tenant_id: &str) -> u64 {
186 self.bytes.get(tenant_id).map_or(0, |v| *v)
187 }
188
189 /// Sum of resident-byte estimates across every loaded tenant —
190 /// the input the budget check compares against. O(loaded
191 /// tenants), expected to be small.
192 pub fn total_bytes(&self) -> u64 {
193 self.bytes.iter().map(|kv| *kv.value()).sum()
194 }
195
196 /// Snapshot of `(tenant_id, bytes)` pairs for every tenant
197 /// that has any resident bytes. Used by the eviction policy
198 /// to pick a victim and by metrics endpoints.
199 pub fn bytes_per_tenant(&self) -> Vec<(String, u64)> {
200 self.bytes
201 .iter()
202 .map(|kv| (kv.key().clone(), *kv.value()))
203 .collect()
204 }
205
206 /// Get-or-insert the per-tenant Mutex used for singleflight
207 /// loading. The first caller for a given tenant creates the
208 /// Mutex; later callers see the same instance and serialize on
209 /// it. Returns an `Arc` so the caller can hold the lock guard
210 /// without keeping a borrow into the DashMap.
211 pub fn lock_for(&self, tenant_id: &str) -> Arc<Mutex<()>> {
212 self.locks
213 .entry(tenant_id.to_string())
214 .or_insert_with(|| Arc::new(Mutex::new(())))
215 .clone()
216 }
217
218 pub fn load_timeout(&self) -> Duration {
219 self.load_timeout
220 }
221
222 /// Number of tenants currently marked loaded. Diagnostic only.
223 #[cfg(test)]
224 pub fn loaded_count(&self) -> usize {
225 self.loaded.len()
226 }
227}
228
229impl Default for TenantLoader {
230 fn default() -> Self {
231 Self::new()
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use std::{
239 sync::atomic::{AtomicUsize, Ordering},
240 thread,
241 };
242
243 #[test]
244 fn test_is_loaded_false_until_marked() {
245 let loader = TenantLoader::new();
246 assert!(!loader.is_loaded("alice"));
247 loader.mark_loaded("alice");
248 assert!(loader.is_loaded("alice"));
249 // Other tenants stay cold — independent state per tenant.
250 assert!(!loader.is_loaded("bob"));
251 }
252
253 #[test]
254 fn test_mark_loaded_is_idempotent() {
255 let loader = TenantLoader::new();
256 loader.mark_loaded("alice");
257 loader.mark_loaded("alice");
258 assert_eq!(loader.loaded_count(), 1);
259 }
260
261 #[test]
262 fn test_lock_for_returns_same_mutex_per_tenant() {
263 // Two callers for the same tenant must get the same lock —
264 // otherwise they don't serialize and the singleflight guarantee
265 // is broken.
266 let loader = TenantLoader::new();
267 let lock_a1 = loader.lock_for("alice");
268 let lock_a2 = loader.lock_for("alice");
269 let lock_b = loader.lock_for("bob");
270
271 assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
272 assert!(!Arc::ptr_eq(&lock_a1, &lock_b));
273 }
274
275 #[test]
276 fn test_singleflight_blocks_second_caller_until_first_releases() {
277 // Mimics the lazy-load path: caller 1 holds the Mutex while
278 // "loading", caller 2 must wait until caller 1 releases.
279 // We assert ordering by recording each thread's progress in a
280 // counter.
281 let loader = Arc::new(TenantLoader::new());
282 let progress = Arc::new(AtomicUsize::new(0));
283
284 let l1 = loader.clone();
285 let p1 = progress.clone();
286 let t1 = thread::spawn(move || {
287 let lock = l1.lock_for("alice");
288 let _g = lock.lock();
289 // Caller 1 is now "loading" — record that we have the lock.
290 p1.store(1, Ordering::SeqCst);
291 thread::sleep(Duration::from_millis(80));
292 p1.store(2, Ordering::SeqCst);
293 l1.mark_loaded("alice");
294 });
295
296 // Make sure thread 1 has acquired the lock before thread 2
297 // starts. Avoids a flaky race where t2 acquires first.
298 while progress.load(Ordering::SeqCst) < 1 {
299 thread::sleep(Duration::from_millis(5));
300 }
301
302 let l2 = loader.clone();
303 let p2 = progress.clone();
304 let t2 = thread::spawn(move || {
305 let lock = l2.lock_for("alice");
306 let _g = lock.lock();
307 // When we get here, t1 must already have advanced to 2.
308 assert_eq!(
309 p2.load(Ordering::SeqCst),
310 2,
311 "second caller acquired lock before first finished — singleflight broken"
312 );
313 // And the tenant must now be marked loaded.
314 assert!(l2.is_loaded("alice"));
315 });
316
317 t1.join().unwrap();
318 t2.join().unwrap();
319 }
320
321 #[test]
322 fn test_mark_unloaded_clears_loaded_and_bytes() {
323 let loader = TenantLoader::new();
324 loader.mark_loaded("alice");
325 loader.add_bytes("alice", 100);
326 loader.mark_loaded("bob");
327 loader.add_bytes("bob", 200);
328
329 loader.mark_unloaded("alice");
330
331 assert!(!loader.is_loaded("alice"));
332 assert_eq!(loader.bytes_for("alice"), 0);
333 // Bob untouched.
334 assert!(loader.is_loaded("bob"));
335 assert_eq!(loader.bytes_for("bob"), 200);
336 assert_eq!(loader.total_bytes(), 200);
337 }
338
339 #[test]
340 fn test_bytes_default_to_zero() {
341 let loader = TenantLoader::new();
342 assert_eq!(loader.bytes_for("alice"), 0);
343 assert_eq!(loader.total_bytes(), 0);
344 assert!(loader.bytes_per_tenant().is_empty());
345 }
346
347 #[test]
348 fn test_add_bytes_accumulates_per_tenant() {
349 let loader = TenantLoader::new();
350 loader.add_bytes("alice", 100);
351 loader.add_bytes("alice", 50);
352 loader.add_bytes("bob", 200);
353 assert_eq!(loader.bytes_for("alice"), 150);
354 assert_eq!(loader.bytes_for("bob"), 200);
355 assert_eq!(loader.total_bytes(), 350);
356
357 let mut snapshot = loader.bytes_per_tenant();
358 snapshot.sort();
359 assert_eq!(
360 snapshot,
361 vec![("alice".to_string(), 150), ("bob".to_string(), 200)]
362 );
363 }
364
365 #[test]
366 fn test_pick_lru_returns_oldest_excluding_target() {
367 // Touch alice, then bob, then carol — alice is oldest.
368 // Pick excluding "carol" (the freshly-loaded one) → alice.
369 let loader = TenantLoader::new();
370 loader.mark_loaded("alice");
371 std::thread::sleep(std::time::Duration::from_millis(5));
372 loader.mark_loaded("bob");
373 std::thread::sleep(std::time::Duration::from_millis(5));
374 loader.mark_loaded("carol");
375
376 assert_eq!(
377 loader.pick_lru_excluding("carol"),
378 Some("alice".to_string())
379 );
380
381 // After touching alice, bob becomes oldest.
382 loader.touch("alice");
383 assert_eq!(loader.pick_lru_excluding("carol"), Some("bob".to_string()));
384 }
385
386 #[test]
387 fn test_pick_lru_returns_none_when_only_excluded_is_loaded() {
388 let loader = TenantLoader::new();
389 loader.mark_loaded("alice");
390 // Excluding alice from a single-tenant set → no victim.
391 assert_eq!(loader.pick_lru_excluding("alice"), None);
392 }
393
394 #[test]
395 fn test_pick_lru_returns_none_when_nothing_loaded() {
396 let loader = TenantLoader::new();
397 assert_eq!(loader.pick_lru_excluding("anyone"), None);
398 }
399
400 #[test]
401 fn test_over_budget_zero_budget_means_disabled() {
402 let loader = TenantLoader::new();
403 loader.mark_loaded("alice");
404 loader.add_bytes("alice", 1_000_000_000);
405 // No budget set (default 0) → over_budget is false even
406 // with massive resident bytes.
407 assert!(!loader.over_budget());
408
409 loader.set_byte_budget(100);
410 assert!(loader.over_budget());
411 loader.set_byte_budget(2_000_000_000);
412 assert!(!loader.over_budget());
413 }
414
415 #[test]
416 fn test_lock_for_distinct_tenants_does_not_serialize() {
417 // Two tenants must get distinct locks — alice loading must
418 // not block bob's queries.
419 let loader = Arc::new(TenantLoader::new());
420
421 let l1 = loader.clone();
422 let alice_lock = l1.lock_for("alice");
423 let _alice_held = alice_lock.lock();
424
425 // bob's lock should be acquirable immediately even while
426 // alice's is held. try_lock returns Some(_) on success.
427 let bob_lock = loader.lock_for("bob");
428 let bob_held = bob_lock.try_lock();
429 assert!(
430 bob_held.is_some(),
431 "bob's lock should not be blocked by alice's"
432 );
433 }
434}